Flink系统学习32-2:Table API详解
本帖最后由 pig2 于 2019-2-28 11:51 编辑问题导读
1.Join算子有哪些?
2.内联和外联有什么区别?
3.时间窗口连接有什么条件?
4.集合算子有哪些?
上一篇
Flink系统学习32-1:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26756
Joins
算子描述
Inner Join
BatchStreaming与SQL JOIN子句类似。连接两张表。 两个表必须具有不同的字段名称,并且必须通过连接运算符或使用where或filter运算符定义至少一个相等连接谓词。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e)val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e)
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
Outer Join
Batch StreamingResult Updating与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。 join两张表。 两个表必须具有不同的字段名称,并且必须至少定义一个相等连接谓词。Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
Time-windowed Join
BatchStreaming注意:时间窗口连接是可以以流方式处理的常规子集连接。时间窗口连接需要至少一个等连接和一个限制双方时间的连接条件。 这样的条件可以由两个适当的范围(<,<=,> =,>)或单个等式来定义,该单个等式比较两个输入表的相同类型的时间属性(即,处理时间或事件时间)。例如,以下是有效的窗口连接条件:
[*]ltime === rtime
[*]ltime >= rtime && ltime < rtime + 10.minutes
Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
Table result = left.join(right)
.where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
.select("a, b, e, ltime");val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)
val result = left.join(right)
.where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)
.select('a, 'b, 'e, 'ltime)
Inner Join with Table Function
BatchStreaming使用表函数的结果与表连接。 左(外)表的每一行与表函数的相应调用产生的所有行连接。 如果其表函数调用返回空结果,则删除左(外)表的一行。// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
// join
Table orders = tableEnv.scan("Orders");
Table result = orders
.join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
.select("a, b, s, t, v");// instantiate User-Defined Table Function
val split: TableFunction = new MySplitUDTF()
// join
val result: Table = table
.join(split('c) as ('s, 't, 'v))
.select('a, 'b, 's, 't, 'v)
Left Outer Join with Table Function
BatchStreaming使用表函数的结果连接表。 左(外)表的每一行与表函数的相应调用产生的所有行连接。 如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。注意:目前,表函数的左外连接只能为空或为true。// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
// join
Table orders = tableEnv.scan("Orders");
Table result = orders
.leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
.select("a, b, s, t, v");// instantiate User-Defined Table Function
val split: TableFunction = new MySplitUDTF()
// join
val result: Table = table
.leftOuterJoin(split('c) as ('s, 't, 'v))
.select('a, 'b, 's, 't, 'v)
Join with Temporal Table(时态表连接)
Streaming
时态表是跟踪随时间变化的表。
时态表函数提供对特定时间点的时态表的状态的访问。 使用时态表函数连接表的语法与使用表函数的内部连接相同。
目前仅支持具有时态表的内部联接。Table ratesHistory = tableEnv.scan("RatesHistory");
// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
"r_proctime",
"r_currency");
tableEnv.registerFunction("rates", rates);
// join with "Orders" based on the time attribute and key
Table orders = tableEnv.scan("Orders");
Table result = orders
.join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")val ratesHistory = tableEnv.scan("RatesHistory")
// register temporal table function with a time attribute and primary key
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)
// join with "Orders" based on the time attribute and key
val orders = tableEnv.scan("Orders")
val result = orders
.join(rates('o_rowtime), 'r_currency === 'o_currency)更多可查看时态表
集合算子
算子描述
Union
Batch与SQL UNION子句类似。 联合两个表删除了重复记录,两个表必须具有相同的字段类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.union(right)Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);
UnionAll
BatchStreaming类似于SQL UNION ALL子句。 联合两张表。 两个表必须具有相同的字段类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.unionAll(right)Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);
Intersect
Batch类似于SQL INTERSECT子句。 Intersect返回两个表中存在的记录。 如果一个或两个表中的记录不止一次出现,则只返回一次,即结果表没有重复记录。 两个表必须具有相同的字段类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersect(right)Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);
IntersectAll
Batch类似于SQL INTERSECT ALL子句。 IntersectAll返回两个表中存在的记录。 如果两个表中的记录不止一次出现,则返回的次数与两个表中存在的次数相同,即结果表可能具有重复记录。 两个表必须具有相同的字段类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersectAll(right)Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);
Minus
Batch与SQL EXCEPT子句类似。 Minus返回左表中右表中不存在的记录。 左表中的重复记录只返回一次,即删除重复项。 两个表必须具有相同的字段类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minus(right)Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);
MinusAll
Batch类似于SQL EXCEPT ALL子句。 MinusAll返回右表中不存在的记录。 在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中出现的重复数。 两个表必须具有相同的字段类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minusAll(right)Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);
In
BatchStreaming与SQL IN子句类似。 如果表达式存在于给定的表子查询中,则返回true。 子查询表必须包含一列。 此列必须与表达式具有相同的数据类型。val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a)
val result = left.select('a, 'b, 'c).where('a.in(right))
Table left = ds1.toTable(tableEnv, "a, b, c");
Table right = ds2.toTable(tableEnv, "a");
// using implicit registration
Table result = left.select("a, b, c").where("a.in(" + right + ")");
// using explicit registration
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");注意:对于流式查询,操作将在连接和组操作中重写。 计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享
页:
[1]