Skip to content

Instantly share code, notes, and snippets.

@bruvio
Forked from stephenconnolly1/athena-example.py
Created May 21, 2021 19:43
Show Gist options
  • Select an option

  • Save bruvio/4a25a773b32b191bf149ff1b91b1d885 to your computer and use it in GitHub Desktop.

Select an option

Save bruvio/4a25a773b32b191bf149ff1b91b1d885 to your computer and use it in GitHub Desktop.
Example Python script to create athena table from some JSON records and query it
import boto3
import time
import json
import pprint
import sys
# Defaults
query_output = 's3://platform-prd-my-athena-output-bucket/outputs'
pp = pprint.PrettyPrinter(indent=2)
queryparams = {}
queryparams['execution_id']=''
athena = boto3.client('athena')
# functions
# queryparams is mutable, so that execution_id has to be returned to the caller for further processing
def run_athena_query (query, queryparams):
print "Executing query:\n{0}".format(query)
response = athena.start_query_execution(
QueryString=ddl_query,
ResultConfiguration={
'OutputLocation': query_output
}
)
execution_id = response['QueryExecutionId']
queryparams['execution_id'] = execution_id
status = ''
while True:
stats = athena.get_query_execution(QueryExecutionId=execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
return status
time.sleep(0.2) # 200ms
# Print the results of the query execution
def print_results(execution_id):
results = athena.get_query_results(QueryExecutionId=execution_id)
print json.dumps(results, sort_keys=True, indent=4)
def main():
query = r'''CREATE EXTERNAL TABLE IF NOT EXISTS SPC_TABLE (
id INT,
cuisine STRING,
ingredients ARRAY<STRING>
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://platform-prd-my-athena-input-bucket/'
'''
# 'org.openx.data.jsonserde.JsonSerDe'
queryparams['execution_id']=''
if run_athena_query(query, queryparams) != 'SUCCEEDED' :
print ret
sys.exit(1)
print_results(queryparams['execution_id'])
# Now interrogate the data
query = r'''
SELECT
cuisine,
COUNT (*) as cnt
FROM SPC_TABLE
GROUP BY
cuisine
'''
queryparams['execution_id']=''
ret = run_athena_query(query, queryparams)
if ret != 'SUCCEEDED' :
print ret
sys.exit(1)
print_results(queryparams['execution_id'])
# end functions
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment