分享

探索Spark源码---Spark中的Client,Master和Slave

regan 发表于 2015-12-10 09:04:12 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 15802
本帖最后由 regan 于 2015-12-10 13:58 编辑

在Spark中,主要由Client节点,Master节点和Slave节节点组成。它们之间的关系如下图:
图片a.png

在我们启动集群的时候,会启动Master和Worker进程,可以通过jps命令查看。Master和Worker本身是一个Actor,无论是通过start-all.sh启动集群,还是通过start-master.sh和start-slave.sh分别启动Master和worker节点来启动集群,都会创建并启动Master和Worker这两个ActorSystem和Actor。在启动Worker的时候,在actor的生命周期回调函数preStart中,将worker注册到master中,接受master的统一领导。下面是Worker的代码:
private[deploy] object Worker extends Logging {
  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir)
    actorSystem.awaitTermination()
  }
在Worker这个单例对象中,我们看到在main方法例调用了startSystemAndActor方法,这个方法的具体作用是启动ActorSystem和WorkerActor,让我们进入到这个方法看个究竟:
  val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
    val actorName = "Worker"
    val securityMgr = new SecurityManager(conf)
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
      conf = conf, securityManager = securityMgr)
    val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
    actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
      masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
    (actorSystem, boundPort)
在代码中我们看到,通过AkkaUtils.createActorSystem方法,创建了一个名称为“sparkWorker”+worker编号,这样一个ActorSystem。并通过actorSystem的actorOf方法,实例化出一个Worker。该方法返回一个二元tuple。在实例化Worker的时候,我们知道Actor的生命周期中有几个回调函数,其中preStart这个回调函数最适合用来做一个启动之前的准备工作。在Worker的preStart回调函数中,我们将worker注册到master上,以便接受master的管理。preStart代码如下:
createWorkDir()
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
    registerWithMaster()
    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
我们看到在preStart生命周期回调函数中,进行了很多初始化工作,如:创建WorkerWebUI,注册master,创建并启动度量系统等。registerWithMaster()方法就是将自己注册到master上。对应上图中的①Register Worker。现在我们进入这个方法:
registrationRetryTimer match {
      case None =>
        registered = false
        tryRegisterAllMasters()
        connectionAttemptCount = 0
        registrationRetryTimer = Some {  context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
            INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
        }
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
我们看到在这个方法中,会通过一个registrationRetryTimer不断的去将自己注册到master上,在上面代码中,有tryRegisterAllMasters()方法被调用,我们跟进这个方法瞧个究竟:
private def tryRegisterAllMasters() {
    for (masterAkkaUrl <- masterAkkaUrls) {
      logInfo("Connecting to master " + masterAkkaUrl + "...")
      val actor = context.actorSelection(masterAkkaUrl)
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    }
  }
我们看到在这个方法中,会向所有master(其中只有一个master处于active状态)发送RegisterWorker消息。至此,worker完成向master的注册。
我们再来看上图中的②Register Application。我们通过SparkContex编程接口编写的程序,在提交到集群中的时候会以一个Application的形式注册到master中。那我们来看看看application是如何注册的?在通过spark-submit提交程序的时候,会调用Client这个单例对象(位于:org.apache.spark.Clent),我们看看这个Client对象:
for (m <- driverArgs.masters) {
      Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
    }
    actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
在上面的Client的关键代码中,我们看到在这里面启动了actorSysem,并通过actorSystem启动了ClientActor这个Actor,跟前面一样,我们直接进入到这个Actor的preStart方法:
val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
        // This assumes only one Master is active at a time
        for (masterActor <- masterActors) {
          masterActor ! RequestSubmitDriver(driverDescription)
       }
在preStart的方法中,我么我们看到构建了一个driverDescription的类,该类类似于Java Bean对象,可序列化便于网络传输。该对象中包括的driver程序依赖的JAR包,driver的内存,可用核数等信息。在for循环中向所有的masterActor注册该Driver。至此Register Application到此完成。
   我们再来看看③LaunchExcutor,当我们的Master,Worker都正常启动,并且worker都成功注册到master,当application注册到master的时候,我们来看看master端收到注册请求时是如何处理的:
logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()
当master收当application的注册请求,完成注册之后,会将该application加入到waitingDrivers数组中,等待调度。上面代码中,我们看到调用了schedule()函数,这个函数是进行调度用的。我们进入到schedule函数:
private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) { return }
    val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      for (driver <- waitingDrivers) {
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }
    startExecutorsOnWorkers()
  }
