本帖最后由 pig2 于 2019-1-13 19:20 编辑
问题导读
1.Table API有哪两种语言?
2.你认为Table API包含哪些内容?
3.Flink的SQL集成是基于什么?
4.如何提交Table到TableSink?
彻底明白Flink系统学习21:【Flink1.7】Table API 和SQL API介绍1:注册
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26600
表查询
表 API
Table API是Scala和Java的语言集成查询API。
API 基于Table类【代表表(流或批处理)】,提供应用关系操作的方法。这些方法返回一个新的Table对象,该对象表示在输入表上应用关系运算的结果。 一些关系操作由多个方法调用组成,例如table.groupBy(...)。select(),其中groupBy(...)指定表的分组,并选择(...)分组上的投影 表。
Table API文档描述了流和批处理表支持的所有Table API操作。【表 API跟我们的关系数据库是差不多的,无非是查询、过滤、分组、join等】
以下示例显示了一个简单的Table API聚合查询:
[mw_shl_code=scala,true]
// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册Orders table
// scan 注册Orders table
val orders = tableEnv.scan("Orders")
//计算来自法国的所有客户的收入
val revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// emit 或则转换表
// 执行查询[/mw_shl_code]
注意:Scala Table API使用Scala符号,它以单个tick(')开头,以引用Table的属性。 Table API使用 Scala implicits。 确保导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._以便使用Scala隐式转换。
SQL
Flink的SQL集成基于Apache Calcite,它实现了SQL标准。 SQL查询被指定为常规字符串。
SQL文档描述了Flink对流和批处理表的SQL支持。
以下示例显示如何指定查询并将结果作为表返回。
[mw_shl_code=scala,true]// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册Orders 表
// //计算来自法国的所有客户的收入
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit 或则转换 Table
// 执行query[/mw_shl_code]
以下示例说明如何指定将其结果插入注册表的更新查询。
[mw_shl_code=scala,true]// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册"Orders" table
// 注册"RevenueFrance" 输出table
//计算来自法国的所有客户的收入和emit 到"RevenueFrance"
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 执行查询[/mw_shl_code]
表API和SQL结合
表API和SQL查询可以一起使用,因为它们都返回Table对象:
- 可以在SQL查询返回的Table对象上定义Table API查询。
- 通过在TableEnvironment中注册(结果)表,并在SQL查询的FROM子句中引用它,可以在Table API查询的结果上定义SQL查询。
提交Table
通过将表写入提交到TableSink。 TableSink是支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如,JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如,Apache Kafka,RabbitMQ的)。
批处理表只能写入BatchTableSink,而流表需要AppendStreamTableSink,RetractStreamTableSink或UpsertStreamTableSink。
Table.insertInto(String tableName)方法将Table提交到注册的TableSink。 该方法通过名称从目录中查找TableSink,并验证Table的schema是否与TableSink的schema 相同。
以下示例显示了如何提交表:
[mw_shl_code=scala,true]Scala
// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// 注册带有指定schema的TableSink
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
//使用Table API运算符和/或SQL查询 计算结果表
val result: Table = ...
// 提交结果到TableSink(已注册)
result.insertInto("CsvSinkTable")
//执行程序[/mw_shl_code]
转换并执行查询
表API和SQL查询将转换为DataStream或DataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:
- 优化逻辑计划,
- 转换为DataStream或DataSet程序。
在以下情况下转换表API或SQL查询:
- 当调用Table.insertInto()的时候,将Table 提交到TableSink
- 当调用TableEnvironment.sqlUpdate()的时候,指定SQL 更新查询
- 一个Table转换为 DataStream 或则DataSet
转换后,Table API或SQL查询将像常规DataStream或DataSet程序一样处理,并在调用StreamExecutionEnvironment.execute()或ExecutionEnvironment.execute()时执行。
|
|