分享

[Spark源码剖析] DAGScheduler划分stage

yr123 发表于 2015-8-6 16:59:36 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 14649
问题导读
1、了解stage的相关知识点。
2、stage的产生和提交的步骤有哪些?
3、DAGScheduler如何划分stage?
4、shuffleToMapStage的key的含义是什么?key和value又是怎么确定的呢?
5、本文源码环环相扣,要一步一步地解读。





本文基于Spark 1.3.1
先上一些stage相关的知识点:
  • DAGScheduler将Job分解成具有前后依赖关系的多个stage
  • DAGScheduler是根据ShuffleDependency划分stage的
  • stage分为ShuffleMapStage和ResultStage
  • 一个stage包含多个tasks,task的个数即该stage的finalRDD的partition数
  • 一个stage中的task完全相同,ShuffleMapStage包含的都是ShuffleMapTask;ResultStage包含的都是ResultTask
stage的产生和提交的大致步骤如下:
RDD.action
        SparkContext.runJob
        DAGScheduler.runJob
        DAGScheduler.submitJob
        DAGScheduler.eventProcessLoop.post( JobSubmitted(...) )
        DAGScheduler.handleJobSubmitted
        DAGScheduler.submitStage
可以看到,在DAGScheduler内部通过post一个JobSubmitted事件来触发Job的提交
DAGScheduler.eventProcessLoop.post( JobSubmitted(...) )
DAGScheduler.handleJobSubmitted
既然这两个方法都是DAGScheduler内部的实现,为什么不直接调用函数而要这样“多此一举”呢?我猜想这是为了保证整个系统事件模型的完整性。

DAGScheduler.handleJobSubmitted部分源码及如下:
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      //< 创建finalStage可能会抛出一个异常, 比如, jobs是基于一个HadoopRDD的但这个HadoopRDD已被删除
      finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
    } catch {
      case e: Exception =>
        return
    }

    //< 此处省略n行代码
  }
handleJobSubmitted首先通过调用newResultStage来创建了一个ResultStage,即finalStage。
这里先给出一个结论:在finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)调用完之后,stage的划分其实已经完成,并已将所有的ShuffleMapStage保存在成员shuffleToMapStage: HashMap[Int, ShuffleMapStage]中。
shuffleToMapStage的key的含义是什么?key和value又是怎么确定的呢?往下看~

跟进到DAGScheduler.newResultStage
  private def newResultStage(
      rdd: RDD[_],
      numTasks: Int,
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
DAGScheduler.newResultStage首先调用val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId),这个调用看起来像是要先确定好该ResultStage依赖的所有父stages。

接下来,调用val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)来创建DAG图中唯一的一个ResultStage,new ResultStage值得注意的有两点 :
1. ResultStage构造函数参数numTasks的值等于newResultStage调用时传入的finalRDD的partitions.size,那么我们就知道了“对于ResultStage,task数目就是它finalRDD的partition个数”。
2. new ResultStage时会使用val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)获得的该stage依赖的父stages来构造ResultStage,说明了ResultStage对象内部保存了它依赖的stages,可以在需要的时候使用。

创建ResultStage的调用比较简单,得到ResultStage对象后存入成员stageIdToStage中。接下来,看看getParentStagesAndId都干了点啥
private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    (parentStages, id)
  }

跟到getParentStages里
  //< getParentStages函数以参数rdd为起点,通过一层一层遍历依赖来划分stage,划分stage的依据是ShuffleDependency,即宽依赖。
  //< 这个函数的实现方式非常巧妙
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    //< 通过vist一级一级vist得到的父stage
    val parents = new HashSet[Stage]
    //< 已经visted的rdd
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r

        for (dep <- r.dependencies) {
          dep match {
            //< 若为宽依赖,调用getShuffleMapStage
            case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              //< 若为窄依赖,将该依赖中的rdd加入到待vist列表,以保证能一级一级往上vist,直至遍历整个DAG图
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }
case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, jobId)若为宽依赖,则调用getShuffleMapStage并将返回的stage加入到ResultStage依赖的父stage中,那么getShuffleMapStage又是干嘛的?

