数据经过了window和WindowAssigner之后,已经被分配到不同的窗口里。
我们要通过窗口函数,在每个窗口上对窗口内的数据进行处理。
窗口函数主要分为两种,一种是增量计算,如reduce和aggregate,一种是全量计算,如process。
1.增量计算
增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中
当数据流中的新元素流入后,ReduceFunction将中间结果和新流入数据两两合一,生成新的数据替换之前的状态数据。
AggregateFunction也是一种增量计算窗口函数,也只保存了一个中间状态数据。
- public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
- // 在一次新的aggregate发起时,创建一个新的Accumulator,Accumulator是我们所说的中间状态数据,简称ACC
- // 这个函数一般在初始化时调用
- ACC createAccumulator();
- // 当一个新元素流入时,将新元素与状态数据ACC合并,返回状态数据ACC
- ACC add(IN value, ACC accumulator);
-
- // 将两个ACC合并
- ACC merge(ACC a, ACC b);
- // 将中间数据转成结果数据
- OUT getResult(ACC accumulator);
- }
复制代码
输入类型是IN,输出类型是OUT,中间状态数据是ACC,这样复杂的设计主要是为了解决输入类型、中间状态和输出类型不一致的问题,
同时ACC可以自定义,我们可以在ACC里构建我们想要的数据结构。
比如我们要计算一个窗口内某个字段的平均值,那么ACC中要保存总和以及个数,下面是一个平均值的示例:
- 这几个函数的工作流程如下图所示。在计算之前要创建一个新的ACC,这时ACC还没有任何实际表示意义,
- 当有新数据流入时,Flink会调用add方法,更新ACC,并返回最新的ACC,
- ACC是一个中间状态数据。当有一些跨节点的ACC融合时,Flink会调用merge,生成新的ACC。
- 当所有的ACC最后融合为一个ACC后,Flink调用getResult生成结果
复制代码
2. 全量计算
全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。
ProcessWindowFunction要对窗口内的全量数据都缓存。
在Flink所有API中,process算子以及其对应的函数是最底层的实现,使用这些函数能够访问一些更加底层的数据,比如直接操作状态等。
ProcessWindowFunction相比AggregateFunction和ReduceFunction的应用场景更广,能解决的问题也更复杂。
但ProcessWindowFunction需要将窗口中所有元素作为状态存储起来,这将占用大量的存储资源,尤其是在数据量大窗口多的场景下,使用不慎可能导致整个程序宕机。
比如,每天的数据在TB级,我们需要Slide为十分钟Size为一小时的滑动窗口,这种设置会导致窗口数量很多,而且一个元素会被复制好多份分给每个所属的窗口,这将带来巨大的内存压力。
触发器(Trigger)【何时计算】
触发器(Trigger)决定了何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理。
增量计算窗口函数对每个新流入的数据直接进行聚合,Trigger决定了在窗口结束时将聚合结果发送出去;全量计算窗口函数需要将窗口内的元素缓存。
Trigger决定了在窗口结束时对所有元素进行计算然后将结果发送出去。每个窗口都有一个默认的Trigger。
如果我们有一些个性化的触发条件,比如窗口中遇到某些特定的元素、元素总数达到一定数量或窗口中的元素到达时满足某种特定的模式时,我们可以自定义一个Trigger。
我们甚至可以在Trigger中定义一些提前计算的逻辑,比如在Event Time语义中,虽然Watermark还未到达,但是我们可以定义提前计算输出的逻辑,以快速获取计算结果,获得更低的延迟。
触发器决定了窗口何时会被触发计算,Flink 中开发人员需要在 window 类型的操作之后才能调用 trigger 方法传入触发器定义。
Flink 中的触发器定义需要继承并实现 Trigger 接口,该接口有以下方法:
- onElement(): 每个被添加到窗口中的元素都会被调用
- onEventTime(): 当事件时间定时器触发时会被调用,比如watermark到达
- onProcessingTime(): 当处理时间定时器触发时会被调用,比如时间周期触发
- onMerge(): 当两个窗口合并时两个窗口的触发器状态将会被调动并合并
- clear(): 执行需要清除相关窗口的事件
|