问题导读
1.如何创建不同数据类型的datastream?
2.数据类型映射分为哪两种?
3.基于位置的映射和基于名称的映射有什么区别?
上一篇
彻底明白Flink系统学习23:【Flink1.7】Table API 和SQL API介绍3【重点】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26619
数据类型映射表Schema
Flink的DataStream和DataSet API支持各种类型。 复合类型(如Tuples(内置Scala和Flink Java元组),POJO,Scala case classes和Flink的Row类型)允许嵌套数据结构具有可在Table表达式中访问的多个字段。 其他类型被视为原子类型。 在下文中,我们将描述Table API如何将这些类型转换为内部行表示,并演示将DataStream转换为表的示例。
数据类型到表模式的映射有两种:基于字段位置或基于字段名称。
基于位置的映射
基于位置的映射可用于在保持字段顺序的同时为字段提供有意义的名称。 此映射可用于具有已定义字段顺序的原子类型和复合数据类型。 复合数据类型(如元组,行和 case classes)具有此类字段顺序。 但是,POJO字段必须基于字段名字映射。
定义基于位置的映射时,指定的名称不能存在于输入数据类型中,否则API将认为映射基于字段名称进行。 如果未指定字段名称,则使用复合类型的默认字段名称和字段顺序,或者使用f0作为原子类型。
[mw_shl_code=scala,true]//获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, Int)] = ...
// 转换DataStream为Table带有默认字段名称 "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// 转换 DataStream为带有字段名字为 "myLong" 和 "myInt"的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong 'myInt)[/mw_shl_code]基于名称的映射可用于任何数据类型,包括POJO。 它是定义表模式映射的最灵活方式。 映射中的所有字段都按名称引用,并且可以重命名为别名。 字段可以重新排序和投影。
如果未指定字段名称,则使用复合类型的默认字段名称和字段顺序,或者原子类型使用f0。
[mw_shl_code=scala,true]//获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, Int)] = ...
// 转换DataStream为带有默认字段名字 "_1" 和 "_2"的Table
val table: Table = tableEnv.fromDataStream(stream)
// 转换DataStream为仅带有字段"_2"表
val table: Table = tableEnv.fromDataStream(stream, '_2)
// 转换DataStream为交换字段的Table
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// 转换DataStream为交换字段的Table并且字段名字"myInt" 和"myLong"
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)[/mw_shl_code]
原子类型
Flink将原语(Integer,Double,String)或泛型类型(无法分析和分解的类型)视为原子类型。 原子类型的DataStream或DataSet将转换为具有单个属性的表。 从原子类型推断属性的类型,并且可以指定属性的名称。
[mw_shl_code=scala,true]//获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[Long] = ...
// 转换DataStream 为带有默认字段名字为 "f0"的表
val table: Table = tableEnv.fromDataStream(stream)
// 转换DataStream 带有字段名字为 "myLong"的表
val table: Table = tableEnv.fromDataStream(stream, 'myLong)[/mw_shl_code]
元组(Scala和Java)和Case Classes(仅限Scala)Flink支持Scala的内置元组,并为Java提供自己的元组类。 两种元组的DataStream和DataSet都可以转换为表。 可以通过为所有字段提供名称(基于位置的映射)来重命名字段。 如果未指定字段名称,则使用默认字段名称。 如果引用了原始字段名称(flink元组的f0,f1,...和Scala元组的_1,_2,...),则API假定映射是基于名称而不是基于位置的。 基于名称的映射允许使用别名(as)重新排序字段和投影。
[mw_shl_code=scala,true]// 获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 转换DataStream为带有默认字段名 '_1, '_2的Table
val table: Table = tableEnv.fromDataStream(stream)
// 转换DataStream 为带有字段名的"myLong", "myString" (基于位置)Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// 转换DataStream 为带有重新排序的字段"_2", "_1" (基于名字)表
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// 转换DataStream 为带有projected field "_2" (基于名字)Table
val table: Table = tableEnv.fromDataStream(stream, '_2)
// 转换DataStream 带有排序和别名字段的"myString", "myLong" (name-based)Table
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
// 定义 case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// 转换DataStream 为带有默认字段名称 'name, 'age的Table
val table = tableEnv.fromDataStream(streamCC)
// 转换DataStream 为带有字段名字的 'myName, 'myAge (position-based)Table
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
// 转换DataStream 为有序和别名字段为 "myAge", "myName" (name-based)Table
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)[/mw_shl_code]
POJO (Java and Scala)
Flink支持POJO作为复合类型。 这里记录了决定POJO的规则。
将POJO DataStream或DataSet转换为表而不指定字段名称时,将使用原始POJO字段的名称。 名称映射需要原始名称,不能通过位置来完成。 可以使用别名(使用as关键字)重新命名字段,重新排序和投影。
补充:如果满足以下要求,则Flink将Java和Scala类视为POJO数据类型: class需要为public。 必须有一个没有参数的公共构造函数(默认构造函数)。 所有字段都是公共的,或者必须通过getter和setter函数访问。 对于名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。 Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date)。
Flink分析POJO类型的结构,即它了解POJO的字段。 因此,POJO类型比起一般类型更容易使用。 此外,Flink可以比一般类型更有效地处理POJO。 以下示例显示了一个包含两个公共字段的简单POJO。 [mw_shl_code=scala,true]class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"[/mw_shl_code] [mw_shl_code=java,true]public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"[/mw_shl_code]
Row
Row数据类型支持具有空值的字段和任意数量的字段。 可以通过RowTypeInfo指定字段名称,或则将Row DataStream或DataSet转换为表格的时候指定。 row类型支持按位置和名称映射字段。 可以通过为所有字段提供名称来重命名字段(基于位置的映射)或为投影/排序/重命名(基于名称的映射)单独选择字段来重命名字段。
[mw_shl_code=scala,true]//获取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// // Row的DataStream,在`RowTypeInfo`中指定了两个字段“name”和“age”
val stream: DataStream[Row] = ...
//将DataStream转换为表,默认字段名称为“name”,“age”
val table: Table = tableEnv.fromDataStream(stream)
//将DataStream转换为表,重命名的字段名称为“myName”,“myAge”(基于位置)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
// //将DataStream转换为表,使用重命名的字段“myName”,“myAge”(基于名称)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
//使用投影字段“name”将DataStream转换为表格(基于名称)
val table: Table = tableEnv.fromDataStream(stream, 'name)
//使用投影和重命名的字段“myName”(基于名称)将DataStream转换为表
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)[/mw_shl_code]
查询优化
Apache Flink利用Apache Calcite优化和转换查询。 当前执行的优化包括投影和过滤器下推,子查询以及其他类型的查询重写。 Flink尚未优化join的顺序,但是以查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。
通过提供CalciteConfig对象,可以调整在不同阶段应用的优化规则集。 这可以通过调用CalciteConfig.createBuilder())构建器创建,并通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)提供给TableEnvironment。
解析表
Table API提供了一种机制来解析计算表的逻辑和优化查询计划。 这是通过TableEnvironment.explain(table)方法完成的。 它返回一个描述三个计划的String:
- 关系查询的抽象语法树,即未优化的逻辑查询计划,
- 优化的逻辑查询计划
- 物理执行计划
以下代码显示了一个示例和相应的输出:
[mw_shl_code=scala,true]val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)[/mw_shl_code]
[mw_shl_code=scala,true]== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, 'F%')])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, 'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE[/mw_shl_code]
|
|