分享

彻底明白Flink系统学习14:【Flink1.7】DataSet 编程之Transformations详解

pig2 2018-12-24 18:57:49 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 12942
问题导读
1.DataSet与 Datastream Transformations 都有哪些API?
2.DataSet MapPartition的作用是什么?
3.ReduceGroup与Reduce有什么区别?
4.Join Hints是什么?
5.Flink是否所有外部联接类型都支持所有执行策略?
6.CoGroup与join的区别是什么?
7.Cross使用需要注意什么?
8.Range-Partition可以解决什么问题?

上一篇:
彻底明白Flink系统学习13:【Flink1.7】流连接器介绍及如何添加连接器
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26512

相关篇:
彻底明白Flink系统学习9:【Flink1.7编程】数据流Transformations介绍2窗口及相关操作
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26469

尽管许多人都很欣赏大多数行业中流数据处理的潜在价值,但是有许多使用情况,人们认为没有必要以流方式处理数据。 在所有这些情况下,批处理都是可行的方法。 到目前为止,Hadoop一直是数据处理的默认选择。 但是,Flink还支持DataSet API的批处理数据处理。

对于Flink,批处理是流处理的特例。 在本文中,我们将查看有关DataSet API的详细信息。

Transformations将数据集从一种形式转换为另一种形式。 输入可以是一个或多个数据集,输出也可以是零,或者一个或多个数据流。也就是说Transformations将一个或多个DataSet转换为新的DataSet。 程序可以将多个Transformations组合到复杂的程序集中。

Map
取每个元素转换为另外的元素
[mw_shl_code=scala,true]data.map { x => x.toInt }
[/mw_shl_code]

FlatMap
取每个元素转换为0, 1, 或则更多元素
[mw_shl_code=scala,true]data.flatMap { str => str.split(" ") }
[/mw_shl_code]

MapPartition
单个函数调用并行分区,该函数将分区作为“迭代器”,可以产生任意数量的结果。每个分区中的元素数量取决于并行度和以前的operations。
MapPartition和map是类似的,不过MapPartition相对来说效率高一些。
map有多少个元素,map就会被调用多少次
MapPartition对于数据有多少分区,就会被调用多少次。
[mw_shl_code=scala,true]data.mapPartition { in => in map { (_, 1) } }
[/mw_shl_code]

Filter
计算每个元素的布尔函数,并保留函数返回true的元素。
重要信息:系统假定该函数不会修改元素。 违反此假设可能会导致错误的结果。

[mw_shl_code=scala,true]data.filter { _ > 1000 }
[/mw_shl_code]

Reduce
通过将两个元素重复组合成一个元素,将一组元素组合成一个元素。 Reduce可以应用于完整数据集或分组数据集。
[mw_shl_code=scala,true]data.reduce { _ + _ }
[/mw_shl_code]

ReduceGroup
将一组元素组合成一个或多个元素。 ReduceGroup可以应用于完整数据集或分组数据集。
[mw_shl_code=scala,true]data.reduceGroup { elements => elements.sum }
[/mw_shl_code]

Aggregate
将一组值聚合为单个值。 聚合函数可以被认为是内置的reduce函数。 聚合可以应用于完整数据集或分组数据集。
[mw_shl_code=scala,true]val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)[/mw_shl_code]

可以使用简写语法进行最小,最大和求和聚合。
[mw_shl_code=scala,true]val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)[/mw_shl_code]

Distinct
返回数据集的不同元素。 它相对于元素的所有字段或字段子集从输入DataSet中删除重复条目。
[mw_shl_code=scala,true]data.distinct()[/mw_shl_code]

Join
通过创建在其key上相等的所有元素对来连接两个数据集。 可以使用JoinFunction将元素对转换为单个元素,或使用FlatJoinFunction将元素对转换为任意多个(包括无)元素。
补充:如何定义key:以DataSet分组来说:
[mw_shl_code=scala,true]DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);[/mw_shl_code]
我们看到groupBy中可以自定义key

下面情况下,元组字段用作键。
“0”是第一个元组的连接字段
“1”是第二个元组的连接字段。
[mw_shl_code=scala,true]// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)[/mw_shl_code]

可以通过Join Hints来指定运行时执行join的方式。这些提示描述了连接是通过分区还是广播,以及它是使用基于排序的(sort-based)还是基于散列(hash-based)的算法。如果没有指定链接方式,系统将尝试对输入大小进行评估,并根据这些评估选择最佳策略。
注意,连接transformation 仅适用于等连接。 其他连接类型需要使用OuterJoin或CoGroup表示。

补充:
上面我们看到Join Hints翻译为中文是链接提示的意思。那么它的具体是什么含义,我们慢慢来看:
Flink运行时可以以各种方式执行连接。 在不同情况下,每种可能的方式都优于其他方式。 系统会尝试自动选择合理的方式,但允许手动选择策略,强制执行外连接的特定方式。对于那么特定的方式有哪些?我们这里举下面例子:
[mw_shl_code=scala,true]val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")[/mw_shl_code]
我们可以看到JoinHint.BROADCAST_HASH_FIRST,这里或许我们对JoinHint有了认识,我们接着往下看。

