Skip to content

Instantly share code, notes, and snippets.

@emrantalukder
Created August 18, 2023 18:50
Show Gist options
  • Select an option

  • Save emrantalukder/300959dd7163a72325399f3b87857bd1 to your computer and use it in GitHub Desktop.

Select an option

Save emrantalukder/300959dd7163a72325399f3b87857bd1 to your computer and use it in GitHub Desktop.
Produce AVRO Messages with RecordNameStrategy and Sink to MongoDB
kafka-avro-console-producer \
--bootstrap-server $BOOTSTRAP_SERVER \
--producer.config client.config \
--topic orders-avro \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy \
--property auto.register.schemas=true \
--property schema.registry.url=$SR_URL \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=SR_KEY:SR_SECERT \
--property value.schema.file=orders-avro-schema.json < sample-data.json
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=<BOOTSTRAP_SERVER>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<SECRET>';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=<SR URL>
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<SR KEY>:<SR SECERT>
curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy",
"connection.uri":"mongodb://user:pass@localhost:27017/",
"database":"quickstart",
"collection":"topicData",
"topics":"orders-avro",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "$SR_URL",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "OQM7Q7YUW6MT6OKO:QmYJJGVxjvgDM/Q3QXEhbCCyxdXN/LpkJe5V7HhRdUBkWE/la3eDLIMepsw8owZp",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy"
}
}
' \
http://localhost:8083/connectors -w "\n"
{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00}
{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00}
{"number":3,"date":18502,"shipping_address":"5014 Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00}
{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00}
{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00}
@emrantalukder
Copy link
Author

emrantalukder commented Aug 18, 2023

This gist shows the configuration I used to verify that TopicRecordName and RecordName strategies work.

This test used the mongodb/kafka-connect-mongodb:1.10.1 of the MongoDB Connector for Kafka Connect:
MongoDB Connector (Source and Sink)

The value.subject.name.strategy must be the same when the records are produced and the records are consumed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment