fc013 发表于 2016-4-24 16:37:20

spark源码分析之Executor启动与任务提交篇


问题导读:
1.什么是Spark-submit ?2.Executor启动流程是什么?3.Executor怎样进行任务调度?


http://www.aboutyun.com/static/image/hrline/4.gif


Spark-submit
提交一个任务到集群通过的是Spark-submit
通过启动脚本的方式启动它的主类,这里以WordCount为例子
`spark-submit –class cn.itcast.spark.WordCount“

[*]bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 调用这个类的main方法
[*]doRunMain方法中传进来一个自定义spark应用程序的main方法
class cn.itcast.spark.WordCount
[*]通过反射拿到类的实例的引用mainClass = Utils.classForName(childMainClass)
[*]在通过反射调用class cn.itcast.spark.WordCount的main方法
我们来看SparkSubmit的main方法
def main(args: Array): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      printStream.println(appArgs)
    }
    //匹配任务类型
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
}
这里的类型是submit,调用submit方法
private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
   。。。。。。
      try {
          proxyUser.doAs(new PrivilegedExceptionAction() {
            override def run(): Unit = {
            //childMainClass这个你自己定义的App的main所在的全类名
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
      } catch {
            。。。。。。      
      }
      }   
      。。。。。。。
      //掉用上面的doRunMain
      doRunMain()
    }submit里调用了doRunMain(),然后调用了runMain,来看runMainprivate def runMain(
    。。。。。。
    try {
      //通过反射
      mainClass = Class.forName(childMainClass, true, loader)
    } catch {
      。。。。。。
    }
    //反射拿到面方法实例
    val mainMethod = mainClass.getMethod("main", new Array(0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }
    。。。。。。
    try {
      //调用App的main方法
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
      throw findCause(t)
    }
}

最主要的流程就在这里了,上面的代码注释很清楚,通过反射调用我们写的类的main方法,大体的流程到此
SparkSubmit时序图

Executor启动流程
SparkSubmit通过反射调用了我们程序的main方法后,就开始执行我们的代码 ,一个Spark程序中需要创建SparkContext对象,我们就从这个对象开始
SparkContext的构造方法代码很长,主要关注的地方如下
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
。。。。。。
private def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    //通过SparkEnv来创建createDriverEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
//在这里调用了createSparkEnv,返回一个SparkEnv对象,这个对象里面有很多重要属性,最重要的ActorSystem
private val env = createSparkEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)

//创建taskScheduler
// Create and start the scheduler
private var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)
//创建DAGScheduler
dagScheduler = new DAGScheduler(this)
//启动TaksScheduler
taskScheduler.start()
    。。。。。
}
Spark的构造方法主要干三件事,创建了一个SparkEnv,taskScheduler,dagScheduler,我们先来看createTaskScheduler里干了什么
//通过给定的URL创建TaskScheduler
private def createTaskScheduler(
   .....
    //匹配URL选择不同的方式
    master match {
      。。。。。。
      //这个是Spark的Standalone模式
      case SPARK_REGEX(sparkUrl) =>
      //首先创建TaskScheduler
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      //很重要
      val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
      //初始化了一个调度器,默认是FIFO
      scheduler.initialize(backend)
      (backend, scheduler)
      。。。。。
    }      
}
通过master的url来匹配到Standalone模式:然后初始化了SparkDeploySchedulerBackend和TaskSchedulerImpl,这两个对象很重要,是启动任务调度的核心,然后调用了scheduler.initialize(backend)进行初始化
启动TaksScheduler初始化完成,回到我们的SparkContext构造方法后面继续调用了 taskScheduler.start() 启动TaksScheduler
来看start方法
override def start() {
    //调用backend的实现的start方法
    backend.start()
    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
      Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
}
这里的backend是SparkDeploySchedulerBackend调用了它的start
override def start() {
    //CoarseGrainedSchedulerBackend的start方法,在这个方法里面创建了一个DriverActor
    super.start()
    // The endpoint for executors to talk to us
    //下面是为了启动java子进程做准备,准备一下参数
    val driverUrl = AkkaUtils.address(
      AkkaUtils.protocol(actorSystem),
      SparkEnv.driverActorSystemName,
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
      sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
      Nil
      }
    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    //用command拼接参数,最终会启动org.apache.spark.executor.CoarseGrainedExecutorBackend子进程
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    //用ApplicationDescription封装了一些重要的参数
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
    //在这里面创建ClientActor
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    //启动ClientActor
    client.start()
    waitForRegistration()
}
这里是拼装了启动Executor的一些参数,类名+参数 封装成ApplicationDescription。最后传给并创建AppClient并调用它的start方法
AppClient创建时序图

AppClient的start方法
接来下关注start方法
def start() {
    // Just launch an actor; it will call back into the listener.
    actor = actorSystem.actorOf(Props(new ClientActor))
}在start方法里创建了与Master通信的ClientActor,然后会调用它的preStart方法向Master注册,接下来看它的preStartoverride def preStart() {
      context.system.eventStream.subscribe(self, classOf)
      try {
      //ClientActor向Master注册
      registerWithMaster()
      } catch {
      case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          context.stop(self)
      }
    }
最后会调用该方法向所有Master注册
def tryRegisterAllMasters() {
      for (masterAkkaUrl <- masterAkkaUrls) {
      logInfo("Connecting to master " + masterAkkaUrl + "...")
      //t通过actorSelection拿到了Master的引用
      val actor = context.actorSelection(masterAkkaUrl)
      //向Master发送异步的注册App的消息
      actor ! RegisterApplication(appDescription)
      }
    }
ClientActor发送来的注册App的消息,ApplicationDescription,他包含了需求的资源,要求启动的Executor类名和一些参数

Master的Receiver
case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
      // ignore, don't send response
      } else {
      logInfo("Registering app " + description.name)
      //创建Appsender:ClientActor
      val app = createApplication(description, sender)
      //注册App
      registerApplication(app)
      logInfo("Registered app " + description.name + " with ID " + app.id)
      //持久化App
      persistenceEngine.addApplication(app)
      //向ClientActor反馈信息,告诉他app注册成功了
      sender ! RegisteredApplication(app.id, masterUrl)
      //TODO 调度任务
      schedule()
      }
    }
registerApplication(app)
def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.path.address
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }
    //把App放到集合里面
    applicationMetricsSystem.registerSource(app.appSource)
    apps += app
    idToApp(app.id) = app
    actorToApp(app.driver) = app
    addressToApp(appAddress) = app
    waitingApps += app
}
Master将接受的信息保存到集合并序列化后发送一个RegisteredApplication消息通知反馈给ClientActor,接着执行schedule()方法,该方法中会遍历workers集合,并执行launchExecutor
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    //记录该worker上使用了多少资源
    worker.addExecutor(exec)
    //Master向Worker发送启动Executor的消息
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    //Master向ClientActor发送消息,告诉ClientActor executor已经启动了
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
这里Master向Worker发送启动Executor的消息 worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
application.desc里包含了Executor类的启动信息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
         。。。。。
          appDirectories(appId) = appLocalDirs
          //创建一个ExecutorRunner,这个很重要,保存了Executor的执行配置和参数
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            akkaUrl,
            conf,
            appLocalDirs, ExecutorState.LOADING)
          executors(appId + "/" + execId) = manager
          //TODO 开始启动ExecutorRunner
          manager.start()
            。。。。。。
          }
      }
      }
Worker的Receiver接受到了启动Executor的消息,appDesc对象保存了Command命令、Executor的实现类和参数manager.start()里会创建一个线程
def start() {
    //启动一个线程
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      //用一个子线程来帮助Worker启动Executor子进程
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = new Thread() {
      override def run() {
      killProcess(Some("Worker shutting down"))
      }
    }
    Runtime.getRuntime.addShutdownHook(shutdownHook)
}
在线程中调用了fetchAndRunExecutor()方法,我们来看该方法
def fetchAndRunExecutor() {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
      sparkHome.getAbsolutePath, substituteVariables)
      //构建命令
      val command = builder.command()
      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
      builder.directory(executorDir)
      builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
      // Add webUI log urls
      val baseUrl =
      s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
      //启动子进程
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
      command.mkString("\"", "\" \"", "\""), "=" * 40)
      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      //开始执行,等待结束信号
      val exitCode = process.waitFor()
      。。。。
    }
}
这里面进行了类名和参数的拼装,具体拼装过程不用关心,最终builder.start()会以SystemRuntime的方式启动一个子进程,这个是进程的类名是CoarseGrainedExecutorBackend
到此Executor进程就启动起来了
Executor创建时序图


Executor任务调度对象启动
Executor进程后,就首先要执行main方法,main的代码如下
//Executor进程启动的入口
def main(args: Array) {
    。。。。
    //拼装参数
    while (!argv.isEmpty) {
      argv match {
      case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
      case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
      case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
      case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
      case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
      case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
      case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
      case Nil =>
      case tail =>
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          printUsageAndExit()
      }
    }
    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }
    //开始执行Executor
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}
执行了run方法
private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option,
      userClassPath: Seq)   
      。。。。。
      //通过actorSystem创建CoarseGrainedExecutorBackend -> Actor
      //CoarseGrainedExecutorBackend -> DriverActor通信
      env.actorSystem.actorOf(
      Props(classOf,
          driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
      name = "Executor")
            。。。。。。
      }
      env.actorSystem.awaitTermination()
    }
}
run方法中创建了CoarseGrainedExecutorBackend的Actor对象用于准备和DriverActor通信,接着会继续调用preStart生命周期方法
override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    //Executor跟DriverActor建立连接
    driver = context.actorSelection(driverUrl)
    //Executor向DriverActor发送消息
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf)
}
Executor向DriverActor发送注册的消息
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
DriverActor的receiver收到消息后
def receiveWithLogging = {
      //Executor发送给DriverActor的注册消息
      case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
      Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
      if (executorDataMap.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
      } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          //DriverActor向Executor发送注册成功的消息
          sender ! RegisteredExecutor
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val (host, _) = Utils.parseHostPort(hostPort)
         //将Executor的信息封装起来
          val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            //往集合添加Executor的信息对象
            executorDataMap.put(executorId, data)
            if (numPendingExecutors > 0) {
            numPendingExecutors -=
            logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          //将来用来执行真正的业务逻辑
          makeOffers()
      }

DriverActor的receiver里将Executor信息封装到Map中保存起来,并发送反馈消息 sender ! RegisteredExecutor 给CoarseGrainedExecutorBackend

override def receiveWithLogging = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

CoarseGrainedExecutorBackend收到消息后创建一个Executor对象用于准备任务的执行




zmer 发表于 2016-12-29 14:56:39

挺清晰的
页: [1]
查看完整版本: spark源码分析之Executor启动与任务提交篇