Skip to content

Instantly share code, notes, and snippets.

Created November 27, 2015 13:55
Show Gist options
  • Select an option

  • Save anonymous/d0578a4d27768a75bea4 to your computer and use it in GitHub Desktop.

Select an option

Save anonymous/d0578a4d27768a75bea4 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class MyTrigger<W extends TimeWindow> implements Trigger<Object, W> { //extends TimeWindow not Window
private static final long serialVersionUID = 1L;
private final long maxCount;
private MyTrigger(long maxCount) {
this.maxCount = maxCount;
}
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
OperatorState count = ctx.getKeyValueState("count", Long.valueOf(0L));
long currentCount = ((Long)count.value()).longValue() + 1L;
count.update(Long.valueOf(currentCount));
if(currentCount >= this.maxCount) {
count.update(Long.valueOf(0L));
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerProcessingTimeTimer(window.getEnd()); // not reach count trigger, register processing timer
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; //process trigger
}
public String toString() {
return "CountTrigger(" + this.maxCount + ")";
}
public static <W extends TimeWindow> MyTrigger<W> of(long maxCount) { //extends TimeWindow not Window
return new MyTrigger(maxCount);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment