Skip to content

Instantly share code, notes, and snippets.

@rjtshrm
Created October 17, 2023 18:25
Show Gist options
  • Select an option

  • Save rjtshrm/7bef9be3bb71dceb19cbde0c70aa7cfd to your computer and use it in GitHub Desktop.

Select an option

Save rjtshrm/7bef9be3bb71dceb19cbde0c70aa7cfd to your computer and use it in GitHub Desktop.
import logging
from typing import Union
import os
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
gms_endpoint = f'http://{os.getenv("DATAHUB_GMS_HOST")}:{os.getenv("DATAHUB_GMS_PORT")}'
return DataHubRestEmitter(gms_server=gms_endpoint)
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:athena,ota_genesys.conversations,PROD)"
with get_emitter() as emitter:
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_custom_property("custom_test", "test_prop")
.build()
):
emitter.emit(patch_mcp)
log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment