-
-
Save bruvio/4a25a773b32b191bf149ff1b91b1d885 to your computer and use it in GitHub Desktop.
Example Python script to create athena table from some JSON records and query it
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import boto3 | |
| import time | |
| import json | |
| import pprint | |
| import sys | |
| # Defaults | |
| query_output = 's3://platform-prd-my-athena-output-bucket/outputs' | |
| pp = pprint.PrettyPrinter(indent=2) | |
| queryparams = {} | |
| queryparams['execution_id']='' | |
| athena = boto3.client('athena') | |
| # functions | |
| # queryparams is mutable, so that execution_id has to be returned to the caller for further processing | |
| def run_athena_query (query, queryparams): | |
| print "Executing query:\n{0}".format(query) | |
| response = athena.start_query_execution( | |
| QueryString=ddl_query, | |
| ResultConfiguration={ | |
| 'OutputLocation': query_output | |
| } | |
| ) | |
| execution_id = response['QueryExecutionId'] | |
| queryparams['execution_id'] = execution_id | |
| status = '' | |
| while True: | |
| stats = athena.get_query_execution(QueryExecutionId=execution_id) | |
| status = stats['QueryExecution']['Status']['State'] | |
| if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']: | |
| return status | |
| time.sleep(0.2) # 200ms | |
| # Print the results of the query execution | |
| def print_results(execution_id): | |
| results = athena.get_query_results(QueryExecutionId=execution_id) | |
| print json.dumps(results, sort_keys=True, indent=4) | |
| def main(): | |
| query = r'''CREATE EXTERNAL TABLE IF NOT EXISTS SPC_TABLE ( | |
| id INT, | |
| cuisine STRING, | |
| ingredients ARRAY<STRING> | |
| ) | |
| ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' | |
| LOCATION 's3://platform-prd-my-athena-input-bucket/' | |
| ''' | |
| # 'org.openx.data.jsonserde.JsonSerDe' | |
| queryparams['execution_id']='' | |
| if run_athena_query(query, queryparams) != 'SUCCEEDED' : | |
| print ret | |
| sys.exit(1) | |
| print_results(queryparams['execution_id']) | |
| # Now interrogate the data | |
| query = r''' | |
| SELECT | |
| cuisine, | |
| COUNT (*) as cnt | |
| FROM SPC_TABLE | |
| GROUP BY | |
| cuisine | |
| ''' | |
| queryparams['execution_id']='' | |
| ret = run_athena_query(query, queryparams) | |
| if ret != 'SUCCEEDED' : | |
| print ret | |
| sys.exit(1) | |
| print_results(queryparams['execution_id']) | |
| # end functions | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment