分享

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

xioaxu790 2014-12-18 20:24:29 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 22779
本帖最后由 pig2 于 2015-1-6 14:09 编辑
问题导读
1、使用什么指令运行ocal-cluster模式?
2、TaskRunner中执行的task其业务逻辑是如何被调用到的?
3、如何理解堆栈输出?






概要
本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。

准备
spark已经安装完毕
spark运行在local mode或local-cluster mode

local-cluster mode
local-cluster模式也称为伪分布式,可以使用如下指令运行
  1. MASTER=local[1,2,1024] bin/spark-shell
复制代码

[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512M

Driver Programme的初始化过程分析
初始化过程的涉及的主要源文件
  • SparkContext.scala       整个初始化过程的入口
  • SparkEnv.scala          创建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
  • DAGScheduler.scala       任务提交的入口,即将Job划分成各个stage的关键
  • TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
  • SchedulerBackend
最简单的单机运行模式的话,看LocalBackend.scala
如果是集群模式,看源文件SparkDeploySchedulerBackend




初始化过程步骤详解
步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager
  1. private[spark] val env = SparkEnv.create(
  2.     conf,
  3.     "",
  4.     conf.get("spark.driver.host"),
  5.     conf.get("spark.driver.port").toInt,
  6.     isDriver = true,
  7.     isLocal = isLocal)
  8.   SparkEnv.set(env)
复制代码


步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键
  1.   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
  2.   taskScheduler.start()
复制代码


TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测
  1. override def start() {
  2.     backend.start()
  3.     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
  4.       logInfo("Starting speculative execution thread")
  5.       import sc.env.actorSystem.dispatcher
  6.       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
  7.             SPECULATION_INTERVAL milliseconds) {
  8.         checkSpeculatableTasks()
  9.       }
  10.     }
  11.   }
复制代码


步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行
  1. @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
  2.   dagScheduler.start()
复制代码


步骤4:启动WEB UI
  1. ui.start()
复制代码



RDD的转换过程
还是以最简单的wordcount为例说明rdd的转换过程
  1. sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
复制代码

上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果

步骤1:val rawFile = sc.textFile("README.md")
textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析
  1. scala> sc.textFile("README.md")
  2. 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
  3. 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750
  4. 14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)
  5. 14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took  277 ms
  6. 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took  281 ms
  7. res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13
复制代码


步骤2: val splittedText = rawFile.flatMap(line => line.split(" "))
flatMap将原来的MappedRDD转换成为FlatMappedRDD
  1. def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =                                                                                                  new FlatMappedRDD(this, sc.clean(f))
复制代码


步骤3:val wordCount = splittedText.map(word => (word, 1))
利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD

步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂
步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala

细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions
  1. implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
  2.     new PairRDDFunctions(rdd)
复制代码


这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。

接下来再看一看reduceByKey的定义
  1.   def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
  2.     reduceByKey(defaultPartitioner(self), func)
  3.   }
  4.   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
  5.     combineByKey[V]((v: V) => v, func, func, partitioner)
  6.   }
  7.   def combineByKey[C](createCombiner: V => C,
  8.       mergeValue: (C, V) => C,
  9.       mergeCombiners: (C, C) => C,
  10.       partitioner: Partitioner,
  11.       mapSideCombine: Boolean = true,
  12.       serializerClass: String = null): RDD[(K, C)] = {
  13.     if (getKeyClass().isArray) {
  14.       if (mapSideCombine) {
  15.         throw new SparkException("Cannot use map-side combining with array keys.")
  16.       }
  17.       if (partitioner.isInstanceOf[HashPartitioner]) {
  18.         throw new SparkException("Default partitioner cannot partition array keys.")
  19.       }
  20.     }
  21.     val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
  22.     if (self.partitioner == Some(partitioner)) {
  23.       self.mapPartitionsWithContext((context, iter) => {
  24.         new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  25.       }, preservesPartitioning = true)
  26.     } else if (mapSideCombine) {
  27.       val combined = self.mapPartitionsWithContext((context, iter) => {
  28.         aggregator.combineValuesByKey(iter, context)
  29.       }, preservesPartitioning = true)
  30.       val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
  31.         .setSerializer(serializerClass)
  32.       partitioned.mapPartitionsWithContext((context, iter) => {
  33.         new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
  34.       }, preservesPartitioning = true)
  35.     } else {
  36.       // Don't apply map-side combiner.
  37.       val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
  38.       values.mapPartitionsWithContext((context, iter) => {
  39.         new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  40.       }, preservesPartitioning = true)
  41.     }
  42.   }
复制代码



reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD

Log输出能证明我们的分析
  1. res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13
复制代码



RDD转换小结
小结一下整个RDD转换过程
  1. HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
复制代码

整个转换过程好长啊,这一切的转换都发生在任务提交之前。


运行过程分析
数据集操作分类
在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?

对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.

在此基础上作一下简单的分类

  • one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
  • one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作
  • many-one 多个dataset合并为一个dataset,如combine, join
  • one-many 一个dataset分裂为多个dataset, 如groupBy



Task运行期的函数调用
task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation
  • TaskRunner.run
  • Task.run
  • Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)
  • RDD.iterator
  • RDD.computeOrReadCheckpoint
  • RDD.compute

或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例
  1.   override def compute(split: Partition, context: TaskContext) =                                                                                                      
  2.     firstParent[T].iterator(split, context).map(f)
复制代码


注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2. ... collection.Iterator

堆栈输出
  1. 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
  2. 81         at org.apache.spark.rdd.HadoopRDD$anon$1.(HadoopRDD.scala:154)
  3. 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
  4. 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
  5. 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  6. 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  7. 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
  8. 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  9. 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  10. 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
  11. 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  12. 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  13. 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
  14. 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  15. 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  16. 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
  17. 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  18. 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  19. 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
  20. 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
  21. 100         at org.apache.spark.scheduler.Task.run(Task.scala:53)
  22. 101         at org.apache.spark.executor.Executor$TaskRunner$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
复制代码



ResultTask
compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。
  1. override def runTask(context: TaskContext): U = {
  2.     metrics = Some(context.taskMetrics)
  3.     try {
  4.       func(context, rdd.iterator(split, context))
  5.     } finally {
  6.       context.executeOnCompleteCallbacks()
  7.     }
  8.   }
复制代码


计算结果的传递
上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.

那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下

  • ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler
  • DAGScheduler将MapStatus保存到MapOutputTrackerMaster中
  • ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据
  •          第一件事就是咨询MapOutputTrackerMaster所要取的数据的location
  •          根据返回的结果调用BlockManager.getMultiple获取真正的数据

BlockStoreShuffleFetcher的fetch函数伪码
  1.     val blockManager = SparkEnv.get.blockManager
  2.     val startTime = System.currentTimeMillis
  3.     val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
  4.     logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
  5.       shuffleId, reduceId, System.currentTimeMillis - startTime))
  6.     val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
  7.     val itr = blockFetcherItr.flatMap(unpackBlock)
复制代码

注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。
有关Shuffle的详细解释,请参考”详细探究Spark的shuffle实现一文" http://jerryshao.me/architecture ... tail-investigation/

相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行


Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现




本文档转载自:http://www.cnblogs.com/hseagle/p/3673132.html
作者:徽沪一郎




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条