Spark系列-共享变量
问题导读1、Spark支持哪些共享变量?
2、如何操作累加器?
static/image/hrline/4.gif
Spark的第二个抽象,是并行计算中使用的共享变量。一般来说,当一个函数被传递给Spark操作(例如map和reduce),通常是在集群结点上运行,在函数中使用到的所有变量,都做分别拷贝,供函数操作,而不会互相影响。这些变量会被拷贝到每一台机器,而在远程机器上,在对变量的所有更新,都不会被传播回Driver程序,因此,这些变量都不是共享的。然而有时候,我们需要在任务中能够被共享的变量,或者在任务与驱动程序之间共享。Spark支持两种类型的共享变量:
广播变量: 可以在内存的所有结点中被访问,用于缓存变量(只读)
累加器:只能用来做加法的变量,例如计数和求和
广播变量
广播变量允许程序员保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。他们可以使用,例如,给每个结点一个大的输入数据集,以一种高效的方式。Spark也会尝试,使用一种高效的广播算法,来减少沟通的损耗。
广播变量是从变量v创建的,通过调用SparkContext.broadcast(v)方法。这个广播变量是一个v的分装器,它的只可以通过调用value方法获得。如下的解释器模块展示了如何应用:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array = Array(1, 2, 3)
在广播变量被创建后,它能在集群运行的任何函数上,被取代v值进行调用,从而v值不需要被再次传递到这些结点上。另外,对象v不能在被广播后修改,是只读的,从而保证所有结点的变量,收到的都是一模一样的。
累加器
累加器是只能通过组合操作“加”起来的变量,可以高效的被并行支持。他们可以用来实现计数器(如同MapReduce中)和求和。Spark原生就支持Int和Double类型的计数器,程序员可以添加新的类型。
一个计数器,可以通过调用SparkContext.accumulator(v)方法来创建。运行在集群上的任务,可以使用+=来加值。然而,它们不能读取计数器的值。当Driver程序需要读取值的时候,它可以使用.value方法。
如下的解释器,展示了如何利用累加器,将一个数组里面的所有元素相加
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
…
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
页:
[1]