Skip to content

Instantly share code, notes, and snippets.

@stonegao
Forked from mardambey/KafkaEmbedded.scala
Created May 11, 2012 03:11
Show Gist options
  • Select an option

  • Save stonegao/2657294 to your computer and use it in GitHub Desktop.

Select an option

Save stonegao/2657294 to your computer and use it in GitHub Desktop.

Revisions

  1. @mardambey mardambey created this gist May 10, 2012.
    60 changes: 60 additions & 0 deletions KafkaEmbedded.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,60 @@
    import java.util.Properties
    import kafka.server.KafkaServer
    import kafka.server.KafkaConfig
    import kafka.producer.ProducerConfig
    import kafka.producer.Producer
    import kafka.message.Message
    import kafka.producer.ProducerData
    import kafka.consumer.ConsumerConfig
    import kafka.consumer.Consumer
    import kafka.utils.Utils
    import kafka.consumer.SimpleConsumer
    import kafka.api.FetchRequest

    object KafkaEmbedded extends App {

    val props = new Properties()
    props.setProperty("hostname", "localhost")
    props.setProperty("port", "9090");
    props.setProperty("brokerid", "1")
    props.setProperty("log.dir", "/tmp/embeddedkafka/")
    props.setProperty("enable.zookeeper", "false")

    val server = new KafkaServer(new KafkaConfig(props))
    server.startup()

    val prodProps = new Properties()
    prodProps.setProperty("producer.type", "async")
    prodProps.setProperty("queue.time", "2000")
    prodProps.setProperty("queue.size", "100")
    prodProps.setProperty("batch.size", "10")
    prodProps.setProperty("broker.list", "1:localhost:9090")

    val prodConfig = new ProducerConfig(prodProps)
    val prod = (new Producer[String, Message](prodConfig))

    for(i <- 1 to 200) {
    prod.send(new ProducerData("TEST",new Message("testing 1 2 3".getBytes)))
    }

    val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
    var offset = 0L

    var i = 0
    while (true) {
    val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)

    for (msg <- cons.fetch(fetchRequest)) {
    i = i + 1
    println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
    offset = msg.offset
    }
    }

    sys.addShutdownHook({
    prod.close()
    cons.close()
    server.shutdown()
    server.awaitShutdown()
    })
    }