以下hints 可用:
OPTIMIZER_CHOOSES:相当于不提供任何提示,将选择留给系统。
BROADCAST_HASH_FIRST:广播第一个输入并从中构建一个哈希表,由第二个输入探测。如果第一个输入非常小,那么这是一个很好的策略
BROADCAST_HASH_SECOND:广播第二个输入并从中构建一个哈希表,由第一个输入探测。如果第二个输入非常小,这是一个很好的策略。
REPARTITION_HASH_FIRST:系统对每个输入进行分区(shuffle)(除非输入已经分区)并从第一个输入构建哈希表。如果第一个输入小于第二个输入,则此策略很好,尽管两个输入仍然很大。
REPARTITION_HASH_SECOND:系统对每个输入进行分区(shuffle)(除非输入已经分区)并从第二个输入构建哈希表。如果第二个输入小于第一个输入,则此策略很好,尽管两个输入仍然很大。
REPARTITION_SORT_MERGE:系统对每个输入进行分区(shuffle)(除非输入已经被分区)并对每个输入进行排序(除非它已经排序)。输入通过已排序输入的流合并来连接。如果已经对一个或两个输入进行了排序,则此策略很好。
注意:并非所有外部联接类型都支持所有执行策略。
  • LeftOuterJoin supports:
    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_SECOND
    • REPARTITION_HASH_SECOND
    • REPARTITION_SORT_MERGE
  • RightOuterJoin supports:
    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_FIRST
    • REPARTITION_HASH_FIRST
    • REPARTITION_SORT_MERGE
  • FullOuterJoin supports:
    • OPTIMIZER_CHOOSES
    • REPARTITION_SORT_MERGE

OuterJoin
对两个数据集执行左、右或完全外部连接。外部联接类似于常规(内部)联接,在键上创建相等的所有元素对。此外,如果在另一侧没有找到匹配的键,则保留“外部”的记录(左、右,或者在满的情况下两者都保留)。将匹配元素对(或一个元素和另一个输入的“空”值)赋给JoinFunction以将元素对转换为单个元素,或给FlatJoinFunction以将元素对转换为任意多个(包括无)元素。
[mw_shl_code=scala,true]val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
   (left, right) =>
     val a = if (left == null) "none" else left._1
     (a, right)
  }[/mw_shl_code]

CoGroup:

reduce操作的二维变体。 将一个或多个字段上的每个输入分组,然后join组。每对组调用transformation 函数。其实,这个跟join其实是类似的,除了输出匹配的元素对以外,未能匹配的元素也会输出,而join不同,输出则不同。

[mw_shl_code=scala,true]data1.coGroup(data2).where(0).equalTo(1)
[/mw_shl_code]
关于CoGroup等,推荐文章
Flink中cogroup, join和coflatmap比较
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26532


Cross
构建两个输入的笛卡尔积(交叉乘积),创建所有元素对。 可选择使用CrossFunction将元素对转换为单个元素
[mw_shl_code=scala,true]val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)[/mw_shl_code]
注意:Cross可能是一个计算密集型操作,甚至可以挑战大型集群! 建议使用crossWithTiny()和crossWithHuge()来提示系统DataSet大小。

Union
生成两个数据集的并集。
[mw_shl_code=scala,true]data.union(data2)
[/mw_shl_code]

Rebalance
均匀地重新平衡数据集的并行分区以消除数据偏差。 只有类似Map的转换可以跟着rebalance操作使用。
[mw_shl_code=scala,true]val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)[/mw_shl_code]

Hash-Partition        
根据给定的key对一个数据集进行hash分区。Key可以是position keys, expression keys, 或者key selector functions
[mw_shl_code=scala,true]val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }[/mw_shl_code]

Range-Partition        
根据给定的key对一个数据集进行范围分区。Key可以是position keys, expression keys, 或者key selector functions
[mw_shl_code=scala,true]val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }[/mw_shl_code]

补充:
Hash-Partition与Range-Partition的区别?
HashPartitioner分区的原理,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。
RangePartitioner分区器的主要作用就是将一定范围内的数映射到某一个分区内,分界算法对应的函数是rangeBounds。RangePartitioner分区的优势,可以解决HashPartitioner中可能出现的数据不均匀现象,每个分区的数据,都比另外的分区的数据大或则小。


Custom Partitioning        

使用自定义Partitioner函数分配基于key的指定分区的记录,也就是说根据我们的自定义分区,来对数据进行划分。这个有点像我们的Hbase预分区,而这里我们只是指定了或则自定义了分区器。
注意:此方法仅适用于单个字段。
[mw_shl_code=scala,true]val in: DataSet[(Int, String)] = // [...]
val result = in
  .partitionCustom(partitioner, key).mapPartition { ... }[/mw_shl_code]

Sort Partition        
以指定的顺序在指定的字段上对本地数据集的所有分区排序。 Fields 可以指定为 tuple positions 或则字段expressions.。通过sortPartition()调用来完成对多个字段的排序。
[mw_shl_code=scala,true]val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }[/mw_shl_code]

First-n

返回数据集的前n个(任意)元素。 First-n可以应用于常规数据集,分组数据集或分组排序数据集。 可以将分组keys 指定为 key-selector函数,uple positions 或则class fields。
[mw_shl_code=scala,true]val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)[/mw_shl_code]

以下transformations 可用于元组的数据集:

MinBy / MaxBy        
从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。 用于比较的字段必须是有效的key字段,即可比较的字段。 如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。 MinBy(MaxBy)可以应用于完整数据集或分组数据集。
[mw_shl_code=scala,true]val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                                             .minBy(1)[/mw_shl_code]

Project
从元组中选择字段的子集【Java】
[mw_shl_code=java,true]DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0);[/mw_shl_code]


关注最新经典文章,欢迎关注公众号



已有(2)人评论

跳转到指定楼层
huangrong 发表于 2019-5-11 10:52:14
打开。。。如果提示no implicits found for parametere evidence.....,得先导入import org.apache.flink.api.scala._
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条