howtodown 发表于 2016-9-5 18:50:39

Spark之Stage详解【专题】

问题导读



1.什么是spark stage?
2.stage如何划分?
3.Spark 多个Stage执行是串行执行的么?

static/image/hrline/4.gif



什么是spark stage

什么是spark stage,这里没有明确的概念,我们可以从下面的内容中来提取它的含义。

1.在DAGScheduler中,会将每个job划分成多个stage,每个stage会创建一批task并且计算task的最佳位置,一个task对应一个partition。DAGScheduler的stage划分算法如下:它会从触发action操作的那个RDD开始往前倒推,首先会为最后一个RDD创建一个stage,然后往前倒推的时候,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的stage,那个RDD就是新的stage的最后一个RDD。然后依次类推,继续往前倒推,根据窄依赖或者宽依赖进行stage的划分,直到所有的RDD全部遍历完成为止。
创建完task之后,stage会将一批task用TaskSet来封装,提交给TaskScheduler进行分配,最后发送到Executor执行。


2.一个Job可以有一个或者多个Stage,Stage划分的依据就是宽依赖,产生宽依赖的算子:reduceByKey、groupByKey等等

3.Spark在运行时会把Stage包装成任务提交,有父Stage的Spark会先提交父Stage。

4.Spark Application在遇到action算子时,SparkContext会生成Job,并将构成DAG图将给DAG Scheduler解析成Stage。

上面都是说了相关Stage与job相关的内容,这里说下个人理解。

这有点符合美国人的思维,美国人会把一个工作细分为每个步骤。
所以spark job,所以老美就细分为每个stage。如果接触过hive,其实他也是有stage的。

如何划分stage

对于stage有了了解,接着该如何划分stage。
下面例子可参考

1.从hdfs中读取文件后,创建 RDD 对象
2.DAGScheduler模块介入运算,计算RDD之间的依赖关系。RDD之间的依赖关系就形成了DAG
3.每一个JOB被分为多个Stage,划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销。以下面一个按 A-Z 首字母分类,查找相同首字母下不同姓名总个数的例子来看一下 RDD 是如何运行起来的。
注:mapValues(function) 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。步骤 1 :创建 RDD 上面的例子除去最后一个 collect 是个动作,不会创建 RDD 之外,前面四个转换都会创建出新的 RDD 。因此第一步就是创建好所有 RDD( 内部的五项信息 ) 。
步骤 2 :创建执行计划 Spark 会尽可能地管道化,并基于是否要重新组织数据来划分阶段 (stage) ,例如本例中的 groupBy() 转换就会将整个执行计划划分成两阶段执行。最终会产生一个 DAG(directed acyclic graph ,有向无环图 ) 作为逻辑执行计划。步骤 3 :调度任务将各阶段划分成不同的任务(task),每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。因为下一阶段的第一个转换一定是重新组织数据的,所以必须等当前阶段所有结果数据都计算出来了才能继续。 +
个人总结:划分stage的依据是数据是否需要进行重组。
下面从源码角度来分析:

源码Stage划分





      前面提到,对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。那么我们先来看下代码:


// 处理Job提交的函数
private def handleJobSubmitted(jobId: Int,
      finalRDD: RDD,
      func: (TaskContext, Iterator) => _,
      partitions: Array,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
   
    // 利用最后一个RDD(finalRDD),创建最后的stage对象:finalStage
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // 根据最后一个RDD获取最后的stage
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
    }

    // 创建一个ActiveJob对象
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
   
    // 清除RDD分区位置缓存
    // private val cacheLocs = new HashMap]]
    clearCacheLocs()
   
    // 调用logInfo()方法记录日志信息
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
   
    // 将jobId-->ActiveJob的对应关系添加到HashMap类型的数据结构jobIdToActiveJob中去
    jobIdToActiveJob(jobId) = job
   
    // 将ActiveJob添加到HashSet类型的数据结构activeJobs中去
    activeJobs += job
   
    finalStage.setActiveJob(job)
   
    //2 获取stageIds列表
    // jobIdToStageIds存储的是jobId--stageIds的对应关系
    // stageIds为HashSet类型的
    // jobIdToStageIds在上面newResultStage过程中已被处理
    val stageIds = jobIdToStageIds(jobId).toArray
    // stageIdToStage存储的是stageId-->Stage的对应关系
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
   
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
   
    // 提交最后一个stage
    submitStage(finalStage)

    // 提交其他正在等待的stage
    submitWaitingStages()
}

    这个handleJobSubmitted()方法一共做了这么几件事:
      第一,调用newResultStage()方法,生成Stage,包括最后一个Stage:ResultStage和前面的Parent Stage:ShuffleMapStage;
      第二,创建一个ActiveJob对象job;
      第三,清除RDD分区位置缓存;
      第四,调用logInfo()方法记录日志信息;
      第五,维护各种数据对应关系涉及到的数据结构:
      (1)将jobId-->ActiveJob的对应关系添加到HashMap类型的数据结构jobIdToActiveJob中去;
      (2)将ActiveJob添加到HashSet类型的数据结构activeJobs中去;
      第六,提交Stage;
      下面,除了提交Stage留在第三阶段外,我们挨个分析第二阶段的每一步。
      首先是调用newResultStage()方法,生成Stage,包括最后一个Stage:ResultStage和前面的Parent Stage:ShuffleMapStage。代码如下:



/**
   * Create a ResultStage associated with the provided jobId.
   * 用提供的jobId创建一个ResultStage
   */
private def newResultStage(
      rdd: RDD,
      func: (TaskContext, Iterator) => _,
      partitions: Array,
      jobId: Int,
      callSite: CallSite): ResultStage = {
   
    // 根据fianl RDD获取parent stage及id,这个id为ResultStage的stageId
    val (parentStages: List, id: Int) = getParentStagesAndId(rdd, jobId)
   
    // 创建一个ResultStage,即为整个Job的finalStage
    // 参数:id为stage的id,rdd为stage中最后一个rdd,func为在分区上执行的函数操作,
    // partitions为rdd中可以执行操作的分区,parentStages为该stage的父stages,jobId为该stage
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
   
    // 将stage加入到stageIdToStage中
    stageIdToStage(id) = stage
   
    // 更新数据结构jobIdToStageIds
    updateJobIdStageIdMaps(jobId, stage)
   
    // 返回stage
    stage
}

首先,根据fianl RDD获取parent stages及id,这个id为ResultStage的stageId;
      其次,创建一个ResultStage,即为整个Job的finalStage;
      然后,将stage加入到数据结构stageIdToStage中;
      接着,更新数据结构jobIdToStageIds;
      最后,返回这个ResultStage。
      我们一步步来看。首先调用getParentStagesAndId()方法,根据fianl RDD获取parent stages及id,这个id为ResultStage的stageId。代码如下:


/**
   * Helper function to eliminate some code re-use when creating new stages.
   */
private def getParentStagesAndId(rdd: RDD, firstJobId: Int): (List, Int) = {
    // 获取parent stages
    val parentStages = getParentStages(rdd, firstJobId)
   
    // 获取下一个stageId,为AtomicInteger类型,getAndIncrement()能保证原子操作
    val id = nextStageId.getAndIncrement()
   
    // 返回parentStages和id
    (parentStages, id)
}

      这个id即为下一个stageId,通过AtomicInteger类型的getAndIncrement()获得,能够保证原子性。继续分析getParentStages()方法,通过它来获取final RDD的parent stage。代码如下:


/**
   * Get or create the list of parent stages for a given RDD.The new Stages will be created with
   * the provided firstJobId.
   */
private def getParentStages(rdd: RDD, firstJobId: Int): List = {
    // 用HashSet存储parents stage
    val parents = new HashSet
   
    // 用HashSet存储已经被访问过的RDD
    val visited = new HashSet]
   
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 存储需要被处理的RDD。Stack中得RDD都需要被处理
    val waitingForVisit = new Stack]
   
    // 定义一个visit函数,根据传入的RDD,如果之前没有处理过,标记为已处理,循环此RDD的依赖关系dependencies
    // 如果是ShuffleDependency,获取其parents;如果不是,则说明为同一stage,并压入Stack:waitingForVisit顶部
    def visit(r: RDD) {
      if (!visited(r)) {// visited中没有的话
      // 将RDD r加入到visited,表示已经处理过了
      visited += r
      
      // Kind of ugly: need to register RDDs with the cache here since
      // we can't do it in its constructor because # of partitions is unknown
      // 循环Rdd r的依赖关系
      for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency =>
            // 如果是ShuffleDependency,获取其parents,添加到parents中去
            parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
            // 否则,属于同一个stage,压入Stack顶部,后续再递归处理
            waitingForVisit.push(dep.rdd)
          }
      }
      }
    }
   
    // 将rdd压入Stack顶部
    waitingForVisit.push(rdd)
   
    // 循环waitingForVisit,弹出每个rdd
    while (waitingForVisit.nonEmpty) {
      // 调用visit()方法,处理每个rdd
      visit(waitingForVisit.pop())
    }
   
    // 返回得到的parents列表
    parents.toList
}

getParentStages()方法在其内部定义了如下数据结构:
      parents:用HashSet存储parents stages,即finalRDD的所有parent stages,也就是ShuffleMapStage;
      visited:用HashSet存储已经被访问过的RDD,在RDD被处理前先存入该HashSet,保证存储在里面的RDD将不会被重复处理;
      waitingForVisit:存储需要被处理的RDD。Stack中得RDD都需要被处理。
      getParentStages()方法在其内部还定义了一个visit()方法,传入一个RDD,如果之前没有处理过,标记为已处理,并循环此RDD的依赖关系dependencies,如果是ShuffleDependency,调用getShuffleMapStage()方法获取其parent stage;如果不是,则说明为同一stage,并压入Stack:waitingForVisit顶部,等待后续通过visit()方法处理。所以,getParentStages()方法从finalRDD开始,逐渐往上查找,如果是窄依赖,证明在同一个Stage中,继续往上查找,如果是宽依赖,通过getShuffleMapStage()方法获取其parent stage,就能得到整个Job中所有的parent stages,也就是ShuffleMapStage。
      接下来,我们看下getShuffleMapStage()方法的实现。代码如下:



/**
   * Get or create a shuffle map stage for the given shuffle dependency's map side.
   * 针对给定的shuffle dependency的map端,获取或者创建一个ShuffleMapStage
   */
private def getShuffleMapStage(
      shuffleDep: ShuffleDependency,
      firstJobId: Int): ShuffleMapStage = {
   
    // 从数据结构shuffleToMapStage中根据shuffleId获取,如果有直接返回,否则
    // 获取ShuffleDependency中的rdd,调用getAncestorShuffleDependencies()方法,
    // 循环每个parent,调用newOrUsedShuffleStage()方法,创建一个新的ShuffleMapStage,
    // 并加入到数据结构shuffleToMapStage中去
    //
    // 它的定义为:private val shuffleToMapStage = new HashMap
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage // 有则直接返回
      case None => // 没有
      // We are going to register ancestor shuffle dependencies
      // 调用getAncestorShuffleDependencies()方法,传入ShuffleDependency中的rdd
      
      // 发现还没有在shuffleToMapStage中注册的祖先shuffle dependencies
      getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // 并循环返回的parents,调用newOrUsedShuffleStage()方法,创建一个新的ShuffleMapStage,
          // 并加入到数据结构shuffleToMapStage中去
          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
      }
      
      // Then register current shuffleDep
      // 最后注册当前shuffleDep,并加入到数据结构shuffleToMapStage中,返回stage
      val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
      shuffleToMapStage(shuffleDep.shuffleId) = stage
      stage
    }
}

通过代码我们可以发现,它和getParentStages()方法的代码风格非常相似。在其内部也定义了三个数据结构:
      parents:存放parents的栈,即Stack,用于存放入参RDD的在shuffleToMapStage中未注册过的祖先shuffle dependencies;
      visited:存放已经处理过的RDD的哈希表,即HashSet;
      waitingForVisit:存放等待被处理的RDD的栈,即Stack;
      定义了一个visit()方法,入参为RDD,针对传入的RDD,如果之前没有处理过则标记为已处理,并循环RDD的所有依赖,如果是如果是ShuffleDependency,并且其依赖的shuffleId在shuffleToMapStage中没有,添加到parents中,否则直接跳过,最后无论为何种Dependency,都将该dependence的rdd压入waitingForVisit栈顶部,等待后续处理。
      接下来,我们再看下newOrUsedShuffleStage()方法,其代码如下:


/**
   * Create a shuffle map Stage for the given RDD.The stage will also be associated with the
   * provided firstJobId.If a stage for the shuffleId existed previously so that the shuffleId is
   * present in the MapOutputTracker, then the number and location of available outputs are
   * recovered from the MapOutputTracker
   *
   * 为给定的RDD创建一个ShuffleStage
   */
private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency,
      firstJobId: Int): ShuffleMapStage = {
   
    // 从shuffleDep中获取RDD
    val rdd = shuffleDep.rdd
   
    // 获取RDD的分区个数,即未来的task数目
    val numTasks = rdd.partitions.length
   
    // 构造一个ShuffleMapStage实例
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
   
   
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // 如果mapOutputTracker中存在
   
      // 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      
      // 反序列化
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      
      // 循环
      (0 until locs.length).foreach { i =>
      if (locs(i) ne null) {
          // locs(i) will be null if missing
          // 将
          stage.addOutputLoc(i, locs(i))
      }
      }
    } else {
      // 如果mapOutputTracker中不存在,注册一个
   
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      // 注册的内容为
      // 1、根据shuffleDep获取的shuffleId;
      // 2、rdd中分区的个数
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
}

这个方法的主要完成了以下两件事:
      1、构造一个ShuffleMapStage实例stage;
      2、判断是否在mapOutputTracker中存在:
         (1)如果不存在,调用mapOutputTracker的registerShuffle()方法注册一个,注册的内容为根据shuffleDep获取的shuffleId和rdd中分区的个数;
         (2)如果存在,根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象,反序列化后循环,逐个添加到stage中。
      紧接着,看下newShuffleMapStage()方法,其代码如下:



/**
   * Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
   * newOrUsedShuffleStage.The stage will be associated with the provided firstJobId.
   * Production of shuffle map stages should always use newOrUsedShuffleStage, not
   * newShuffleMapStage directly.
   */
private def newShuffleMapStage(
      rdd: RDD,
      numTasks: Int,
      shuffleDep: ShuffleDependency,
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
   
    // 获得parentStages和下一个stageId
    val (parentStages: List, id: Int) = getParentStagesAndId(rdd, firstJobId)
   
    // 创建一个ShuffleMapStage
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      firstJobId, callSite, shuffleDep)

    // 将stage加入到数据结构stageIdToStage
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(firstJobId, stage)
    stage
}

可以发现,这个方法也调用了getParentStagesAndId()方法,这样,就形成了一个递归,按照RDD的依赖关系,由后往前,逐渐生成Stage。代码剩余的部分就是创建一个ShuffleMapStage,并将stage加入到数据结构stageIdToStage,以及调用updateJobIdStageIdMaps()方法更新相关数据结构。这个updateJobIdStageIdMaps()方法留待下面分析。
      下面,简单看下mapOutputTracker注册的代码。


// 注册shuffle
def registerShuffle(shuffleId: Int, numMaps: Int) {
    // 将shuffleId、numMaps大小和MapStatus类型的Array数组的映射关系,放入mapStatuses中
    // mapStatuses为TimeStampedHashMap]类型的数据结构
    if (mapStatuses.put(shuffleId, new Array(numMaps)).isDefined) {
      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
    }
}

很简单,将shuffleId、numMaps大小和MapStatus类型的Array数组的映射关系,放入mapStatuses中,mapStatuses为TimeStampedHashMap]类型的数据结构。
       经历了这多又长又大篇幅的叙述,现在返回newResultStage()方法,在通过getParentStagesAndId()方法获取parent stages及其result stage的id后,紧接着创建一个ResultStage,并将stage加入到stageIdToStage中,最后在调用updateJobIdStageIdMaps()更新数据结构jobIdToStageIds后,返回stage。
      下面,简单看下updateJobIdStageIdMaps()方法。代码如下:



/**
   * Registers the given jobId among the jobs that need the given stage and
   * all of that stage's ancestors.
   */
