分享

Spark 开发指南(版本spark1.1.1)

howtodown 2014-12-9 19:58:16 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 88821
本帖最后由 howtodown 于 2014-12-9 20:00 编辑


问题导读

1.Spark中RDD是什么?
2.Spark支持哪两种类型的共享变量?
3.如何将一些Spark的类和隐式转换导入到程序中?
4.Spark Scala API读取数据,除了支持文本文件,还支持什么格式?
5.RDD有哪种操作?
6.spark有哪些通用转换和动作?
7.spark中如果RDD的任一分区丢失了,spark是如何处理的?







简介
总的来说,每一个Spark的应用,都是由一个驱动程序(driver program)构成,它运行用户的main函数,在一个集群上执行各种各样的并行操作。Spark提出的最主要抽象概念是弹性分布式数据集 (resilient distributed dataset,RDD),它是元素的集合,划分到集群的各个节点上,可以被并行操作。RDDs的创建可以从HDFS(或者任意其他支持Hadoop文件系统) 上的一个文件开始,或者通过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可以让Spark保留一个RDD在内存中,使其能在并行操作中被有效的重复使用。最后,RDD能自动从节点故障中恢复。
Spark的第二个抽象概念是共享变量(shared variables),可以在并行操作中使用。在默认情况下,Spark通过不同节点上的一系列任务来运行一个函数,它将每一个函数中用到的变量的拷贝传递到每一个任务中。有时候,一个变量需要在任务之间,或任务与驱动程序之间被共享。Spark 支持两种类型的共享变量:广播变量 (broadcast variables),可以在内存的所有的结点上缓存变量;累加器(accumulators):只能用于做加法的变量,例如计数或求和。
本指南将展示这些特性,并给出一些例子。读者最好比较熟悉Scala,尤其是闭包的语法。请留意,你也可以通过spark-shell脚本,来交互式地运行Spark。我们建议你在接下来的步骤中这样做。


接入Spark
Spark 1.1.1 需要搭配使用 Scala 2.10. 如果你用Scala 来编写应用,你需要使用相同版本的Scala,更新的大版本很可能不兼容。
要写一个Spark 应用,你需要给它加上Spark的依赖。如果你使用SBT或者Maven,Spark可以通过Maven中心库来获得:
  1. groupId = org.apache.spark
  2. artifactId = spark-core_2.10
  3. version = 1.1.1
复制代码


另外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本,添加一个hadoop-client的依赖, HDFS的版本可以在third party distributions找到:
  1. groupId = org.apache.hadoop
  2. artifactId = hadoop-client
  3. version = <your-hdfs-version>
复制代码


最后,你需要将一些Spark的类和隐式转换导入到你的程序中。通过如下语句:
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. import org.apache.spark.SparkConf
复制代码


Spark 1.1.1 可以运行在Java 6及以上版本。 如果你使用Java 8, Spark支持Lambda表达式来代替实现function匿名类,否则你还是需要使用org.apache.spark.api.java.function 包下的function类.
你需要引入Spark类以及隐式转换:
  1. import org.apache.spark.api.java.JavaSparkContext
  2. import org.apache.spark.api.java.JavaRDD
  3. import org.apache.spark.SparkConf
复制代码


初始化Spark
Spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉Spark如何访问一个集群。你需要创建一个包含你应用信息的SparkConf对象,把它传给JavaSparkContext 。
  1. SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
  2. JavaSparkContext sc = new JavaSparkContext(conf);
复制代码


appName是在集群UI中显示的你应用的名字,master是一个Spark, Mesos or YARN cluster URL,或者local模式运行的特殊字符串“local”。 实践中,当程序运行在集群中时,不需要在程序中硬编码master,而是使用spark-submit启动应用. 然而对于本地测试和单元测试,你需要将”local”传给Spark。


使用shell
使用Spark shell时, 一个特殊的交互式的SparkContext已经为你创建, 叫做sc变量. 你自己的SparkContext不会工作. 你可以使用—master参数指定context连接的master。你可以通过—jar参数增加外部jar. 例如运行bin/spark-shell在四个core上:
  1. $ ./bin/spark-shell --master local[4]
复制代码


也可以增加code.jar:

  1. $ ./bin/spark-shell --master local[4] --jars code.jar
复制代码


运行spark-shell --help查看更多的参数。


弹性分布式数据集RDD
Spark围绕的概念是弹性分布式数据集(RDD),这是一个有容错机制并可以被并行操作的元素集合。目前有两种方式创建RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 或者引用一个外部存储系统的数据集,比如共享文件系统,HDFS, HBase 或者hadoop支持的任意存储系统即可。


并行集合(Parallelized Collections)
并行集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组(1到5)创建一个并行集合:

  1. val data = Array(1, 2, 3, 4, 5)
  2. val distData = sc.parallelize(data)
复制代码


一旦分布式数据集(distData)被创建好,它们将可以被并行操作。例如,我们可以调用distData.reduce((a, b) => a + b) 来将数组的元素相加。我们会在后续的分布式数据集运算中进一步描述。
并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个CPU上分布2-4个slices. 一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以通过传递给parallelize的第二个参数来进行手动设置。(例如:sc.parallelize(data, 10)).


外部数据集(External Datasets)
Spark可以从Hadoop支持的文件系统创建数据集, 包括本地文件,HDFS,Cassandra,HBase,amazon S3等。Spark可以支持TextFile,SequenceFiles以及其它任何Hadoop输入格式。
Text file的RDDs可以通过SparkContext’s textFile的方式创建,该方法接受一个文件的URI地址(或者机器上的一个本地路径,或者一个hdfs://, sdn://,kfs://,其它URI). 下面是一个调用例子:
  1. scala> val distFile = sc.textFile("data.txt")
  2. distFile: RDD[String] = MappedRDD@1d4cee08
复制代码


一旦创建完成,distFile可以被进行数据集操作。例如,我们可以通过使用如下的map和reduce操作:distFile.map(s => s.length).reduce((a, b) => a + b)将所有数据行的长度相加。


读取文件时的一些注意点:
  • 如果使用本地文件系统,必须确保每个节点都能自己节点的此路径下访问相同的文件。 可以将文件复制到所有的worker上或者使用网络共享文件系统。
  • Spark所有的文件输入方法,包括textFile,支持文件夹,压缩文件和通配符。 比如你可以使用textFile(“/my/directory”), textFile(“/my/directory/.txt”)和 textFile(“/my/directory/.gz”)。
  • textFile方法也可以通过输入一个可选的第二参数,来控制文件的分片数目。默认情况下,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),但是你也可以通过传入一个更大的值,来指定一个更高的片值。注意,你不能指定一个比块数更小的片值。

除了文本文件,Spark Scala API 也支持其它数据格式:
  • SparkContext.wholeTextFiles允许你读取文件夹下所有的文件,比如多个小的文本文件, 返回文件名/内容对。
  • 对于SequenceFiles,可以使用SparkContext的sequenceFile[K, V]方法创建,其中K和V是文件中的key和values的类型。像IntWritable和Text一样,它们必须是Hadoop的Writable interface的子类。另外,对于几种通用Writable类型,Spark允许你指定原生类型来替代。例如:sequencFile[Int, String]将会自动读取IntWritable和Texts。
  • 对于其他类型的Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意类型的JobConf和输入格式类,键类型和值类型。按照像Hadoop作业一样的方法,来设置输入源就可以了。你也可以使用SparkContext.newHadoopRDD, 它基于新的MapReduce API(org.apache.hadoop.mapreduce).
  • RDD.saveAsObjectFile and SparkContext.objectFile支持保存RDD为一个简单格式, 包含序列化的Java对象. 尽管这不是一个高效的格式,比如Avro, 但是它提供了一个容易的方式来保存RDD。


RDD 的操作
RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。 例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的reduceByKey,能返回一个分布式数据集)
Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。

默认情况下,每一个转换过的RDD都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用persist(或者cache)方法,持久化一个RDD在内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的。


基础操作
下面的代码演示了RDD的基本操作:
  1. val lines = sc.textFile("data.txt")
  2. val lineLengths = lines.map(s => s.length)
  3. val totalLength = lineLengths.reduce((a, b) => a + b)
复制代码


第一行从一个外部文件创建了一个基本的RDD对象。这个数据集并没有加载到内存中,行只不过是一个指向文件的指针. 代码第二行定义行长度作为mao的结果, 行长度由于惰性设计并没有立即计算。最终 当我们运行reduce,这是一个action。 这时Spark将计算分解成运行在各个节点的任务。 每个节点运行它的map部分以及一个本地的reduction, 并仅将它的结果返回给驱动程序。
如果你想再使用行长度,我们可以在reduce之前增加:
  1. lineLengths.persist()
复制代码


它可以在lineLengths第一次计算之前被保存在内存中。


将function对象传给Spark
Spark API非常依赖在集群中运行的驱动程序中传递function, 对于Scala来说有两种方式实现:
  • 匿名函数语法(Anonymous function syntax), 可以用作简短的代码。
  • 全局单例对象的静态方法(Static methods in a global singleton object). 例如,你可以定义MyFunctions对象,传递MyFunctions.func1, 如下所示:
  1. object MyFunctions {
  2.   def func1(s: String): String = { ... }
  3. }
  4. myRdd.map(MyFunctions.func1)
复制代码


对于Java来说,function代表实现包org.apache.spark.api.java.function的接口类。
也是有两种方法:
  • 实现Function 接口,匿名类或者非匿名类。
  • 使用Java 8的lambda表达式
  1. JavaRDD<String> lines = sc.textFile("data.txt");
  2. JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  3.   public Integer call(String s) { return s.length(); }
  4. });
  5. int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  6.   public Integer call(Integer a, Integer b) { return a + b; }
  7. });
复制代码


  1. class GetLength implements Function<String, Integer> {
  2.   public Integer call(String s) { return s.length(); }
  3. }
  4. class Sum implements Function2<Integer, Integer, Integer> {
  5.   public Integer call(Integer a, Integer b) { return a + b; }
  6. }
  7. JavaRDD<String> lines = sc.textFile("data.txt");
  8. JavaRDD<Integer> lineLengths = lines.map(new GetLength());
  9. int totalLength = lineLengths.reduce(new Sum());
复制代码


使用键值对
大部分的Spark操作可以包含任意类似的对象,而一些特殊的操作只能操作键值对的RDD。 最有代表性的是“shuffle”操作, 比如根据键分组或者聚合元素。
在Scala中,这些操作可以使用包含Tuple2 元素的RDD(Scala内建的tuple类型,只需(a, b)就可创建此类型的对象), 比需要import org.apache.spark.SparkContext._ 允许Spark隐式转换. 可以在PairRDDFunctions上应用键值对操作。
举例来说,下面的代码使用reduceByKey操作来计算行在文件中出现了多少次:
  1. val lines = sc.textFile("data.txt")
  2. val pairs = lines.map(s => (s, 1))
  3. val counts = pairs.reduceByKey((a, b) => a + b)
复制代码


我们也可以使用counts.sortByKey(),例如按照字幕顺序排序然后使用counts.collect()继续将它们作为驱动程序的一个数组对象。
注意: 当使用定制对象作为键时,必须保证equals() 和hashCode() 方法一致.


