分享

关于take(num)的一些疑问

邓立辉 发表于 2015-12-26 10:44:42 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 11586
take(num)会取出n条元素。
因为take是action所以会驱动前边依赖的stage和transfrom运行,我的疑问是他会驱动前边的rdd里所有的元素都运算么?还是只驱动num个元素来运算?
比如rddA:1,2,3,4,5,6
rddB是由rddA做map转化过来的(每个元素都+1):2,3,4,5,6,7
如果我调rddB.take(2);那么rddA转成rddB时3,4,5,6会被计算么?如果计算了岂不是白白浪费性能。


还有一个问题是怎么将rdd截断转从一个小的rdd。比如rddA中有6个元素,但是我只要3个元素来处理,其他的元素忽略掉。

已有(7)人评论

跳转到指定楼层
goldtimes 发表于 2015-12-26 13:14:42
take(n)        返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素
谁调用take,就取谁的前N个元素。

如果想将rdd转换,也就是楼主说的:
将rdd截断转从一个小的rdd

可以使用
filter(func)

Transformation

更多查看:
Spark RDD详解
http://www.aboutyun.com/thread-7214-1-1.html



推荐:

spark RDD Transformations和Actions区别是什么?
http://www.aboutyun.com/thread-14203-1-1.html
RDD操作详解1——Transformation和Actions概况
http://www.aboutyun.com/thread-14331-1-1.html


回复

使用道具 举报

邓立辉 发表于 2015-12-28 11:32:58
goldtimes 发表于 2015-12-26 13:14
take(n)        返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行, ...

我想过用filter,但是没想出来应该怎么取前n个的方法,
回复

使用道具 举报

regan 发表于 2015-12-28 11:36:27

探索Spark源码---action触发作业提交(DAGSchedular,TaskSchedular,TaskSetManager)

本帖最后由 regan 于 2015-12-28 11:37 编辑

你提到的这个问题,哥哥就带你看一下源代码吧:take是一个action操作,在take中会调用sparkContext的runjob方法,take如下:
def take(num: Int): Array[T] = withScope {
  if (num == 0) {
    new Array[T](0)
  } else {
    val buf = new ArrayBuffer[T]
    val totalParts = this.partitions.length
    var partsScanned = 0
    while (buf.size < num && partsScanned < totalParts) {
      // The number of partitions to try in this iteration. It is ok for this number to be
      // greater than totalParts because we actually cap it at totalParts in runJob.
      var numPartsToTry = 1
      if (partsScanned > 0) {
        // If we didn't find any rows after the previous iteration, quadruple and retry.
        // Otherwise, interpolate the number of partitions we need to try, but overestimate
        // it by 50%. We also cap the estimation in the end.
        if (buf.size == 0) {
          numPartsToTry = partsScanned * 4
        } else {
          // the left side of max is >=1 whenever partsScanned >= 2
          numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
          numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
        }
      }

      val left = num - buf.size
      val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

      res.foreach(buf ++= _.take(num - buf.size))
      partsScanned += numPartsToTry
    }

    buf.toArray
  }
}
我给你大概讲一下这段代码吧,首先代码中会得到当前RDD的partitions集合的大小,然后遍历这些partitions,知道取回的结果满足你take()方法中传入的值,从上面代码中的while循环中,我们可以清楚的看到,当buf的size等于take传入的参数的时候,while循环退出,在循环中,可能会多次调用sc.runJob提交作业,直到返回的结果填满以take参数作为大小的arrayBuffer。在这个过程中,如果你传入的参数较小,那么可能遍历一个partion就满足了你的限制了,因此只需提交一次job,计算一个partion。如果你take参数很大,一个partion不能满足,那么会继续执行while循环,以另一个partion数据提交job,这样Job提交可能有多次,直到满足take限制。这是你提到的第一个问题。


第二个问题:
        可以做transaction操作,如filter,map等操作,过滤掉一些数据满足
回复

使用道具 举报

regan 发表于 2015-12-28 12:01:54
用zipwithIndex,然后filter    index为前n的值既可以取出前n条值
回复

使用道具 举报

goldtimes 发表于 2015-12-28 12:06:59
邓立辉 发表于 2015-12-28 11:32
我想过用filter,但是没想出来应该怎么取前n个的方法,

filter(func)可以形成新的rdd,新的rdd的基础上在take
回复

使用道具 举报

邓立辉 发表于 2015-12-28 15:44:57
regan 发表于 2015-12-28 12:01
用zipwithIndex,然后filter    index为前n的值既可以取出前n条值

哇靠,非常感谢,给哥哥跪了,看来学scala,看源码才是王道
回复

使用道具 举报

regan 发表于 2015-12-28 16:39:37
邓立辉 发表于 2015-12-28 15:44
哇靠,非常感谢,给哥哥跪了,看来学scala,看源码才是王道

对头,什么事情不能光停别人说,自己去看看就知道了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条