分享

Spark源码系列(三)作业运行过程

本帖最后由 sunshine_junge 于 2014-7-7 21:49 编辑


问题导读:
1.如何进行作业划分?
2.TaskScheduler如何提交Task?













作业执行
上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥?

官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法。

  1.   def collect(): Array[T] = {
  2.     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  3.     Array.concat(results: _*)
  4.   }
复制代码



它进行了两个操作:

1、调用SparkContext的runJob方法,把自身的引用传入去,再传了一个匿名函数(把Iterator转换成Array数组)
2、把result结果合并成一个Array,注意results是一个Array[Array[T]]类型,所以第二句的那个写法才会那么奇怪。这个操作是很重的一个操作,如果结果很大的话,这个操作是会报OOM的,因为它是把结果保存在Driver程序的内存当中的result数组里面。

我们点进去runJob这个方法吧。

  1. val callSite = getCallSite
  2.     val cleanedFunc = clean(func)
  3.     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)
  4.     rdd.doCheckpoint()
复制代码



追踪下去,我们会发现经过多个不同的runJob同名函数调用之后,执行job作业靠的是dagScheduler,最后把结果通过resultHandler保存返回。


DAGScheduler如何划分作业
好的,我们继续看DAGScheduler的runJob方法,提交作业,然后等待结果,成功什么都不做,失败抛出错误,我们接着看submitJob方法。


  1.     val jobId = nextJobId.getAndIncrement()
  2.     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  3.     // 记录作业成功与失败的数据结构,一个作业的Task数量是和分片的数量一致的,Task成功之后调用resultHandler保存结果。
  4.     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  5.     eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
复制代码



走到这里,感觉有点儿绕了,为什么到了这里,还不直接运行呢,还要给eventProcessActor发送一个JobSubmitted请求呢,new一个线程和这个区别有多大?

不管了,搜索一下eventProcessActor吧,结果发现它是一个DAGSchedulerEventProcessActor,它的定义也在DAGScheduler这个类里面。它的receive方法里面定义了12种事件的处理方法,这里我们只需要看JobSubmitted的就行,它也是调用了自身的handleJobSubmitted方法。但是这里很奇怪,没办法打断点调试,但是它的结果倒是能返回的,因此我们得用另外一种方式,打开test工程,找到scheduler目录下的DAGSchedulerSuite这个类,我们自己写一个test方法,首先我们要在import那里加上import org.apache.spark.SparkContext._  ,然后加上这一段测试代码。

  1. test("run shuffle") {
  2.     val rdd1 = sc.parallelize(1 to 100, 4)
  3.     val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
  4.     val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
  5.     val rdd4 = rdd3.reduceByKey(_ + _)
  6.     submit(rdd4, Array(0,1,2,3))
  7.     complete(taskSets(0), Seq(
  8.       (Success, makeMapStatus("hostA", 1)),
  9.       (Success, makeMapStatus("hostB", 1))))
  10.     complete(taskSets(1), Seq((Success, 42)))
  11.     complete(taskSets(2), Seq(
  12.       (Success, makeMapStatus("hostA", 2)),
  13.       (Success, makeMapStatus("hostB", 2))))
  14.     complete(taskSets(3), Seq((Success, 68)))
  15.   }
复制代码



这个例子的重点还是shuffle那块,另外也包括了map的多个转换,大家可以按照这个例子去测试下。

我们接着看handleJobSubmitted吧。


  1. var finalStage: Stage = null
  2.     try {
  3.       finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
  4.     } catch {
  5.       // 错误处理,告诉监听器作业失败,返回....
  6.     }
  7.     if (finalStage != null) {
  8.       val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
  9.       clearCacheLocs()
  10.       if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
  11.         // 很短、没有父stage的本地操作,比如 first() or take() 的操作本地执行.
  12.         listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
  13.         runLocally(job)
  14.       } else {
  15.         // collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业
  16.         jobIdToActiveJob(jobId) = job
  17.         activeJobs += job
  18.         resultStageToJob(finalStage) = job
  19.         listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
  20.         // 提交stage
  21.         submitStage(finalStage)
  22.       }
  23.     }
  24.     // 提交stage
  25.     submitWaitingStages()
