Skip to content

Instantly share code, notes, and snippets.

@rjtshrm
Created October 17, 2023 18:25
Show Gist options
  • Select an option

  • Save rjtshrm/7bef9be3bb71dceb19cbde0c70aa7cfd to your computer and use it in GitHub Desktop.

Select an option

Save rjtshrm/7bef9be3bb71dceb19cbde0c70aa7cfd to your computer and use it in GitHub Desktop.

Revisions

  1. rjtshrm created this gist Oct 17, 2023.
    32 changes: 32 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,32 @@
    import logging
    from typing import Union
    import os

    from datahub.configuration.kafka import KafkaProducerConnectionConfig
    from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
    from datahub.emitter.mce_builder import make_dataset_urn
    from datahub.emitter.rest_emitter import DataHubRestEmitter
    from datahub.specific.dataset import DatasetPatchBuilder

    log = logging.getLogger(__name__)
    logging.basicConfig(level=logging.INFO)


    # Get an emitter, either REST or Kafka, this example shows you both
    def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
    gms_endpoint = f'http://{os.getenv("DATAHUB_GMS_HOST")}:{os.getenv("DATAHUB_GMS_PORT")}'
    return DataHubRestEmitter(gms_server=gms_endpoint)


    dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:athena,ota_genesys.conversations,PROD)"

    with get_emitter() as emitter:
    for patch_mcp in (
    DatasetPatchBuilder(dataset_urn)
    .add_custom_property("custom_test", "test_prop")
    .build()
    ):
    emitter.emit(patch_mcp)


    log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")