Spark aggregate介绍
2、什么是Spark aggregate?
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.
函数原型:aggregate(self, zeroValue, seqOp, combOp)
def test_aggregate(sc):
def seq(c, v):
print 'In seq c ' + str(c)
print 'In seq v ' + str(v)
return c+v, c+0.1
def comb(c1, c2):
print 'In comb c1 ' + str(c1)
print 'In comb c2 ' + str(c2)
return c1-c2, c1+c2
data = sc.parallelize([(1.0, np.arange(10)), (2.0, np.arange(10)), (2.0, np.arange(10)), (1.0, np.arange(10))],2)
gradientSum, lossSum = data.aggregate((np.ones(10),0), seq, comb)
print gradientSum, lossSum
if __name__ == '__main__':
sc = SparkContext(appName='Test')
# 第一个partition的输出
In seq c (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
In seq v (1.0, array())
In seq c (array(), 0.1)
In seq v (2.0, array())
In comb c1 (array(), 0.2)
In comb c2 (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
# 第二个partition的输出
In seq c (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
In seq v (2.0, array())
In seq c (array(), 0.1)
In seq v (1.0, array())
In comb c1 (array(), 0.2)
In comb c2 (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
# combine的输出
In comb c1 (array([ 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]), 0)
In comb c2 (array(), 0.2)
In comb c1 (array(), 0.2)
In comb c2 (array(), 0.2)
# 最后结果输出
让我不解的是,为什么在每个partition计算完seqOp之后,都要做一次combOp(U1, U2)操作,且U1是seqOp最终的计算结果,U2则是zeroValue?
谢谢楼主的分享 很优秀的楼主