Skip to content

Instantly share code, notes, and snippets.

@boseoladipo
Last active July 16, 2020 10:18
Show Gist options
  • Select an option

  • Save boseoladipo/313c2c7e85333da7218ab74061d36bb9 to your computer and use it in GitHub Desktop.

Select an option

Save boseoladipo/313c2c7e85333da7218ab74061d36bb9 to your computer and use it in GitHub Desktop.

Revisions

  1. boseoladipo revised this gist Jul 16, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion pipeline.py
    Original file line number Diff line number Diff line change
    @@ -23,7 +23,7 @@
    use_standard_sql=True)) \
    | 'Transform: RequestData' >> beam.ParDo(FetchEventsFn(payload))

    events |'Output: WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
    events |'Output: WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
    destination_table,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
  2. boseoladipo revised this gist Jul 16, 2020. 1 changed file with 19 additions and 0 deletions.
    19 changes: 19 additions & 0 deletions pipeline.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,22 @@
    payload = {
    "customer_ids": {"cookie": ""},
    "event_types": [
    "campaign",
    "first_session",
    "session_start",
    "session_end",
    "reserve_car",
    "view item",
    "view_car",
    "view_category",
    "add item to cart",
    "preorder",
    "autoprenuer_signup",
    "book_inspection",
    "set_city",
    "set_location",
    ]}

    events = p \
    | 'Input: ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(
    query=source_query,
  3. boseoladipo created this gist Jul 16, 2020.
    14 changes: 14 additions & 0 deletions pipeline.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,14 @@
    events = p \
    | 'Input: ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(
    query=source_query,
    use_standard_sql=True)) \
    | 'Transform: RequestData' >> beam.ParDo(FetchEventsFn(payload))

    events |'Output: WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
    destination_table,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

    result = p.run()