在foreach中,一般就是拿到一条数据处理一条,如果后面处理可能丢失。
详解如下:
################################
foreach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容,
这种处理你并不知道这个iterator的foreach什么时候结果,只能是foreach的过程中,你得到一条数据,就处理一条数据.
由下面的红色部分可以看出,foreach操作是直接调用了partition中数据的foreach操作:
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
示例说明:
val list = new ArrayBuffer()
Rdd.foreach(record => {
list += record
If (list.size >= 10000) {
list.flush
}
})
上面这段示例代码中,如果会存在一个问题,迭代的最后,list的结果可能还没有达到10000条,这个时候,
你在内部的处理的flush部分就不会执行,也就是迭代的最后如果没有达到10000的数据就会丢失.
所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})
|