复制代码



从上面这个方法来看,我们应该重点关注newStage方法、submitStage方法和submitWaitingStages方法。

我们先看newStage,它得到的结果叫做finalStage,挺奇怪的哈,为啥?先看吧

  1.     val id = nextStageId.getAndIncrement()
  2.     val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
  3.     stageIdToStage(id) = stage
  4.     updateJobIdStageIdMaps(jobId, stage)
  5.     stageToInfos(stage) = StageInfo.fromStage(stage)
  6.     stage
复制代码



可以看出来Stage也没有太多的东西可言,它就是把rdd给传了进去,tasks的数量,shuffleDep是空,parentStage。

那它的parentStage是啥呢?


  1. private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
  2.     val parents = new HashSet[Stage]
  3.     val visited = new HashSet[RDD[_]]
  4.     def visit(r: RDD[_]) {
  5.       if (!visited(r)) {
  6.         visited += r
  7.         // 在visit函数里面,只有存在ShuffleDependency的,parent才通过getShuffleMapStage计算出来
  8.         for (dep <- r.dependencies) {
  9.           dep match {
  10.             case shufDep: ShuffleDependency[_,_] =>
  11.               parents += getShuffleMapStage(shufDep, jobId)
  12.             case _ =>
  13.               visit(dep.rdd)
  14.           }
  15.         }
  16.       }
  17.     }
  18.     visit(rdd)
  19.     parents.toList
  20.   }
复制代码



它是通过不停的遍历它之前的rdd,如果碰到有依赖是ShuffleDependency类型的,就通过getShuffleMapStage方法计算出来它的Stage来。

那我们就开始看submitStage方法吧。


  1. private def submitStage(stage: Stage) {
  2.         //...
  3.         val missing = getMissingParentStages(stage).sortBy(_.id)
  4.         logDebug("missing: " + missing)
  5.         if (missing == Nil) {
  6.           // 没有父stage,执行这stage的tasks
  7.           submitMissingTasks(stage, jobId.get)
  8.           runningStages += stage
  9.         } else {
  10.          // 提交父stage的task,这里是个递归,真正的提交在上面的注释的地方
  11.           for (parent <- missing) {
  12.             submitStage(parent)
  13.           }
  14.           // 暂时不能提交的stage,先添加到等待队列
  15.           waitingStages += stage
  16.         }
  17.       }
  18.   }
复制代码



这个提交stage的过程是一个递归的过程,它是先要把父stage先提交,然后把自己添加到等待队列中,直到没有父stage之后,就提交该stage中的任务。等待队列在最后的submitWaitingStages方法中提交。

这里我引用一下上一章当中我所画的那个图来表示这个过程哈。


从getParentStages方法可以看出来,RDD当中存在ShuffleDependency的Stage才会有父Stage, 也就是图中的虚线的位置!

所以我们只需要记住凡是涉及到shuffle的作业都会至少有两个Stage,即shuffle前和shuffle后。


TaskScheduler提交Task
那我们接着看submitMissingTasks方法,下面是主体代码。


  1. private def submitMissingTasks(stage: Stage, jobId: Int) {
  2.     val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
  3.     myPending.clear()
  4.     var tasks = ArrayBuffer[Task[_]]()
  5.     if (stage.isShuffleMap) {
  6.       // 这是shuffle stage的情况
  7.       for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
  8.         val locs = getPreferredLocs(stage.rdd, p)
  9.         tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
  10.       }
  11.     } else {
  12.       // 这是final stage的情况
  13.       val job = resultStageToJob(stage)
  14.       for (id <- 0 until job.numPartitions if !job.finished(id)) {
  15.         val partition = job.partitions(id)
  16.         val locs = getPreferredLocs(stage.rdd, partition)
  17.         tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
  18.       }
  19.     }
  20.     if (tasks.size > 0) {
  21.       myPending ++= tasks
  22.       taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
  23.       stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
  24.     } else {
  25.       runningStages -= stage
  26.     }
  27.   }
  28. 复制代码
  29. Task也是有两类的,一种是ShuffleMapTask,一种是ResultTask,我们需要注意这两种Task的runTask方法。最后Task是通过taskScheduler.submitTasks来提交的。
  30. 我们找到TaskSchedulerImpl里面看这个方法。
  31. 复制代码
  32.   override def submitTasks(taskSet: TaskSet) {
  33.     val tasks = taskSet.tasksthis.synchronized {
  34.       val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
  35.       activeTaskSets(taskSet.id) = manager
  36.       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  37.       hasReceivedTask = true
  38.     }
  39.     backend.reviveOffers()
  40.   }
