Created
October 17, 2023 18:25
-
-
Save rjtshrm/7bef9be3bb71dceb19cbde0c70aa7cfd to your computer and use it in GitHub Desktop.
Revisions
-
rjtshrm created this gist
Oct 17, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,32 @@ import logging from typing import Union import os from datahub.configuration.kafka import KafkaProducerConnectionConfig from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig from datahub.emitter.mce_builder import make_dataset_urn from datahub.emitter.rest_emitter import DataHubRestEmitter from datahub.specific.dataset import DatasetPatchBuilder log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) # Get an emitter, either REST or Kafka, this example shows you both def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]: gms_endpoint = f'http://{os.getenv("DATAHUB_GMS_HOST")}:{os.getenv("DATAHUB_GMS_PORT")}' return DataHubRestEmitter(gms_server=gms_endpoint) dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:athena,ota_genesys.conversations,PROD)" with get_emitter() as emitter: for patch_mcp in ( DatasetPatchBuilder(dataset_urn) .add_custom_property("custom_test", "test_prop") .build() ): emitter.emit(patch_mcp) log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")