Skip to content

Instantly share code, notes, and snippets.

@morzzz007
Created July 14, 2022 09:11
Show Gist options
  • Select an option

  • Save morzzz007/913b2bdecd63f3d138269f237c242618 to your computer and use it in GitHub Desktop.

Select an option

Save morzzz007/913b2bdecd63f3d138269f237c242618 to your computer and use it in GitHub Desktop.
A streaming workflow for BigQuery to BigTable transfer with QUERY string

Itt project_id arg van es JO a UI-rol kimasolhato string a querybe (59-es sor)

Fontos:

Ez egy kulon jobot csinal, az IAM-ban a service accountnak kell a bigquery.datasets.create ES a bigquery.jobs.create jog.

"""A streaming workflow for BigQuery to BigTable transfer
"""
# pytype: skip-file
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from google.cloud.bigtable import row
import datetime
import apache_beam as beam
import argparse
import logging
class CreateRowFn(beam.DoFn):
def process(self, bigquery_row):
# use attributes from message pls
print(bigquery_row)
direct_row = row.DirectRow(row_key="myrowkey")
direct_row.set_cell(
'cf1',
'field1',
'value1',
timestamp=datetime.datetime.now())
yield direct_row
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--bigquery_project',
required=True,
help=(
'BigQuery Project Id'))
parser.add_argument(
'--bigtable_project',
required=True,
help='The Bigtable project ID, this can be different than your '
'Dataflow project')
parser.add_argument(
'--bigtable_instance',
required=True,
help='The Bigtable instance ID')
parser.add_argument(
'--bigtable_table',
required=True,
help='The Bigtable table ID in the instance.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'QueryTable' >> beam.io.ReadFromBigQuery(method='DIRECT_READ', project=known_args.bigquery_project,
query="SELECT title FROM [ems-plugins.sap_cc_products_test_dataset.sap_cc_products_test_table]")
| 'BigQuery row to BigTable row object' >> beam.ParDo(CreateRowFn())
| 'Writing row object to BigTable' >> WriteToBigTable(project_id=known_args.bigtable_project,
instance_id=known_args.bigtable_instance,
table_id=known_args.bigtable_table))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment