Flink系统学习32-3:Table API详解
本帖最后由 pig2 于 2019-3-5 18:25 编辑问题导读
1.OrderBy, Offset & Fetch作用是什么?
2.Insert可以干什么事情?
3.如何在表上定义窗口聚合?
上一篇:Flink系统学习32-2:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26780
OrderBy, Offset & Fetch
算子描述
Order By
Batch与SQL ORDER BY子句类似。 返回跨所有并行分区全局排序的记录。val in = ds.toTable(tableEnv, 'a, 'b, 'c)
val result = in.orderBy('a.asc)Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");
Offset & Fetch
Batch类似于SQL OFFSET和FETCH子句。 偏移(Offset)和Fetch限制从排序结果返回的记录数。 Offset和Fetch在技术上是Order By运算符的一部分,因此必须在它之前。val in = ds.toTable(tableEnv, 'a, 'b, 'c)
// returns the first 5 records from the sorted result
val result1: Table = in.orderBy('a.asc).fetch(5)
// skips the first 3 records and returns all following records from the sorted result
val result2: Table = in.orderBy('a.asc).offset(3)
// skips the first 10 records and returns the next 5 records from the sorted result
val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)Table in = tableEnv.fromDataSet(ds, "a, b, c");
// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5);
// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);
// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
Insert
算子描述
Insert Into
BatchStreaming类似于SQL查询中的INSERT INTO子句。 执行插入已注册的输出表。输出表必须在TableEnvironment中注册(请参阅注册TableSink)。 此外,已注册表的模式必须与查询的模式匹配。val orders: Table = tableEnv.scan("Orders")
orders.insertInto("OutOrders")Table orders = tableEnv.scan("Orders");
orders.insertInto("OutOrders");
Group Windows
组窗口根据时间或行计数(row-count )间隔将行组聚合为有限组,并按组聚合函数。 对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。
Windows是使用window(w:Window)子句定义的,需要使用as子句指定的别名。 为了按窗口对表进行分组,必须在groupBy(...)子句中引用窗口别名,就像常规分组属性一样。 以下示例显示如何在表上定义窗口聚合。
val table = input
.window( as 'w)// define window with alias w
.groupBy('w) // group the table by window w
.select('b.sum)// aggregate
Table table = input
.window(.as("w"))// define window with alias w
.groupBy("w")// group the table by window w
.select("b.sum");// aggregate
在流式传输环境中,如果窗口聚合除了窗口之外还在一个或多个属性上进行分组,则它们只能并行计算,即,groupBy(...)子句引用窗口别名和至少一个附加属性。 仅引用窗口别名的groupBy(...)子句(例如上面的示例中)只能由单个非并行任务来调用。 以下示例显示如何使用其他分组属性定义窗口聚合。
val table = input
.window( as 'w) // define window with alias w
.groupBy('w, 'a)// group the table by attribute a and window w
.select('a, 'b.sum)// aggregate
Table table = input
.window(.as("w"))// define window with alias w
.groupBy("w, a")// group the table by attribute a and window w
.select("a, b.sum");// aggregate
窗口属性(如时间窗口的开始,结束或行时间戳)可以在select语句中添加为窗口别名的属性,分别为w.start,w.end和w.rowtime。 窗口开始和行时间戳是包含的下窗口和上窗口边界。 相反,窗口结束时间戳是独占的上窗口边界。 例如,从下午2点开始的30分钟的翻滚窗口将以14:00:00.000作为开始时间戳,14:29:59.999作为行时间戳,并且14:30:00.000作为结束时间戳。
val table = input
.window( as 'w)// define window with alias w
.groupBy('w, 'a)// group the table by attribute a and window w
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
Table table = input
.window(.as("w"))// define window with alias w
.groupBy("w, a")// group the table by attribute a and window w
.select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
Window参数定义行如何映射到窗口。 Window不是用户可以实现的接口。 相反,Table API提供了一组具有特定语义的预定义Window类,这些类被转换为基础DataStream或DataSet操作。
最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享
页:
[1]