彻底明白Flink系统学习30:查询配置
问题导读1.有界批量输入还是无界流输入,表API和SQL查询都有什么特点?
2.状态的本质是什么?
3.连续查询的准确性和资源消耗如何调整?
4.配置空闲状态保留时间有那两个参数?
上一篇文章:彻底明白Flink系统学习29-4:【Flink1.7】流概念之模式检测
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26695
版本:Flink1.7
无论表达式输入是有界批量输入还是无界流输入,表API和SQL查询都具有相同的语义。在许多情况下,对流输入的连续查询能够计算与离线计算结果相同的准确结果。然而,这在一般情况下是不可能的,因为连续查询必须限制它们维护的状态的大小,以避免耗尽存储并且能够在很长一段时间内处理无界流数据。因此,连续查询可能只能提供近似结果,具体取决于输入数据的特征和查询本身。(这里需要说明的是,状态的本质其实还是存储,所以对于状态的维护,需要不断的清理)
Flink的Table API和SQL接口提供参数来调整连续查询的准确性和资源消耗。参数通过QueryConfig对象指定。 QueryConfig可以从TableEnvironment获得,并在转换Table时传回,即,当它转换为DataStream或通过TableSink发出时。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// obtain query configuration from TableEnvironment
val qConfig: StreamQueryConfig = tableEnv.queryConfig
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
// define query
val result: Table = ???
// create TableSink
val sink: TableSink = ???
// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
Array(...), // field names
Array](...), // field types
sink) // table sink
// emit result Table via a TableSink
result.insertInto("outputTable", qConfig)
// convert result Table into a DataStream
val stream: DataStream = result.toAppendStream(qConfig)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
// define query
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
new String[]{...}, // field names
new TypeInformation[]{...},// field types
sink); // table sink
// emit result Table via a TableSink
result.insertInto("outputTable", qConfig);
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
在下文中,我们将描述QueryConfig的参数以及它们如何影响查询的准确性和资源消耗。
空闲状态保留时间
许多查询聚合或连接一个或多个key 属性上的记录。 在流上执行此类查询时,连续查询需要收集记录或维护每个key的部分结果。 如果输入流的key域正在变化,即,active key值随时间变化,则随着越来越多的不同key,连续查询累积越来越多的状态。 但是,经常在一段时间后key变为非活动状态,并且它们的相应状态变得陈旧且无用。
例如,以下查询计算每个会话(session)的单击次数。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
sessionId属性用作分组key,连续查询维护其(看到的)每个sessionId的计数。 sessionId属性随着时间的推移而变化(进化),并且sessionId值仅在会话(session)结束之前有效,即,在有限的时间段内。 但是,连续查询无法知道sessionId的此属性,并期望每个sessionId值都可以在任何时间点发生。 它维护每个(观察到的)sessionId值的计数。 因此,随着(观察到)越来越多的sessionId值,查询的总状态大小不断增长。
空闲状态保留时间参数(Idle State Retention Time)定义了在删除key之前保留key状态多长时间而不进行更新。 对于前面的示例查询,只要在配置的时间段内没有更新sessionId,就会删除它的计数。
通过删除key的状态,连续查询完全忘记它之前已经看过这个key。 如果处理了具有其状态已被删除的key的记录,则该记录将被视为具有相应key的第一个记录。 对于上面的示例,这意味着sessionId的计数将再次从0开始。
配置空闲状态保留时间有两个参数:
[*]minimum idle state retention time定义了非活动key的状态在被删除之前至少保持多长时间。
[*]maximum idle state retention time定义了非活动key的状态在被删除之前最多保留多长时间。
参数规定如下:
val qConfig: StreamQueryConfig = ???
// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
StreamQueryConfig qConfig = ...
// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
清理状态需要额外的簿记(bookkeeping),这对于minTime和maxTime的较大差异而言变得更实用(便宜)。 minTime和maxTime之间的差异必须至少为5分钟。
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享
页:
[1]