Skip to content

Instantly share code, notes, and snippets.

@picadoh
Last active January 2, 2021 00:45
Show Gist options
  • Select an option

  • Save picadoh/b6c0ba6fe2d368a8f413e3fa9ac9a172 to your computer and use it in GitHub Desktop.

Select an option

Save picadoh/b6c0ba6fe2d368a8f413e3fa9ac9a172 to your computer and use it in GitHub Desktop.

Revisions

  1. picadoh revised this gist Jun 17, 2017. 2 changed files with 2 additions and 2 deletions.
    2 changes: 1 addition & 1 deletion WordCounter.java
    Original file line number Diff line number Diff line change
    @@ -33,7 +33,7 @@ private static Properties getProperties() {
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Counter");
    props.put("group.id", "test-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-counter");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
    }
    2 changes: 1 addition & 1 deletion docker-compose.yml
    Original file line number Diff line number Diff line change
    @@ -9,7 +9,7 @@ services:
    ports:
    - "9092:9092"
    environment:
    KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
    KAFKA_ADVERTISED_HOST_NAME: 0.0.0.0
    KAFKA_CREATE_TOPICS: "words-topic:1:1,counts-topic:1:1"
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
  2. picadoh revised this gist Oct 6, 2016. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions WordCounter.java
    Original file line number Diff line number Diff line change
    @@ -24,6 +24,8 @@ public static void main(String[] args) {

    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
    kafkaStreams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }

    private static Properties getProperties() {
  3. picadoh revised this gist Oct 6, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion WordCounter.java
    Original file line number Diff line number Diff line change
    @@ -20,7 +20,7 @@ public static void main(String[] args) {
    .countByKey(stringSerde, "Counts")
    .toStream()
    .map((word, count) -> new KeyValue<>(word, word + ":" + count))
    .to(stringSerde, stringSerde, "count-topic");
    .to(stringSerde, stringSerde, "counts-topic");

    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
    kafkaStreams.start();
  4. picadoh revised this gist Oct 5, 2016. 1 changed file with 0 additions and 4 deletions.
    4 changes: 0 additions & 4 deletions WordCounter.java
    Original file line number Diff line number Diff line change
    @@ -5,13 +5,10 @@
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStreamBuilder;
    import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

    import java.util.Properties;

    import static java.util.Arrays.asList;

    public class WordCounter {

    public static void main(String[] args) {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    Serde<String> stringSerde = Serdes.String();
    @@ -38,5 +35,4 @@ private static Properties getProperties() {
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
    }

    }
  5. picadoh revised this gist Oct 5, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion WordCounter.java
    Original file line number Diff line number Diff line change
    @@ -34,7 +34,7 @@ private static Properties getProperties() {
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Counter");
    props.put("group.id", "test-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-counter");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localdocker:9092");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
    }
  6. picadoh renamed this gist Oct 5, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  7. picadoh revised this gist Oct 5, 2016. 1 changed file with 0 additions and 5 deletions.
    5 changes: 0 additions & 5 deletions pom.xml
    Original file line number Diff line number Diff line change
    @@ -24,11 +24,6 @@
    </build>

    <dependencies>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
  8. picadoh revised this gist Oct 5, 2016. 1 changed file with 2 additions and 4 deletions.
    6 changes: 2 additions & 4 deletions StreamCounter.java
    Original file line number Diff line number Diff line change
    @@ -31,12 +31,10 @@ public static void main(String[] args) {

    private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Word-Counter");
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Counter");
    props.put("group.id", "test-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-counter");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-counter");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localdocker:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localdocker:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
    }
  9. picadoh created this gist Oct 5, 2016.
    44 changes: 44 additions & 0 deletions StreamCounter.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStreamBuilder;
    import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

    import java.util.Properties;

    import static java.util.Arrays.asList;

    public class WordCounter {

    public static void main(String[] args) {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    Serde<String> stringSerde = Serdes.String();

    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    kStreamBuilder.stream(stringSerde, stringSerde, "words-topic")
    .flatMapValues(text -> asList(text.split(" ")))
    .map((key, word) -> new KeyValue<>(word, word))
    .countByKey(stringSerde, "Counts")
    .toStream()
    .map((word, count) -> new KeyValue<>(word, word + ":" + count))
    .to(stringSerde, stringSerde, "count-topic");

    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
    kafkaStreams.start();
    }

    private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Word-Counter");
    props.put("group.id", "test-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-counter");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localdocker:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localdocker:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
    }

    }
    16 changes: 16 additions & 0 deletions docker-compose.yml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,16 @@
    version: '2'
    services:
    zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
    - "2181:2181"
    kafka:
    image: wurstmeister/kafka:0.10.0.1
    ports:
    - "9092:9092"
    environment:
    KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
    KAFKA_CREATE_TOPICS: "words-topic:1:1,counts-topic:1:1"
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
    - /var/run/docker.sock:/var/run/docker.sock
    39 changes: 39 additions & 0 deletions pom.xml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,39 @@
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kstreaming</groupId>
    <artifactId>kstreaming</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.5.1</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    <encoding>UTF-8</encoding>
    </configuration>
    </plugin>
    </plugins>
    </build>

    <dependencies>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.1</version>
    </dependency>
    </dependencies>

    </project>