分享

Flink系统学习32-3:Table API详解

pig2 2019-3-4 18:55:14 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 4433
本帖最后由 pig2 于 2019-3-5 18:25 编辑
问题导读
1.OrderBy, Offset & Fetch作用是什么?
2.Insert可以干什么事情?
3.如何在表上定义窗口聚合?

上一篇:Flink系统学习32-2:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26780



OrderBy, Offset & Fetch

算子
描述
Order By
Batch
与SQL ORDER BY子句类似。 返回跨所有并行分区全局排序的记录。
[mw_shl_code=scala,true]val in = ds.toTable(tableEnv, 'a, 'b, 'c)
val result = in.orderBy('a.asc)[/mw_shl_code]
[mw_shl_code=java,true]Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");[/mw_shl_code]
[mw_shl_code=java,true]Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");[/mw_shl_code]
Offset & Fetch
Batch
类似于SQL OFFSET和FETCH子句。 偏移(Offset)和Fetch限制从排序结果返回的记录数。 Offset和Fetch在技术上是Order By运算符的一部分,因此必须在它之前。
[mw_shl_code=scala,true]val in = ds.toTable(tableEnv, 'a, 'b, 'c)

// returns the first 5 records from the sorted result
val result1: Table = in.orderBy('a.asc).fetch(5)

// skips the first 3 records and returns all following records from the sorted result
val result2: Table = in.orderBy('a.asc).offset(3)

// skips the first 10 records and returns the next 5 records from the sorted result
val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)[/mw_shl_code]
[mw_shl_code=java,true]Table in = tableEnv.fromDataSet(ds, "a, b, c");

// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5);

// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);

// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);[/mw_shl_code]

Insert
算子
描述
Insert Into
BatchStreaming
类似于SQL查询中的INSERT INTO子句。 执行插入已注册的输出表。
输出表必须在TableEnvironment中注册(请参阅注册TableSink)。 此外,已注册表的模式必须与查询的模式匹配。
[mw_shl_code=scala,true]val orders: Table = tableEnv.scan("Orders")
orders.insertInto("OutOrders")[/mw_shl_code]
[mw_shl_code=java,true]Table orders = tableEnv.scan("Orders");
orders.insertInto("OutOrders");[/mw_shl_code]

Group Windows
组窗口根据时间或行计数(row-count )间隔将行组聚合为有限组,并按组聚合函数。 对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。

Windows是使用window(w:Window)子句定义的,需要使用as子句指定的别名。 为了按窗口对表进行分组,必须在groupBy(...)子句中引用窗口别名,就像常规分组属性一样。 以下示例显示如何在表上定义窗口聚合。

[mw_shl_code=scala,true]val table = input
  .window([w: Window] as 'w)  // define window with alias w
  .groupBy('w)   // group the table by window w
  .select('b.sum)  // aggregate[/mw_shl_code]

[mw_shl_code=java,true]Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w")  // group the table by window w
  .select("b.sum");  // aggregate[/mw_shl_code]

在流式传输环境中,如果窗口聚合除了窗口之外还在一个或多个属性上进行分组,则它们只能并行计算,即,groupBy(...)子句引用窗口别名和至少一个附加属性。 仅引用窗口别名的groupBy(...)子句(例如上面的示例中)只能由单个非并行任务来调用。 以下示例显示如何使用其他分组属性定义窗口聚合。
[mw_shl_code=scala,true]val table = input
  .window([w: Window] as 'w) // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w
  .select('a, 'b.sum)  // aggregate[/mw_shl_code]
[mw_shl_code=java,true]Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w
  .select("a, b.sum");  // aggregate[/mw_shl_code]
窗口属性(如时间窗口的开始,结束或行时间戳)可以在select语句中添加为窗口别名的属性,分别为w.start,w.end和w.rowtime。 窗口开始和行时间戳是包含的下窗口和上窗口边界。 相反,窗口结束时间戳是独占的上窗口边界。 例如,从下午2点开始的30分钟的翻滚窗口将以14:00:00.000作为开始时间戳,14:29:59.999作为行时间戳,并且14:30:00.000作为结束时间戳。
[mw_shl_code=scala,true]val table = input
  .window([w: Window] as 'w)  // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w
  .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps[/mw_shl_code]
[mw_shl_code=java,true]Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w
  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps[/mw_shl_code]
Window参数定义行如何映射到窗口。 Window不是用户可以实现的接口。 相反,Table API提供了一组具有特定语义的预定义Window类,这些类被转换为基础DataStream或DataSet操作。




最新经典文章,欢迎关注公众号




已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条