总体来讲,关联维表有三个基础的方式:实时数据库查找关联(Per-Record Reference Data Lookup)、预加载维表关联(Pre-Loading of Reference Data)和维表变更日志关联(Reference Data Change Stream),而根据实现上的优化可以衍生出多种关联方式,且这些优化还可以灵活组合产生不同效果(不过为了简单性这里不讨论同时应用多种优化的实现方式)。对于不同的关联方式,我们可以从以下 7 个关键指标来衡量(每个指标的得分将以 1-5 五档来表示):
如果基于 Processing Time 做关联,我们可以利用 keyby 将两个数据流中关联字段值相同的数据划分到 KeyedCoProcessFunction 的同一个分区,然后用 ValueState 或者 MapState 将维表数据保存下来。在普通数据流的一条记录进到函数时,到 State 中查找有无符合条件的 join 对象,若有则关联输出结果,若无则根据 join 的类型决定是直接丢弃还是与空值关联。这里要注意的是,State 的大小要尽量控制好。首先是只保存每个 key 最新的维度数据值,其次是要给 State 设置好 TTL,让 Flink 可以自动清理。
基于 Processing Time 的维表变更日志关联优点是不需要直接请求数据库,不会对数据库造成压力;缺点是比较复杂,相当于使用 changelog 在 Flink 应用端重新构建一个维表,会占用一定的 CPU 和比较多的内存和磁盘资源。值得注意的是,我们可以利用 Flink 提供的 RocksDB StateBackend,将大部分的维表数据存在磁盘而不是内存中,所以并不会占用很高的内存。不过基于 Processing Time 的这种关联对两个数据流的延迟要求比较高,否则如果其中一个数据流出现 lag 时,关联得到的结果可能并不是我们想要的,比如可能会关联到未来时间点的维表数据。
基于 Processing Time 的维表变更日志关联比较适用于不便直接访问数据的场景(比如维表数据库是业务线上数据库,出于安全和负载的原因不能直接访问),或者对维表的变更实时性要求比较高的场景(但因为数据准确性的关系,一般用下文的 Event Time 关联会更好)。
Event Time 维表变更日志关联
基于 Event Time 的维表关联实际上和基于 Processing Time 的十分相似,不同之处在于我们将维表 changelog 的多个时间版本都记录下来,然后每当一条记录进来,我们会找到对应时间版本的维表数据来和它关联,而不是总用最新版本,因此延迟数据的关联准确性大大提高。不过因为目前 State 并没有提供 Event Time 的 TTL,因此我们需要自己设计和实现 State 的清理策略,比如直接设置一个 Event Time Timer(但要注意 Timer 不能太多导致性能问题),再比如对于单个 key 只保存最近的 10 个版本,当有更新版本的维表数据到达时,要清理掉最老版本的数据。
基于 Event Time 的维表变更日志关联相对基于 Processing Time 的方式来说是一个改进,虽然多个维表版本导致空间资源要求更大,但确保准确性对于大多数场景来说都是十分重要的。相比 Processing Time 对两个数据的延迟都有要求,Event Time 要求 build 数据流的延迟低,否则可能一条数据到达时关联不到对应维表数据或者关联了一个过时版本的维表数据,
基于 Event Time 的维表变更日志关联比较适合于维表变更比较多且对变更实时性要求较高的场景 同时也适合于不便直接访问数据库的场景。
Temporal Table Join
Temporal Table Join 是 Flink SQL/Table API 的原生支持,它对两个数据流的输入都进行了缓存,因此比起上述的基于 Event Time 的维表变更日志关联,它可以容忍任意数据流的延迟,数据准确性更好。Temporal Table Join 在 SQL/Table API 使用时是十分简单的,但如果想在 DataStream API 中使用,则需要自己实现对应的逻辑。
总体思路是使用一个 CoProcessFunction,将 build 数据流以时间版本为 key 保存在 MapState 中(与基于 Event Time 的维表变更日志关联相同),再将 probe 数据流和输出结果也用 State 缓存起来(同样以 Event Time 为 key),一直等到 Watermark 提升到它们对应的 Event Time,才把结果输出和将两个数据流的输入清理掉。
这个 Watermark 触发很自然地是用 Event Time Timer 来实现,但要注意不要为每条数据都设置一遍 Timer,因为一旦 Watermark 提升会触发很多个 Timer 导致性能急剧下降。比较好的实践是为每个 key 只注册一个 Timer。实现上可以记录当前未处理的最早一个 Event Time,并用来注册 Timer。当前 Watermark。每当 Watermark 触发 Timer 时,我们检查处理掉未处理的最早 Event Time 到当前 Event Time 的所有数据,并将未处理的最早 Event Time 更新为当前时间。