xioaxu790 发表于 2014-7-31 19:29:03

Spark Core源码分析: Spark任务执行模型

本帖最后由 xioaxu790 于 2014-7-31 19:31 编辑

问题导读
1、如何提交TaskSet给TaskScheduler执行?
2、BlockManager如何通过BlockId获得数据?
3、源码的阅读该如何入口?

static/image/hrline/4.gif


DAGScheduler
面向stage的调度层,为job生成以stage组成的DAG,提交TaskSet给TaskScheduler执行。
每一个Stage内,都是独立的tasks,他们共同执行同一个compute function,享有相同的shuffledependencies。DAG在切分stage的时候是依照出现shuffle为界限的。
private
class DAGScheduler(
    taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv)
extends Logging {

// Actor模式收取发来的DAGSchedulerEvent,并进行processEvent
private var eventProcessActor: ActorRef = _

private val nextJobId = new AtomicInteger(0)
private def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)

// 一系列信息维护,很清晰
private val jobIdToStageIds = new HashMap]
private val stageIdToJobIds = new HashMap]
private val stageIdToStage = new HashMap
private val shuffleToMapStage = new HashMap
private val jobIdToActiveJob = new HashMap
private val resultStageToJob = new HashMap
private val stageToInfos = new HashMap

// 不同状态stages的维护,很清晰
// Stages we need to run whose parents aren't done
private val waitingStages = new HashSet

// Stages we are running right now
private val runningStages = new HashSet

// Stages that must be resubmitted due to fetch failures
private val failedStages = new HashSet

// Missing tasks from each stage
private val pendingTasks = new HashMap]]

private val activeJobs = new HashSet

// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap]]

在start()方法中会初始化Actor,然后接收DAGSchedulerEvent处理。Scheduler会在SparkContext里start起来。

Event处理
源码的阅读入口:可以根据processEvent(event:DAGSchedulerEvent)方法展开。
处理的事件包括这么一些:



Submit Job
JobSubmitted事件:



提交任务的事件传入参数如下
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)

处理过程可以拆成三步看,每一步里面涉及的具体逻辑在下面补充展开
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
本次newStage()操作可以对应新的result stage或者shuffle stage。返回Stage类(里面记录一些信息)。Stage类会传入Option]参数,内部有一个isShuffleMap变量,以标识该Stage是shuffle or result。
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

ActiveJob类也是记录一些信息的类,可以当作是一个VO类
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// Compute very short actions like first() or take()   
// with no parent stages locally.
listenerBus.post(SparkListenerJobStart(
job.jobId, Array(), properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
SparkListenerJobStart(
job.jobId, jobIdToStageIds(jobId).toArray, properties))
submitStage(finalStage)
}

首先判断stage没有父亲依赖,且partition为1的话,就执行本地任务。否则,submitStage。

submitStage的逻辑为,首先寻找本次stage的parents。如果没有missing的parent stage,那么就submitMissingTask,即提交本次stage的tasks了。如果有,会对parent stage进行递归submitStage,而且getMissingParentStages得到的结果集是按id降序排的,也就是说递归submitStage的时候会按parent stage的id顺序进行。

submitMissingTask处理的是stage的parent已经available的stage。主要逻辑如下:
第一步:通过stage.isShuffleMap来决定生成ShuffleMapTask还是ResultTask,生成的ShuffleMapTask数目和partition数目相等。
第二步:把生成的tasks组建成一个TaskSet,提交给TaskScheduler的submitTasks方法。

TaskScheduler
DAGScheduler以stage为单位,提tasks给TaskScheduer,实现类为TaskSchedulerImpl。

TaskSchedulerImpl几个内部部件:
SchedulerBackend
SchedulableBuilder
DAGScheduler
TaskSetManager
TaskResultGetter
Tasks信息(taskIdToTaskSetId,taskIdToExecutorId,activeExecutorIds)
别的信息(SchedulerMode)

TaskScheduler做接收task、接收分到的资源和executor、维护信息、与backend打交道、把任务分配好等事情。

start(),stop()的时候,backend的start(),stop()

submitTasks(TaskSet)逻辑:
为这批Task生成新的TaskSetManager,把TaskSetManager加到SchedulerBuilder里,然后向backend进行一次reviveOffer()操作。

SchedulerBuilder
SchedulableBuilder有FIFO和Fair两种实现, addTaskSetManager会把TaskSetManager加到pool里。FIFO的话只有一个pool。Fair有多个pool,Pool也分FIFO和Fair两种模式。
SchedulableBuilder的rootPool里面可以新增pool或者TaskSetManager,两者都是Scheduable的继承类,所以SchedulableBuilder用于维护rootPool这棵Scheduable 树结构。Pool是树上的非叶子节点,而TaskSetManager就是叶子节点。
在TaskScheduler初始化的时候会buildDafaultPool。




TaskSetManager
TaskSetManager负责这批Tasks的启动,失败重试,感知本地化等事情。每次reourseOffer方法会寻找合适(符合条件execId, host, locality)的Task并启动它。

reourseOffer方法,
def resourceOffer(
    execId: String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
寻找符合execId, host和locality的task,找到的话就启动这个Task。启动的时候,把task加到runningTask的HashSet里,然后调DAGScheduler的taskStarted方法,taskStarted方法向eventProcessorActor发出BeginEvent的DAGSchedulerEvent。

TaskResultGetter
维护一个线程池,用来反序列化和从远端获取task结果。
def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer)
把序列化的数据反序列化解析出来之后,有两种情况:直接可读的result和间接task result。

前者是DirectTaskResult类:
class DirectTaskResult(var valueBytes: ByteBuffer, var accumUpdates: Map, var metrics: TaskMetrics)
后者是IndirectTaskResult类:
case class IndirectTaskResult(blockId: BlockId) extends TaskResult with Serializable
在反解析出IndirectTaskResult后,可以得到BlockId这个类,他的实现有这么些:



在TaskResultGetter里,会通过blockManager的getRemoteBytes(BlockId)方法来获得序列化的task result,对这个task result进行反解析后得到DirectTaskResult类,从而获得反序列化后的真正结果数据。
这是大致的一个过程,具体还有一些细节在之中,比如会向scheduler发送不同的event、blockManager会调用BlockManagerMaster把该Block remove掉。

BlockId类有这么些关键变量:
private sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String

// convenience methods
def asRDDId = if (isRDD) Some(asInstanceOf) else None
def isRDD = isInstanceOf
def isShuffle = isInstanceOf
def isBroadcast = isInstanceOf

下面看BlockManager如何通过BlockId获得数据:
调用的是BlockManager的内部方法
private def doGetRemote(blockId: BlockId, asValues: Boolean): Option = {
require(blockId != null, "BlockId is null")
// 通过BlockManagerMaster获得这个blockId的locations
    val locations = Random.shuffle(master.getLocations(blockId))
    for (loc <- locations) {
      logDebug("Getting remote block " + blockId + " from " + loc)
      // 使用BlockManagerWorker来获得block的数据
      val data = BlockManagerWorker.syncGetBlock(
      GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
      if (data != null) {
      if (asValues) {
          // 取到就返回
          return Some(dataDeserialize(blockId, data))
      } else {
          return Some(data)
      }
      }
      logDebug("The value of block " + blockId + " is null")
    }
    logDebug("Block " + blockId + " not found")
    None
}
思路是通过BlockManagerMaster来获得block的位置信息,得到的集合打乱后,遍历位置信息,通过BlockManagerWorker去获得数据,只要得到了,就反序列化之后返回。

在TaskResultGetter处理的时候,成功和失败分别向Scheduler调用handleSuccessfulTask和handleFailedTask方法。
handleSuccessfulTask在DAGScheduler里,会发出CompletionEvent事件,这一步结尾工作会有很多细节处理,这里先不阅读了。
handleFailedTask的话,只要TaskSetManager不是zombie,task没有被kill,那么会继续调用backend.reviveOffers()来re-run。

页: [1]
查看完整版本: Spark Core源码分析: Spark任务执行模型