Skip to content

Instantly share code, notes, and snippets.

@morzzz007
Last active July 14, 2022 09:06
Show Gist options
  • Select an option

  • Save morzzz007/6a90c5ab8a2295313be6ecd2353fe801 to your computer and use it in GitHub Desktop.

Select an option

Save morzzz007/6a90c5ab8a2295313be6ecd2353fe801 to your computer and use it in GitHub Desktop.
A batch workflow for BigQuery to BigTable transfer

Ez nem tud queryzni csak vegigmegy az egesz tablan, eleg a BigQuery Read Session User jog az IAM-ban a service accountnak.

FONTOS:

A --table_spec ems-plugins:sap_cc_products_test_dataset.sap_cc_products_test_table

!!!! KETTOSPONTTAL !!!!

mukodik csak a project neve utan, amit a kibaszott Cloud Console UI a vagolapra masol ott PONT van es NEM jo.

"""A batch workflow for BigQuery to BigTable transfer
"""
# pytype: skip-file
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from google.cloud.bigtable import row
import datetime
import apache_beam as beam
import argparse
import logging
class CreateRowFn(beam.DoFn):
def process(self, bigquery_row):
# use attributes from bigquery row pls
print(bigquery_row)
direct_row = row.DirectRow(row_key="myrowkey")
direct_row.set_cell(
'cf1',
'field1',
'value1',
timestamp=datetime.datetime.now())
yield direct_row
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--table_spec',
required=True,
help=(
'BigQuery Table Spec in project-id:dataset_id.table_id format'))
parser.add_argument(
'--bigtable_project',
required=True,
help='The Bigtable project ID, this can be different than your '
'Dataflow project')
parser.add_argument(
'--bigtable_instance',
required=True,
help='The Bigtable instance ID')
parser.add_argument(
'--bigtable_table',
required=True,
help='The Bigtable table ID in the instance.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadTable' >> beam.io.ReadFromBigQuery(method='DIRECT_READ', table=known_args.table_spec)
| 'BigQuery row to BigTable row object' >> beam.ParDo(CreateRowFn())
| 'Writing row object to BigTable' >> WriteToBigTable(project_id=known_args.bigtable_project,
instance_id=known_args.bigtable_instance,
table_id=known_args.bigtable_table))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment