Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save msgeorgem/824c06816872d56e0de5c9a0f299b636 to your computer and use it in GitHub Desktop.

Select an option

Save msgeorgem/824c06816872d56e0de5c9a0f299b636 to your computer and use it in GitHub Desktop.
import os
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
import datetime
import gspread
import urllib.request
from oauth2client.service_account import ServiceAccountCredentials
def nytaxi_pubsub(event, context):
# 1st. Part - Run query upon data warehouse BigQuery table, create data mart BigQuery table, and create pandas data frame with the same contents.
today = datetime.date.today().strftime('%Y%m%d')
# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Instantiate bigquery client and bigquery_storage client for the project.
client = bigquery.Client(project=project_id)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient()
# Define query to run.
query = f"""
SELECT
{today} AS date
, passenger_count
, COUNT(*) AS ride_count
, SUM(passenger_count) AS total_passenger_count
, SUM(fare_amount) AS total_fare_amount
, SUM(tip_amount) AS total_tip_amount
, SUM(total_amount) AS total_amount
FROM < Original NY taxi data table in BigQuery >
--WHERE ride_month = {today}
GROUP BY passenger_count
ORDER BY passenger_count
"""
# Define BigQuery destination table.
destination_dataset = 'DataMart_NYTaxi_per_customer'
destination_table = f"{project_id}.{destination_dataset}.DataMart_NYTaxi_per_customer_{today}"
## Delete if there's already a table as the target table.
client.delete_table(destination_table, not_found_ok=True)
# Run query upon data warehouse BigQuery table, create data mart BigQuery table, and create pandas data frame with the same contents.
query_job = client.query(query, job_config=bigquery.QueryJobConfig(destination=destination_table))
res_df = query_job.result().to_dataframe(bqstorage_client=bqstorageclient)
# 2nd. Part - Load the data frame to Google Sheets
# Instantiate Sheets service account client - Beforehand, create service account json and save it somewhere in GCP Storage.
if not os.path.isfile('/tmp/service_account.json'):
urllib.request.urlretrieve("< Path to .json with service account credentials stored in GCP Storage>","/tmp/service_account.json")
client = gspread.service_account(filename='/tmp/service_account.json')
sheet = client.open("DataMart_NYTaxi_per_customer").sheet1
# Only when the Google Sheets file is new.
# sheet.update([res_df.columns.values.tolist()] + res_df.values.tolist())
# When Google Sheets file already has some input.
sheet.insert_rows(res_df.values.tolist(),2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment