Skip to content

Instantly share code, notes, and snippets.

@adriaanm
Created April 13, 2017 22:25
Show Gist options
  • Select an option

  • Save adriaanm/b0104d88ba86e918d4464ce30b829606 to your computer and use it in GitHub Desktop.

Select an option

Save adriaanm/b0104d88ba86e918d4464ce30b829606 to your computer and use it in GitHub Desktop.

Revisions

  1. adriaanm created this gist Apr 13, 2017.
    74 changes: 74 additions & 0 deletions WordCountDemo.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,74 @@
    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */


    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.Serdes
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.KeyValue
    import org.apache.kafka.streams.StreamsConfig
    import org.apache.kafka.streams.kstream.KStreamBuilder
    import org.apache.kafka.streams.kstream.KStream
    import org.apache.kafka.streams.kstream.KTable
    import java.util.{Locale, Properties}

    /**
    * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
    * that computes a simple word occurrence histogram from an input text.
    *
    * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
    * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
    * is an updated count of a single word.
    *
    * Before running this example you must create the input topic and the output topic (e.g. via
    * bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
    * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
    */
    object WordCountDemoScala {

    def main(args: Array[String]): Unit = {
    val props = new Properties
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount")
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    // Note: To re-run the demo, you need to use the offset reset tool:
    // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    import scala.collection.JavaConverters._

    val builder = new KStreamBuilder
    val source: KStream[String, String] = builder.stream("streams-file-input")
    val counts: KTable[String, java.lang.Long] = source.
    flatMapValues[String] { _.toLowerCase(Locale.getDefault).split(" ").toList.asJava }.
    map[String, String] { (key, value) => new KeyValue(value, value) }.
    groupByKey.count("Counts")

    // need to override value serde to Long type
    counts.to(Serdes.String, Serdes.Long, "streams-wordcount-output")

    val streams = new KafkaStreams(builder, props)
    streams.start()
    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L)
    streams.close()
    }
    }
    7 changes: 7 additions & 0 deletions build.sbt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    name := "kafka-wordcount"

    version := "1.0"

    scalaVersion := "2.12.1"

    libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.10.2.0"