本帖最后由 regan 于 2015-12-17 16:41 编辑
在Spark集群搭建好之后,可以提交Application到集群中运行。如果有多个Application提交到集群中,那么集群如何调度运行不同的applicatino呢?我们还是站在源代码的角度,向大家揭开spark多application调度的面纱。
第一步:使用spark-submit提交应用
我们先来看看spark-submit:
21 # Only define a usage function if an upstream script hasn't done so.
22 if ! type -t usage >/dev/null 2>&1; then
23 usage() {
24 if [ -n "$1" ]; then
25 echo "$1"
26 fi
27 "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit --help
28 exit "$2"
29 }
30 export -f usage
31 fi
32 exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" | 在spark-submit脚本中,我们砍掉调用了SparkSubmit,我们以此为线索,进入到SparkSubmit。在SparkSubmit中,main函数就是提交应用的主入口:
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
} |
我们看到上面的在匹配appArgs.action,由于是提交,以此程序进入case SparkSubmitAction.SUBMIT=>submit(appArgs)分支,进入submit(appArgs)方法
可以看到:
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) | 这句代码的主要作用正如方法名称所示,准备提交的环境,该方法返回一个四维tuple,在该方法中进行了大量的运行模式,配置的解析与判断工作。我们到该方法中看看:
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
} | 在上面的代码中,我们看到在spark提交中根据提交的模式设置了childMainClass,在此我们使用的是client模式提交,因此childMainClass设置成org.apache.spark.deploy.Client
在runMain方法中通过反射机制,执行方法:
mainClass = Class.forName(childMainClass, true, loader) | valmainMethod = mainClass.getMethod("main", new Array[String](0).getClass) | mainMethod.invoke(null, childArgs.toArray) | 在此执行了org.apache.spark.deploy.Client中的main方法,通过spark-sbumit设置的参数也通过childArgs.toArray传递过去.
我们进入到Client对象的main方法中看看:
def main(args: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
for (m <- driverArgs.masters) {
Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
}
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
} | 在这里我们惊奇的发现,竟然创建的是一个actor,通过Akka系统创建了一个ClientAtor对象,在ClientActor中,向每一个masterActor发起注册请求:
// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestSubmitDriver(driverDescription)
} | 在这里,通过spark-submit提交的运行参数通过包装成driverDescription对象传递给master,driverDescription代码如下:
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command) | 在masterActor中可以通过receive方法收到来自Client的注册driver的请求,我们可以到Master中查看相应的代码:
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
sender ! SubmitDriverResponse(true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}")
}
} | 收到注册的driver之后,将会把driver添加到drivers这个hashSet集合中。
然后会执行schedule()调度方法,所有的diver注册都会调用这个schedule方法,它究竟奢什么用处,我们进去看看:
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
startExecutorsOnWorkers()
} | 在schedular方法中,我们看到先会将workers打乱混洗,返回一个随机的worker序列,然后遍历waitingDrivers数组,当worker节点上的内存和核数满足driver运行需求时,将会调用launchDriver方法,加载driver同时从waitingDrivers中移除该driver,最后调用startExecutorsOnWorders方法,在Woker节点启动Executor
从上面的分析可以看到加载driver是一个简单的FIFO的调度模式.至此,Driver/Client端向Master完成注册,加载Driver(即launchDriver)的调度方式为简单的FIFO调度方式
|
|