本帖最后由 regan 于 2015-12-10 13:58 编辑
在Spark中,主要由Client节点,Master节点和Slave节节点组成。它们之间的关系如下图:
在我们启动集群的时候,会启动Master和Worker进程,可以通过jps命令查看。Master和Worker本身是一个Actor,无论是通过start-all.sh启动集群,还是通过start-master.sh和start-slave.sh分别启动Master和worker节点来启动集群,都会创建并启动Master和Worker这两个ActorSystem和Actor。在启动Worker的时候,在actor的生命周期回调函数preStart中,将worker注册到master中,接受master的统一领导。下面是Worker的代码: private[deploy] object Worker extends Logging { def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } |
在Worker这个单例对象中,我们看到在main方法例调用了startSystemAndActor方法,这个方法的具体作用是启动ActorSystem和WorkerActor,让我们进入到这个方法看个究竟: val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) |
在代码中我们看到,通过AkkaUtils.createActorSystem方法,创建了一个名称为“sparkWorker”+worker编号,这样一个ActorSystem。并通过actorSystem的actorOf方法,实例化出一个Worker。该方法返回一个二元tuple。在实例化Worker的时候,我们知道Actor的生命周期中有几个回调函数,其中preStart这个回调函数最适合用来做一个启动之前的准备工作。在Worker的preStart回调函数中,我们将worker注册到master上,以便接受master的管理。preStart代码如下: createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() metricsSystem.getServletHandlers.foreach(webUi.attachHandler) |
我们看到在preStart生命周期回调函数中,进行了很多初始化工作,如:创建WorkerWebUI,注册master,创建并启动度量系统等。registerWithMaster()方法就是将自己注册到master上。对应上图中的①Register Worker。现在我们进入这个方法: registrationRetryTimer match { case None => registered = false tryRegisterAllMasters() connectionAttemptCount = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } |
我们看到在这个方法中,会通过一个registrationRetryTimer不断的去将自己注册到master上,在上面代码中,有tryRegisterAllMasters()方法被调用,我们跟进这个方法瞧个究竟: private def tryRegisterAllMasters() { for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } } |
我们看到在这个方法中,会向所有master(其中只有一个master处于active状态)发送RegisterWorker消息。至此,worker完成向master的注册。 我们再来看上图中的②Register Application。我们通过SparkContex编程接口编写的程序,在提交到集群中的时候会以一个Application的形式注册到master中。那我们来看看看application是如何注册的?在通过spark-submit提交程序的时候,会调用Client这个单例对象(位于:org.apache.spark.Clent),我们看看这个Client对象: for (m <- driverArgs.masters) { Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem)) } actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) |
在上面的Client的关键代码中,我们看到在这里面启动了actorSysem,并通过actorSystem启动了ClientActor这个Actor,跟前面一样,我们直接进入到这个Actor的preStart方法: val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) // This assumes only one Master is active at a time for (masterActor <- masterActors) { masterActor ! RequestSubmitDriver(driverDescription) } |
在preStart的方法中,我么我们看到构建了一个driverDescription的类,该类类似于Java Bean对象,可序列化便于网络传输。该对象中包括的driver程序依赖的JAR包,driver的内存,可用核数等信息。在for循环中向所有的masterActor注册该Driver。至此Register Application到此完成。 我们再来看看③LaunchExcutor,当我们的Master,Worker都正常启动,并且worker都成功注册到master,当application注册到master的时候,我们来看看master端收到注册请求时是如何处理的: logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() |
当master收当application的注册请求,完成注册之后,会将该application加入到waitingDrivers数组中,等待调度。上面代码中,我们看到调用了schedule()函数,这个函数是进行调度用的。我们进入到schedule函数: private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } } } startExecutorsOnWorkers() } |
在schedule方法中我们可以很清楚的看到使用Random.shuffle方法随机选择了worker,然后将等待列表中的driver和worker中的信息一一比对,如果内存和剩余处理核满足执行条件,则调用launchDriver方法,在worker节点上加载driver。launchDriver代码如下: private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.actor ! LaunchDriver(driver.id, driver.desc) driver.state = DriverState.RUNNING } |
在launchDriver方法中,我们看到调用worker.actor!LaunchDriver,向worker.actor发送消息,加载Driver。在前面的代码中,我们还可以看到一个方法startExecutorsOnWorkers(),这个方法有什么作用呢?我们在这个方法中看到一个关键的方法调用:allocateWorkerResourceToExecutors(app, app.coresLeft, worker),在调用的这个方法中,我们看到调用了launchExcutor方法,我们进入这个方法看个究竟: private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } |
在这个方法中,向worker.actor发送了LaunchExecutor消息,worker.actor在收到该消息之后立即加载executor。我们再看看worker在receive方法中匹配到LaunchExecutor消息后进行的处理(只列出了关键代码): case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_, memory_,self, workerId, host,webUi.boundPort, publicAddress,sparkHome,executorDir,akkaUrl,conf, appLocalDirs, ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ master ! ExecutorStateChanged(appId, execId, manager.state, None, None) |
在匹配到launchExecutor这个case之后,程序中实例化出来一个ExecutorRunner,最后向master发送ExecutorStateChanged消息。至此③LaunchExcutor分析完毕。 下面我们将分析④LaunchTask。在TaskSchedular将任务分发到worker节点,worker节点将会launchTask,加载任务。当我们driver端的程序加载执行到action方法的时候,将会调用sparkContext的runJob方法,最终会调用DAGSchedular的runJob方法。最后该action事件提交的结果将会放到eventProcessEvent对象的事件队列里面。eventProcessEvent继承自EventLoop,在EventLoop中有一个线程,在线程中不断去loop事件队列: override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } |
eventQueue是一个阻塞队列,通过take阻塞方法返回队列首部的事件对象,然后调用onReceive(even)匹配事件类型,在DAGSchedulerEventProcessLoop中重写了onReceive方法,部分代码如下: override def onReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) |
我们从上面代码看到,如果匹配到的事件类型是JobSubmitted,则调用dagScheduler.handleJobSubmitted方法进行处理。我们看看handleJobSubmitted方法代码: private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } if (finalStage != null) { val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( job.jobId, callSite.shortForm, partitions.length, allowLocal)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val shouldRunLocally = localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 val jobSubmissionTime = clock.getTimeMillis() if (shouldRunLocally) { listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties)) runLocally(job) } else { jobIdToActiveJob(jobId) = job activeJobs += job finalStage.resultOfJob = Some(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) } } submitWaitingStages() } |
handleJobSubmitted的最主要工作是生成finalStage,并依据finalStage来产生ActiveJob.上诉代码中newStage用于创建一个新的ResultStage。 private def newResultStage( rdd: RDD[_], numTasks: Int, jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } |
submitStage方法将会提交所生成的stage: private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id) } } |
getMissingParentStages通过对DAG图的遍历,来找出所有依赖的父stage: private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd if (getCacheLocs(rdd).contains(Nil)) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } missing.toList } |
Stage划分是如何确定的喃?其判断的重要依据是是否存ShuffleDependency ,如果有则创建一个新的stage。怎样确定是否存在ShuffleDependency呢?这取决于RDD转换的本身。以下RDD会返回ShuffleDependency: l ShuffledRDD l CoGroupedRDD l SubtractedRDD
|