Flink难点:彻底明白CEP8,CEP库中的时间、例子、版本说明
问题导读
1.对于事件延迟,cep是如何处理的?
2.对于cep编程,本文举了一个什么例子?
3.从较旧的Flink版本迁移有哪些需要注意?
上一篇:
Flink难点:彻底明白CEP8,模式检测
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27335
CEP库中的时间
处理事件时间的延迟
在CEP中,元素处理的顺序很重要。 为了保证在事件时间工作时元素以正确的顺序处理,传入元素最初放在缓冲区中,元素按照时间戳按升序排序,当水印到达时,此缓冲区中的所有元素都包含在 处理小于水印的时间戳。 这意味着水印之间的元素按事件时间顺序处理。
注意:在事件时间工作时,库假定水印的正确性。
为了保证水印中的元素按事件时间顺序处理,Flink的CEP库假定水印的正确性,并将其视为时间戳小于上次看到的水印的后期元素。 后期元素不会被进一步处理。 此外,可以指定sideOutput标记来收集最后看到的水印之后的后期元素,可以像这样使用它。
val patternStream: PatternStream = CEP.pattern(input, pattern)
val lateDataOutputTag = OutputTag("late-data")
val result: SingleOutputStreamOperator = patternStream
.sideOutputLateData(lateDataOutputTag)
.select{
pattern: Map] => ComplexEvent()
}
val lateData: DataStream = result.getSideOutput(lateDataOutputTag)
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream
.sideOutputLateData(lateDataOutputTag)
.select(
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
时间context
在PatternProcessFunction以及IterativeCondition中,用户可以访问实现TimeContext的上下文,如下所示:
/**
* Enables access to time related characteristics such as current processing time or timestamp of
* currently processed element. Used in {@link PatternProcessFunction} and
* {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
*/
@PublicEvolving
public interface TimeContext {
/**
* Timestamp of the element currently being processed.
*
* <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
* will be set to the time when event entered the cep operator.
*/
long timestamp();
/** Returns the current processing time. */
long currentProcessingTime();
}
此上下文使用户可以访问已处理事件的时间特征(IterativeCondition中的传入记录和PatternProcessFunction情况下的匹配)。 调用TimeContext#currentProcessingTime总是为提供当前处理时间的值,此调用应优先于例如 调用System.currentTimeMillis()。
在TimeContext #timestamp()的情况下,返回的值等于EventTime的指定时间戳。 在ProcessingTime中,这将等于所述事件进入cep运算符的时间点(或者在PatternProcessFunction的情况下生成匹配时)。 这意味着该值在多次调用该方法时将保持一致。
例子
以下示例检测事件的key数据流上的模式start,middle(name =“error”) - > end(name =“critical”)。 事件由其ID键入,并且有效模式必须在10秒内发生。 整个处理是在事件时间完成的。
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input : DataStream = ...
val partitionedInput = input.keyBy(event => event.getId)
val pattern = Pattern.begin("start")
.next("middle").where(_.getName == "error")
.followedBy("end").where(_.getName == "critical")
.within(Time.seconds(10))
val patternStream = CEP.pattern(partitionedInput, pattern)
val alerts = patternStream.select(createAlert(_))
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlert(pattern);
}
});
从较旧的Flink版本迁移(1.3之前版本)
迁移到1.4+
在Flink-1.4中,CEP库与<= Flink 1.2的向后兼容性被删除。 遗憾的是,无法恢复曾经使用1.2.x运行的CEP作业
迁移到1.3.x.
Flink-1.3中的CEP库附带了许多新功能,这些功能导致了API的一些变化。在这里,我们描述了您需要对旧CEP作业进行的更改,以便能够使用Flink-1.3运行它们。进行这些更改并重新编译作业后,将能够从使用旧版本作业的保存点恢复执行,即无需重新处理过去的数据。
所需的更改是:
[*]更改条件(where(...)子句中的条件)以扩展SimpleCondition类,而不是实现FilterFunction接口。
[*]更改select(...)和flatSelect(...)方法的参数,以期望与每个模式关联事件列表(Java中的List,Scala中的Iterable)。这是因为通过添加循环模式,多个输入事件可以匹配单个(循环)模式。
[*]Flink 1.1和1.2中的followBy()暗示了非确定性松弛连续性(见此处)。在Flink 1.3中,这已经改变并且followBy()意味着放松的连续性,而如果需要非确定性的松弛连续性,则应该使用followAyAny()。
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg
About云 VIP会员套餐介绍http://www.aboutyun.com/forum.php?mod=viewthread&tid=27305
加入About云知识星球,获取更多实用资料
http://www.aboutyun.com/data/attachment/forum/201906/10/162109faj7o1z2qobr83jd.png
1.对于事件延迟,cep是如何处理的?
- 水印处理,传入元素最初放在缓冲区中,此缓冲区中的所有元素都包含在 处理小于水印的时间戳,元素按照时间戳按升序排序, 此外,可以指定sideOutput标记来收集最后看到的水印之后的后期元素。
2.对于cep编程,本文举了一个什么例子?
- 举了一个CEP系统接收一个基于EventTime的数据流,通过模式匹配保证程序有效模式在10秒内的例子
3.从较旧的Flink版本迁移有哪些需要注意?
- 迁移到1.4+:CEP库与<= Flink 1.2的向后兼容性被删除。无法恢复曾经使用1.2.x运行的CEP作业
- 迁移到1.3.x.:
更改where条件;
更改select(...)和flatSelect(...)方法的参数;
Flink 1.1和1.2中的followBy()改成followAyAny()
页:
[1]