问题导读
1.PatternProcessFunction是哪个版本引入的?
2.对于模式检测中超时部分处理使用什么接口?
3.如何创建PatternStream?
上一篇:
Flink难点:彻底明白CEP7,匹配跳过策略【api】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27334
指定要查找的模式序列后,是时候将其应用于输入流以检测潜在匹配。 要针对模式序列运行事件流,必须创建PatternStream。 给定输入流输入,模式pattern和可选的比较器comparator ,用于在EventTime的情况下对具有相同时间戳的事件进行排序或在同一时刻到达,通过调用以下方法创建PatternStream:
[mw_shl_code=scala,true]val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)[/mw_shl_code]
[mw_shl_code=java,true]DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);[/mw_shl_code]
根据使用情况,输入流可以是keyed或non-keyed。
注意在non-keyed流上应用模式将导致并行度等于1的作业。
从模式中选择
获得PatternStream后,可以将转换(transformation )应用于检测到的事件序列。 建议的方法是使用PatternProcessFunction。
PatternProcessFunction有一个processMatch方法,为每个匹配的事件序列调用该方法。 它以Map <String,List <IN >>的形式接收匹配,其中key是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是你的类型) 输入元素)。 给定匹配的事件按时间戳排序。 返回每个模式的接受事件列表,原因是当使用循环模式(例如,oneToMany()和times())时,对于给定模式可以接受多个事件。
[mw_shl_code=bash,true]class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
IN startEvent = match.get("start").get(0);
IN endEvent = match.get("end").get(0);
out.collect(OUT(startEvent, endEvent));
}
}[/mw_shl_code]
PatternProcessFunction提供对Context对象的访问。 由于它,可以访问与时间相关的特征,例如currentProcessingTime或当前匹配的时间戳(这是分配给匹配的最后一个元素的时间戳)。 有关更多信息,请参阅时间背景 通过这种情况,人们也可以将结果输出到侧输出。
处理超时部分模式
每当模式具有通过within关键字附加的窗口长度时,部分事件序列可能因为超过窗口长度而被丢弃。 要对超时的部分匹配进行操作,可以使用TimedOutPartialMatchHandler接口。 该接口应该以mixin风格使用。 这意味着还可以使用PatternProcessFunction实现此接口。 TimedOutPartialMatchHandler提供了额外的processTimedOutMatch方法,该方法将在每个超时部分匹配时调用。
[mw_shl_code=bash,true]class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
...
}
@Override
public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
IN startEvent = match.get("start").get(0);
ctx.output(outputTag, T(startEvent));
}
}[/mw_shl_code]
注意processTimedOutMatch不提供对main 输出的一次访问。 仍然可以通过Context对象通过侧输出emit 结果。
便捷API
前面提到的PatternProcessFunction是在Flink 1.8中引入的,从那时起它就是与匹配进行交互的推荐方式。 仍然可以使用旧样式API,如select / flatSelect,它将在内部转换为PatternProcessFunction。
[mw_shl_code=scala,true]val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
out.collect(TimeoutEvent())
} {
(pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
out.collect(ComplexEvent())
}
val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)[/mw_shl_code]
[mw_shl_code=java,true]PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
outputTag,
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
public void timeout(
Map<String, List<Event>> pattern,
long timeoutTimestamp,
Collector<TimeoutEvent> out) throws Exception {
out.collect(new TimeoutEvent());
}
},
new PatternFlatSelectFunction<Event, ComplexEvent>() {
public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
out.collect(new ComplexEvent());
}
}
);
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);[/mw_shl_code]
最新经典文章,欢迎关注公众号
加入About云知识星球,获取更多实用资料
|
|