复制代码



Task也是有两类的,一种是ShuffleMapTask,一种是ResultTask,我们需要注意这两种Task的runTask方法。最后Task是通过taskScheduler.submitTasks来提交的。

我们找到TaskSchedulerImpl里面看这个方法。


  1.   override def submitTasks(taskSet: TaskSet) {
  2.     val tasks = taskSet.tasksthis.synchronized {
  3.       val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
  4.       activeTaskSets(taskSet.id) = manager
  5.       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  6.       hasReceivedTask = true
  7.     }
  8.     backend.reviveOffers()
  9.   }
复制代码



调度器有两种模式,FIFO和FAIR,默认是FIFO, 可以通过spark.scheduler.mode来设置,schedulableBuilder也有相应的两种FIFOSchedulableBuilder和FairSchedulableBuilder。

那backend是啥?据说是为了给TaskSchedulerImpl提供插件式的调度服务的。

它是怎么实例化出来的,这里我们需要追溯回到SparkContext的createTaskScheduler方法,下面我直接把常用的3中类型的TaskScheduler给列出来了。

mode            Scheduler                          Backend
cluster          TaskSchedulerImpl             SparkDeploySchedulerBackend
yarn-cluster  YarnClusterScheduler          CoarseGrainedSchedulerBackend
yarn-client    YarnClientClusterScheduler  YarnClientSchedulerBackend

好,我们回到之前的代码上,schedulableBuilder.addTaskSetManager比较简单,把作业集添加到调度器的队列当中。

我们接着看backend的reviveOffers,里面只有一句话driverActor ! ReviveOffers。真是头晕,搞那么多Actor,只是为了接收消息。。。

照旧吧,找到它的receive方法,找到ReviveOffers这个case,发现它调用了makeOffers方法,我们继续追杀!

  1. def makeOffers() {
  2.     launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
  3. }
复制代码



从executorHost中随机抽出一些来给调度器,然后调度器返回TaskDescription,executorHost怎么来的,待会儿再说,我们接着看resourceOffers方法。

  1. def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  2.     SparkEnv.set(sc.env)
  3.     // 遍历worker提供的资源,更新executor相关的映射
  4.     for (o <- offers) {
  5.       executorIdToHost(o.executorId) = o.host
  6.       if (!executorsByHost.contains(o.host)) {
  7.         executorsByHost(o.host) = new HashSet[String]()
  8.         executorAdded(o.executorId, o.host)
  9.       }
  10.     }
  11.     // 从worker当中随机选出一些来,防止任务都堆在一个机器上
  12.     val shuffledOffers = Random.shuffle(offers)
  13.     // worker的task列表
  14.     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  15.     val availableCpus = shuffledOffers.map(o => o.cores).toArray
  16.     val sortedTaskSets = rootPool.getSortedTaskSetQueue
  17.     // 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer,把本地性最高的Task分给Worker
  18.     var launchedTask = false
  19.     for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
  20.       do {
  21.         launchedTask = false
  22.         for (i <- 0 until shuffledOffers.size) {
  23.           val execId = shuffledOffers(i).executorId
  24.           val host = shuffledOffers(i).host
  25.           if (availableCpus(i) >= CPUS_PER_TASK) {
  26.             // 把本地性最高的Task分给Worker
  27.             for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
  28.               tasks(i) += task
  29.               val tid = task.taskId
  30.               taskIdToTaskSetId(tid) = taskSet.taskSet.id
  31.               taskIdToExecutorId(tid) = execId
  32.               activeExecutorIds += execId
  33.               executorsByHost(host) += execId
  34.               availableCpus(i) -= CPUS_PER_TASK
  35.               assert (availableCpus(i) >= 0)
  36.               launchedTask = true
  37.             }
  38.           }
  39.         }
  40.       } while (launchedTask)
  41.     }
  42.     if (tasks.size > 0) {
  43.       hasLaunchedTask = true
  44.     }
  45.     return tasks
  46.   }
复制代码



resourceOffers主要做了3件事:

1、从Workers里面随机抽出一些来执行任务。
2、通过TaskSetManager找出和Worker在一起的Task,最后编译打包成TaskDescription返回。
3、将Worker-->Array[TaskDescription]的映射关系返回。

我们继续看TaskSetManager的resourceOffer,看看它是怎么找到和host再起的Task,并且包装成TaskDescription。

通过查看代码,我发现之前我解释的和它具体实现的差别比较大,它所谓的本地性是根据当前的等待时间来确定的任务本地性的级别。

它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。


  1. private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
  2.     while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
  3.         currentLocalityIndex < myLocalityLevels.length - 1)
  4.     {
  5.       // 成立条件是当前时间-上次发布任务的时间 > 当前本地性级别的,条件成立就跳到下一个级别
  6.       lastLaunchTime += localityWaits(currentLocalityIndex)
  7.       currentLocalityIndex += 1
  8.     }
  9.     myLocalityLevels(currentLocalityIndex)
  10.   }
复制代码



等待时间是可以通过参数去设置的,具体的自己查下面的代码。


  1.   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
  2.     val defaultWait = conf.get("spark.locality.wait", "3000")
  3.     level match {
  4.       case TaskLocality.PROCESS_LOCAL =>
  5.         conf.get("spark.locality.wait.process", defaultWait).toLong
  6.       case TaskLocality.NODE_LOCAL =>
  7.         conf.get("spark.locality.wait.node", defaultWait).toLong
  8.       case TaskLocality.RACK_LOCAL =>
  9.         conf.get("spark.locality.wait.rack", defaultWait).toLong
  10.       case TaskLocality.ANY =>
  11.         0L
  12.     }
  13.   }
复制代码



下面继续看TaskSetManager的resourceOffer的方法,通过findTask来从Task集合里面找到相应的Task。


  1. findTask(execId, host, allowedLocality) match {
  2.         case Some((index, taskLocality)) => {
  3.              val task = tasks(index)
  4.              val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
  5.             val timeTaken = clock.getTime() - startTime
  6.             addRunningTask(taskId)
  7.             val taskName = "task %s:%d".format(taskSet.id, index)
  8.             sched.dagScheduler.taskStarted(task, info)
  9.             return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
  10.         }
复制代码



它的findTask方法如下:


  1. private def findTask(execId: String, host: String, locality: TaskLocality.Value)
  2.     : Option[(Int, TaskLocality.Value)] =
  3.   {
  4.    // 同一个Executor,通过execId来查找相应的等待的task
  5.     for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
  6.       return Some((index, TaskLocality.PROCESS_LOCAL))
  7.     }
  8.    // 通过主机名找到相应的Task,不过比之前的多了一步判断
  9.     if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
  10.       for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
  11.         return Some((index, TaskLocality.NODE_LOCAL))
  12.       }
  13.     }
  14.   // 通过Rack的名称查找Task
  15.     if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
  16.       for {
  17.         rack <- sched.getRackForHost(host)
  18.         index <- findTaskFromList(execId, getPendingTasksForRack(rack))
  19.       } {
  20.         return Some((index, TaskLocality.RACK_LOCAL))
  21.       }
  22.     }
  23.    // 查找那些preferredLocations为空的,不指定在哪里执行的Task来执行
  24.     for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
  25.       return Some((index, TaskLocality.PROCESS_LOCAL))
  26.     }
  27.   // 查找那些preferredLocations为空的,不指定在哪里执行的Task来执行
  28.     if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
  29.       for (index <- findTaskFromList(execId, allPendingTasks)) {
  30.         return Some((index, TaskLocality.ANY))
  31.       }
  32.     }
  33.     // 最后没办法了,拖的时间太长了,只能启动推测执行了
  34.     findSpeculativeTask(execId, host, locality)
  35.   }
复制代码



从这个方面可以看得出来,Spark对运行时间还是很注重的,等待的时间越长,它就可能越饥不择食,从PROCESS_LOCAL一直让步到ANY,最后的最后,推测执行都用到了。

找到任务之后,它就调用dagScheduler.taskStarted方法,通知dagScheduler任务开始了,taskStarted方法就不详细讲了,它触发dagScheduler的BeginEvent事件,里面只做了2件事:

1、检查Task序列化的大小,超过100K就警告。
2、提交等待的Stage。

好,我们继续回到发布Task上面来,中间过程讲完了,我们应该是要回到CoarseGrainedSchedulerBackend的launchTasks方法了。

  1. def makeOffers() {
  2.     launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
  3. }
复制代码



它的方法体是:

  1.     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  2.       for (task <- tasks.flatten) {
  3.         freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
  4.         executorActor(task.executorId) ! LaunchTask(task)
  5.       }
  6.     }
复制代码



通过executorId找到相应的executorActor,然后发送LaunchTask过去,一个Task占用一个Cpu。


注册Application
那这个executorActor是怎么来的呢?找呗,最后发现它是在receive方法里面接受到RegisterExecutor消息的时候注册的。通过搜索,我们找到CoarseGrainedExecutorBackend这个类,在它的preStart方法里面赫然找到了driver ! RegisterExecutor(executorId, hostPort, cores)  带的这三个参数都是在初始化的时候传入的,那是谁实例化的它呢,再逆向搜索找到SparkDeploySchedulerBackend!之前的backend一直都是它,我们看reviveOffers是在它的父类CoarseGrainedSchedulerBackend里面。

关系清楚了,在这个backend的start方法里面启动了一个AppClient,AppClient的其中一个参数ApplicationDescription就是封装的运行CoarseGrainedExecutorBackend的命令。AppClient内部启动了一个ClientActor,这个ClientActor启动之后,会尝试向Master发送一个指令actor ! RegisterApplication(appDescription) 注册一个Application。

别废话了,Ctrl +Shift + N吧,定位到Master吧。


  1.     case RegisterApplication(description) => {
  2.         val app = createApplication(description, sender)
  3.         registerApplication(app)
  4.         persistenceEngine.addApplication(app)
  5.         sender ! RegisteredApplication(app.id, masterUrl)
  6.         schedule()
  7.     }
复制代码



它做了5件事:

1、createApplication为这个app构建一个描述App数据结构的ApplicationInfo。
2、注册该Application,更新相应的映射关系,添加到等待队列里面。
3、用persistenceEngine持久化Application信息,默认是不保存的,另外还有两种方式,保存在文件或者Zookeeper当中。
4、通过发送方注册成功。
5、开始作业调度。

关于调度的问题,在第一章《spark-submit提交作业过程》已经介绍过了,建议回去再看看,搞清楚Application和Executor之间的关系。

Application一旦获得资源,Master会发送launchExecutor指令给Worker去启动Executor。

进到Worker里面搜索LaunchExecutor。


  1.  val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
  2.             appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
  3.  executors(appId + "/" + execId) = manager
  4.   manager.start()
  5.    coresUsed += cores_
  6.    memoryUsed += memory_
  7.    masterLock.synchronized {
  8.       master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
  9.    }
复制代码



原来ExecutorRunner还不是传说中的Executor,它内部是执行了appDesc内部的那个命令,启动了CoarseGrainedExecutorBackend,它才是我们的真命天子Executor。

启动之后ExecutorRunner报告ExecutorStateChanged事件给Master。

Master干了两件事:

1、转发给Driver,这个Driver是之前注册Application的那个AppClient
2、如果是Executor运行结束,从相应的映射关系里面删除


发布Task

上面又花了那么多时间讲Task的运行环境ExecutorRunner是怎么注册,那我们还是回到我们的主题,Task的发布。

发布任务是发送LaunchTask指令给CoarseGrainedExecutorBackend,接受到指令之后,让它内部的executor来发布这个任务。

这里我们看一下Executor的launchTask。


  1. def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
  2.     val tr = new TaskRunner(context, taskId, serializedTask)
  3.     runningTasks.put(taskId, tr)
  4.     threadPool.execute(tr)
  5.   }
复制代码



TaskRunner是这里的重头戏啊!看它的run方法吧。


  1. override def run() {
  2.       // 准备工作若干...那天我们放学回家经过一片玉米地,以上省略一百字
  3.       try {
  4.         // 反序列化Task
  5.         SparkEnv.set(env)
  6.         Accumulators.clear()
  7.         val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
  8.         updateDependencies(taskFiles, taskJars)
  9.         task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
  10.        // 命令为尝试运行,和hadoop的mapreduce作业是一致的
  11.         attemptedTask = Some(task)
  12.         logDebug("Task " + taskId + "'s epoch is " + task.epoch)
  13.         env.mapOutputTracker.updateEpoch(task.epoch)
  14.         // 运行Task, 具体可以去看之前让大家关注的ResultTask和ShuffleMapTask
  15.         taskStart = System.currentTimeMillis()
  16.         val value = task.run(taskId.toInt)
  17.         val taskFinish = System.currentTimeMillis()
  18.      // 对结果进行序列化
  19.         val resultSer = SparkEnv.get.serializer.newInstance()
  20.         val beforeSerialization = System.currentTimeMillis()
  21.         val valueBytes = resultSer.serialize(value)
  22.         val afterSerialization = System.currentTimeMillis()
  23.      // 更新任务的相关监控信息,会反映到监控页面上的
  24.         for (m <- task.metrics) {
  25.           m.hostname = Utils.localHostName()
  26.           m.executorDeserializeTime = taskStart - startTime
  27.           m.executorRunTime = taskFinish - taskStart
  28.           m.jvmGCTime = gcTime - startGCTime
  29.           m.resultSerializationTime = afterSerialization - beforeSerialization
  30.         }
  31.         val accumUpdates = Accumulators.values
  32.      // 对结果进行再包装,包装完再进行序列化
  33.         val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
  34.         val serializedDirectResult = ser.serialize(directResult)
  35.         // 如果中间结果的大小超过了spark.akka.frameSize(默认是10M)的大小,就要提升序列化级别了,超过内存的部分要保存到硬盘的
  36.         val serializedResult = {
  37.           if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
  38.             val blockId = TaskResultBlockId(taskId)
  39.             env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
  40.             ser.serialize(new IndirectTaskResult[Any](blockId))
  41.           } else {
  42.             serializedDirectResult
  43.           }
  44.         }
  45.      // 返回结果
  46.         execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  47.       } catch {
  48.         // 这部分是错误处理,被我省略掉了,主要内容是通关相关负责人处理后事
  49.       } finally {
  50.         // 清理为ResultTask注册的shuffle内存,最后把task从正在运行的列表当中删除
  51.         val shuffleMemoryMap = env.shuffleMemoryMap
  52.         shuffleMemoryMap.synchronized {
  53.           shuffleMemoryMap.remove(Thread.currentThread().getId)
  54.         }
  55.         runningTasks.remove(taskId)
  56.       }
  57.     }
  58.   }
复制代码



以上代码被我这些了,但是建议大家看看注释吧。

最后结果是通过statusUpdate返回的。

  1.   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
  2.     driver ! StatusUpdate(executorId, taskId, state, data)
  3.   }
复制代码



这回这个Driver又不是刚才那个AppClient,而是它的家长SparkDeploySchedulerBackend,是在SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend接受了这个StatusUpdate消息。

这关系真他娘够乱的。。

继续,Task里面走的是TaskSchedulerImpl这个方法。

  1. scheduler.statusUpdate(taskId, state, data.value)
复制代码



到这里,一个Task就运行结束了,后面就不再扩展了,作业运行这块是Spark的核心,再扩展基本就能写出来一本书了,限于文章篇幅,这里就不再深究了。

以上的过程应该是和下面的图一致的。



看完这篇文章,估计大家会云里雾里的,在下一章《作业生命周期》会把刚才描述的整个过程重新梳理出来,便于大家记忆,敬请期待!






上一篇:Spark源码系列(二)RDD详解
下一篇:Spark源码系列(四)图解作业生命周期




作者:岑玉海




欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条