private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
    // 定义一个函数updateJobIdStageIdMapsList()
    def updateJobIdStageIdMapsList(stages: List) {
      
      if (stages.nonEmpty) {
      
      // 获取列表头元素
      val s = stages.head
      
      // 将jobId添加到Stage的jobIds中
      s.jobIds += jobId
      
      // 更新jobIdToStageIds,将jobId与stageIds的对应关系添加进去
      jobIdToStageIds.getOrElseUpdate(jobId, new HashSet()) += s.id
      
      val parents: List = getParentStages(s.rdd, jobId)
      
      val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
      updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
      }
    }
    // 调用函数updateJobIdStageIdMapsList()
    updateJobIdStageIdMapsList(List(stage))
}

    这个方法的实现比较简单,在其内部定义了一个函数updateJobIdStageIdMapsList(),首选传入result stage,将jobId添加到stage的jobIds中,更新jobIdToStageIds,将jobId与stageIds的对应关系添加进去,然后根据给定stage的RDD获取其parent stages,过滤出不包含此JobId的parents stages,再递归调用updateJobIdStageIdMapsList()方法,直到全部stage都处理完。
      至此,Stage划分大体流程已分析完毕


spark stage的划分,我们可能感觉了解的不够,更多还需我们多找资料,多用心。下面我们继续,spark Stage是什么,是如何划分的,那么spark stage是串行执行吗?


Spark 多个Stage执行是串行执行的么?


看如下的代码:



这里的话,我们构建了两个输入(input1,input2),input2带有一个reduceByKey,所以会产生一次Shuffle,接着进行Join,会产生第二次Shuffle(值得注意的是,join 不一定产生新的Stage,我通过强制变更join后的分区数让其发生Shuffle ,然后进行Stage的切分)。

所以这里一共有两次Shuffle,产生了四个Stage。 下图是Spark UI上呈现的。那这四个Stage的执行顺序是什么呢?




再次看Spark UI上的截图:




我们仔细分析下我们看到现象:

首先我们看到 Stage0,Stage 1 是同时提交的。

Stage0 只有两条记录,并且设置了两个Partition,所以一次性就能执行完,也就是3s就完成了。

Stage1 有四个分区,六条记录,记录数最多的分区是两条,也就是需要执行10秒,如果完全能并行执行,也就是最多10s。但是这里消耗了13秒,为什么呢?点击这个13秒进去看看:





我们看到有两个task 延迟了3秒后才并行执行的。 根据上面的代码,我们只有四颗核供Spark使用,Stage0 里的两个任务因为正在运行,所以Stage1 只能运行两个任务,等Stage0 运行完成后,Stage1剩下的两个任务才接着运行。

之后Stage2 是在Stage1 执行完成之后才开始执行,而Stage3是在Stage2 执行完成才开始执行。

现在我们可以得出结论了:

Stage 可以并行执行的
存在依赖的Stage 必须在依赖的Stage执行完成后才能执行下一个Stage
Stage的并行度取决于资源数
我么也可以从源码的角度解释这个现象:





我们看到如果一个Stage有多个依赖,会深度便利,直到到了根节点,如果有多个根节点,都会通过submitMissingTasks 提交上去运行。当然Spark只是尝试提交你的Tasks,能不能完全并行运行取决于你的资源数了。

这里再贡献一张画了很久的示意图,体现了partition,shuffle,stage,RDD,transformation,action,source 等多个概念。





更多:
Spark之任务调度
http://www.aboutyun.com/forum.php?mod=viewthread&tid=8548


Spark内核介绍:Spark在运行时会把Stage包装成任务提交
http://www.aboutyun.com/home.php?mod=space&uid=19&do=blog&id=3094


Spark任务中Stage划分算法及Task任务本地性算法原理
http://www.aboutyun.com/forum.php?mod=viewthread&tid=19729


quanxiaofei 发表于 2016-9-7 15:24:09

辛苦楼主.每天都默默奉献

xuliang123789 发表于 2016-9-7 09:13:11

谢谢楼主,学习一下,赞~~

xianzhi558 发表于 2016-9-12 17:11:18

深度理解spark

QWE123W 发表于 2020-8-19 09:44:00

能够看懂这篇文章的才算懂SPARK
页: [1]
查看完整版本: Spark之Stage详解【专题】