分享

彻底明白Flink系统学习28:【Flink1.7】时态表详解

pig2 2019-1-24 16:55:19 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 8379
问题导读
1.什么是Flink时态表?
2.时态表有什么特点?
3.如何创建时态表?
4.时态表产生的原因是什么?



上一篇彻底明白Flink系统学习27:【Flink1.7】连续查询连接(时态表等)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26650

时态表表示改变的历史记录表上的(参数化)视图的概念,该表返回特定时间点的表的内容。

Flink可以跟踪应用于追加表的更改,在查询中的特定时间点,允许访问表的内容。

时态表产生的原因

我们假设有下表RatesHistory。
[mw_shl_code=sql,true]SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108[/mw_shl_code]
RatesHistory代表一个增长的日元(汇率为1)仅追加(append-only)货币汇率表。 例如,欧元兑日元从09:00到10:45的汇率为114.从10:45到11:15,汇率为116。

鉴于我们希望在10:58时输出所有当前的汇率,我们需要以下SQL查询来计算结果表:
[mw_shl_code=sql,true]SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = r.currency
  AND r2.rowtime <= TIME '10:58');[/mw_shl_code]
相关子查询确定相应货币的最大时间小于或等于所需时间。 外部查询列出具有最大时间戳的汇率。

下表显示了这种计算的结果。 在我们的示例中,将考虑在10:45更新欧元,但是,在时间10:58,表格的版本中不会考虑在11点15分对欧元的更新以及新的值。【评注,这里的版本由于不同的时间可能会改变,因此随着时间版本会改变】
[mw_shl_code=sql,true]rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
10:45   Euro        116[/mw_shl_code]
Temporal Tables的概念旨在简化此类查询,加速执行并减少Flink的状态使用。 Temporal Table是仅附加表的参数化视图,它将仅附加( append-only)表的行解释为表的更改日志,并在特定时间点提供该表的版本。 将仅附加表解释为更改日志(changelog)需要指定主键属性和时间戳属性。 主键确定覆盖哪些行,时间戳确定行有效的时间。
在上面的示例中,currency是RatesHistory表的主键,rowtime是timestamp属性。

在Flink中,时态表由时态表函数表示。

时态表函数
为了访问时态表中的数据,必须传递一个时间属性,该属性确定将返回的表的版本。 Flink使用表函数的SQL语法来提供表达它的方法。

定义后,Temporal Table Function采用单个时间参数timeAttribute并返回一组rows。 返回内容包含与给定时间属性相关的所有现有主键的最新行版本。

假设我们基于RatesHistory表定义了一个时态表函数Rates(timeAttribute),我们可以通过以下方式查询这样的函数:
[mw_shl_code=sql,true]SELECT * FROM Rates('10:15');

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1

SELECT * FROM Rates('11:00');

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
10:45   Euro        116
09:00   Yen           1[/mw_shl_code]

Rates(timeAttribute)的每个查询都返回给定timeAttribute的Rates的状态。

注意:目前,Flink不支持使用常量时间属性参数直接查询时态表函数。 目前,时态表函数只能用于连接。

关于时态表的链接,可参考上一篇
彻底明白Flink系统学习27:【Flink1.7】连续查询连接(时态表等)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26650

定义时态表函数
以下代码段说明了如何从仅追加加表创建时态表函数。
[mw_shl_code=scala,true]// 获取table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// 提供费率历史记录表的静态数据集。
val ratesHistoryData = new mutable.MutableList[(String, Long)]
ratesHistoryData.+=(("US Dollar", 102L))
ratesHistoryData.+=(("Euro", 114L))
ratesHistoryData.+=(("Yen", 1L))
ratesHistoryData.+=(("Euro", 116L))
ratesHistoryData.+=(("Euro", 119L))

//使用上面的数据集创建并注册示例表。
// 在实际设置中,用自己的表替换它。
val ratesHistory = env
  .fromCollection(ratesHistoryData)
  .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)

tEnv.registerTable("RatesHistory", ratesHistory)

//创建并注册TemporalTableFunction。
//将“r_proctime”定义为time属性,将“r_currency”定义为主键。
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1)
tEnv.registerFunction("Rates", rates)                                          // <==== (2)[/mw_shl_code]

[mw_shl_code=java,true]import org.apache.flink.table.functions.TemporalTableFunction;
(...)

// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

// Provide a static data set of the rates history table.
List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
ratesHistoryData.add(Tuple2.of("Euro", 114L));
ratesHistoryData.add(Tuple2.of("Yen", 1L));
ratesHistoryData.add(Tuple2.of("Euro", 116L));
ratesHistoryData.add(Tuple2.of("Euro", 119L));

// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");

tEnv.registerTable("RatesHistory", ratesHistory);

// Create and register a temporal table function.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates);         [/mw_shl_code]
第(1)行创建了一个rates时态表函数,它允许我们使用表API中的函数rates。

第(2)行在我们的表环境中以名称Rates注册此函数,这允许我们在SQL中使用Rates函数。


最新经典文章,欢迎关注公众号




本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
jiangzi 发表于 2019-1-30 00:11:05
【Flink1.7】时态表详解~~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条