pig2 发表于 2019-6-17 16:04:44

Flink难点:彻底明白CEP8,CEP库中的时间、例子、版本说明


问题导读

1.对于事件延迟,cep是如何处理的?
2.对于cep编程,本文举了一个什么例子?
3.从较旧的Flink版本迁移有哪些需要注意?

上一篇:
Flink难点:彻底明白CEP8,模式检测
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27335


CEP库中的时间

处理事件时间的延迟
在CEP中,元素处理的顺序很重要。 为了保证在事件时间工作时元素以正确的顺序处理,传入元素最初放在缓冲区中,元素按照时间戳按升序排序,当水印到达时,此缓冲区中的所有元素都包含在 处理小于水印的时间戳。 这意味着水印之间的元素按事件时间顺序处理。

注意:在事件时间工作时,库假定水印的正确性。

为了保证水印中的元素按事件时间顺序处理,Flink的CEP库假定水印的正确性,并将其视为时间戳小于上次看到的水印的后期元素。 后期元素不会被进一步处理。 此外,可以指定sideOutput标记来收集最后看到的水印之后的后期元素,可以像这样使用它。


val patternStream: PatternStream = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag("late-data")

val result: SingleOutputStreamOperator = patternStream
      .sideOutputLateData(lateDataOutputTag)
      .select{
          pattern: Map] => ComplexEvent()
      }

val lateData: DataStream = result.getSideOutput(lateDataOutputTag)
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag)
    .select(
      new PatternSelectFunction<Event, ComplexEvent>() {...}
    );

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

时间context
在PatternProcessFunction以及IterativeCondition中,用户可以访问实现TimeContext的上下文,如下所示:
/**
* Enables access to time related characteristics such as current processing time or timestamp of
* currently processed element. Used in {@link PatternProcessFunction} and
* {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
*/
@PublicEvolving
public interface TimeContext {

      /**
         * Timestamp of the element currently being processed.
         *
         * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
         * will be set to the time when event entered the cep operator.
         */
      long timestamp();

      /** Returns the current processing time. */
      long currentProcessingTime();
}
此上下文使用户可以访问已处理事件的时间特征(IterativeCondition中的传入记录和PatternProcessFunction情况下的匹配)。 调用TimeContext#currentProcessingTime总是为提供当前处理时间的值,此调用应优先于例如 调用System.currentTimeMillis()。

在TimeContext #timestamp()的情况下,返回的值等于EventTime的指定时间戳。 在ProcessingTime中,这将等于所述事件进入cep运算符的时间点(或者在PatternProcessFunction的情况下生成匹配时)。 这意味着该值在多次调用该方法时将保持一致。


例子
以下示例检测事件的key数据流上的模式start,middle(name =“error”) - > end(name =“critical”)。 事件由其ID键入,并且有效模式必须在10秒内发生。 整个处理是在事件时间完成的。
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin("start")
.next("middle").where(_.getName == "error")
.followedBy("end").where(_.getName == "critical")
.within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_))
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
      @Override
      public Integer getKey(Event value) throws Exception {
                return value.getId();
      }
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
      .next("middle").where(new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("error");
                }
      }).followedBy("end").where(new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("critical");
                }
      }).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
      @Override
      public Alert select(Map<String, List<Event>> pattern) throws Exception {
                return createAlert(pattern);
      }
});

从较旧的Flink版本迁移(1.3之前版本)

迁移到1.4+
在Flink-1.4中,CEP库与<= Flink 1.2的向后兼容性被删除。 遗憾的是,无法恢复曾经使用1.2.x运行的CEP作业


迁移到1.3.x.
Flink-1.3中的CEP库附带了许多新功能,这些功能导致了API的一些变化。在这里,我们描述了您需要对旧CEP作业进行的更改,以便能够使用Flink-1.3运行它们。进行这些更改并重新编译作业后,将能够从使用旧版本作业的保存点恢复执行,即无需重新处理过去的数据。

所需的更改是:


[*]更改条件(where(...)子句中的条件)以扩展SimpleCondition类,而不是实现FilterFunction接口。


[*]更改select(...)和flatSelect(...)方法的参数,以期望与每个模式关联事件列表(Java中的List,Scala中的Iterable)。这是因为通过添加循环模式,多个输入事件可以匹配单个(循环)模式。


[*]Flink 1.1和1.2中的followBy()暗示了非确定性松弛连续性(见此处)。在Flink 1.3中,这已经改变并且followBy()意味着放松的连续性,而如果需要非确定性的松弛连续性,则应该使用followAyAny()。



最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg


About云 VIP会员套餐介绍http://www.aboutyun.com/forum.php?mod=viewthread&tid=27305
加入About云知识星球,获取更多实用资料
http://www.aboutyun.com/data/attachment/forum/201906/10/162109faj7o1z2qobr83jd.png

金瞳 发表于 2019-12-9 17:01:21

1.对于事件延迟,cep是如何处理的?
- 水印处理,传入元素最初放在缓冲区中,此缓冲区中的所有元素都包含在 处理小于水印的时间戳,元素按照时间戳按升序排序, 此外,可以指定sideOutput标记来收集最后看到的水印之后的后期元素。

2.对于cep编程,本文举了一个什么例子?
- 举了一个CEP系统接收一个基于EventTime的数据流,通过模式匹配保证程序有效模式在10秒内的例子

3.从较旧的Flink版本迁移有哪些需要注意?
- 迁移到1.4+:CEP库与<= Flink 1.2的向后兼容性被删除。无法恢复曾经使用1.2.x运行的CEP作业
- 迁移到1.3.x.:
    更改where条件;
    更改select(...)和flatSelect(...)方法的参数;
    Flink 1.1和1.2中的followBy()改成followAyAny()
页: [1]
查看完整版本: Flink难点:彻底明白CEP8,CEP库中的时间、例子、版本说明