Itt project_id arg van es JO a UI-rol kimasolhato string a querybe (59-es sor)
Ez egy kulon jobot csinal, az IAM-ban a service accountnak kell a
bigquery.datasets.create ES a bigquery.jobs.create jog.
| """A streaming workflow for BigQuery to BigTable transfer | |
| """ | |
| # pytype: skip-file | |
| from apache_beam.options.pipeline_options import PipelineOptions | |
| from apache_beam.io.gcp.bigtableio import WriteToBigTable | |
| from google.cloud.bigtable import row | |
| import datetime | |
| import apache_beam as beam | |
| import argparse | |
| import logging | |
| class CreateRowFn(beam.DoFn): | |
| def process(self, bigquery_row): | |
| # use attributes from message pls | |
| print(bigquery_row) | |
| direct_row = row.DirectRow(row_key="myrowkey") | |
| direct_row.set_cell( | |
| 'cf1', | |
| 'field1', | |
| 'value1', | |
| timestamp=datetime.datetime.now()) | |
| yield direct_row | |
| def run(argv=None): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| '--bigquery_project', | |
| required=True, | |
| help=( | |
| 'BigQuery Project Id')) | |
| parser.add_argument( | |
| '--bigtable_project', | |
| required=True, | |
| help='The Bigtable project ID, this can be different than your ' | |
| 'Dataflow project') | |
| parser.add_argument( | |
| '--bigtable_instance', | |
| required=True, | |
| help='The Bigtable instance ID') | |
| parser.add_argument( | |
| '--bigtable_table', | |
| required=True, | |
| help='The Bigtable table ID in the instance.') | |
| known_args, pipeline_args = parser.parse_known_args(argv) | |
| pipeline_options = PipelineOptions(pipeline_args) | |
| with beam.Pipeline(options=pipeline_options) as p: | |
| (p | |
| | 'QueryTable' >> beam.io.ReadFromBigQuery(method='DIRECT_READ', project=known_args.bigquery_project, | |
| query="SELECT title FROM [ems-plugins.sap_cc_products_test_dataset.sap_cc_products_test_table]") | |
| | 'BigQuery row to BigTable row object' >> beam.ParDo(CreateRowFn()) | |
| | 'Writing row object to BigTable' >> WriteToBigTable(project_id=known_args.bigtable_project, | |
| instance_id=known_args.bigtable_instance, | |
| table_id=known_args.bigtable_table)) | |
| if __name__ == '__main__': | |
| logging.getLogger().setLevel(logging.INFO) | |
| run() |