本帖最后由 regan 于 2015-12-17 16:39 编辑
Driver被加载后,何时成为可在web UI中看到的application程序呢?
当使用./spark-submit脚本提交Driver程序的时候,会调用Client对象,在Client中启动ClientActor并将提交的Driver程序的jar、 url 、memery、 core等信息提交给Master,在Master的receiveWithLogging方发中将会收到RequestSubmitDriver这个Case中将会匹配到请求提交Driver的请求,收到该请求之后,Master将会把Driver加入到等待列表中,并掉用schedule()方法启动调度。schedule方法如下:
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
startExecutorsOnWorkers()
} | 代码中我们看到,在waitingDrivers数组中Driver的调度不是FIFO调度的,而是依次遍历等待列表中的Driver,只要满足worker节点的内存和核数目大于等于driver提交时候的配置的数目,就会首先加载这个driver同时将这个driver从等待列表中移除。不满足调度条件的Driver将会留在waitingDriver列表中,等待下一次schdule()的触发,直到集群中满足提交Driver为止。
launchDriver方法中,将会通过Akka框架向Worker发送消息:
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
} | 在worker节点中,将会构造DriverRunner,调用start方法将会启动一个线程运行这个Driver
Worker中代码如下:
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
akkaUrl,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
} | 在DriverRunner中将会调用launchDriver方法,该方法中将会使用ProcessBuilder构建一个Process进程运行提交的Driver的main方法。
代码如下:
private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
builder.directory(baseDir)
def initialize(process: Process): Unit = {
// Redirect stdout and stderr to files
val stdout = new File(baseDir, "stdout")
CommandUtils.redirectStream(process.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.append(header, stderr, UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
} | 至此Driver程序完成调度,并在一个worker节点的一个线程中运行起来! so happy!
|
|