分享

彻底明白Flink系统学习23:【Flink1.7】Table API 和SQL API介绍3【重点】

pig2 2019-1-14 20:32:00 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 8195
问题导读

1.DataStream和DataSet转换为Table有几种方式?
2.如何将DataStream和DataSet注册为表?
3.如何转换DataStream 或则DataSet为表?

上一篇:
彻底明白Flink系统学习22:【Flink1.7】Table API 和SQL API介绍2
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26614

DataStream 和DataSet API转换为Table
Table API 和SQL API这是第三篇,第三篇是重点,这里讲解了DataStream和DataSet与Table的相互转换



表API和SQL查询可以集成并嵌入到DataStream和DataSet程序中。 例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,Projection,聚合或加入元数据,然后使用DataStream或DataSet API进一步处理数据。 (以及任何基于这些API构建的库,例如CEP或Gelly)。 相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。

可以通过将DataStream或DataSet转换为表来实现此交互,反之亦然。 在本节中,我们将介绍如何完成这些转换。

Scala的隐式转换
Scala Table API具有DataSet,DataStream和Table类的隐式转换。 除了org.apache.flink.api.scala._之外,还可以通过导入包org.apache.flink.table.api.scala._来为Scala DataStream API启用这些转换。

注册DataStream或则DataSet为Table
DataStream或DataSet可以在TableEnvironment中注册为表。结果表的schema取决于注册的DataStream或DataSet的数据类型。更多信息
[mw_shl_code=scala,true]// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 注册DataStream 为表 "myTable" 带字段"f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// 注册 DataStream为表"myTable2"带有字段"myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)[/mw_shl_code]
注意:DataStream表的名称不得与^ datastream table_u9]+模式匹配,DataSet表的名称不得与^ dataset table_9]+模式匹配。这些模式仅供内部使用。

转换DataStream 或则DataSet为表
它不在TableEnvironment中注册DataStream或DataSet,也可以直接转换为Table。 如果要在Table API查询中使用Table,这很方便。
[mw_shl_code=scala,true]// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 转换DataStream到Table带有默认字段 '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

//转换DataStream 到Table带有字段 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)[/mw_shl_code]

转换表为DataStream或则DataSet
表可以转换为DataStream或DataSet。 通过这种方式,可以在Table API或SQL查询的结果的基础上运行DataStream或DataSet程序。将表转换为DataStream或DataSet时,需要指定生成的DataStream或DataSet的数据类型,即要转换表行的数据类型。 通常最方便的转换类型是Row。 以下列表概述了不同选项的功能:
  • Row:字段按位置映射,任意数量的字段,支持空值,无类型安全访问。
  • POJO:字段按名称映射(POJO字段必须命名为表字段),任意数量的字段,支持空值,类型安全访问。
  • Case Class:字段按位置映射,不支持空值,类型安全访问。
  • Tuple:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持空值,类型安全访问。
  • Atomic Type:表必须具有单个字段,不支持空值,类型安全访问。


1.转换Table 为DataStream
作为流式查询的结果的表被动态更新,即,当新记录到达查询的输入流时,它正在改变。

将表转换为DataStream有两种模式:
附加模式:只有在动态表仅通过INSERT修改时才能使用此模式,即它仅附加并且以前提交的结果永远不会更新。
缩进模式:始终可以使用此模式。 它使用布尔标志对INSERT和DELETE更改进行编程。

[mw_shl_code=scala,true]// 获取 TableEnvironment.
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 带有两个字段的表 (String name, Integer age)
val table: Table = ...

// 转换为Table 为追加Row的DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// 转换为Table 为追加Tuple2[String, Int]的DataStream
val dsTuple: DataStream[(String, Int)] dsTuple =
  tableEnv.toAppendStream[(String, Int)](table)

// 转换Table为Row的retract DataStream .
// type X的retract stream 是DataStream[(Boolean, X)].
// boolean 字段 大表改变的类型.
//   True为INSERT, false为DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)[/mw_shl_code]
关于动态表,后面会详细介绍,可参考

2.转换表为DataSet
表转换为DataSet,如下所示:
[mw_shl_code=scala,true]// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 带两个字段的Table (String name, Integer age)
val table: Table = ...

// 转换表为Row的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// 转换表为Tuple2[String, Int]的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)[/mw_shl_code]


最新经典文章,欢迎关注公众号




本帖被以下淘专辑推荐:

已有(4)人评论

跳转到指定楼层
jiangzi 发表于 2019-1-15 14:06:57
可以通过将DataStream或DataSet转换为表来实现此交互~
回复

使用道具 举报

pig2 发表于 2019-1-15 20:03:11
jiangzi 发表于 2019-1-15 14:06
可以通过将DataStream或DataSet转换为表来实现此交互~

DataStream或DataSet都可以转换为Table,但是DataStream和DataSet不能彼此转换
回复

使用道具 举报

若无梦何远方 发表于 2019-9-7 09:18:08
jiangzi 发表于 2019-1-15 14:06
可以通过将DataStream或DataSet转换为表来实现此交互~

Table转换为DataStream或DataSet


转换为DataSet
-- 直接使用 tableEnv.toDataSet 方法就可以将Table转换为DataSet转换的时候,需要指定泛型,可以是一个样例类,也可以是指定为 Row 类型


转换为DataStream
-- 使用 tableEnv.toAppendStream ,将表直接附加在流上使用 tableEnv.toRetractStream ,返回一个元组(Boolean, DataStream),Boolean表示数据是否被成功获取转换的时候,需要指定泛型,
-- 可以是一个样例类,也可以是指定为 Row 类型

回复

使用道具 举报

YTP520YTP 发表于 2019-12-19 17:22:09
6666666666666
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条