builder.stream("page-views") .groupByKey() .filter((page, pageView) -> !isBot(pageView)) .windowedBy(Duration.ofMinutes(1)) .count() .toStream() .to("traffic-alerts");