问题导读
1.流处理和批处理是否使用同一套table API?
2.是否所有算子都可以用于流处理和批处理?
3.本文讲了哪些聚合算子?
上一篇:
Flink系统学习31-2:Table API和SQL之读取外部数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26739
Table API是用于流和批处理的统一关系API。表API查询可以在批量或流式输入上运行而无需修改。 Table API是SQL语言的超级集合,专门用于Apache Flink。 Table API是Scala和Java的语言集成API。 Table API查询不是像SQL中常见的那样将查询指定为String值,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持。
Table API与Flink的SQL集成共享其API的许多概念和部分。查看Common Concepts&API以了解如何注册表或创建Table对象。 Streaming Concepts讨论了流特定的概念,例如动态表和时间属性。
以下示例假设一个名为Orders的已注册表,其中包含属性(a,b,c,rowtime)。 rowtime字段是流式传输中的逻辑时间属性或批处理中的常规时间戳字段。
概述和示例
Table API可用于Scala和Java。 Scala Table API利用Scala表达式,Java Table API基于字符串,这些字符串被解析并转换为等效表达式。
以下示例显示了Scala和Java Table API之间的差异。 表程序在批处理环境中执行。 它按字段a扫描Orders表,分组,并计算每组的结果行数。 表程序的结果转换为Row类型的DataSet并打印。
由于Scala比较简洁,这里只做了Scala方面的内容,如果想看Java方面的,可参考Flink官网中文文档
Scala:
通过导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._来启用Scala Table API。
以下示例显示了如何构造Scala Table API程序。 使用Scala符号引用表属性,该符号以撇号字符(')开头。
[mw_shl_code=scala,true]import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table in table environment
// ...
// specify table program
val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)
val result = orders
.groupBy('a)
.select('a, 'b.count as 'cnt)
.toDataSet[Row] // conversion to DataSet
.print()[/mw_shl_code]
下一个示例显示了一个更复杂的Table API程序。 程序再次扫描Orders表。 它过滤空值,规范化String类型的字段a,并计算每小时和产品的平均计费金额b。
[mw_shl_code=scala,true]// environment configuration
// ...
// specify table program
val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)
val result: Table = orders
.filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
.select('a.lowerCase() as 'a, 'b, 'rowtime)
.window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
.groupBy('hourlyWindow, 'a)
.select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)[/mw_shl_code]
由于Table API是批量和流数据的统一API,因此两个示例程序都可以在批处理和流式输入上执行,而无需对表程序本身进行任何修改。 在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参阅流式概念)。
算子
Table API支持以下算子操作。 请注意,并非所有操作都可用于批处理和流式处理; 他们被相应地标记。
Scan,Projection和过滤
算子 | 描述 | Scan
Batch Streaming | 与SQL查询中的FROM子句类似。 执行已注册表的扫描。 val orders: Table = tableEnv.scan("Orders") | Select
BatchStreaming | 与SQL SELECT语句类似。 执行选择操作。 val orders: Table = tableEnv.scan("Orders") val result = orders.select('a, 'c as 'd) 可以使用星号(*)作为通配符,选择表格中的所有列。 val orders: Table = tableEnv.scan("Orders") val result = orders.select('*) | As
BatchStreaming | 重命名字段。 val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't) | Where / Filter
BatchStreaming | 与SQL WHERE子句类似。 过滤掉未通过过滤谓词的行。 val orders: Table = tableEnv.scan("Orders") val result = orders.filter('a % 2 === 0) 或则 val orders: Table = tableEnv.scan("Orders") val result = orders.where('b === "red") |
聚合
算子 | 描述 | GroupBy聚合
Batch Streaming Result Updating | 与SQL GROUP BY子句类似。 使用以下运行的聚合算子对分组键上的行进行分组,以按组聚合行。
val orders: Table = tableEnv.scan("Orders")
val result = orders.groupBy('a).select('a, 'b.sum as 'd)
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信 | GroupBy Window 聚合
Batch Streaming | 组和聚合组窗口上的表以及可能的一个或多个分组键。
val orders: Table = tableEnv.scan("Orders")
val result: Table = orders
.window(Tumble over 5.minutes on 'rowtime as 'w) // define window
.groupBy('a, 'w) // group by key and window
.select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate | Over Window 聚合
Batch Streaming | 与SQL OVER子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅over windows部分【后面将会有解释】。
Table orders = tableEnv.scan("Orders");
Table result = orders
// define window
.window(Over
.partitionBy("a")
.orderBy("rowtime")
.preceding("UNBOUNDED_RANGE")
.following("CURRENT_RANGE")
.as("w"))
.select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate
注意:必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个时间属性上指定ORDER BY 。 | Distinct集合
Batch Streaming Result Updating | 类似于SQL DISTINCT聚合子句,例如COUNT(DISTINCT a)。 不同聚合声明聚合函数(内置或用户定义)仅应用于不同的输入值。 Distinct可以应用于GroupBy聚合,GroupBy窗口聚合和Over Window Aggregation。
Table orders = tableEnv.scan("Orders");
// Distinct aggregation on group by
Table groupByDistinctResult = orders
.groupBy("a")
.select("a, b.sum.distinct as d");
// Distinct aggregation on time window group by
Table groupByWindowDistinctResult = orders
.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
.select("a, b.sum.distinct as d");
// Distinct aggregation on over window
Table result = orders
.window(Over
.partitionBy("a")
.orderBy("rowtime")
.preceding("UNBOUNDED_RANGE")
.as("w"))
.select("a, b.avg.distinct over w, b.max over w, b.min over w");
用户定义的聚合函数也可以与DISTINCT修饰符一起使用。 要仅为不同的值计算聚合结果,只需将distinct修饰符添加到聚合函数即可。
Table orders = tEnv.scan("Orders");
// Use distinct aggregation for user-defined aggregate functions
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信 | Distinct Batch Streaming Result Updating | 与SQL DISTINCT子句类似。 返回具有不同值组合的记录。 Table orders = tableEnv.scan("Orders");
Table result = orders.distinct();
|
最新经典文章,欢迎关注公众号
|
|