转换(transformation)
下面的列表列出了一些通用的转换。 请参考 RDD API doc (Scala, Java, Python) 和 pair RDD functions doc (Scala, Java) 了解细节.
转换
含义
map(func)
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成
filter(func)
返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成
flatMap(func)
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
mapPartitions(func)
类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithSplit(func)
类似于mapPartitions, 但func带有一个整数参数表示分块的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]
sample(withReplacement,fraction, seed)
根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset)
返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成
distinct([numTasks]))
返回一个包含源数据集中所有不重复元素的新数据集
groupByKey([numTasks])
在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集
注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它
reduceByKey(func, [numTasks])
在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的
aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
根据提供的函数进行聚合。When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])
在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定
join(otherDataset, [numTasks])
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集
cogroup(otherDataset, [numTasks])
在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为groupwith
cartesian(otherDataset)
笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)
pipe(command,[envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

动作(actions)
下面的列表列出了一些通用的action操作. 请参考 RDD API doc (Scala, Java, Python) 和 pair RDD functions doc (Scala, Java) 了解细节.
动作
含义
reduce(func)
通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。
collect()
在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。
count()
返回数据集的元素的个数。
first()
返回数据集的第一个元素(类似于take(1))
take(n)
返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素
takeSample(withReplacement,num, seed)
返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子
takeOrdered(n,[ordering])
Return the firstn elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)
将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行
saveAsSequenceFile(path)
将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()
对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数
foreach(func)
在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase


RDD持久化
Spark最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(Actions)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。

你可以用persist()或cache()方法来标记一个要被持久化的RDD,然后首次被一个动作(Action)触发时计算,它将会被保留在计算结点的内存中并重用。Cache有容错机制,如果RDD的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算(不需要全部重算,只计算丢失的部分)。
此外,每一个RDD都可以用不同的保存级别进行保存,从而允许你持久化数据集在硬盘,或者在内存作为序列化的Java对象(节省空间),甚至于跨结点复制。这些等级选择,是通过将一个org.apache.spark.storage.StorageLevel对象传递给persist()方法进行确定。cache()方法是使用默认存储级别的快捷方法,也就是StorageLevel.MEMORY_ONLY(将反序列化的对象存入内存)。

完整的可选存储级别如下:
存储级别
意义
MEMORY_ONLY
将RDD作为反序列化的的对象存储JVM中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是是默认的级别
MEMORY_AND_DISK
将RDD作为反序列化的的对象存储在JVM中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取
MEMORY_ONLY_SER
将RDD作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU
MEMORY_AND_DISK_SER
与MEMORY_ONLY_SER相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
DISK_ONLY
只将RDD分区存储在硬盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
与上述的存储级别一样,但是将每一个分区都复制到两个集群结点上
OFF_HEAP (experimental)
Store RDD in serialized format inTachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.
注意: 如果使用Python, 存储对象总是使用Pickle库进行序列化,所以它不关心你用何种serialized级别.
Spark也会在shuffle操作中存储一些中间对象(比如 reduceByKey), 甚至用户都没有调用persist. 这样做的目的是避免在一个节点失败后重新计算完整的输入。我们总是推荐用户持久化它们的RDD, 如果需要重用它们.


存储级别的选择
Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:
  • 如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。
  • 如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。
  • 尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。
  • 如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算。
  • 如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法。
在大内存或者多应用的环境中, 实验性的OFF_HEAP模式有几个好处:
  • 允许多个执行者共享Tachyon中的共享内存
  • 显著的减少垃圾回收的消耗
  • 单个executor崩溃时缓存数据不会丢失

移除数据
Spark 自动监控每个节点的缓存使用,依据LRU方式丢弃老的数据分区. 如果你想手工移除而不是等待cache移除机制,使用RDD.unpersist() 方法.

共享变量
一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器,在远程机器上对变量的所有更新都不会被传播回驱动程序。通常看来,在任务之间中,读写共享变量显然不够高效。然而,Spark还是为两种常见的使用模式,提供了两种有限的共享变量:广播变量和累加器。


广播变量 Broadcast Variables
广播变量允许程序员保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。他们可以这样被使用,例如,以一种高效的方式给每个结点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量,从而减少通信的代价。
广播变量是通过调用SparkContext.broadcast(v)方法从变量v创建的。广播变量是一个v的封装器,它的值可以通过调用value方法获得。如下模块展示了这个:
  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
  3. scala> broadcastVar.value
  4. res0: Array[Int] = Array(1, 2, 3)
复制代码


在广播变量被创建后,它应该在集群运行的任何函数中,代替v值被调用,从而v值不需要被再次传递到这些结点上。另外,对象v不能在广播后修改,这样可以保证所有结点的收到的都是一模一样的广播值。


累加器 Accumulators
累加器是一种只能通过关联操作进行“加”操作的变量,因此可以高效被并行支持。它们可以用来实现计数器(如MapReduce中)和求和器。Spark原生就支持Int和Double类型的累加器,开发者可以自己添加新的支持类型。
一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始值v中创建。运行在集群上的任务,可以通过使用+=来给它加值。然而,他们不能读取这个值。只有驱动程序可以使用value的方法来读取累加器的值。
如下的解释器模块,展示了如何利用累加器,将一个数组里面的所有元素相加:
  1. scala> val accum = sc.accumulator(0, "My Accumulator")
  2. accum: spark.Accumulator[Int] = 0
  3. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  4. ...
  5. 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
  6. scala> accum.value
  7. res2: Int = 10
复制代码


代码使用了内建的Int的累加器, 程序员也可以自己创建累加器接口AccumulatorParam的子类。. AccumulatorParam接口有两个方法, zero代表提供一个零值,addInPlace代表将两个值相加。 例如假定我们有一个Vector类,代表数学里的vector, 我们可以实现代码如下:
  1. object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  2.   def zero(initialValue: Vector): Vector = {
  3.     Vector.zeros(initialValue.size)
  4.   }
  5.   def addInPlace(v1: Vector, v2: Vector): Vector = {
  6.     v1 += v2
  7.   }
  8. }
  9. // Then, create an Accumulator of this type:
  10. val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
复制代码


如果使用Scala, Spark也支持更通用的Accumulable接口, 可以累加不同类型的元素, SparkContext.accumulableCollection方法累加通用的collection类型.
发布到集群中
应用提交指南 描述了如何提交应用到一个Spark集群中。简而言之,一旦你打包好你的应用(JAR for Java/Scala,或者一堆.py / .zip files for Python), bin/spark-submit脚本允许你提交它们到任意的cluster manager.


单元测试
Spark很好的支持流行的单元测试框架。 简单的使用master URL为local创建一个SparkContext,执行你的操作然后调用 SparkContext.stop() 停止. 确保在finally块中停止context或者在单元测试框架的tearDown停止context, 因为Spark 不支持在同一个程序的同时拥有两个context.


从1.0以前的版本升级
Spark 1.0 为1.x系列版本冻结了API的改动, 没有API被标记为 “experimental” 或者 “developer API” 以在未来版本中支持。 对于Scala 用户唯一的改变是分组操作,如 groupByKey, cogroup 和 join, 返回结果类型需哦那个 (Key, Seq[Value]) 改为 (Key, Iterable[Value]).
升级指南请参照 Streaming, MLlib and GraphX.


更多资料
你可以在官方站点查看官方的例子 example. 除此之外,Spark在发布包的examples的文件夹中包含了几个例子(Scala, Java, Python). 运行Java 和 Scala例子时你可以传递类名给Spark bin/run-example脚本, 例如:
  1. ./bin/run-example SparkPi
复制代码


对于Python例子,使用spark-submit:
  1. ./bin/spark-submit examples/src/main/python/pi.py
复制代码


想了解优化的方法, 配置configuration调优tuning 指南提供了最佳实践. 特别重要的是你的数据要以有效的格式存储在内存中. 想了解发布信息, cluster mode overview 描述了分布式操作和集群管理器支持的组件.
最后全部的API文档请访问 Scala, JavaPython.

0.8.1版本由taobao技术部团队的月禾mm初审,以及微博上的Spark达人@crazyjvm复审。 0.8.1译文链接


http://colobu.com/2014/12/08/spark-programming-guide/


已有(11)人评论

跳转到指定楼层
liusiping 发表于 2014-12-10 00:38:08
回复

使用道具 举报

韩克拉玛寒 发表于 2014-12-10 09:03:38
回复

使用道具 举报

redhat1986 发表于 2014-12-10 09:41:11
非常感谢分享!
回复

使用道具 举报

hb1984 发表于 2014-12-10 14:59:38
谢谢楼主分享。            
回复

使用道具 举报

liusiping 发表于 2014-12-12 13:01:39
回复

使用道具 举报

EASONLIU 发表于 2014-12-30 20:57:04
当入门用,挺全的
回复

使用道具 举报

小小布衣 发表于 2015-1-8 16:03:10
运行官方提供的WordCount例子,出现这个错误, 会是什么原因导致的
首先scala的版本都是2.10.4 和开发环境是一致的
在spark-shell上可以成功,在spark-submit上就会出现如下错误

