彻底明白Flink系统学习23:【Flink1.7】Table API 和SQL API介绍3【重点】
问题导读1.DataStream和DataSet转换为Table有几种方式?
2.如何将DataStream和DataSet注册为表?
3.如何转换DataStream 或则DataSet为表?
上一篇:
彻底明白Flink系统学习22:【Flink1.7】Table API 和SQL API介绍2
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26614
DataStream 和DataSet API转换为Table
Table API 和SQL API这是第三篇,第三篇是重点,这里讲解了DataStream和DataSet与Table的相互转换
http://www.aboutyun.com/data/attachment/forum/201901/09/185951srxsso9xsys3dxwl.png
表API和SQL查询可以集成并嵌入到DataStream和DataSet程序中。 例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,Projection,聚合或加入元数据,然后使用DataStream或DataSet API进一步处理数据。 (以及任何基于这些API构建的库,例如CEP或Gelly)。 相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。
可以通过将DataStream或DataSet转换为表来实现此交互,反之亦然。 在本节中,我们将介绍如何完成这些转换。
Scala的隐式转换
Scala Table API具有DataSet,DataStream和Table类的隐式转换。 除了org.apache.flink.api.scala._之外,还可以通过导入包org.apache.flink.table.api.scala._来为Scala DataStream API启用这些转换。
注册DataStream或则DataSet为Table
DataStream或DataSet可以在TableEnvironment中注册为表。结果表的schema取决于注册的DataStream或DataSet的数据类型。更多信息
// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 注册DataStream 为表 "myTable" 带字段"f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// 注册 DataStream为表"myTable2"带有字段"myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
注意:DataStream表的名称不得与^ datastream table_u9]+模式匹配,DataSet表的名称不得与^ dataset table_9]+模式匹配。这些模式仅供内部使用。
转换DataStream 或则DataSet为表
它不在TableEnvironment中注册DataStream或DataSet,也可以直接转换为Table。 如果要在Table API查询中使用Table,这很方便。
// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 转换DataStream到Table带有默认字段 '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
//转换DataStream 到Table带有字段 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
转换表为DataStream或则DataSet
表可以转换为DataStream或DataSet。 通过这种方式,可以在Table API或SQL查询的结果的基础上运行DataStream或DataSet程序。将表转换为DataStream或DataSet时,需要指定生成的DataStream或DataSet的数据类型,即要转换表行的数据类型。 通常最方便的转换类型是Row。 以下列表概述了不同选项的功能:
[*]Row:字段按位置映射,任意数量的字段,支持空值,无类型安全访问。
[*]POJO:字段按名称映射(POJO字段必须命名为表字段),任意数量的字段,支持空值,类型安全访问。
[*]Case Class:字段按位置映射,不支持空值,类型安全访问。
[*]Tuple:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持空值,类型安全访问。
[*]Atomic Type:表必须具有单个字段,不支持空值,类型安全访问。
1.转换Table 为DataStream
作为流式查询的结果的表被动态更新,即,当新记录到达查询的输入流时,它正在改变。
将表转换为DataStream有两种模式:
附加模式:只有在动态表仅通过INSERT修改时才能使用此模式,即它仅附加并且以前提交的结果永远不会更新。
缩进模式:始终可以使用此模式。 它使用布尔标志对INSERT和DELETE更改进行编程。
// 获取 TableEnvironment.
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 带有两个字段的表 (String name, Integer age)
val table: Table = ...
// 转换为Table 为追加Row的DataStream
val dsRow: DataStream = tableEnv.toAppendStream(table)
// 转换为Table 为追加Tuple2的DataStream
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// 转换Table为Row的retract DataStream .
// type X的retract stream 是DataStream[(Boolean, X)].
// boolean 字段 大表改变的类型.
// True为INSERT, false为DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream(table)
关于动态表,后面会详细介绍,可参考
2.转换表为DataSet
表转换为DataSet,如下所示:
// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 带两个字段的Table (String name, Integer age)
val table: Table = ...
// 转换表为Row的DataSet
val dsRow: DataSet = tableEnv.toDataSet(table)
// 转换表为Tuple2的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
可以通过将DataStream或DataSet转换为表来实现此交互~ jiangzi 发表于 2019-1-15 14:06
可以通过将DataStream或DataSet转换为表来实现此交互~
DataStream或DataSet都可以转换为Table,但是DataStream和DataSet不能彼此转换
jiangzi 发表于 2019-1-15 14:06
可以通过将DataStream或DataSet转换为表来实现此交互~
Table转换为DataStream或DataSet
转换为DataSet
-- 直接使用 tableEnv.toDataSet 方法就可以将Table转换为DataSet转换的时候,需要指定泛型,可以是一个样例类,也可以是指定为 Row 类型
转换为DataStream
-- 使用 tableEnv.toAppendStream ,将表直接附加在流上使用 tableEnv.toRetractStream ,返回一个元组(Boolean, DataStream),Boolean表示数据是否被成功获取转换的时候,需要指定泛型,
-- 可以是一个样例类,也可以是指定为 Row 类型
6666666666666
页:
[1]