彻底明白Flink系统学习21:【Flink1.7】Table API 和SQL API介绍1:注册
本帖最后由 pig2 于 2019-1-11 11:43 编辑问题导读
1.Flink sql程序结构包含哪些内容?
2.TableEnvironment的作用是什么?
3.如何创建TableEnvironment?
4.如何注册表?
彻底明白Flink系统学习20:【Flink1.7】Table API 和SQL概述
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26598
Table API和SQL集成在一个联合API中。 此API的核心概念是一个表,它用作查询的输入和输出。 本文讲解了具有Table API和SQL查询的程序的常见结构,如何注册表,如何查询表以及如何发出表。
表API和SQL程序的结构
批处理和流式所有Table API和SQL程序都遵循相同的模式。 以下代码示例显示了Table API和SQL程序的常见结构。
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)
// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult= tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")
// execute
env.execute()
注意:表API和SQL查询可以轻松集成并嵌入到DataStream或DataSet程序中。 下面我们会讲解DataStream和DataSet API的集成部分,以及如何将DataStream和DataSet转换为Tables,反之亦然。
创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念。 它负责:
[*]在内部目录中注册表
[*]注册外部目录
[*]执行SQL查询
[*]注册用户定义的(标量,表或聚合)函数
[*]将DataStream或DataSet转换为表
[*]可引用ExecutionEnvironment或StreamExecutionEnvironment
表始终绑定到特定的TableEnvironment。 不可能整合不同TableEnvironments 的表在相同的查询中等,如join或则union
创建StreamExecutionEnvironment 或则一个ExecutionEnvironmentTableConfig,然后调用静态方法 TableEnvironment.getTableEnvironment()创建TableEnvironment。另外有一个可选的TableConfig,可用于配置TableEnvironment或自定义查询优化和转换过程。// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
在目录中注册表
TableEnvironment维护按名称注册的表的目录。有两种类型的表,输入表和输出表。 输入表可以在表API和SQL查询中引用,并提供输入数据。 输出表可用于将Table API或SQL查询的结果发送到外部系统。
可以从各种来源注册输入表:
[*]现有的Table对象,通常是Table API或SQL查询的结果。
[*]TableSource,用于访问外部数据,例如文件,数据库或消息传递系统。
[*]一个DataStream 或则DataSet也可以成为表的数据源。
可以使用TableSink注册输出表。
注册表
Table在TableEnvironment中注册如下:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").select(...)
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)
注意:注册表的处理方式与关系数据库系统中已知的VIEW类似,即定义表的查询未优化,但在另一个查询引用注册表时将内联。 如果多个查询引用相同的注册表,则将为每个引用查询内联并多次执行,即,不会共享注册表的结果。
注册TableSource
TableSource提供对外部数据的访问,外部数据存储在存储系统中,例如数据库(MySQL,HBase,...),具有特定编码的文件(CSV,Apache ,...)或消息传递 系统(Apache Kafka,RabbitMQ,...)。
Flink旨在为常见的数据格式和存储系统提供TableSource。 关于Table Sources and Sinks信息,TableSource列表以及如何构建自定义TableSource,后面文章会有说明。
TableSource在TableEnvironment中注册如下:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
注册TableSink
注册的TableSink可用于将Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [ Parquet,Avro,ORC],...)。
Flink旨在为常见的数据格式和存储系统提供TableSink。 有关可用接收器的详细信息以及有关如何实现自定义TableSink的说明,后面文章会有说明。。
TableSink在TableEnvironment中注册如下:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array = Array("a", "b", "c")
val fieldTypes: Array] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
注册外部目录
外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。
可以通过实现ExternalCatalog接口创建外部目录,并在TableEnvironment中注册,如下所示:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
在TableEnvironment中注册后,可以通过指定其完整路径(例如catalog.database.table)从Table API或SQL查询中访问ExternalCatalog中定义的所有表。
目前,Flink提供InMemoryExternalCatalog用于演示和测试目的。 但是,ExternalCatalog接口也可用于将目录(如HCatalog或Metastore)连接到Table API。
未完待续
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享 学习了,感谢分享!
页:
[1]