分享

探索Spark源码action触发作业提交(DAGSchedular,TaskSchedular,TaskSetManager)

regan 发表于 2015-12-28 17:26:20 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 12761
本帖最后由 regan 于 2016-9-17 12:17 编辑

      从调度方面讲,DAGSchedular主要管理提交job的Stage的划分,那么stage划分好之后该如何处理呢?我们知道每一个stage中有一个任务的集合(TaskSet),那么这些任务在被DAGSchedular划分好之后,他们该何去何从呢?它们的生命周期又由谁来负责呢?这就是今天探讨的主题,《DAGSchedular、TaskSchedular、TaskSetManager之间的爱恨纠葛》。
      在上一讲只中,我们讲到了Driver程序如何与Spark集群建立联系,我们知道Driver被【随机】launch到集群中某一个满足运行条件的worker节点上的一个线程中运行,在运行中将会调用Driver程序的main函数(提交jar包main函数,执行用户编写的函数逻辑)。当程序执行遇到action操作的时候,将会触发job提交,从源代码中我们可以看到,首先是调用SparkContext的runJob方法,经过几次重载调用之后,最终调用了DAGSchedular的runJob方法,在该方法中调用submitJob提交了作业。流程如下图:
1.png

每一次提交stage,都会在DAGSchedular中的submitMissingTasks方法中,根据stage的类型生成Task。如果stage时ShuffleMapStage则生成ShuffleMapTask,如果stage时ResultStage则对应生成ResultTask。当生成好tasks之后,将会调用taskSchedular的SubmitTask方法进行提交:
taskScheduler.submitTasks(
  new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

taskScheduler在启动SparkContext的时候,就根据配置实例化了。在默认的Standalone模式下,taskSchedular的实现类是TaskSchedularImpl,在这个类中的submitTasks方法中。从源码中我们可以看到,在sparkContext实例化的时候DAGScheduler顶层调度器,TaskSchedular和schedularBakend也被实例化了,如下面代码:
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
因此在taskScheduler提交任务的时候,所有的调度器和schedulerBackend都已经准备完毕,虽然调度器准备好了,但是执行的资源(如内存、核等)何时准备呢?
在下一章《任务提交资源准备》中讲解

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条