Created
October 17, 2023 18:25
-
-
Save rjtshrm/7bef9be3bb71dceb19cbde0c70aa7cfd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| from typing import Union | |
| import os | |
| from datahub.configuration.kafka import KafkaProducerConnectionConfig | |
| from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig | |
| from datahub.emitter.mce_builder import make_dataset_urn | |
| from datahub.emitter.rest_emitter import DataHubRestEmitter | |
| from datahub.specific.dataset import DatasetPatchBuilder | |
| log = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # Get an emitter, either REST or Kafka, this example shows you both | |
| def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]: | |
| gms_endpoint = f'http://{os.getenv("DATAHUB_GMS_HOST")}:{os.getenv("DATAHUB_GMS_PORT")}' | |
| return DataHubRestEmitter(gms_server=gms_endpoint) | |
| dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:athena,ota_genesys.conversations,PROD)" | |
| with get_emitter() as emitter: | |
| for patch_mcp in ( | |
| DatasetPatchBuilder(dataset_urn) | |
| .add_custom_property("custom_test", "test_prop") | |
| .build() | |
| ): | |
| emitter.emit(patch_mcp) | |
| log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment