分享

Flink系统学习32-2:Table API详解

pig2 2019-2-28 11:51:02 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 4994
本帖最后由 pig2 于 2019-2-28 11:51 编辑
问题导读

1.Join算子有哪些?
2.内联和外联有什么区别?
3.时间窗口连接有什么条件?
4.集合算子有哪些?


上一篇
Flink系统学习32-1:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26756


Joins
算子
描述
Inner Join
BatchStreaming
与SQL JOIN子句类似。连接两张表。 两个表必须具有不同的字段名称,并且必须通过连接运算符或使用where或filter运算符定义至少一个相等连接谓词。
[mw_shl_code=java,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e)[/mw_shl_code]
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e)[/mw_shl_code]

注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信

Outer Join
Batch StreamingResult Updating
与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。 join两张表。 两个表必须具有不同的字段名称,并且必须至少定义一个相等连接谓词。
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");

Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");[/mw_shl_code]
[mw_shl_code=scala,true]val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)

val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)[/mw_shl_code]
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信

Time-windowed Join
BatchStreaming
注意:时间窗口连接是可以以流方式处理的常规子集连接。
时间窗口连接需要至少一个等连接和一个限制双方时间的连接条件。 这样的条件可以由两个适当的范围(<,<=,> =,>)或单个等式来定义,该单个等式比较两个输入表的相同类型的时间属性(即,处理时间或事件时间)。
例如,以下是有效的窗口连接条件:
  • ltime === rtime
  • ltime >= rtime && ltime < rtime + 10.minutes
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");

Table result = left.join(right)
  .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
  .select("a, b, e, ltime");[/mw_shl_code]
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)

val result = left.join(right)
  .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)
  .select('a, 'b, 'e, 'ltime)[/mw_shl_code]

Inner Join with Table Function
BatchStreaming
使用表函数的结果与表连接。 左(外)表的每一行与表函数的相应调用产生的所有行连接。 如果其表函数调用返回空结果,则删除左(外)表的一行。
[mw_shl_code=java,true]// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
    .join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
    .select("a, b, s, t, v");[/mw_shl_code]
[mw_shl_code=scala,true]// instantiate User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()

// join
val result: Table = table
    .join(split('c) as ('s, 't, 'v))
    .select('a, 'b, 's, 't, 'v)[/mw_shl_code]

Left Outer Join with Table Function
BatchStreaming
使用表函数的结果连接表。 左(外)表的每一行与表函数的相应调用产生的所有行连接。 如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。
注意:目前,表函数的左外连接只能为空或为true。
[mw_shl_code=java,true]// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
    .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
    .select("a, b, s, t, v");[/mw_shl_code]
[mw_shl_code=scala,true]// instantiate User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()

// join
val result: Table = table
    .leftOuterJoin(split('c) as ('s, 't, 'v))
    .select('a, 'b, 's, 't, 'v)[/mw_shl_code]

Join with Temporal Table(时态表连接)
Streaming
时态表是跟踪随时间变化的表。

时态表函数提供对特定时间点的时态表的状态的访问。 使用时态表函数连接表的语法与使用表函数的内部连接相同。

目前仅支持具有时态表的内部联接。
[mw_shl_code=java,true]Table ratesHistory = tableEnv.scan("RatesHistory");

// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
    "r_proctime",
    "r_currency");
tableEnv.registerFunction("rates", rates);

// join with "Orders" based on the time attribute and key
Table orders = tableEnv.scan("Orders");
Table result = orders
    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")[/mw_shl_code]
[mw_shl_code=scala,true]val ratesHistory = tableEnv.scan("RatesHistory")

// register temporal table function with a time attribute and primary key
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)

// join with "Orders" based on the time attribute and key
val orders = tableEnv.scan("Orders")
val result = orders
    .join(rates('o_rowtime), 'r_currency === 'o_currency)[/mw_shl_code]
更多可查看时态表

集合算子
算子
描述
Union
Batch
与SQL UNION子句类似。 联合两个表删除了重复记录,两个表必须具有相同的字段类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.union(right)[/mw_shl_code]
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);[/mw_shl_code]

UnionAll
BatchStreaming
类似于SQL UNION ALL子句。 联合两张表。 两个表必须具有相同的字段类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.unionAll(right)[/mw_shl_code]
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);[/mw_shl_code]

Intersect
Batch
类似于SQL INTERSECT子句。 Intersect返回两个表中存在的记录。 如果一个或两个表中的记录不止一次出现,则只返回一次,即结果表没有重复记录。 两个表必须具有相同的字段类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersect(right)[/mw_shl_code]
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);[/mw_shl_code]

IntersectAll
Batch
类似于SQL INTERSECT ALL子句。 IntersectAll返回两个表中存在的记录。 如果两个表中的记录不止一次出现,则返回的次数与两个表中存在的次数相同,即结果表可能具有重复记录。 两个表必须具有相同的字段类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersectAll(right)[/mw_shl_code]
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);[/mw_shl_code]

Minus
Batch
与SQL EXCEPT子句类似。 Minus返回左表中右表中不存在的记录。 左表中的重复记录只返回一次,即删除重复项。 两个表必须具有相同的字段类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minus(right)[/mw_shl_code]
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);[/mw_shl_code]

MinusAll
Batch
类似于SQL EXCEPT ALL子句。 MinusAll返回右表中不存在的记录。 在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中出现的重复数。 两个表必须具有相同的字段类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minusAll(right)[/mw_shl_code]
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);[/mw_shl_code]
In
BatchStreaming
与SQL IN子句类似。 如果表达式存在于给定的表子查询中,则返回true。 子查询表必须包含一列。 此列必须与表达式具有相同的数据类型。
[mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a)
val result = left.select('a, 'b, 'c).where('a.in(right))[/mw_shl_code]

[mw_shl_code=java,true]Table left = ds1.toTable(tableEnv, "a, b, c");
Table right = ds2.toTable(tableEnv, "a");

// using implicit registration
Table result = left.select("a, b, c").where("a.in(" + right + ")");

// using explicit registration
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");[/mw_shl_code]
注意:对于流式查询,操作将在连接和组操作中重写。 计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信


最新经典文章,欢迎关注公众号




已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条