本帖最后由 regan 于 2016-9-17 12:17 编辑
从调度方面讲,DAGSchedular主要管理提交job的Stage的划分,那么stage划分好之后该如何处理呢?我们知道每一个stage中有一个任务的集合(TaskSet),那么这些任务在被DAGSchedular划分好之后,他们该何去何从呢?它们的生命周期又由谁来负责呢?这就是今天探讨的主题,《DAGSchedular、TaskSchedular、TaskSetManager之间的爱恨纠葛》。
在上一讲只中,我们讲到了Driver程序如何与Spark集群建立联系,我们知道Driver被【随机】launch到集群中某一个满足运行条件的worker节点上的一个线程中运行,在运行中将会调用Driver程序的main函数(提交jar包main函数,执行用户编写的函数逻辑)。当程序执行遇到action操作的时候,将会触发job提交,从源代码中我们可以看到,首先是调用SparkContext的runJob方法,经过几次重载调用之后,最终调用了DAGSchedular的runJob方法,在该方法中调用submitJob提交了作业。流程如下图:
每一次提交stage,都会在DAGSchedular中的submitMissingTasks方法中,根据stage的类型生成Task。如果stage时ShuffleMapStage则生成ShuffleMapTask,如果stage时ResultStage则对应生成ResultTask。当生成好tasks之后,将会调用taskSchedular的SubmitTask方法进行提交:
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) |
taskScheduler在启动SparkContext的时候,就根据配置实例化了。在默认的Standalone模式下,taskSchedular的实现类是TaskSchedularImpl,在这个类中的submitTasks方法中。从源码中我们可以看到,在sparkContext实例化的时候DAGScheduler顶层调度器,TaskSchedular和schedularBakend也被实例化了,如下面代码:
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this) | 因此在taskScheduler提交任务的时候,所有的调度器和schedulerBackend都已经准备完毕,虽然调度器准备好了,但是执行的资源(如内存、核等)何时准备呢?
在下一章《任务提交资源准备》中讲解
|
|