flink触发当前窗口计算的前提是下一条数据在当前窗口的结束时间之后,我现在想要实现一个功能是每进来一条数据都去统计当前窗口的数据,我自定义的触发器好像不起作用,请问这个怎么处理呀?我新手,刚刚学习,不太懂,求教
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.FIRE;
}
}
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
public void onMerge(TimeWindow window, OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
|
|