本帖最后由 regan 于 2015-12-17 16:42 编辑
我们知道,再spark中每一个提交的job,在划分好stage,由taskSetManager管理taskSet的生命周期,发送给满足计算的worker节点上进行计算,在每一个worker节点上,task在ThreadPool中并行执行。但是,如果在driver端同时提交了多个Job,那么这些提交的job是并行计算的还是串行计算的呢。最近对这个感到比较迷惑,在我的sparkSteaming项目中,我简单的试了一下,有一个print和saveAsTextFile操作,这两个操作都是action操作,因此都会触发job的提交,代码如下:
imsiUrntiNeidTimeKVGroupedRecordRDD.print(1)
imsiUrntiNeidTimeKVGroupedRecordRDD.cache()
imsiUrntiNeidTimeKVGroupedRecordRDD.saveAsTextFiles(ProjectConf.resultPath+"_"+timeHs.value+"_") |
在我没有配置job调度策略FIFO的情况下,由WEB UI得到的结果如下:
| | | | | Tasks (for all stages): Succeeded/Total | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |
| 我猜想,由不同的Application提交到Spark集群之间的JOB是否都是按照提交串行来执行的,即集群中每次只能运行一个JOB,待JOB运行完成后,再调度下一个JOB运行。
为了解答这个问题,最好的办法不是猜想,而是由表象跟入源代码:
在每一个action操作中,都会调用SparkContext的runjob方法提交JOB,代码如下:
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties) | 跟入代码会发现,调用了dagScheduler的handdleJObSubmitted方法,在该方法中生成了job,主要代码如下:
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
} | 在该代码中,我们看到新生成的job加入到了activeJobs中,其中jobid是一个递增的数字,activeJobs是一个hashMap,还有一句关键的代码是向事件总线中发送了SparkListenerJobStart事件,如下:
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties)) | 我们进入到listenerBus的post方法:
def post(event: E) {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
}
} | 在上述代码中,我们看有eventQueue.offer(event),事情逐渐清晰起来,我们看看eventQueue的定义:
private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY) | 其实eventQueue是一个LinkedBlockingQueue,是一个阻塞队列.通过offer向队列中加入事件,通过poll方法从队列头部取出事件。因此从这里来看,我们得知,不管是多少个job的执行,都会向eventQueue中发送事件消息,当系统从队列中取出消息执行的时候,也是从队列头部取出,队列是FIFO类型的,所以JOB的执行是顺序执行的
心血来潮,有误大家一起探讨!共同学习进步
|
|