//FilteredRDD[20] at filter at <console>:16 (48 partitions)
// MappedRDD[19] at map at <console>:14 (48 partitions)
// ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)
复制代码
(3). flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
Test:
val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。mapPartitions将会被每一个数据集分区调用一次。各个数据集分区的全部内容将作为顺序的数据流传入函数func的参数中,func必须返回另一个Iterator[T]。被合并的结果自动转换成为新的RDD。下面的测试中,元组(3,4)和(6,7)将由于我们选择的分区策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose
Test:
val nums = sc . parallelize (1 to 9 , 3)
def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = {
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T=>) ==> Iterator<U=> when running on an RDD of type T.
类似于mapPartitions, 其函数原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。下面测试中,将分区索引和分区数据一起输出。
Test:
val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)
def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {
iter . toList . map ( x => index + "-" + x ) . iterator
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集
注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它。如果分组是用来计算聚合操作(如sum或average),那么应该使用reduceByKey 或combineByKey 来提供更好的性能。
groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以这里引出两个概念宽依赖和窄依赖。
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的
(12).sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定
Test:
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集
(14).cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为groupwith
Test:
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)
(16). pipe(command, [envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
通过POSIX 管道来将每个RDD分区的数据传入一个shell命令(例如Perl或bash脚本)。RDD元素会写入到进程的标准输入,其标准输出会作为RDD字符串返回。
(17).coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
将RDD分区的数量降低为numPartitions,对于经过过滤后的大数据集的在线处理更加有效。
(18).repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
随机重新shuffle RDD中的数据,并创建numPartitions个分区。这个操作总会通过网络来shuffle全部数据。
— Actions —(19). reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。
(20). collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。
(21). count()
Return the number of elements in the dataset.
返回数据集的元素的个数。
(22). first()
Return the first element of the dataset (similar to take(1)).
返回数据集的第一个元素(类似于take(1))
(23). take(n)
Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素
(24). countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数
(25). foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase.
Test:
val num=sc.parallelize(1 to 10)
num.reduce (_ + _)
res1: Int = 55
num.take(5)
res2: Array[Int] = Array(1, 2, 3, 4, 5)
num.first
res3: Int = 1
num.count
res4: Long = 10
num.take(5).foreach(println)
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("A",7),("B",7)))
val kv1_count=kv1.countByKey()
kv1_count: scala.collection.Map[String,Long] = Map(A -> 3, C -> 1, B -> 3)
复制代码
(26). takeSample(withReplacement,num, seed)
Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子
(27). takeOrdered(n, [ordering])
Return the first n elements of the RDD using either their natural order or a custom comparator.
返回一个由数据集的前n个元素组成的有序数组,使用自然序或自定义的比较器。
(28). saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行
(29). saveAsSequenceFile(path)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)
(30). saveAsObjectFile(path)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
将数据集元素写入Java序列化的可以被SparkContext.objectFile()加载的简单格式中。
当然,transformation和action的操作远远不止这些。其他请参考API文档: RDD API
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon
}
// 其中,StorageLevel 类的构造器参数如下:
class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)