Created
August 18, 2023 18:50
-
-
Save emrantalukder/300959dd7163a72325399f3b87857bd1 to your computer and use it in GitHub Desktop.
Produce AVRO Messages with RecordNameStrategy and Sink to MongoDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| kafka-avro-console-producer \ | |
| --bootstrap-server $BOOTSTRAP_SERVER \ | |
| --producer.config client.config \ | |
| --topic orders-avro \ | |
| --property value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy \ | |
| --property auto.register.schemas=true \ | |
| --property schema.registry.url=$SR_URL \ | |
| --property basic.auth.credentials.source=USER_INFO \ | |
| --property basic.auth.user.info=SR_KEY:SR_SECERT \ | |
| --property value.schema.file=orders-avro-schema.json < sample-data.json | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Required connection configs for Kafka producer, consumer, and admin | |
| bootstrap.servers=<BOOTSTRAP_SERVER> | |
| security.protocol=SASL_SSL | |
| sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<SECRET>'; | |
| sasl.mechanism=PLAIN | |
| # Required for correctness in Apache Kafka clients prior to 2.6 | |
| client.dns.lookup=use_all_dns_ips | |
| # Best practice for higher availability in Apache Kafka clients prior to 3.0 | |
| session.timeout.ms=45000 | |
| # Best practice for Kafka producer to prevent data loss | |
| acks=all | |
| # Required connection configs for Confluent Cloud Schema Registry | |
| schema.registry.url=<SR URL> | |
| basic.auth.credentials.source=USER_INFO | |
| basic.auth.user.info=<SR KEY>:<SR SECERT> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| curl -X POST \ | |
| -H "Content-Type: application/json" \ | |
| --data ' | |
| {"name": "mongo-sink", | |
| "config": { | |
| "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", | |
| "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy", | |
| "connection.uri":"mongodb://user:pass@localhost:27017/", | |
| "database":"quickstart", | |
| "collection":"topicData", | |
| "topics":"orders-avro", | |
| "key.converter": "org.apache.kafka.connect.storage.StringConverter", | |
| "value.converter": "io.confluent.connect.avro.AvroConverter", | |
| "value.converter.schema.registry.url": "$SR_URL", | |
| "value.converter.basic.auth.credentials.source": "USER_INFO", | |
| "value.converter.basic.auth.user.info": "OQM7Q7YUW6MT6OKO:QmYJJGVxjvgDM/Q3QXEhbCCyxdXN/LpkJe5V7HhRdUBkWE/la3eDLIMepsw8owZp", | |
| "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy" | |
| } | |
| } | |
| ' \ | |
| http://localhost:8083/connectors -w "\n" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| {"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00} | |
| {"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00} | |
| {"number":3,"date":18502,"shipping_address":"5014 Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00} | |
| {"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00} | |
| {"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00} |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist shows the configuration I used to verify that TopicRecordName and RecordName strategies work.
This test used the mongodb/kafka-connect-mongodb:1.10.1 of the MongoDB Connector for Kafka Connect:
MongoDB Connector (Source and Sink)
The
value.subject.name.strategymust be the same when the records are produced and the records are consumed.