15/01/08 15:19:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@n5.china.com:20545]
15/01/08 15:19:58 INFO Utils: Successfully started service 'sparkDriver' on port 20545.
15/01/08 15:19:58 INFO SparkEnv: Registering MapOutputTracker
15/01/08 15:19:58 INFO SparkEnv: Registering BlockManagerMaster
15/01/08 15:19:58 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150108151958-cadb
15/01/08 15:19:58 INFO Utils: Successfully started service 'Connection manager for block manager' on port 63094.
15/01/08 15:19:58 INFO ConnectionManager: Bound socket to port 63094 with id = ConnectionManagerId(n5.china.com,63094)
15/01/08 15:19:58 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/01/08 15:19:58 INFO BlockManagerMaster: Trying to register BlockManager
15/01/08 15:19:58 INFO BlockManagerMasterActor: Registering block manager n5.china.com:63094 with 265.4 MB RAM
15/01/08 15:19:58 INFO BlockManagerMaster: Registered BlockManager
15/01/08 15:19:58 INFO HttpFileServer: HTTP File server directory is /tmp/spark-3b796d21-db73-4aad-84bc-23cd0ce3fab7
15/01/08 15:19:58 INFO HttpServer: Starting HTTP Server
15/01/08 15:19:58 INFO Utils: Successfully started service 'HTTP file server' on port 38860.
15/01/08 15:19:58 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/01/08 15:19:58 INFO SparkUI: Started SparkUI at http://n5.china.com:4040
15/01/08 15:19:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/01/08 15:19:59 INFO SparkContext: Added JAR file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar at http://10.1.0.182:38860/jars/spa ... p2.5.0-cdh5.2.0.jar with timestamp 1420701599601
15/01/08 15:19:59 INFO AppClient$ClientActor: Connecting to master spark://10.1.0.141:7077...
15/01/08 15:19:59 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
15/01/08 15:19:59 INFO SparkContext: Starting job: reduce at SparkPi.scala:35
15/01/08 15:19:59 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 1000 output partitions (allowLocal=false)
15/01/08 15:19:59 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35)
15/01/08 15:19:59 INFO DAGScheduler: Parents of final stage: List()
15/01/08 15:19:59 INFO DAGScheduler: Missing parents: List()
15/01/08 15:19:59 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at SparkPi.scala:31), which has no missing parents
15/01/08 15:20:00 INFO MemoryStore: ensureFreeSpace(1728) called with curMem=0, maxMem=278302556
15/01/08 15:20:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1728.0 B, free 265.4 MB)
15/01/08 15:20:00 INFO MemoryStore: ensureFreeSpace(1125) called with curMem=1728, maxMem=278302556
15/01/08 15:20:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1125.0 B, free 265.4 MB)
15/01/08 15:20:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on n5.china.com:63094 (size: 1125.0 B, free: 265.4 MB)
15/01/08 15:20:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/01/08 15:20:00 INFO DAGScheduler: Submitting 1000 missing tasks from Stage 0 (MappedRDD[1] at map at SparkPi.scala:31)
15/01/08 15:20:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000 tasks
15/01/08 15:20:15 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
15/01/08 15:20:19 INFO AppClient$ClientActor: Connecting to master spark://10.1.0.141:7077...
15/01/08 15:20:30 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
15/01/08 15:20:39 INFO AppClient$ClientActor: Connecting to master spark://10.1.0.141:7077...
15/01/08 15:20:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
15/01/08 15:20:59 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
15/01/08 15:20:59 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/01/08 15:20:59 INFO TaskSchedulerImpl: Cancelling stage 0
15/01/08 15:20:59 INFO DAGScheduler: Failed to run reduce at SparkPi.scala:35
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[root@n5 bin]#

点评

可能运行时内存不足,导致失败  发表于 2015-1-8 21:23
回复

使用道具 举报

sxyqhyt 发表于 2015-5-6 09:32:24
学习了        
回复

使用道具 举报

IT_雪夜归人 发表于 2016-2-1 17:18:03
非常详细,非常感谢
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条