Last active
January 2, 2021 00:45
-
-
Save picadoh/b6c0ba6fe2d368a8f413e3fa9ac9a172 to your computer and use it in GitHub Desktop.
Revisions
-
picadoh revised this gist
Jun 17, 2017 . 2 changed files with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -33,7 +33,7 @@ private static Properties getProperties() { props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Counter"); props.put("group.id", "test-group"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-counter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return props; } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -9,7 +9,7 @@ services: ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 0.0.0.0 KAFKA_CREATE_TOPICS: "words-topic:1:1,counts-topic:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: -
picadoh revised this gist
Oct 6, 2016 . 1 changed file with 2 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,6 +24,8 @@ public static void main(String[] args) { KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); } private static Properties getProperties() { -
picadoh revised this gist
Oct 6, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -20,7 +20,7 @@ public static void main(String[] args) { .countByKey(stringSerde, "Counts") .toStream() .map((word, count) -> new KeyValue<>(word, word + ":" + count)) .to(stringSerde, stringSerde, "counts-topic"); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); -
picadoh revised this gist
Oct 5, 2016 . 1 changed file with 0 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,13 +5,10 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import java.util.Properties; import static java.util.Arrays.asList; public class WordCounter { public static void main(String[] args) { StreamsConfig streamsConfig = new StreamsConfig(getProperties()); Serde<String> stringSerde = Serdes.String(); @@ -38,5 +35,4 @@ private static Properties getProperties() { props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return props; } } -
picadoh revised this gist
Oct 5, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -34,7 +34,7 @@ private static Properties getProperties() { props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Counter"); props.put("group.id", "test-group"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-counter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return props; } -
picadoh renamed this gist
Oct 5, 2016 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
picadoh revised this gist
Oct 5, 2016 . 1 changed file with 0 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,11 +24,6 @@ </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> -
picadoh revised this gist
Oct 5, 2016 . 1 changed file with 2 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -31,12 +31,10 @@ public static void main(String[] args) { private static Properties getProperties() { Properties props = new Properties(); props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Counter"); props.put("group.id", "test-group"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-counter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localdocker:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return props; } -
picadoh created this gist
Oct 5, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,44 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import java.util.Properties; import static java.util.Arrays.asList; public class WordCounter { public static void main(String[] args) { StreamsConfig streamsConfig = new StreamsConfig(getProperties()); Serde<String> stringSerde = Serdes.String(); KStreamBuilder kStreamBuilder = new KStreamBuilder(); kStreamBuilder.stream(stringSerde, stringSerde, "words-topic") .flatMapValues(text -> asList(text.split(" "))) .map((key, word) -> new KeyValue<>(word, word)) .countByKey(stringSerde, "Counts") .toStream() .map((word, count) -> new KeyValue<>(word, word + ":" + count)) .to(stringSerde, stringSerde, "count-topic"); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); } private static Properties getProperties() { Properties props = new Properties(); props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Word-Counter"); props.put("group.id", "test-group"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-counter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localdocker:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localdocker:2181"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return props; } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,16 @@ version: '2' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: image: wurstmeister/kafka:0.10.0.1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100 KAFKA_CREATE_TOPICS: "words-topic:1:1,counts-topic:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,39 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kstreaming</groupId> <artifactId>kstreaming</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.1</version> </dependency> </dependencies> </project>