分享

spark源码【 TaskScheduler】与任务提交原理浅析2


问题导读


1.submitMissingTasks本文是如何分析其流程的?
2.TaskSchedulerImpl中的submitTasks包含哪些流程?
3.resourceOffers本文认为主要做了哪3件事?







引言
上一节《spark源码【 TaskScheduler】与任务提交原理浅析1》介绍了TaskScheduler的创建过程,在这一节中,我将承接《Stage生成和Stage源码浅析》中的submitMissingTasks函数继续介绍task的创建和分发工作。
DAGScheduler中的submitMissingTasks函数
如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。
submitMissingTasks负责创建新的Task。
Spark将由Executor执行的Task分为ShuffleMapTask和ResultTask两种。
每个Stage生成Task的时候根据Stage中的isShuffleMap标记确定是否为ShuffleMapStage,如果标记为真,则这个Stage输出的结果会经过Shuffle阶段作为下一个Stage的输入,创建ShuffleMapTask;否则是ResultStage,这样会创建ResultTask,Stage的结果会输出到Spark空间;最后,Task是通过taskScheduler.submitTasks来提交的。
计算流程
submitMissingTasks的计算流程如下:

首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task。确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的。通过TaskScheduler提交TaskSet。

部分代码
下面是submitMissingTasks判断是否为ShuffleMapStage的部分代码,其中部分参数说明在注释中:
[mw_shl_code=bash,true]val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
    partitionsToCompute.map { id =>
      val locs = getPreferredLocs(stage.rdd, id)
      val part = stage.rdd.partitions(id)
      //stage.id:Stage的序号
      //taskBinary:这个在下面具体介绍
      //part:RDD对应的partition
//locs:最适合的执行位置
      new ShuffleMapTask(stage.id, taskBinary, part, locs)
    }
  } else {
    val job = stage.resultOfJob.get
    partitionsToCompute.map { id =>
      val p: Int = job.partitions(id)
      val part = stage.rdd.partitions(p)
      val locs = getPreferredLocs(stage.rdd, p)
      //p:partition索引,表示从哪个partition读取数据
      //id:输出的分区索引,表示reduceID
      new ResultTask(stage.id, taskBinary, part, locs, id)
    }
  }[/mw_shl_code]

关于taskBinary参数:这是RDD和ShuffleDependency的广播变量(broadcase version),作为序列化之后的结果。
这里将RDD和其依赖关系进行序列化,在executor运行task之前再进行反序列化。这种方式对不同的task之间提供了较好的隔离。
下面是submitMissingTasks进行任务提交的部分代码:
[mw_shl_code=bash,true]if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingTasks ++= tasks
  logDebug("New pending tasks: " + stage.pendingTasks)
  taskScheduler.submitTasks(
    new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
  // Because we posted SparkListenerStageSubmitted earlier, we should mark
  // the stage as completed here in case there are no tasks to run
  markStageAsFinished(stage, None)
  logDebug("Stage " + stage + " is actually done; %b %d %d".format(
    stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
}[/mw_shl_code]

TaskSchedulerImpl中的submitTasks
submitTasks的流程如下:

  • 任务(tasks)会被包装成TaskSetManager(由于TaskSetManager不是线程安全的,所以源码中需要进行同步)
  • TaskSetManager实例通过schedulableBuilder(分为FIFOSchedulableBuilder和FairSchedulableBuilder两种)投入调度池中等待调度
  • 任务提交同时启动定时器,如果任务还未被执行,定时器会持续发出警告直到任务被执行
  • 调用backend的reviveOffers函数,向backend的driverActor实例发送ReviveOffers消息,driveerActor收到ReviveOffers消息后,调用makeOffers处理函数
[mw_shl_code=bash,true]override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    activeTaskSets(taskSet.id) = manager
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
    }
    hasReceivedTask = true
  }
  backend.reviveOffers()
}[/mw_shl_code]


TaskSetManager调度
每个Stage一经确认,生成相应的TaskSet(即为一组tasks),其对应一个TaskSetManager通过Stage回溯到最源头缺失的Stage提交到调度池pool中,在调度池中,这些TaskSetMananger又会根据Job ID排序,先提交的Job的TaskSetManager优先调度,然后一个Job内的TaskSetManager ID小的先调度,并且如果有未执行完的父母Stage的TaskSetManager,则不会提交到调度池中。
reviveOffers函数代码
下面是CoarseGrainedSchedulerBackend的reviveOffers函数:
[mw_shl_code=bash,true]override def reviveOffers() {
  driverActor ! ReviveOffers
}[/mw_shl_code]

driveerActor收到ReviveOffers消息后,调用makeOffers处理函数。
DriverActor的makeOffers函数
makeOffers函数的处理逻辑是:

  • 找到空闲的Executor,分发的策略是随机分发的,即尽可能将任务平摊到各个Executor
  • 如果有空闲的Executor,就将任务列表中的部分任务利用launchTasks发送给指定的Executor

SchedulerBackend(这里实际是CoarseGrainedSchedulerBackend)负责将新创建的Task分发给Executor,从launchTasks代码中可以看出,在发送LauchTasks指令之前需要将TaskDescription序列化。
[mw_shl_code=bash,true]// Make fake resource offers on all executors
def makeOffers() {
  launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq))
}[/mw_shl_code]

TaskSchedulerImpl中的resourceOffers函数
任务是随机分发给各个Executor的,资源分配的工作由resourceOffers函数处理。
正如上面submitTasks函数提到的,在TaskSchedulerImpl中,这一组Task被交给一个新的TaskSetManager实例进行管理,所有的TaskSetManager经由SchedulableBuilder根据特定的调度策略进行排序,在TaskSchedulerImpl的resourceOffers函数中,当前被选择的TaskSetManager的ResourceOffer函数被调用并返回包含了序列化任务数据的TaskDescription,最后这些TaskDescription再由SchedulerBackend派发到ExecutorBackend去执行。
resourceOffers主要做了3件事:

  • 从Workers里面随机抽出一些来执行任务。
  • 通过TaskSetManager找出和Worker在一起的Task,最后编译打包成TaskDescription返回。
  • 将Worker—>Array[TaskDescription]的映射关系返回。
[mw_shl_code=bash,true] /**
  * Called by cluster manager to offer resources on slaves. We respond by asking our active task
  * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
  * that tasks are balanced across the cluster.
  */
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
   // Mark each slave as alive and remember its hostname
   // Also track if new executor is added
   var newExecAvail = false
   // 遍历worker提供的资源,更新executor相关的映射
   for (o <- offers) {
     executorIdToHost(o.executorId) = o.host
     activeExecutorIds += o.executorId
     if (!executorsByHost.contains(o.host)) {
       executorsByHost(o.host) = new HashSet[String]()
       executorAdded(o.executorId, o.host)
       newExecAvail = true
     }
     for (rack <- getRackForHost(o.host)) {
       hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
     }
   }
   // 从worker当中随机选出一些来,防止任务都堆在一个机器上
   // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
   val shuffledOffers = Random.shuffle(offers)
   // Build a list of tasks to assign to each worker.
   // worker的task列表
   val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
   val availableCpus = shuffledOffers.map(o => o.cores).toArray
   // getSortedTask函数对taskset进行排序
   val sortedTaskSets = rootPool.getSortedTaskSetQueue
   for (taskSet <- sortedTaskSets) {
     logDebug("parentName: %s, name: %s, runningTasks: %s".format(
       taskSet.parent.name, taskSet.name, taskSet.runningTasks))
     if (newExecAvail) {
       taskSet.executorAdded()
     }
   }
   // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
   // of locality levels so that it gets a chance to launch local tasks on all of them.
   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
   // 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer,把本地性最高的Task分给Worker
   // 本地性是根据当前的等待时间来确定的任务本地性的级别。
// 它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
//1. 首先依次遍历 sortedTaskSets, 并对于每个 Taskset, 遍历 TaskLocality
//2. 越 local 越优先, 找不到(launchedTask 为 false)才会到下个 locality 级别
//3. (封装在resourceOfferSingleTaskSet函数)在多次遍历offer list,
//因为一次taskSet.resourceOffer只会占用一个core,
//而不是一次用光所有的 core, 这样有助于一个 taskset 中的 task 比较均匀的分布在workers上
//4. 只有在该taskset, 该locality下, 对所有worker offer都找不到合适的task时,
//才跳到下个 locality 级别
   var launchedTask = false
   for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
     do {
       launchedTask = resourceOfferSingleTaskSet(
           taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
     } while (launchedTask)
   }
   if (tasks.size > 0) {
     hasLaunchedTask = true
   }
   return tasks
}[/mw_shl_code]


TaskDescription代码:
[mw_shl_code=bash,true]private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task's TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {
  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
  private val buffer = new SerializableBuffer(_serializedTask)
  def serializedTask: ByteBuffer = buffer.value
  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}[/mw_shl_code]

DriverActor的launchTasks函数
launchTasks函数流程:

  • launchTasks函数将resourceOffers函数返回的TaskDescription信息进行序列化
  • 向executorActor发送封装了serializedTask的LaunchTask消息
由于受到Akka Frame Size尺寸的限制,如果发送数据过大,会被截断。
[mw_shl_code=bash,true]// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val serializedTask = ser.serialize(task)
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
      scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
            "spark.akka.frameSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
            AkkaUtils.reservedSizeBytes)
          taskSet.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      executorData.freeCores -= scheduler.CPUS_PER_TASK
      executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
    }
  }
}[/mw_shl_code]


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条