在schedule方法中我们可以很清楚的看到使用Random.shuffle方法随机选择了worker,然后将等待列表中的driver和worker中的信息一一比对,如果内存和剩余处理核满足执行条件,则调用launchDriver方法,在worker节点上加载driver。launchDriver代码如下:
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    worker.actor ! LaunchDriver(driver.id, driver.desc)
    driver.state = DriverState.RUNNING
  }
在launchDriver方法中,我们看到调用worker.actor!LaunchDriver,向worker.actor发送消息,加载Driver。在前面的代码中,我们还可以看到一个方法startExecutorsOnWorkers(),这个方法有什么作用呢?我们在这个方法中看到一个关键的方法调用:allocateWorkerResourceToExecutors(app, app.coresLeft, worker),在调用的这个方法中,我们看到调用了launchExcutor方法,我们进入这个方法看个究竟:
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
在这个方法中,向worker.actor发送了LaunchExecutor消息,worker.actor在收到该消息之后立即加载executor。我们再看看worker在receive方法中匹配到LaunchExecutor消息后进行的处理(只列出了关键代码):
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
       appDirectories(appId) = appLocalDirs
       val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,
       memory_,self, workerId, host,webUi.boundPort,
       publicAddress,sparkHome,executorDir,akkaUrl,conf,
       appLocalDirs, ExecutorState.LOADING)
       executors(appId + "/" + execId) = manager
       manager.start()
       coresUsed += cores_
       memoryUsed += memory_
       master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
在匹配到launchExecutor这个case之后,程序中实例化出来一个ExecutorRunner,最后向master发送ExecutorStateChanged消息。至此③LaunchExcutor分析完毕。
   下面我们将分析④LaunchTask。在TaskSchedular将任务分发到worker节点,worker节点将会launchTask,加载任务。当我们driver端的程序加载执行到action方法的时候,将会调用sparkContext的runJob方法,最终会调用DAGSchedular的runJob方法。最后该action事件提交的结果将会放到eventProcessEvent对象的事件队列里面。eventProcessEvent继承自EventLoop,在EventLoop中有一个线程,在线程中不断去loop事件队列:
override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }
eventQueue是一个阻塞队列,通过take阻塞方法返回队列首部的事件对象,然后调用onReceive(even)匹配事件类型,在DAGSchedulerEventProcessLoop中重写了onReceive方法,部分代码如下:
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)
我们从上面代码看到,如果匹配到的事件类型是JobSubmitted,则调用dagScheduler.handleJobSubmitted方法进行处理。我们看看handleJobSubmitted方法代码:
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    }
    submitWaitingStages()
  }
handleJobSubmitted的最主要工作是生成finalStage,并依据finalStage来产生ActiveJob.上诉代码中newStage用于创建一个新的ResultStage。
private def newResultStage(
      rdd: RDD[_],
      numTasks: Int,
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
submitStage方法将会提交所生成的stage:
private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }
getMissingParentStages通过对DAG图的遍历,来找出所有依赖的父stage:
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        if (getCacheLocs(rdd).contains(Nil)) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }
Stage划分是如何确定的喃?其判断的重要依据是是否存ShuffleDependency ,如果有则创建一个新的stage。怎样确定是否存在ShuffleDependency呢?这取决于RDD转换的本身。以下RDD会返回ShuffleDependency:
l   ShuffledRDD
l   CoGroupedRDD
l   SubtractedRDD


已有(3)人评论

跳转到指定楼层
zhujun182104906 发表于 2015-12-17 10:28:50
还需要多学学scala啊
回复

使用道具 举报

邓立辉 发表于 2015-12-26 10:54:11
一直用java写,感觉很别扭。查资料的时候网上基本上都是scala的
回复

使用道具 举报

kinglee_zs 发表于 2018-4-28 19:48:19
新手上路,如何能发帖啊。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条