pig2 发表于 2019-1-23 18:53:52

彻底明白Flink系统学习27:【Flink1.7】连续查询连接(时态表等)

问题导读

1.Flink有哪些连接?
2.常规连接、窗口连接、时态表连接有什么区别?
3.时态表函数如何使用?

上一篇彻底明白Flink系统学习26:【Flink1.7】Table API 和SQL API之时间属性
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26646


连接是批处理数据处理中常见且易于理解的操作,用于连接两个关系的行。 但是,动态表上的连接语义不太明显甚至令人困惑。

因此,有几种方法可以使用Table API或SQL实际执行连接。

常规连接
常规joins是最通用的join类型,其中任何新记录或对join输入任一侧的更改都是可见的并且正在影响整个联接(join)结果。 例如,如果左侧有新记录,则它将与右侧的所有先前和未来记录一起加入。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
这些语义允许任何类型的更新(插入,更新,删除)输入表。
但是,此操作具有重要意义:它需要将连接输入的两侧永久保持在Flink的状态。 因此,如果一个或两个输入表不断增长,资源使用也将无限增长。

时间窗口连接
时间窗口连接由join 定义,该join 检查输入记录的时间属性是否在某些时间限制内,即时间窗口。
SELECT *
FROM
Orders o,
Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
与常规连接操作相比,此类连接仅支持具有时间属性的仅追加表(append-only tables)。 由于时间属性是quasi-monontic增加,Flink可以从其状态中删除旧值而不影响结果的正确性。

时态表连接
具有时态表的连接将仅追加( append-only)表(左输入/探测侧)与时态表(右输入/构建侧)连接,即,随时间变化并跟踪其变化的表。 有关时态表,后面会进一步介绍。
以下示例显示了一个仅追加(append-only)表的订单,这些订单应与不断变化的货币汇率表RatesHistory连接。

Orders是仅追加( append-only)表,表示给定金额和给定货币的付款。 例如,在10:15,订单金额为2欧元。
SELECT * FROM Orders;

rowtime amount currency
======= ====== =========
10:15      2 Euro
10:30      1 US Dollar
10:32       50 Yen
10:52      3 Euro
11:04      5 US Dollar
RatesHistory代表一个不断变化的仅追加货币汇率表,相对于日元(汇率为1)。 例如,欧元兑日元从09:00到10:45的汇率为114.从10:45到11:15,汇率为116。
SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro      114
09:00   Yen         1
10:45   Euro      116
11:15   Euro      119
11:49   Pounds      108
鉴于我们想要计算所有订单的金额转换为通用货币(日元)。

例如,我们希望使用转换率给定的rowtime转换如下订单 (114)

rowtime amount currency
======= ====== =========
10:15      2 Euro
如果不使用时态表,就需要编写如下查询:
SELECT
SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
SELECT MAX(rowtime)
FROM RatesHistory AS r2
WHERE r2.currency = o.currency
AND r2.rowtime <= o.rowtime);借助时态表函数Rates,可以使用下面sql查询
SELECT
o.amount * r.rate AS amount
FROM
Orders AS o,
LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency来自探测器(追加)侧的每个记录将在探测器(追加)侧记录的相关时间属性时与构建侧表(的版本)连接。 为了支持构建端表上先前值的更新(覆盖),该表必须定义主键。

在我们的示例中,Orders表中的每条记录将与Rates表join,在时间o.rowtime 。 货币(currency)字段已被定义为之前的Rates的主键,并用于连接我们示例中的两个表。 如果查询使用处理时间概念,则在执行操作时,新添加的订单将始终与最新版本的Rates连接。

与常规连接相比,这意味着如果构建端(时态表)有新记录,则不会影响以前的连接结果。 这再次允许Flink限制元素的数量,保持state。

与时间窗口连接相比,时态表连接不定义时间窗口(时间窗口内的数据将会被join)。 探针端的记录始终与time属性指定的构建端版本连接。 因此,构建方面的记录可能是任意旧的。 随着时间的推移,将从状态中删除先前和不再需要的记录版本(对于给定的主键)。

用法
定义时态表函数后,我们可以使用它。 时态表函数的使用方式与使用普通表函数的方式相同。

以下代码段解决了我们从Orders表转换货币问题:
SELECT
SUM(o_amount * r_rate) AS amount
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency

Table result = orders
    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
    .select("(o_amount * r_rate).sum as amount");

val result = orders
    .join(rates('o_proctime), 'r_currency === 'o_currency)
    .select(('o_amount * 'r_rate).sum as 'amount)
注意:尚未为时态连接实现查询配置中定义的状态保留。 这意味着计算查询结果所需的状态可能会无限增长,具体取决于历史记录表的不同主键的数量。

处理时间时态连接
使用处理时间属性,不可能将过去的时间属性作为参数传递给时态表函数。 根据定义,它始终是当前时间戳。 因此,处理时间时间表函数的调用将始终返回基础表的最新已知版本,并且基础历史表中的任何更新也将立即覆盖当前值。

只有构建端记录的最新版本(相对于已定义的主键)保留在该状态中。 构建端的更新将不会影响先前发出的连接结果。

可以将处理时间时间连接视为一个简单的HashMap <K,V>,它存储来自构建方的所有记录。 当构建端的新记录具有与先前记录相同的key时,旧值只是被覆盖。 始终根据HashMap的最新/当前状态评估探测器端的每条记录。

事件时间时态连接
利用事件时间属性(即rowtime属性),可以将过去的时间属性传递给时间表函数。这允许在共同的时间点连接两个表。

与处理时间时间连接相比,时态表不仅保持状态中的构建侧记录的最新版本(相对于定义的主键),而且存储自上一个水印以来的所有版本(由时间标识)。

例如,根据时态表的概念,在时间12:30:00将附加到探测器侧表的事件时间时间戳为12:30:00的传入行与构建侧表的版本连接在一起。 。因此,传入行仅与时间戳小于或等于12:30:00的行连接,并根据主键应用更新,直到此时为止。

通过事件时间的定义,水印允许连接操作及时向前移动并丢弃不再需要的构建表的版本,因为不期望具有较低或相等时间戳的传入行。

最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg



美丽天空 发表于 2019-1-25 00:21:41

感谢分享
页: [1]
查看完整版本: 彻底明白Flink系统学习27:【Flink1.7】连续查询连接(时态表等)