aggregate的官方描述是:
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.
(本文译:聚合每个分区的元素,然后所有的分区结果,使用给定的组合功能和一个中立的“零值。“功能op(t1,t2)允许修改t1和它作为结果返回值以避免对象分配;然而,它不应该修改t2。第一个函数可以返回不同的结果类型(seqOp),U,比这个抽样的类型。因此,我们需要一个操作合并T成一个和一个操作合并两个)
函数原型:aggregate(self, zeroValue, seqOp, combOp)
现在先用一段代码测试它的输出:
def test_aggregate(sc):
def seq(c, v):
print 'In seq c ' + str(c)
print 'In seq v ' + str(v)
return c[0]+v[1], c[1]+0.1
def comb(c1, c2):
print 'In comb c1 ' + str(c1)
print 'In comb c2 ' + str(c2)
return c1[0]-c2[0], c1[1]+c2[1]
data = sc.parallelize([(1.0, np.arange(10)), (2.0, np.arange(10)), (2.0, np.arange(10)), (1.0, np.arange(10))],2)