鉴于 pipelined 的数据流产出和消费同时发生,Flink 需要保证 pipelined 边相连的上下游节点同时运行。由 pipelined 边相连的节点构成了一个 region,被称为 Pipelined Region (以下简称 region)。在 Flink 中,region 是任务调度和 Failover 的基本单位。在调度的过程中,同一 region 内的所有 Task 节点都会被同时调度,而整个拓扑中所有 region 会按照拓扑顺序逐一进行调度。
目前在 Flink 的调度层面有两种 region:逻辑层面的 Logical Pipelined Region 以及执行调度层面的 Scheduling Pipelined Region。逻辑 region 由逻辑拓扑 (JobGraph) 中的节点 JobVertex 构成,而执行 region 则由执行拓扑 (ExecutionGraph) 中的节点 ExecutionVertex 构成。类似于 ExecutionVertex 基于 JobVertex 计算产生,执行 region 是由逻辑 region 计算得到的,如图 4 所示。
在构建 region 的过程中会遇到一个问题:region 之间可能存在环形依赖。对于当前 region,当且仅当其所消费的上游 region 都产出全部数据后才能进行调度。如果两个 region 之间存在环形依赖,那么就会出现调度死锁:两个 region 都需要等对方完成才能调度,最终两个 region 都无法被调度起来。因此,Flink 通过 Tarjan 强连通分量算法来发现环形依赖,并将具有环形依赖的 region 合并成一个 region,这样就能解决调度死锁的问题。Tarjan 强连通分量算法需要遍历拓扑内的所有边,而对于全连接的分发模式来说,其边的数量为 O(N2),因此算法整体的计算复杂度为 O(N2),随着规模变大会显著增长,从而影响大规模作业初始化的时间。
为了加快 region 的构建速度,我们可以基于逻辑拓扑和执行拓扑之间的关联进行优化。鉴于一个执行 region 只能由一个逻辑 region 中的节点派生,不会出现跨 region 的情况,Flink 在初始化作业时只需要遍历所有逻辑 region 并逐一转换成执行 region 即可。转换的方式跟分发模式相关。如果逻辑 region 内的节点间有任何全连接边,则整个逻辑 region 可以直接转换成一个执行 region。
如果全连接边采用的是 pipelined 数据交换,所有与之相连的上下游节点都必须同时运行,也就是说全连接边所连接的所有 region 都要合并成一个 region。如果全连接边采用的是 blocking 数据交换,则会引入环形依赖,如图 5 所示。在这种情况下所有与之相连的 region 都必须合并以避免调度死锁,如图 6 所示。鉴于只要有全连接边就直接生成一整个执行 region,在这种情况下不需要用 Tarjan 算法,整体计算复杂度只需要 O(N) 即可。
如果在逻辑 region 内,所有节点间都只有点对点的分发模式,那么 Flink 依旧直接用 Tarjan 算法来检测环形依赖,鉴于点对点的分发模式其边数为 O(N),算法的时间复杂度也只有 O(N)。
在优化后,将逻辑 region 转换成执行 region 的整体计算复杂度从 O(N2) 降为 O(N)。经测试,对于上文提到的 word count 作业,当两个节点间的连边为全连接边且数据交换方式为 blocking 时,构建 region 的总时间降低了 99%,从 8,257ms 降至 120ms。