一文讲解Flink1.9 SQL空闲状态保留时间实现原理
问题导读:1、Flink SQL 空闲状态保留时间如何设置?
2、空闲状态参数如何配置?
3、如何理解空闲状态保留时间实现原理?
4、registerProcessingCleanupTimer 方法如何使用?
static/image/hrline/line7.png
前言
最近在做 Flink SQL 方面的研究,我们有这样一个场景,就是按照天来实时统计截止到当前时刻的某些指标值。Flink SQL 中会使用状态来存储统计后的结果值,但是有一个问题就是,其实统计的指标值也只有当天才会用到,后续其实很少会用到这些数据。由于统计的粒度非常的细,所以这里 Flink SQL 任务中的状态就会非常大,导致 HDFS 上面的存储占用过大。Flink SQL 中支持状态空闲时间的设置,如果某个 Key 的状态在一定 时间没有被更新, Flink 会自动清理该状态。本文结合 Flink 1.9 SQL 中的代码,尝试研究该原理的实现流程。
1. Flink SQL 空闲状态保留时间和参数配置
Flink SQL 空闲状态保留时间是针对 SQL 中聚合 Key 而言的,空闲的时间也就是 Key 没有更新的时间。如果在 Flink SQL 任务中设置了空闲状态的保留时间,那么当状态空闲超过一定的时间后,状态就会被清理。下面是官方文档对于 Flink SQL 空闲状态保留时间的定义:
[*]The Idle State Retention Time parameters define for how long the state of a key is retained without being updated before it is removed.
设置 Flink SQL 空闲状态保留时间有两个参数,状态空闲最小保留时间和状态空闲最大保留时间,很多人会问,为什么会设置两个时间参数呢,设置一个参数不就好了吗,先来看看这两个参数的定义:
[*]The minimum idle state retention time defines how long the state of an inactive key is at least kept before it is removed.(最小空闲状态时间定义了一个 Key 的状态至少空闲的时间)
[*]The maximum idle state retention time defines how long the state of an inactive key is at most kept before it is removed.(最大空闲状态时间定义了一个 Key 的状态至多空闲的时间)
用户在设置状态的空闲时间时,最小的状态空闲时间和最大的状态空闲时间之间的间隔必须大于 5 分钟。下面是设置 Flink SQL 任务空闲状态的保留时间的代码:
EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().inStreamingMode()
.useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.enableCheckpointing(taskConfig.getCheckpointInterval() * 1000L);
streamEnv.getConfig().setLatencyTrackingInterval(LATENCY_TRACK_INTERVAL);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
/** 设置状态的最小空闲时间和最大的空闲时间*/ streamTableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(idleStateMinTime),Time.minutes(idleStateMaxTime));
你可能会有一个问题,直接使用一个时间参数,但状态到达这个时间就删除不就行了,为什么还需要定义两个时间参数呢,下面来结合源码进行分析。
2. Flink SQL 空闲状态保留时间实现原理分析
简单的讲,Flink SQL 空闲状态保留的时间底层是基于 KeyedProcessFunction 函数来进行实现的,然后为每个 Key,结合空闲状态时间的最小值和最大值注册 Timer ,然后到时间就进行状态清理。具体逻辑从 KeyedProcessFunctionWithCleanupState 这个类开始看起:
首先,这个类有一个是否能够清理空闲状态的标志,当空闲状态最小保留时间大于 1 时,这个标志就为 True。
同时,针对每一个 Key ,都有一个 ValueState,记录着这个 Key 的最新的 Timer 触发的时间。当然,这个值会随着这个 Key 的记录,后续可能会进行时间更新。
下面来看一下具体的 Timer 注册逻辑,空闲状态的清理的 Timer 是调用其方法 registerProcessingCleanupTimer 来进行注册,而方面中又调用了 CleanupState 中的 registerProcessingCleanupTimer 方法:
每次当某个 Key 有消息记录处理时,先从状态中取出该 Key 最新的 Timer 的触发时间,如果为空,表示这调消息是这个 Key 的第一条记录,那么会使用当前的时间 + 最大空闲状态保留的时间作为 Timer 未来的触发时间。
如果当前时间 + 状态最小的空闲状态保留的时间 > 上一次注册 Timer 的触发清理的时间,那么也重新注册 Timer,Timer的时间也为当前的时间 + 最大空闲状态保留的时间,同时,删除上一次注册的清理的 Timer。未来每来一条这个 Key 的消息记录时,便会执行上面的逻辑。如果没有满足上面的逻辑,就不做任何处理。
最终,当 Timer 触发时,会调用 State 的 clear()方法,进行状态清理。
有个点需要注意,如果某个 Key 的状态被清理掉,如果后续再来这个 Key 的消息记录时,会被当做该 Key 的第一条记录来进行处理,聚合值也是重新开始计算。所以,请确保设置合理的空闲状态保留时间。
3. 总结
Flink SQL 虽然没有 DataStream API 那样为每个算子单独来设置状态的保留时间,不过在 Flink SQL 我们可以设置空闲状态的保留时间,具体的时间业务方根据实际情况而定。Flink 1.9 SQL 中也有很多的任务优化的参数配置,感兴趣的同学,可以研究一下。
作者:Lake说科技
来源:https://zhuanlan.zhihu.com/p/98788196
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
大家一起来探讨,共同进步。{:3_41:}
1、Flink SQL 空闲状态保留时间如何设置?
setIdleStateRetentionTime(Time.minutes(idleStateMinTime),Time.minutes(idleStateMaxTime))
2、空闲状态参数如何配置?
Class KeyedProcessFunctionWithCleanupState<K,IN,OUT> 调用其方法 registerProcessingCleanupTimer
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.html
3、如何理解空闲状态保留时间实现原理?
空闲状态保留时间实现原理分析这一块没看太懂 -- 希望大家补充
4、registerProcessingCleanupTimer 方法如何使用?
-------------------------------
1.判断key为null 或者 this.Current + MinTime > 上一次的Timer,则为key 注册一个Timer (currenttime + MaxTime) 并删除上一次的Timer;
注册方法使用调用:registerProcessingCleanupTimer
删除方法调用:cleanupState
-------------------------------
对比来说:就像Streaming中的keyedProcessFunction中的 openprocessElement onTimer 等方法
open 初始化state
processElement 首先判断state是否为空,如果是数据是第一次或者key为第一次的话,都会进逻辑体,然后为key更新一些属性值,然后围绕这个状态进行后续处理
onTimer 触发事件点
-------------------------------
一文讲解Flink1.9 SQL空闲状态保留时间实现原理
页:
[1]