分享

Flink难点:彻底明白CEP7,匹配检测

pig2 2019-6-17 15:30:32 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 9943
问题导读

1.PatternProcessFunction是哪个版本引入的?
2.对于模式检测中超时部分处理使用什么接口?
3.如何创建PatternStream?


上一篇:
Flink难点:彻底明白CEP7,匹配跳过策略【api】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27334


指定要查找的模式序列后,是时候将其应用于输入流以检测潜在匹配。 要针对模式序列运行事件流,必须创建PatternStream。 给定输入流输入,模式pattern和可选的比较器comparator ,用于在EventTime的情况下对具有相同时间戳的事件进行排序或在同一时刻到达,通过调用以下方法创建PatternStream:
[mw_shl_code=scala,true]val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)[/mw_shl_code]
[mw_shl_code=java,true]DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);[/mw_shl_code]
根据使用情况,输入流可以是keyed或non-keyed。
注意在non-keyed流上应用模式将导致并行度等于1的作业。

从模式中选择
获得PatternStream后,可以将转换(transformation )应用于检测到的事件序列。 建议的方法是使用PatternProcessFunction。

PatternProcessFunction有一个processMatch方法,为每个匹配的事件序列调用该方法。 它以Map <String,List <IN >>的形式接收匹配,其中key是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是你的类型) 输入元素)。 给定匹配的事件按时间戳排序。 返回每个模式的接受事件列表,原因是当使用循环模式(例如,oneToMany()和times())时,对于给定模式可以接受多个事件。

[mw_shl_code=bash,true]class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}[/mw_shl_code]
PatternProcessFunction提供对Context对象的访问。 由于它,可以访问与时间相关的特征,例如currentProcessingTime或当前匹配的时间戳(这是分配给匹配的最后一个元素的时间戳)。 有关更多信息,请参阅时间背景 通过这种情况,人们也可以将结果输出到侧输出

处理超时部分模式
每当模式具有通过within关键字附加的窗口长度时,部分事件序列可能因为超过窗口长度而被丢弃。 要对超时的部分匹配进行操作,可以使用TimedOutPartialMatchHandler接口。 该接口应该以mixin风格使用。 这意味着还可以使用PatternProcessFunction实现此接口。 TimedOutPartialMatchHandler提供了额外的processTimedOutMatch方法,该方法将在每个超时部分匹配时调用。
[mw_shl_code=bash,true]class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}[/mw_shl_code]

注意processTimedOutMatch不提供对main 输出的一次访问。 仍然可以通过Context对象通过侧输出emit 结果。

便捷API

前面提到的PatternProcessFunction是在Flink 1.8中引入的,从那时起它就是与匹配进行交互的推荐方式。 仍然可以使用旧样式API,如select / flatSelect,它将在内部转换为PatternProcessFunction。
[mw_shl_code=scala,true]val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)[/mw_shl_code]
[mw_shl_code=java,true]PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    outputTag,
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
        public void timeout(
                Map<String, List<Event>> pattern,
                long timeoutTimestamp,
                Collector<TimeoutEvent> out) throws Exception {
            out.collect(new TimeoutEvent());
        }
    },
    new PatternFlatSelectFunction<Event, ComplexEvent>() {
        public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
            out.collect(new ComplexEvent());
        }
    }
);

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);[/mw_shl_code]

最新经典文章,欢迎关注公众号




加入About云知识星球,获取更多实用资料


已有(2)人评论

跳转到指定楼层
zuliangzhu 发表于 2019-10-10 13:39:10
写的非常棒,学习了
回复

使用道具 举报

金瞳 发表于 2019-12-9 16:45:09
1.PatternProcessFunction是哪个版本引入的?
- PatternProcessFunction是在Flink 1.8中引入的

2.对于模式检测中超时部分处理使用什么接口?
- 继承TimedOutPartialMatchHandler->实现processTimedOutMatch方法

3.如何创建PatternStream?
- val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
- input : DataStream[Event] 输入流
- pattern : Pattern[Event, _] 模式pattern
- comparator : EventComparator[Event]  比较器 用于处理eventTime排序
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条