本帖最后由 pig2 于 2019-2-28 11:51 编辑
问题导读
1.Join算子有哪些?
2.内联和外联有什么区别?
3.时间窗口连接有什么条件?
4.集合算子有哪些?
上一篇
Flink系统学习32-1:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26756
Joins
算子 | 描述 | Inner Join
BatchStreaming | 与SQL JOIN子句类似。连接两张表。 两个表必须具有不同的字段名称,并且必须通过连接运算符或使用where或filter运算符定义至少一个相等连接谓词。 [mw_shl_code=java,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e)[/mw_shl_code] [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e)[/mw_shl_code]
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅 查询配置了解详细信
| Outer Join
Batch StreamingResult Updating | 与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。 join两张表。 两个表必须具有不同的字段名称,并且必须至少定义一个相等连接谓词。 [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");[/mw_shl_code] [mw_shl_code=scala,true]val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)[/mw_shl_code] 注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
| Time-windowed Join
BatchStreaming | 注意:时间窗口连接是可以以流方式处理的常规子集连接。 时间窗口连接需要至少一个等连接和一个限制双方时间的连接条件。 这样的条件可以由两个适当的范围(<,<=,> =,>)或单个等式来定义,该单个等式比较两个输入表的相同类型的时间属性(即,处理时间或事件时间)。 例如,以下是有效的窗口连接条件: - ltime === rtime
- ltime >= rtime && ltime < rtime + 10.minutes
[mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
Table result = left.join(right)
.where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
.select("a, b, e, ltime");[/mw_shl_code] [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)
val result = left.join(right)
.where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)
.select('a, 'b, 'e, 'ltime)[/mw_shl_code]
| Inner Join with Table Function
BatchStreaming | 使用表函数的结果与表连接。 左(外)表的每一行与表函数的相应调用产生的所有行连接。 如果其表函数调用返回空结果,则删除左(外)表的一行。 [mw_shl_code=java,true]// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
// join
Table orders = tableEnv.scan("Orders");
Table result = orders
.join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
.select("a, b, s, t, v");[/mw_shl_code] [mw_shl_code=scala,true]// instantiate User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()
// join
val result: Table = table
.join(split('c) as ('s, 't, 'v))
.select('a, 'b, 's, 't, 'v)[/mw_shl_code]
| Left Outer Join with Table Function
BatchStreaming | 使用表函数的结果连接表。 左(外)表的每一行与表函数的相应调用产生的所有行连接。 如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。 注意:目前,表函数的左外连接只能为空或为true。 [mw_shl_code=java,true]// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
// join
Table orders = tableEnv.scan("Orders");
Table result = orders
.leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
.select("a, b, s, t, v");[/mw_shl_code] [mw_shl_code=scala,true]// instantiate User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()
// join
val result: Table = table
.leftOuterJoin(split('c) as ('s, 't, 'v))
.select('a, 'b, 's, 't, 'v)[/mw_shl_code]
| Join with Temporal Table(时态表连接)
Streaming
| 时态表是跟踪随时间变化的表。
时态表函数提供对特定时间点的时态表的状态的访问。 使用时态表函数连接表的语法与使用表函数的内部连接相同。
目前仅支持具有时态表的内部联接。 [mw_shl_code=java,true]Table ratesHistory = tableEnv.scan("RatesHistory");
// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
"r_proctime",
"r_currency");
tableEnv.registerFunction("rates", rates);
// join with "Orders" based on the time attribute and key
Table orders = tableEnv.scan("Orders");
Table result = orders
.join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")[/mw_shl_code] [mw_shl_code=scala,true]val ratesHistory = tableEnv.scan("RatesHistory")
// register temporal table function with a time attribute and primary key
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)
// join with "Orders" based on the time attribute and key
val orders = tableEnv.scan("Orders")
val result = orders
.join(rates('o_rowtime), 'r_currency === 'o_currency)[/mw_shl_code] |
集合算子
算子 | 描述 | Union
Batch | 与SQL UNION子句类似。 联合两个表删除了重复记录,两个表必须具有相同的字段类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.union(right)[/mw_shl_code] [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);[/mw_shl_code]
| UnionAll
BatchStreaming | 类似于SQL UNION ALL子句。 联合两张表。 两个表必须具有相同的字段类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.unionAll(right)[/mw_shl_code] [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);[/mw_shl_code]
| Intersect
Batch | 类似于SQL INTERSECT子句。 Intersect返回两个表中存在的记录。 如果一个或两个表中的记录不止一次出现,则只返回一次,即结果表没有重复记录。 两个表必须具有相同的字段类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersect(right)[/mw_shl_code] [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);[/mw_shl_code]
| IntersectAll
Batch | 类似于SQL INTERSECT ALL子句。 IntersectAll返回两个表中存在的记录。 如果两个表中的记录不止一次出现,则返回的次数与两个表中存在的次数相同,即结果表可能具有重复记录。 两个表必须具有相同的字段类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersectAll(right)[/mw_shl_code] [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);[/mw_shl_code]
| Minus
Batch | 与SQL EXCEPT子句类似。 Minus返回左表中右表中不存在的记录。 左表中的重复记录只返回一次,即删除重复项。 两个表必须具有相同的字段类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minus(right)[/mw_shl_code] [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);[/mw_shl_code]
| MinusAll
Batch | 类似于SQL EXCEPT ALL子句。 MinusAll返回右表中不存在的记录。 在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中出现的重复数。 两个表必须具有相同的字段类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minusAll(right)[/mw_shl_code] [mw_shl_code=java,true]Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);[/mw_shl_code] | In
BatchStreaming | 与SQL IN子句类似。 如果表达式存在于给定的表子查询中,则返回true。 子查询表必须包含一列。 此列必须与表达式具有相同的数据类型。 [mw_shl_code=scala,true]val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a)
val result = left.select('a, 'b, 'c).where('a.in(right))[/mw_shl_code]
[mw_shl_code=java,true]Table left = ds1.toTable(tableEnv, "a, b, c");
Table right = ds2.toTable(tableEnv, "a");
// using implicit registration
Table result = left.select("a, b, c").where("a.in(" + right + ")");
// using explicit registration
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");[/mw_shl_code] 注意:对于流式查询,操作将在连接和组操作中重写。 计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
|
最新经典文章,欢迎关注公众号
|
|