本帖最后由 xioaxu790 于 2014-9-12 18:17 编辑
问题导读
1、如何理解Spark应用程序的构造?
2、如何接入和初始化Spark?
3、如何自己添加新的支持类型累加器?
Spark开发指南
简介
总的来说,每一个Spark应用程序,都是由一个驱动程序组成,它运行用户的main函数,并且在一个集群上执行各种各样的并行操作。Spark提供的主要的抽象(概念)是一个弹性分布式数据集,它是一个元素集合,划分到集群的不同节点上,可以被并行操作。RDDs的创建可以从Hadoop文件系统(或者任何支持Hadoop的文件系统)上的一个文件开始,或者通过转换这个驱动程序中已存在的Scala集合而来。用户也可以使Spark持久化一个 RDD到内存中,使其能在并行操作中被有效的重用。最后,RDDs能自动从节点故障中恢复。
Spark中的第二个抽象(概念)是共享变量,他可以在并行操作中使用。默认情况下,Spark通过不同节点上的一系列任务来并行运行一个函数。他将每一个函数中用的到变量的拷贝传递到每一个任务中。有时候,一个变量需要在不同的任务之间,或者任务和驱动程序之间共享。Spark支持两种类型的共享变量:广播变量,可以再所有节点的内存中缓存一个值,累加器,一个只能做加法的变量,例如计数器和求和。
本指南通过每一种Spark支持的语言来展示Spark的每个特性。It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
接入Spark
Java
Spark1.0.2工作在Java6或者java6以后之上。如果你在使用Java8,Spark支持lamdba表达式来简化函数编写,否则,你可以使用org.apache.spark.api.java.function 包下的类。
用Java编写Spark应用,你需要添加Spark的依赖,Spark可以通过Maven Central使用:
groupId=org.apache.spark
artifactId=spark-core_2.10
version=1.0.2 复制代码
另外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本添加一个hadoop-client依赖。一些常用的HDFS版本标签显示在页面。
groupId=org.apache.hadoop
artifactId=hadoop-client
version= 复制代码
最后,你需要在你的程序中导入一些Spark类,通过添加如下几行:
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.SparkConf 复制代码
初始化Spark
Java
Spark程序需要做的第一件事就是创建一个JavaSparkContext对象 ,它将告诉Spark怎样访问一个集群。创建一个SparkContext,你首先必须创建SparkConf对象,它包含关于你的应用程序的信息。
SparkConf conf=new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc=new JavaSparkContext(conf); 复制代码
appName参数是你的应用程序的名字,将会在集群的UI上显示。master是Spark、Mesos、或者YARN 集群URL,或者一个专用的字符串”local“使其在本地模式下运行。在实践中,当运行在一个集群上,你将不会想要把master硬编码到程序中,而是通过使用spark-submit运行程序并且接受master。但是,在本地测试或者单元测试中,你可以传递”local“在进程内运行Spark。
弹性分布式数据集
Spark反复围绕的一个概念是弹性分布式数据集。它是一个有容错机制的元素集合,并且可以被并行操作。有两种创建RDDs的方法。并行化你的驱动程序中已存在的集合,或者引用一个外部存储系统的数据集,例如一个共享文件系统,HDFS、HBase、或者任何可以提供一个Hadoop InputFormat的数据源。
并行集合
并行集合通过调用JavaSparkContext的parallelize方法,在你的驱动程序中已存在的Collection上创建。集合的元素将会拷贝组成一个可以被并行操作的分布式数据集。例如,下面是如何创建一个包含数字1到5的并行集合:
List data=Arrays.asList(1,2,3,4,5);
JavaRDD distData=sc.parallelize(data); 复制代码
一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可以调用 distData.reduce((a,b)->a+b)来将列表中的元素相加。我们稍后将会在分布式数据集的操作中描述。
注意:在这个指南中,我们经常使用简洁的Java8 lamdba语法来定义java functions,但是在老的Java版本中,你可以实现org.apache.spark.api.java.function包中的接口。我们将会在下面详细描述passing functions to Spark。
并行集合的另一个重要的参数是数据集被切分成切片(slices)的数量。Spark将会为集群中的每一个slice运行一个task。通常情况下,你要为集群中的每个CPU 2-4个slice。通常,Spark会尝试根据你的集群自动设置slice的数量。然而,你可以手动的设置它,把它作为第二个参数传递给 parallelize(例如:sc.parallelize(data,10)).
外部数据集
Spark可以通过任何Hadoop支持的存储源创建分布式数据集。包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持text files(文本文件),SequenceFiles(序列化文件),和任何其他的Hadoop InputFormat(输入格式)。
Text file 可以通过使用SparkContext的textFile方式创建。这个方法接受一个文件的URI(或者机器上的一个本地路径,或者hdfs://,s3n:// 等URI)并且把这个文件读成一个行的集合。下面是一个调用的例子:
JavaRDD distFile=sc.textFile(“data.txt”); 复制代码
一旦创建,distFile可以被进行数据集操作。例如:我们可以通过使用map和reduce将所有数据行的长度相加.例如:distFile.map(s->s.length()).reduce((a,b)->(a+b)).
Spark读文件时的一些注意事项:
如果使用本地文件系统上的路径,
Spark的所有基于文件的输入方法,包括textFile,支持运行目录,压缩文件盒通配符。例如,你可以食用textFile(“/my/directory/“),textFile(“/my/directory/.txt”),和textFile(“/my/directory/.gz”)
textFile方法也可以接受一个可选的第二参数来控制这个文件的slice数目。默认情况下,Spark为每一个文件创建一个 slice(HDFS中block默认为64MB)。但是你可以通过传递一个较大的值来指定一个跟高的slice值。注意你的slice数不能小于 block数。
除了文本文件,Spark的Java API 也支持集中其他数据格式。
JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
对于序列化文件(SequenceFiles),使用SparkContext的sequenceFile[K,V],K和V是文件中key和value的类型。他们必须是Hadoop的Writeable接口的子类,像IntWriteable和Text。
对于其他的Hadoop输入格式,你可以使用JavaSparkContext.hadoopRDD方法。它可以接受任意(类型)的 JobConf和输入格式类,key类和value类。按照像Hadoop Job一样,来设置输入源就可以了。你也可以为InputFormats使用JavaSparkContext.newHadoopRDD,基于”new“MapReduce API(org.apache.hadoop.mapreduce).
JavaRDD.saveAsObjectFile 和JavaContext.objectFile支持以一种由Java对象序列化组成的简单的格式保存RDD。虽然这不是有效地专门的格式向Avro,但是它提供了一个简单的方式存储RDD。
RDD操作
RDDs支持两种类型的操作:转换(transformations),它从一个现有的数据集创建一个新的数据集。动作(actions),它在数据集上运行计算后,返回一个值给驱动程序。例如:map就是一个转换,它将数据集的每一个元素传递给一个函数,并且返回一个新的RDD表示结果。另一方面,reduce是一个动作,他通过一些行数将一些RDD的所有元素聚合起来,并把最终的结果返回给驱动程序(不过还有一个并行的 reduceByKey,它返回一个分布式数据集)。
Spark中的所有转换都是惰性的,也就是说,他们不会立即计算出结果。相反,他们只是记住应用到这些基础数据集(例如file)上的转换。只有当发生一个需要返回一个结果给驱动程序的动作时,这些转换才真正执行。这样的设计使得Spark运行更加高效——例如,我们可以实现,通过map创建一个数据集,并在reduce中使用,最终只返回reduce的结果给驱动程序,而不是整个大的新数据集。
默认情况下,每一个转换过的RDD都会在你在它上面运行一个action时重新计算。然而,你也可以使用persist方法(或者cache)持久化一个RDD到内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你访问这个RDD时,它将能够更快速访问,。在磁盘上持久化数据集,或者在集群间复制数据集也是支持的。
基本操作
为了说明RDD基础,考虑下面的简单的程序:
JavaDDD lines=sc.textFile(“data.txtt”);
JavaRDD lineLengths=lines.map(s->s.length());
int totalLength=lineLengths.reduce((a,b)->a+b); 复制代码
第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行动作。lines仅仅是这个文件的一个指针。第二行定义了lineLengths作为map转换的结果。此外,lineLengths因为惰性没有立即计算。最后,我们运行reduce,他是一个 action。这时候,Spark将这个计算拆分成不同的task,并使其运行在独立的机器上,并且每台机器运行它自己的map部分和本地的 reducation,仅仅返回他的结果给驱动程序。
如果我们想在以后重复使用lineLengths,我们可以添加:
复制代码
在reduce之前,这将导致lineLengths在第一次被计算之后被保存在内存中。
传递Functions到Spark
Spark的API,在很大程度上依赖于传递函数使其驱动程序在集群上运行。在Java中,函数有实现了org.apache.spark.api.java.function包中接口的类表示。有两种创建这样的函数的方式:
在你自己的类中实现Function接口,可以是匿名内部类,后者命名类,并且你要传递他的一个实例到Spark
在Java8中,使用lamdba表达式来简洁的定义一种实现
为了简洁起见,本指南中的大多数使用lamdba语法,它易于使用,所有的APIs in long-form,例如,我们可以编写上面的代码如下:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
}); 复制代码
或者,如果编写内联函数显得很笨拙:
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum()); 复制代码
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages
Wroking with Key-Value Pairs使用键/值对工作
虽然大多数Spark操作工作在包含各种类型的对象的RDDs之上,一些特殊的操作仅仅能够使用包含key-value对的RDDs。最常见的操作之一是分布式”shuffle“操作,例如通过key分组或者聚合元素。
在Java中,key-value对使用scala标准包下的scala Tuple2类表示。你可以简单的调用new Tuple2(a,b)去创建一个tuuple,并且通过tuple.1()和tuple.2()访问它的字段。
key-value对的RDDs通过JavaPairRDD表示。你可以通过JavaRDDs构建JavaPairRDDs,使用指定的map操作版本,像mapToPair和flatMapToPair。JavaPair将不仅拥有标准RDD函数,并且有特殊的key-value函数。
例如,下面的代码在key-value对上使用reduceByKey操作来计算在一个文件中每行文本出现的次数和。
JavaRDD lines=sc.textFile(“data.txt”);
JavaPairRDD pairs=lines.mapToPair(s->new Tuple2(s,1))
JavaPairRDD counts=pairs.reduceByKey((a,b)->a+b); 复制代码
我们也可以使用counts.sortByKey(),例如,按照字母顺序排序这个键值对。并且最后调用counts.collect()作为一个对象数组返回给驱动程序。
注意:当使用自定义的对象作为key-value对操作的key时,你必须确保自定义equals()方法伴随着一个匹配的hashCode()方法。有关详情,参考 Object.hashCode() 文档大纲中列出的规定。
转换
下面的表格列出了Spark支持的常见的转换。更多信息可以参考RDD API 文档和pair RDD 函数文档。
动作
下面的表格列出了Spark支持的常见的动作。更多信息可以参考RDD API 文档和pair RDD 函数文档。
RDD持久化
Spark最重要的一个功能是在不同的操作间,持久化(或者缓存)一个数据集到内存中。当你持久化一个RDD时,每一个节点都把它计算的分片结果保存在内存中,并且在对此数据集(或者衍生出的数据集)进行其他动作时重用。这将使后续的动作变得更快(通过快109倍以上)。缓存是(Spark)迭代算法和快速交互使用的关键工具。
你可以使用persist()和cache()方法来标记一个将要持久化的RDD。第一次他被一个动作进行计算,他将会保留在这个节点的内存中。Spark的缓存有容错性-如果RDD的任何一个分区丢失了,他会通过使用最初创建的它转换操作,自动重新计算。
此外,每一个持久化RDD可以使用不同的存储级别存储。允许你,例如,持久化数据集到磁盘,持久化数据集到内存作为序列化的Java对象(节省空间),跨节点复制,或者 store it off-heap in Tachyon。这些级别通过传递一个StorageLevel对象(Scala,Java,Python)到persist()来设置。cache()方法是使用默认存储级别的快捷方法,即StorageLevel.MEMORY_ONLY(存储反序列化对象到内存),完整的存储级别设置为:
Spark也会在shuffle操作(例如,reduceByKey)中自动的持久化一些中间数据。甚至当用户未调用persist方法。这样做是为了阻止在进行shuffle操作时由于一个节点故障而重新计算整个输入。我们依然推荐用户在作为结果的RDD上调用persist如果想打算重用它。
存储级别的选择
移除数据
Spark自动监视每一个节点上的缓存使用,并且使用LRU方式删除老的数据分区。如果你想手工的删除yige RDD而不是等他自动从缓存中清除,使用RDD.unpersist()方法。
共享变量
通常,当传递给Spark操作(例如map或者reduce)的函数在远程集群节点上运行时,它实际上操作的是这个函数使用到的所有变量的独立拷贝。这些变量被拷贝到每一台机器,并且在远程机器上的对这些变量的所有更新都不会传回给驱动程序。通常看来,在不同的任务之间读写变量是低效的。然而,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量和累加器。
广播变量
广播变量允许程序员保存一个只读的变量缓存在每一台机器上,而不是每个任务都保存一份拷贝。它们可以这样被使用,例如:以一种高效的方式给每一个节点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来分配广播变量,以减小通信的代价。
广播变量通过调用SparkContext.broadcast(v)方法从变量v创建。广播变量是一个v的包装器。它的值可以通过调用value方法访问。下面的代码展示了这些:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3] 复制代码
在广播变量被创建后,它应该在集群上任何函数中代替v被使用,使v不再传递到这些节点上。此外,对象v在被广播后不能被修改,这样可以保证所有节点获得的广播变量的值是相同的(例如,这个变量在之后被传递到一个新的节点)。
累加器
累加器是一种只能通过关联操作进行”加“操作的变量。因此可以高效的支持并行计算。它们可以用于实现计数器(*例如在MapReduce中)和求和。Spark原生支持数字类型的累加器。开发者也可以自己添加新的支持类型。
一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始值v中创建。运行在集群上的任务,可以通过使用add方法或者+=操作(在Scala和Python中)来给它加值。然而,他们不能读取这个值。只有驱动程序可以使用value的方法来读取累加器的值。
如下的代码,展示了如何利用累加器,将一个数组里面的所有元素相加:
Accumulator<Integer> accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10 复制代码
虽然这段代码使用了内置支持Integer类型的累加器。但是开发者也可以通过实现AccumulatorParam创建自己的类型 。AccumulatorParam接口有两个方法:zero为你的数据类型提供一个"zero value”,addInPlace将两个值相加。例如,假设我们有一个向量类来表示数学向量,我们可以这样写:
class VectorAccumulatorParam implements AccumulatorParam<Vector> {
public Vector zero(Vector initialValue) {
return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) {
v1.addInPlace(v2); return v1;
}
}
// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam()); 复制代码
在Java中,Spark也支持更通用的Accumulable接口来累加数据,他们的计算结果类型和相加的元素的类型不一样(例如,收集同样的元素构建一个list)。