分享

探索Spark源码---在Spark中Job是并行执行的还是串行执行的?

regan 发表于 2015-12-9 00:02:45 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 30554
本帖最后由 regan 于 2015-12-17 16:42 编辑

我们知道,再spark中每一个提交的job,在划分好stage,由taskSetManager管理taskSet的生命周期,发送给满足计算的worker节点上进行计算,在每一个worker节点上,task在ThreadPool中并行执行。但是,如果在driver端同时提交了多个Job,那么这些提交的job是并行计算的还是串行计算的呢。最近对这个感到比较迷惑,在我的sparkSteaming项目中,我简单的试了一下,有一个print和saveAsTextFile操作,这两个操作都是action操作,因此都会触发job的提交,代码如下:
imsiUrntiNeidTimeKVGroupedRecordRDD.print(1)

imsiUrntiNeidTimeKVGroupedRecordRDD.cache()

imsiUrntiNeidTimeKVGroupedRecordRDD.saveAsTextFiles(ProjectConf.resultPath+"_"+timeHs.value+"_")

在我没有配置job调度策略FIFO的情况下,由WEB UI得到的结果如下:
Job Id
Description
Submitted
Duration
Stages: Succeeded/Total
Tasks (for all stages): Succeeded/Total
21
2015/12/08 11:59:17
12 s
2/2
3/3
20
2015/12/08 11:59:07
9 s
1/1 (1 skipped)
2/2 (2 skipped)
19
2015/12/08 11:58:56
11 s
2/2
3/3
18
2015/12/08 11:58:47
9 s
1/1 (1 skipped)
2/2 (2 skipped)
17
2015/12/08 11:58:36
11 s
2/2
3/3
16
2015/12/08 11:58:24
12 s
1/1 (1 skipped)
2/2 (2 skipped)
15
2015/12/08 11:58:12
12 s
2/2
3/3
14
2015/12/08 11:58:04
8 s
1/1 (1 skipped)
2/2 (2 skipped)
13
2015/12/08 11:57:53
10 s
2/2
3/3
12
2015/12/08 11:57:49
4 s
1/1 (1 skipped)
2/2 (1 skipped)
11
2015/12/08 11:57:43
5 s
2/2
2/2
10
2015/12/08 11:57:35
9 s
1/1 (1 skipped)
2/2 (2 skipped)
9
2015/12/08 11:57:24
11 s
2/2
3/3
8
2015/12/08 11:57:15
8 s
1/1 (1 skipped)
2/2 (2 skipped)
7
2015/12/08 11:57:04
11 s
2/2
3/3
我猜想,由不同的Application提交到Spark集群之间的JOB是否都是按照提交串行来执行的,即集群中每次只能运行一个JOB,待JOB运行完成后,再调度下一个JOB运行。
为了解答这个问题,最好的办法不是猜想,而是由表象跟入源代码:
在每一个action操作中,都会调用SparkContext的runjob方法提交JOB,代码如下:
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
  dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
    listener, properties)
跟入代码会发现,调用了dagScheduler的handdleJObSubmitted方法,在该方法中生成了job,主要代码如下:
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
  job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
  localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
  // Compute very short actions like first() or take() with no parent stages locally.
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
  runLocally(job)
} else {
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.resultOfJob = Some(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  submitStage(finalStage)
}
在该代码中,我们看到新生成的job加入到了activeJobs中,其中jobid是一个递增的数字,activeJobs是一个hashMap,还有一句关键的代码是向事件总线中发送了SparkListenerJobStart事件,如下:
listenerBus.post(
  SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
我们进入到listenerBus的post方法:
def post(event: E) {
  if (stopped.get) {
    // Drop further events to make `listenerThread` exit ASAP
    logError(s"$name has already stopped! Dropping event $event")
    return
}
  val eventAdded = eventQueue.offer(event)
  if (eventAdded) {
    eventLock.release()
  } else {
    onDropEvent(event)
  }
}
在上述代码中,我们看有eventQueue.offer(event),事情逐渐清晰起来,我们看看eventQueue的定义:
private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
其实eventQueue是一个LinkedBlockingQueue,是一个阻塞队列.通过offer向队列中加入事件,通过poll方法从队列头部取出事件。因此从这里来看,我们得知,不管是多少个job的执行,都会向eventQueue中发送事件消息,当系统从队列中取出消息执行的时候,也是从队列头部取出,队列是FIFO类型的,所以JOB的执行是顺序执行的

心血来潮,有误大家一起探讨!共同学习进步

已有(4)人评论

跳转到指定楼层
若无梦何远方 发表于 2019-8-29 20:57:10
在Spark中Job是顺序执行FIFO的方式 , 谢谢楼主分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条