Skip to content

Instantly share code, notes, and snippets.

@punneng
Last active June 24, 2018 11:53
Show Gist options
  • Select an option

  • Save punneng/576c557dd06354ab3bf09a7e75336d41 to your computer and use it in GitHub Desktop.

Select an option

Save punneng/576c557dd06354ab3bf09a7e75336d41 to your computer and use it in GitHub Desktop.
package producer
import (
"fmt"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
var producer *kafka.Producer
func InitKafka() error {
var err error
producer, err = kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": os.Getenv("BOOTSTRAP_SERVERS")})
return err
}
func Produce(topics string, message string) error {
deliveryChan := make(chan kafka.Event)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
Value: []byte(message),
}, deliveryChan)
if err != nil {
fmt.Printf("Produce failed: %v\n", err)
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
return m.TopicPartition.Error
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment