分享

Flink系统学习32-1:Table API详解

pig2 2019-2-25 11:24:57 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 5592
问题导读

1.流处理和批处理是否使用同一套table API?
2.是否所有算子都可以用于流处理和批处理?
3.本文讲了哪些聚合算子?


上一篇:
Flink系统学习31-2:Table API和SQL之读取外部数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26739


Table API是用于流和批处理的统一关系API。表API查询可以在批量或流式输入上运行而无需修改。 Table API是SQL语言的超级集合,专门用于Apache Flink。 Table API是Scala和Java的语言集成API。 Table API查询不是像SQL中常见的那样将查询指定为String值,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持。

Table API与Flink的SQL集成共享其API的许多概念和部分。查看Common Concepts&API以了解如何注册表或创建Table对象。 Streaming Concepts讨论了流特定的概念,例如动态表和时间属性。

以下示例假设一个名为Orders的已注册表,其中包含属性(a,b,c,rowtime)。 rowtime字段是流式传输中的逻辑时间属性或批处理中的常规时间戳字段。

概述和示例
Table API可用于Scala和Java。 Scala Table API利用Scala表达式,Java Table API基于字符串,这些字符串被解析并转换为等效表达式。

以下示例显示了Scala和Java Table API之间的差异。 表程序在批处理环境中执行。 它按字段a扫描Orders表,分组,并计算每组的结果行数。 表程序的结果转换为Row类型的DataSet并打印。

由于Scala比较简洁,这里只做了Scala方面的内容,如果想看Java方面的,可参考Flink官网中文文档

Scala:
通过导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._来启用Scala Table API。

以下示例显示了如何构造Scala Table API程序。 使用Scala符号引用表属性,该符号以撇号字符(')开头。

[mw_shl_code=scala,true]import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table in table environment
// ...

// specify table program
val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)

val result = orders
               .groupBy('a)
               .select('a, 'b.count as 'cnt)
               .toDataSet[Row] // conversion to DataSet
               .print()[/mw_shl_code]
下一个示例显示了一个更复杂的Table API程序。 程序再次扫描Orders表。 它过滤空值,规范化String类型的字段a,并计算每小时和产品的平均计费金额b。
[mw_shl_code=scala,true]// environment configuration
// ...

// specify table program
val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)

val result: Table = orders
        .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
        .select('a.lowerCase() as 'a, 'b, 'rowtime)
        .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
        .groupBy('hourlyWindow, 'a)
        .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)[/mw_shl_code]
由于Table API是批量和流数据的统一API,因此两个示例程序都可以在批处理和流式输入上执行,而无需对表程序本身进行任何修改。 在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参阅流式概念)。

算子

Table API支持以下算子操作。 请注意,并非所有操作都可用于批处理和流式处理; 他们被相应地标记。

Scan,Projection和过滤

算子        
描述
Scan
Batch Streaming
与SQL查询中的FROM子句类似。 执行已注册表的扫描。
val orders: Table = tableEnv.scan("Orders")
Select
BatchStreaming
与SQL SELECT语句类似。 执行选择操作。
val orders: Table = tableEnv.scan("Orders")
val result = orders.select('a, 'c as 'd)
可以使用星号(*)作为通配符,选择表格中的所有列。
val orders: Table = tableEnv.scan("Orders")
val result = orders.select('*)
As
BatchStreaming
重命名字段。
val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't)
Where / Filter
BatchStreaming
与SQL WHERE子句类似。 过滤掉未通过过滤谓词的行。
val orders: Table = tableEnv.scan("Orders")
val result = orders.filter('a % 2 === 0)
或则
val orders: Table = tableEnv.scan("Orders")
val result = orders.where('b === "red")

聚合
算子 描述
GroupBy聚合
Batch Streaming Result Updating
与SQL GROUP BY子句类似。 使用以下运行的聚合算子对分组键上的行进行分组,以按组聚合行。
val orders: Table = tableEnv.scan("Orders")
val result = orders.groupBy('a).select('a, 'b.sum as 'd)
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
GroupBy Window 聚合
Batch Streaming
组和聚合组窗口上的表以及可能的一个或多个分组键。
val orders: Table = tableEnv.scan("Orders")
val result: Table = orders
    .window(Tumble over 5.minutes on 'rowtime as 'w) // define window
    .groupBy('a, 'w) // group by key and window
    .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate
Over Window 聚合
Batch Streaming
与SQL OVER子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅over windows部分【后面将会有解释】。
Table orders = tableEnv.scan("Orders");
Table result = orders
    // define window
    .window(Over  
      .partitionBy("a")
      .orderBy("rowtime")
      .preceding("UNBOUNDED_RANGE")
      .following("CURRENT_RANGE")
      .as("w"))
    .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate
注意:必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个时间属性上指定ORDER BY 。
Distinct集合
Batch Streaming Result Updating
类似于SQL DISTINCT聚合子句,例如COUNT(DISTINCT a)。 不同聚合声明聚合函数(内置或用户定义)仅应用于不同的输入值。 Distinct可以应用于GroupBy聚合,GroupBy窗口聚合和Over Window Aggregation。
Table orders = tableEnv.scan("Orders");
// Distinct aggregation on group by
Table groupByDistinctResult = orders
    .groupBy("a")
    .select("a, b.sum.distinct as d");
// Distinct aggregation on time window group by
Table groupByWindowDistinctResult = orders
    .window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
    .select("a, b.sum.distinct as d");
// Distinct aggregation on over window
Table result = orders
    .window(Over
        .partitionBy("a")
        .orderBy("rowtime")
        .preceding("UNBOUNDED_RANGE")
        .as("w"))
    .select("a, b.avg.distinct over w, b.max over w, b.min over w");
用户定义的聚合函数也可以与DISTINCT修饰符一起使用。 要仅为不同的值计算聚合结果,只需将distinct修饰符添加到聚合函数即可。
Table orders = tEnv.scan("Orders");

// Use distinct aggregation for user-defined aggregate functions
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");

注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。 请提供具有有效保留间隔的查询配置,以防止状态过大。 请参阅查询配置了解详细信
Distinct
Batch Streaming Result Updating
与SQL DISTINCT子句类似。 返回具有不同值组合的记录。
Table orders = tableEnv.scan("Orders");
Table result = orders.distinct();


最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条