private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage
      case None =>
        // We are going to register ancestor shuffle dependencies
        registerShuffleDependencies(shuffleDep, jobId)

        //< 然后创建新的ShuffleMapStage
        val stage = newOrUsedShuffleStage(shuffleDep, jobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage

        stage
    }
  }
该函数首先判断成员shuffleToMapStage中是否包含了参数shuffleDep.shuffleId为key的stage,在划分stage的过程中,在以ResultStage的finalRDD为起点向前遍历RDD依赖的过程中,每次碰到的宽依赖都是不同的,它们的id自然也是不同,所以调用到这个函数的时候,都会进入到 case None的分支中。

该分支中首先调用了registerShuffleDependencies(shuffleDep, jobId),该函数比较关键,为了方便阅读,我将该函数的实现及其调用的函数的源码及注释都贴在一起,如下:
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
    val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
    while (parentsWithNoMapStage.nonEmpty) {
      //< 出栈的其实是shuffleDep的前一个宽依赖,且shuffleToMapStage不包含以该出栈宽依赖id为key的元素
      val currentShufDep = parentsWithNoMapStage.pop()
      //< 创建新的ShuffleMapStage
      val stage = newOrUsedShuffleStage(currentShufDep, jobId)
      //< 将新创建的ShuffleMapStage加入到shuffleId -> ShuffleMapStage映射关系中
      shuffleToMapStage(currentShufDep.shuffleId) = stage
    }
  }

  //< 返回还没注册到shuffleToMapStage的宽依赖栈。以参数RDD为起点,向前遍历整个RDD依赖图,将碰到的宽依赖依次压到栈中;
  // 对于宽依赖的id来说,在DAG图从前往后(即我们看起来的从左到右)的过程中递增的
  //< 而该函数从后向前依次将宽依赖对象压入栈中,那么压栈时宽依赖id是递减的,registerShuffleDependencies调用中出栈时的宽依赖id是递增的,即stage得id时递增的
  private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    //< rdd依赖关系中的所有宽依赖,rdd为起点,从后往前推,越早碰到的宽依赖越早压入栈,所以入栈的shuffleDep.id是递减的
    val parents = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]

    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
                //< 碰到宽依赖,将该宽依赖压入栈partents
                parents.push(shufDep)
              }

              waitingForVisit.push(shufDep.rdd)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }

    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents
  }
如以上代码及说明所示,getAncestorShuffleDependencies返回宽依赖栈。
registerShuffleDependencies依次取出栈中宽依赖,并以该宽依赖和jobId创建新的ShuffleStageval stage = newOrUsedShuffleStage(currentShufDep, jobId),并将该新创建的ShuffleMapStage注册到shuffleToMapStage: HashMap[Int, ShuffleMapStage],其中shuffleToMapStage的key为宽依赖的shuffleId,value为对应宽依赖。

那么,新建ShuffleMapStage的函数newOrUsedShuffleStage又做了点什么呢?继续看它的实现,如下:
//< 为给定的RDD创建一个ShuffleMapStage。该stage会和给定的jobId关联起来
  private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.size
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
    //< 若该shuffleDep.shulleId对应的, stage已经在MapOutputTracker中存在,那么可用的输出的数量及位置将从MapOutputTracker恢复
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      for (i <- 0 until locs.size) {
        stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
      }
      stage.numAvailableOutputs = locs.count(_ != null)
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      //< 否则使用shuffleDep.shuffleId, rdd.partitions.size在mapOutputTracker中注册,这会在shuffle阶段reducer从shuffleMap端fetch数据起作用
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
    }
    stage
  }
从val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)可以看出,对于ShuffleMapStage来说,task的个数也是其finalRDD的partition的个数。

以上,完成了stage的划分并将ShuffleMapStage注册到了shuffleToMapStage中,这个成员会在后面的提交stage时用到。提交stage及task的调度会单独再写两篇blog说明。
本文若有纰漏或不妥之处,欢迎指正。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条