-
-
Save msgeorgem/824c06816872d56e0de5c9a0f299b636 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import google.auth | |
| from google.cloud import bigquery | |
| from google.cloud import bigquery_storage_v1beta1 | |
| import datetime | |
| import gspread | |
| import urllib.request | |
| from oauth2client.service_account import ServiceAccountCredentials | |
| def nytaxi_pubsub(event, context): | |
| # 1st. Part - Run query upon data warehouse BigQuery table, create data mart BigQuery table, and create pandas data frame with the same contents. | |
| today = datetime.date.today().strftime('%Y%m%d') | |
| # Explicitly create a credentials object. This allows you to use the same | |
| # credentials for both the BigQuery and BigQuery Storage clients, avoiding | |
| # unnecessary API calls to fetch duplicate authentication tokens. | |
| credentials, project_id = google.auth.default( | |
| scopes=["https://www.googleapis.com/auth/cloud-platform"] | |
| ) | |
| # Instantiate bigquery client and bigquery_storage client for the project. | |
| client = bigquery.Client(project=project_id) | |
| bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient() | |
| # Define query to run. | |
| query = f""" | |
| SELECT | |
| {today} AS date | |
| , passenger_count | |
| , COUNT(*) AS ride_count | |
| , SUM(passenger_count) AS total_passenger_count | |
| , SUM(fare_amount) AS total_fare_amount | |
| , SUM(tip_amount) AS total_tip_amount | |
| , SUM(total_amount) AS total_amount | |
| FROM < Original NY taxi data table in BigQuery > | |
| --WHERE ride_month = {today} | |
| GROUP BY passenger_count | |
| ORDER BY passenger_count | |
| """ | |
| # Define BigQuery destination table. | |
| destination_dataset = 'DataMart_NYTaxi_per_customer' | |
| destination_table = f"{project_id}.{destination_dataset}.DataMart_NYTaxi_per_customer_{today}" | |
| ## Delete if there's already a table as the target table. | |
| client.delete_table(destination_table, not_found_ok=True) | |
| # Run query upon data warehouse BigQuery table, create data mart BigQuery table, and create pandas data frame with the same contents. | |
| query_job = client.query(query, job_config=bigquery.QueryJobConfig(destination=destination_table)) | |
| res_df = query_job.result().to_dataframe(bqstorage_client=bqstorageclient) | |
| # 2nd. Part - Load the data frame to Google Sheets | |
| # Instantiate Sheets service account client - Beforehand, create service account json and save it somewhere in GCP Storage. | |
| if not os.path.isfile('/tmp/service_account.json'): | |
| urllib.request.urlretrieve("< Path to .json with service account credentials stored in GCP Storage>","/tmp/service_account.json") | |
| client = gspread.service_account(filename='/tmp/service_account.json') | |
| sheet = client.open("DataMart_NYTaxi_per_customer").sheet1 | |
| # Only when the Google Sheets file is new. | |
| # sheet.update([res_df.columns.values.tolist()] + res_df.values.tolist()) | |
| # When Google Sheets file already has some input. | |
| sheet.insert_rows(res_df.values.tolist(),2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment