breaking 发表于 2016-1-7 12:37:22

Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues


问题导读:

1.spark中的partitionBy怎么理解?
2.spark中的mapValues怎么理解?
3.spark中的flatMapValues怎么理解?



static/image/hrline/2.gif



partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD at makeRDD at :21

scala> rdd1.partitions.size
res20: Int = 2

//查看rdd1中每个分区的元素
scala> rdd1.mapPartitionsWithIndex{
   |         (partIdx,iter) => {
   |         var part_map = scala.collection.mutable.Map]()
   |             while(iter.hasNext){
   |               var part_name = "part_" + partIdx;
   |               var elem = iter.next()
   |               if(part_map.contains(part_name)) {
   |               var elems = part_map(part_name)
   |               elems ::= elem
   |               part_map(part_name) = elems
   |               } else {
   |               part_map(part_name) = List[(Int,String)]{elem}
   |               }
   |             }
   |             part_map.iterator
   |            
   |         }
   |       }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中

//使用partitionBy重分区
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD at partitionBy at :23

scala> rdd2.partitions.size
res23: Int = 2

//查看rdd2中每个分区的元素
scala> rdd2.mapPartitionsWithIndex{
   |         (partIdx,iter) => {
   |         var part_map = scala.collection.mutable.Map]()
   |             while(iter.hasNext){
   |               var part_name = "part_" + partIdx;
   |               var elem = iter.next()
   |               if(part_map.contains(part_name)) {
   |               var elems = part_map(part_name)
   |               elems ::= elem
   |               part_map(part_name) = elems
   |               } else {
   |               part_map(part_name) = List[(Int,String)]{elem}
   |               }
   |             }
   |             part_map.iterator
   |         }
   |       }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

    mapValues

def mapValues(f: (V) => U): RDD[(K, U)]

同基本转换操作中的map,只不过mapValues是针对中的V值进行map操作

scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD at makeRDD at :21

scala> rdd1.mapValues(x => x + "_").collect
res26: Array[(Int, String)] = Array((1,A_), (2,B_), (3,C_), (4,D_))

flatMapValues


def flatMapValues(f: (V) => TraversableOnce): RDD[(K, U)]

同基本转换操作中的flatMap,只不过flatMapValues是针对中的V值进行flatMap操作。

scala> rdd1.flatMapValues(x => x + "_").collect
res36: Array[(Int, Char)] = Array((1,A), (1,_), (2,B), (2,_), (3,C), (3,_), (4,D), (4,_))


原文链接:http://lxw1234.com/archives/2015/07/356.htm





页: [1]
查看完整版本: Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues