分享

探索Spark源码---Client提交的Driver程序的调度

regan 发表于 2015-12-15 20:15:27 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 8367
本帖最后由 regan 于 2015-12-17 16:39 编辑

Driver被加载后,何时成为可在web UI中看到的application程序呢?
当使用./spark-submit脚本提交Driver程序的时候,会调用Client对象,在Client中启动ClientActor并将提交的Driver程序的jar、 url 、memery、 core等信息提交给Master,在Master的receiveWithLogging方发中将会收到RequestSubmitDriver这个Case中将会匹配到请求提交Driver的请求,收到该请求之后,Master将会把Driver加入到等待列表中,并掉用schedule()方法启动调度。schedule方法如下:
private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }
  // Drivers take strict precedence over executors
  val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)
        waitingDrivers -= driver
      }
    }
  }
  startExecutorsOnWorkers()
}
代码中我们看到,在waitingDrivers数组中Driver的调度不是FIFO调度的,而是依次遍历等待列表中的Driver,只要满足worker节点的内存和核数目大于等于driver提交时候的配置的数目,就会首先加载这个driver同时将这个driver从等待列表中移除。不满足调度条件的Driver将会留在waitingDriver列表中,等待下一次schdule()的触发,直到集群中满足提交Driver为止。
launchDriver方法中,将会通过Akka框架向Worker发送消息:
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  worker.addDriver(driver)
  driver.worker = Some(worker)
  worker.actor ! LaunchDriver(driver.id, driver.desc)
  driver.state = DriverState.RUNNING
}
在worker节点中,将会构造DriverRunner,调用start方法将会启动一个线程运行这个Driver
Worker中代码如下:
case LaunchDriver(driverId, driverDesc) => {
  logInfo(s"Asked to launch driver $driverId")
  val driver = new DriverRunner(
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    akkaUrl,
    securityMgr)
  drivers(driverId) = driver
  driver.start()

  coresUsed += driverDesc.cores
  memoryUsed += driverDesc.mem
}
在DriverRunner中将会调用launchDriver方法,该方法中将会使用ProcessBuilder构建一个Process进程运行提交的Driver的main方法。
代码如下:
private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
  builder.directory(baseDir)
  def initialize(process: Process): Unit = {
    // Redirect stdout and stderr to files
    val stdout = new File(baseDir, "stdout")
    CommandUtils.redirectStream(process.getInputStream, stdout)

    val stderr = new File(baseDir, "stderr")
    val header = "Launch Command: %s\n%s\n\n".format(
      builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
    Files.append(header, stderr, UTF_8)
    CommandUtils.redirectStream(process.getErrorStream, stderr)
  }
  runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}
至此Driver程序完成调度,并在一个worker节点的一个线程中运行起来! so happy!

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条