Flink 1.11 对 Checkpoint 的优化
Checkpoint 如何实现的
Flink 的 checkpoint 是基于 Chandy-Lamport 算法,实现了一个分布式一致性的存储快照算法。
这里我们假设一个简单的场景来描述 checkpoint 具体过程是怎样的。
场景是:假如现在 kafka 只有一个分区,数据是每个 app 发过来的日志,我们统计每个 app 的 PV。
Flink 的 checkpoint coordinator (JobManager 的一部分)会周期性的在流事件中插入一个 barrier 事件(栅栏),用来隔离不同批次的事件,如下图红色的部分。
下图中有两个 barrier ,checkpoint barrier n-1 处的 barrier 是指 Job 从开始处理到 barrier n -1 所有的状态数据,checkpoint barrier n 处的 barrier 是指 Job 从开始处理到 barrier n 所有的状态数据。
回到刚刚计算 PV 的场景,当 Source Task 接受到 JobManager 的编号为 chk-100 的 Checkpoint 触发请求后,发现自己恰好接收到了 offset 为(0,1000)【表示分区0,offset 为1000】处的数据,所以会往 offset 为(0,1000)数据之后,(0,1001)数据之前安插一个 barrier,然后自己开始做快照,把 offset (0,1000)保存到状态后端中,向 CheckpointCoordinator报告自己快照制作情况,同时向自身所有下游算子广播该barrier。如下图:
当下游计算的算子收到 barrier 后,会看是否收到了所有输入流的 barrier,我们现在只有一个分区,Source 算子只有一个实例,barrier 到了就是收到了所有的输入流的 barrier。
开始把本次的计算结果(app1,1000),(app2,5000)写到状态存储之中,向 CheckpointCoordinator 报告自己快照制作情况,同时向自身所有下游算子广播该barrier。
当 Operator 2 收到栅栏后,会触发自身进行快照,把自己当时的状态存储下来,向 CheckpointCoordinator 报告 自己快照制作情况。因为这是一个 sink ,状态存储成功后,意味着本次 checkpoint 也成功了。
Barrier 对齐
上面我们举的例子是 Source Task 实例只有一个的情况,在输入流的算子有多个实例的情况下,会有一个概念叫 Barrier 对齐。
可以看上面的第一张图,有两个输入流,一个是上面的数字流,一个是下面的字母流。
数字流的 barrier 在 1 后面,字母流的 barrier 在 e 后面。当上面的 barrier 到达 operator 之后,必须要等待下面的数字流的 barrier 也到达,此时数字流后面过来的数据会被缓存到缓冲区。这就是 barrier 对齐的过程。
看上面的第二张图,当数字流的 barrier 到达后,意味着输入流的所有实例的 barrier 都到达了,此时开始处理 到第三张图的时候,处理完毕,自身做快照,然后把缓冲区的 pending 数据都发出去,把 checkpoint barrier n 继续往下发送。
Flink 1.11 对 Checkpoint 的优化
从上图的对齐过程,我们可以发现,在进行对齐的过程中,算子是不会再接着处理数据了,一定要等到对齐动作完成之后,才能继续对齐。也就是上图中的数字流的 barrier 到达之后,需要去等待字母流的 barrier 事件。
这其中会有一个阻塞的过程。在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业不稳定。
首先, Chandy-Lamport 分布式快照的结束依赖于 Marker 的流动,而反压则会限制 Marker 的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致 Checkpoint 的时间点落后于实际数据流较多。
这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果 Checkpoint 连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的 Lag 更大,通常带来更大的反压,形成一个恶性循环。
所以在 Flink 1.11 版本中,引入了一个 Unaligned Checkpointing 的模块,主要功能是,在 barrier 到达之后,不必等待所有的输入流的 barrier,而是继续处理数据。
然后把第一次到达的 barrier 之后的所有数据也放到 checkpoint 里面,在下一次计算的时候,会合并上次保存的数据以及流入的数据后再计算。这样会大大加快 Barrier 流动的速度,降低 checkpoint 整体的时长。
页:
[1]