Spark aggregate介绍
问题导读1、如何了解Spark的功能?
2、什么是Spark aggregate?
static/image/hrline/4.gif
Spark的官方文档其实说得并不是明了,很多内容如果不去研究源码,或者不去实验查看过程,你压根就不知道它的真正功能是啥,比如今天要说的aggregate。
aggregate的官方描述是:
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.
(本文译:聚合每个分区的元素,然后所有的分区结果,使用给定的组合功能和一个中立的“零值。“功能op(t1,t2)允许修改t1和它作为结果返回值以避免对象分配;然而,它不应该修改t2。第一个函数可以返回不同的结果类型(seqOp),U,比这个抽样的类型。因此,我们需要一个操作合并T成一个和一个操作合并两个)
函数原型:aggregate(self, zeroValue, seqOp, combOp)
现在先用一段代码测试它的输出:
def test_aggregate(sc):
def seq(c, v):
print 'In seq c ' + str(c)
print 'In seq v ' + str(v)
return c+v, c+0.1
def comb(c1, c2):
print 'In comb c1 ' + str(c1)
print 'In comb c2 ' + str(c2)
return c1-c2, c1+c2
data = sc.parallelize([(1.0, np.arange(10)), (2.0, np.arange(10)), (2.0, np.arange(10)), (1.0, np.arange(10))],2)
gradientSum, lossSum = data.aggregate((np.ones(10),0), seq, comb)
print gradientSum, lossSum
if __name__ == '__main__':
sc = SparkContext(appName='Test')
test_aggregate(sc)
# 第一个partition的输出
In seq c (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
In seq v (1.0, array())
In seq c (array(), 0.1)
In seq v (2.0, array())
In comb c1 (array(), 0.2)
In comb c2 (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
# 第二个partition的输出
In seq c (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
In seq v (2.0, array())
In seq c (array(), 0.1)
In seq v (1.0, array())
In comb c1 (array(), 0.2)
In comb c2 (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
# combine的输出
In comb c1 (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
In comb c2 (array(), 0.2)
In comb c1 (array(), 0.2)
In comb c2 (array(), 0.2)
# 最后结果输出
0.4
从上面可以看出,aggregate的作用大概是对每个partition应用seqOp操作和combOp操作,然后对这个由各个partitions的结果构成的结果空间再做一次combOp。zeroValue是初值,seqOp输入U和T,U的初值就是zeroValue,T是RDD的element,seqOp对每个element操作的结果作为下一个element的U。combOp的输入是U1,U2,其中U1的初值是zeroValue,U2是各个partitions的输出。
让我不解的是,为什么在每个partition计算完seqOp之后,都要做一次combOp(U1, U2)操作,且U1是seqOp最终的计算结果,U2则是zeroValue?
谢谢楼主的分享 很优秀的楼主
页:
[1]