本帖最后由 fc013 于 2016-8-14 13:50 编辑
问题导读: 1.RDD支持哪些操作? 2.什么是Shuffle? 3.怎样用Scala写spark程序?
RDD 操作介绍 RDD 有两种操作方式的概念:
- 转化(transformations),指的是从一个数据集生产另一个数据集的处理
- 动作(actions),指的是在数据集上计算而后得到一个值的处理
常用的转化Transformation | Meaning | map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. | filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true. | flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). | mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T. | mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T. | sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. | union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. | intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. | distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. | groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks. | reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. | sortByKey([ascending], [numTasks]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. | join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. | cogroup(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith. | cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). | pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. | coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. | repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. | repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. | 常用的动作[td] Action | Meaning | reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. | collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. | count() | Return the number of elements in the dataset. | first() | Return the first element of the dataset (similar to take(1)). | take(n) | Return an array with the first n elements of the dataset. | takeSample(withReplacement, num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. | takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. | saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. | saveAsSequenceFile(path) (Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). | saveAsObjectFile(path) (Java and Scala) | Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile(). | countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. | foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. |
RDD 是惰性的 为了高效计算,RDD 的transformations被设计为惰性(lazy),transformations不会立即执行,只会记录如何操作,直到action时,transformations才会被执行。
例如:
[mw_shl_code=scala,true]val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)[/mw_shl_code]
当执行map时,spark并没有生(s,1)这些格式的数据,直到执行reduceByKey时,才开始生成对应的数据。
默认情况下,每次执行action都会重新计算一次,除非使用persist 或 cache 持久化了RDD。
RDD操作方法支持匿名函数或静态方法,无法在分布式环境传递实例方法。其也支持自定义对象,但自定义对象在作为RDD的key使用时必须确保自定义 equals() 方法和 hashCode() 方法是匹配的。
RDD 持久化RDD能通过persist()或者cache()方法持久化。
此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到 Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY。完整的存储级别在之前 RDD概念部分已经有列出。
Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用RDD.unpersist()方法。
Shuffle 简单介绍Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce 阶段之间。当Map的输出结果在被Reduce使用之前,输出结果先按key哈希,然后分发到每一个Reducer上,这个过程就是shuffle。由于shuffle涉及到了磁盘I/O和网络I/O,以及序列化,因此shuffle性能的高低直接影响到了整个程序的运行效率。
下面两幅图中Map阶段和Reduce 阶段之间的复杂处理就是shuffle,也形象地说明了为什么shuffle的性能会影响整个阶段。
很多时候一些性能问题都是shuffle这里出现的,比如因为任务执行的数据集过大而导致shuffle为每一个任务所创建哈希表变非常大,以至于无法加载到内存中,出现OutOfMemory 的错误。
两个很详细说明shuffle在spark和haddopp的区别的文章,要详细了解shuffle请仔细阅读:
共享变量一般来说,spark操作函数(如map或者reduce)中的变量都不支持分布式,每个节点上都是独立的副本。只有以下两种变量才是spark支持的共享变量:
广播变量广播变量允许在每台节点机器上缓存只读的变量,而不是每个task持有一份拷贝副本。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。
使用方式如下:
[mw_shl_code=scala,true]scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)[/mw_shl_code]
广播变量在创建之后,可以在集群的任意函数中使用,只是在广播之后,对应的变量是不能被修改的,因为修改的值不会被广播出去。
累加器累加器是解决RDD并行操作实现count计数、sum求和等情况涉及“加”操作的变量。Spark已原生支持数字类型的累加器,自定义类型必须自己再实现。 使用方式:
[mw_shl_code=scala,true]scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10[/mw_shl_code]
- zero ,提供类似数学中0这个值一般作用的值得方法
- addInPlace 如何计算两个值得和
例子:
[mw_shl_code=scala,true]object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)[/mw_shl_code]
如果使用Scala写spark,还可以用 Accumulable接口实现。也可以SparkContext.accumulableCollection累加scala中的基本集合类型。
RDD Demo 我们在这里用一个简单的Demo简述下如何实现一个自定义的RDD。
RDD 的自定义涉及主要代码结构[mw_shl_code=scala,true]abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
* rdd的子类如何从一个partition取值
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
* 如何生成RDD的partition,该方法之后调用一次
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
* 如何得到RDD的依赖
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* Optionally overridden by subclasses to specify placement preferences.
* 子类在有特殊放置选择时可选择性实现
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
}[/mw_shl_code]
除了上述可重载的方法外,RDD代码中还有很多map等各类方法,RDD子类都是可以使用的,所以这里我们将不做介绍。
一般来说,自定义一个RDD最核心涉及的两个需要重载是 compute 和 getPartitions。前者是从RDD中取值的方法,后者是RDD如何存放数据的方法。
接下来我们实现一个持有string集合的RDD,并且为了简化实现,demo不会和spark中textfil利用HadoopRDD进行数据存储,demo将直接简单的用Partition的属性持有string值:
我们自定义的RDD:
[mw_shl_code=scala,true] /**
* RDD数据实际是放在Partition中,我们这里就简单建一个Partition,用属性Content持有数据
*/
class ContentPartition(idx: Int, val content: String) extends Partition {
override def index: Int = idx
}
/**
* 我们的RDD继承于RDD[String]。Nil表示空集合,参数含义是空依赖,因为我们的目的是实现数据源的RDD功能,之前是没有依赖的
*/
class ContentRDD(sc : SparkContext,strs: Array[String]) extends RDD[String](sc,Nil){
/**
* 由于是数据源RDD,partition只能我们实现,这里我们就简单的按string集合的数量实现对应数量的partition,partition重要的id标志我们就简单用index表示
*/
override def getPartitions : Array[Partition] = {
val array = new Array[Partition](strs.size)
for (i <- 0 until strs.size) {
array(i) = new ContentPartition(i, strs(i))
}
array
}
/**
* 拿取string,我们就简单的拿Array[String]生成一个迭代器
*/
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
val splits = split.asInstanceOf[ContentPartition]
Array[String](splits.content).toIterator
}
}[/mw_shl_code]
好的,我们已经建好了自己的RDD,接下来我们就利用这个RDD做一个字符数量的统计:
[mw_shl_code=scala,true]object CharDemo {
def main(argment: Array[String]): Unit = {
countChar(Array[String]("tttf","aad")) //传入我们的数据集合
}
def countChar( strs :Array[String]) :Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local") //我们直接在本地调试看这个结果
val sc = new SparkContext(conf) //建立spark操作上下文
new ContentRDD(sc,strs) //建立我们的RDD
.flatMap { s => s.map { c => (c, 1) } } // 平展开成每个(char,1)的格式方便统计
.reduceByKey((c1,c2) => c1 + c2) // 统计所有的char
.collect() // 收集所有结果
.foreach(println) // 展示结果
sc.stop() //停止spark应用
}
}[/mw_shl_code]
由这个例子可以看出RDD的自定义和操作都是比较方便的,这也是spark现在比hadoop map reduce 火的一个原因。
|