分享

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

本帖最后由 pig2 于 2015-1-6 14:17 编辑

问题导读

1.Shuffle数据的写入和读取过程是怎样的?
2.HashShuffleReader中的read函数的具体实现是怎样的?














概要
ShuffleMapTask的计算结果保存在哪,随后Stage中的task又是如何知道从哪里去读取的呢,这个过程一直让我困惑不已。

用比较通俗一点的说法来解释一下Shuffle数据的写入和读取过程
  • 每一个task负责处理一个特定的data partition
  • task在初始化的时候就已经明确处理结果可能会产生多少个不同的data partition
  • 利用partitioner函数,task将处理结果存入到不同的partition,这些数据存放在当前task执行的机器上
  • 假设当前是stage 2有两个task, stage 2可能输出4个不同的data partition, task 0和task 1各自运行于不同的机器上,task 0中的部分处理结果会存入到data partition 0,task 1的部分处理结果也可能存入到data partition 0.
  • 由于stage 2产生了4个不同的data partition, 后续stage 1中的task个数就为4. task 0 就负责读取data partition 0的数据,对于(stage1, task0)来说,所要读取的data partition 0的内容由task 0和task 1中的partition 0共同组成。
  • 现在问题的关键转换成为(stage_1, task_0)如何知道(stage_2, task_x)有没有相应的输出是属于data partition 0的呢?这个问题的解决就是MapStatus
  • 每一个ShuffleMapTask在执行结束,都会上报一个MapStatus,在MapStatus中会反应出朝哪些data partition写入了数据,写入了数据则size为非零值,否则为零值
  • (stage_1,task_0)会去获取stage_2中所有task的MapStatus,以判定(stage_2, task_x)产生的数据中有自己需要读入的内容
  • 假设(stage_1,task_0)知道(stage_2, task_0)生成了data partition 0中的数据,于是去(stage_2, task_0)运行时的机器去获取具体的数据,如果恰巧这个时候远端机器已经挂掉了,获取失败,怎么办?
  • 上报异常,由DAGScheduler重新调度(stage_2,task_0),重新生成所需要的数据。
  • Spark不像Hadoop中的MapReduce有一个明显的combine阶段,在spark中combine过程有两次调用,一是Shuffle数据写入过程,另一个是Shuffle数据读取过程。
如果能够明白上述的过程,并对应到相应的代码,那就无须看下述的详细解释了。
好了,让我们开始代码跟踪吧。

数据写入过程

数据写入动作最原始的触发点是ShuffleMapTask.runTask函数,看一看源码先。

  1. override def runTask(context: TaskContext): MapStatus = {
  2.     metrics = Some(context.taskMetrics)
  3.     var writer: ShuffleWriter[Any, Any] = null
  4.     try {
  5.       val manager = SparkEnv.get.shuffleManager
  6.       writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
  7.       writer.write(rdd.iterator(split, context).asInstanceOf[Iterator[_
  8.         if (writer != null) {
  9.           writer.stop(success = false)
  10.         }
  11.         throw e
  12.     } finally {
  13.       context.executeOnCompleteCallbacks()
  14.     }
  15.   }
复制代码


managerGetWriter返回的是HashShuffleWriter,所以调用过程是ShuffleMapTask.runTask->HashShuffleWriter.write->BlockObjectWriter.write. 注意dep.mapSideCombine这一分支判断。ReduceByKey(_ + _)中的(_ + _)在此处被执行一次,另一次执行是在read过程。

  1. override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
  2.     val iter = if (dep.aggregator.isDefined) {
  3.       if (dep.mapSideCombine) {
  4.         dep.aggregator.get.combineValuesByKey(records, context)
  5.       } else {
  6.         records
  7.       }
  8.     } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
  9.       throw new IllegalStateException("Aggregator is empty for map-side combine")
  10.     } else {
  11.       records
  12.     }
  13.     for (elem <- iter) {
  14.       val bucketId = dep.partitioner.getPartition(elem._1)
  15.       shuffle.writers(bucketId).write(elem)
  16.     }
复制代码


HashShuffleWriter.write中主要处理两件事
  • 判断是否需要进行聚合,比如<hello,1>和<hello,1>都要写入的话,那么先生成<hello,2>然后再进行后续的写入工作
  • 利用Partitioner函数来决定<k,val>写入到哪一个文件中

Partitioner是在什么时候注入的,RDD抽象类中,Partitioner为空?以reduceByKey为例,HashPartitioner会在后面combineByKey的代码创建ShuffledRDD的时候作为ShuffledRDD的构造函数传入。

  1. def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
  2.     reduceByKey(new HashPartitioner(numPartitions), func)
  3.   }
复制代码


Stage在创建的时候通过构造函数入参明确需要从多少Partition读取数据,生成的Partition会有多少。看一看Stage的构造函数,读取的分区数目由RDD.partitions.size决定,输出的partitions由shuffleDep决定。

  1. private[spark] class Stage(
  2.     val id: Int,
  3.     val rdd: RDD[_],
  4.     val numTasks: Int,
  5.     val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
  6.     val parents: List[Stage],
  7.     val jobId: Int,
  8.     val callSite: CallSite)
  9. extends Logging {
  10.   val isShuffleMap = shuffleDep.isDefined
  11.   val numPartitions = rdd.partitions.size
  12.   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
  13.   var numAvailableOutputs = 0
  14.   private var nextAttemptId = 0
复制代码
回到数据写入的问题上来,结果写入时的一个主要问题就是已经知道shuffle_id, map_id和要写入的elem,如何找到对应的写入文件。每一个临时文件由三元组(shuffle_id,map_id,reduce_id)来决定,当前已经知道了两个,还剩下一下reduce_id待确定。

reduce_id是使用partitioner计算出来的结果,输入的是elem的键值。也就是dep.partitioner.getPartition(elem._1)。 根据计算出来的bucketid找到对应的writer,然后真正写入。

在HashShuffleWriter.write中使用到的shuffle由ShuffleBlockManager的forMapTask函数生成,注意forMapTask中产生writers的代码逻辑。

每个writer分配一下文件, 文件名由三元组(shuffle_id,map_id,reduce_id)组成,如果知道了这个三元组就可以找到对应的文件。

如果consolidation没有打开,那么在一个task中,有多少个输出的partition就会有多少个中间文件。

  1. val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
  2.         fileGroup = getUnusedFileGroup()
  3.         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
  4.           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
  5.           blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
  6.         }
  7.       } else {
  8.         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
  9.           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
  10.           val blockFile = blockManager.diskBlockManager.getFile(blockId)
  11.           // Because of previous failures, the shuffle file may already exist on this machine.
  12.           // If so, remove it.
  13.           if (blockFile.exists) {
  14.             if (blockFile.delete()) {
  15.               logInfo(s"Removed existing shuffle file $blockFile")
  16.             } else {
  17.               logWarning(s"Failed to remove existing shuffle file $blockFile")
  18.             }
  19.           }
  20.           blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
  21.         }
  22.       }
复制代码


getFile负责将三元组(shuffle_id,map_id,reduce_id)映射到文件名

  1. def getFile(filename: String): File = {
  2. // Figure out which local directory it hashes to, and which subdirectory in that
  3.     val hash = Utils.nonNegativeHash(filename)
  4.     val dirId = hash % localDirs.length
  5.     val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
  6.     // Create the subdirectory if it doesn't already exist
  7.     var subDir = subDirs(dirId)(subDirId)
  8.     if (subDir == null) {
  9.       subDir = subDirs(dirId).synchronized {
  10.         val old = subDirs(dirId)(subDirId)
  11.         if (old != null) {
  12.           old
  13.         } else {
  14.           val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
  15.           newDir.mkdir()
  16.           subDirs(dirId)(subDirId) = newDir
  17.           newDir
  18.         }
  19.       }
  20.     }
  21.     new File(subDir, filename)
  22.   }
  23.   def getFile(blockId: BlockId): File = getFile(blockId.name)
复制代码


产生的文件在哪呢,如果没有更改默认的配置,生成的目录结构类似于下

  1. /tmp/spark-local-20140723092540-7f24
  2. /tmp/spark-local-20140723092540-7f24/0d
  3. /tmp/spark-local-20140723092540-7f24/0d/shuffle_0_0_1
  4. /tmp/spark-local-20140723092540-7f24/0d/shuffle_0_1_0
  5. /tmp/spark-local-20140723092540-7f24/0c
  6. /tmp/spark-local-20140723092540-7f24/0c/shuffle_0_0_0
  7. /tmp/spark-local-20140723092540-7f24/0e
  8. /tmp/spark-local-20140723092540-7f24/0e/shuffle_0_1_1
复制代码

当所有的数据写入文件并提交以后,还需要生成MapStatus汇报给driver application. MapStatus在哪生成的呢?commitWritesAndBuildStatus就干这活。

调用关系HashShuffleWriter.stop->commitWritesAndBuildStatus

  1. private def commitWritesAndBuildStatus(): MapStatus = {
  2.     // Commit the writes. Get the size of each bucket block (total block size).
  3.     var totalBytes = 0L
  4.     var totalTime = 0L
  5.     val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
  6.       writer.commit()
  7.       writer.close()
  8.       val size = writer.fileSegment().length
  9.       totalBytes += size
  10.       totalTime += writer.timeWriting()
  11.       MapOutputTracker.compressSize(size)
  12.     }
  13.     // Update shuffle metrics.
  14.     val shuffleMetrics = new ShuffleWriteMetrics
  15.     shuffleMetrics.shuffleBytesWritten = totalBytes
  16.     shuffleMetrics.shuffleWriteTime = totalTime
  17.     metrics.shuffleWriteMetrics = Some(shuffleMetrics)
  18.     new MapStatus(blockManager.blockManagerId, compressedSizes)
  19.   }
复制代码


compressedSize是一个非常让人疑惑的地方,原因慢慢道来,先看一下MapStatus的构造函数

  1. class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
复制代码

compressedSize是一个byte数组,每一个byte反应了该partiton中的数据大小。如Array(0)=128就表示在data partition 0中有128byte数据。
问题的问题是一个byte只能表示255,如果超过255怎么办呢?

当当当,数学闪亮登场了,注意到compressSize没,通过转换将2^8变换为1.1^256。一下子由255byte延伸到近35G.
看一看这神奇的compressSize函数吧,只是聊聊几行代码而已。

  1. def compressSize(size: Long): Byte = {
  2.     if (size == 0) {
  3.       0
  4.     } else if (size <= 1L) {
  5.       1
  6.     } else {
  7.       math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
  8.     }
  9.   }
复制代码


ShuffleMapTask运行结束时,会将MapStatus结果封装在StatusUpdate消息中汇报给SchedulerBackend, 由DAGScheduler在handleTaskCompletion函数中将MapStatus加入到相应的Stage。这一过程略过,不再详述。
MapOutputTrackerMaster会保存所有最新的MapStatus.
只画张图来表示存储之后的示意。

数据读取过程
ShuffledRDD.compute函数是读取过程的触发点。

  1. override def compute(split: Partition, context: TaskContext): Iterator[P] = {
  2.     val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  3.     SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
  4.       .read()
  5.       .asInstanceOf[Iterator[P]]
  6.   }
复制代码

shuffleManager.getReader返回的是HashShuffleReader,所以看一看HashShuffleReader中的read函数的具体实现。

read函数处理逻辑中需要注意到一点即combine过程有可能会被再次执行。注意dep.aggregator.isDefined这一分支判断。ReduceByKey(_ + _)中的(_ + _)在此处被执行。

  1. override def read(): Iterator[Product2[K, C]] = {
  2.     val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context,
  3.       Serializer.getSerializer(dep.serializer))
  4.     if (dep.aggregator.isDefined) {
  5.       if (dep.mapSideCombine) {
  6.         new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
  7.       } else {
  8.         new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
  9.       }
  10.     } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
  11.       throw new IllegalStateException("Aggregator is empty for map-side combine")
  12.     } else {
  13.       iter
  14.     }
  15.   }
复制代码

一路辗转,终于来到了读取过程中非常关键的所在BlockStoreShuffleFetcher。

BlockStoreShuffleFetcher需要回答如下问题
  • 所要获取的mapid的mapstatus的内容是什么
  • 根据获得的mapstatus去相应的blockmanager获取具体的数据
  1. val blockManager = SparkEnv.get.blockManager
复制代码


一个ShuffleMapTask会生成一个MapStatus,MapStatus中含有当前ShuffleMapTask产生的数据落到各个Partition中的大小。如果大小为0,则表示该分区没有数据产生。MapStatus中另一个重要的成员变量就是BlockManagerId,该变量表示目标数据在哪个BlockManager当中。

MapoutputTrackerMaster拥有最新的MapStatus信息,为了执行效率,MapoutputTrackerWorker会定期更新数据到本地,所以MapoutputTracker先从本地查找,如果找不到再从MapoutputTrackerMaster上同步最新数据。

索引即是reduceId,如果array(0) == 0,就表示上一个ShuffleMapTask中生成的数据中没有任意的内容可以作为reduceId为0的ResultTask的输入。如果不能理解,返回仔细看一下MapStatus的结构图。

BlockManager.getMultiple用于读取BlockManager中的数据,根据配置确定生成tNettyBlockFetcherIterator还是BasicBlockFetcherIterator。


如果所要获取的文件落在本地,则调用getLocal读取,否则发送请求到远端blockmanager。看一下BlockFetcherIterator的initialize函数

  1. override def initialize() {
  2.       // Split local and remote blocks.
  3.       val remoteRequests = splitLocalRemoteBlocks()
  4.       // Add the remote requests into our queue in a random order
  5.       fetchRequests ++= Utils.randomize(remoteRequests)
  6.       // Send out initial requests for blocks, up to our maxBytesInFlight
  7.       while (!fetchRequests.isEmpty &&
  8.         (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
  9.         sendRequest(fetchRequests.dequeue())
  10.       }
  11.       val numFetches = remoteRequests.size - fetchRequests.size
  12.       logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
  13.       // Get Local Blocks
  14.       startTime = System.currentTimeMillis
  15.       getLocalBlocks()
  16.       logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
  17. }
复制代码

至此,数据读取的正常流程讲述完毕。

数据读取异常
如果数据读取中碰到异常怎么办?比如,
  • 已知(stage_2,task_0)产生的parition_0的数据在机器m1, 当前任务在m2执行,于是从m2向m1发起远程获取请求,如果m2中拥有目标数据的JVM进程异常退出,则相应的目标数据无法获取。
如果无法获取目标数据,就会上报FetchFailedException.

  1. def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
  2.       val blockId = blockPair._1
  3.       val blockOption = blockPair._2
  4.       blockOption match {
  5.         case Some(block) => {
  6.           block.asInstanceOf[Iterator[T]]
  7.         }
  8.         case None => {
  9.           blockId match {
  10.             case ShuffleBlockId(shufId, mapId, _) =>
  11.               val address = statuses(mapId.toInt)._1
  12.               throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
  13.             case _ =>
  14.               throw new SparkException(
  15.                 "Failed to get block " + blockId + ", which is not a shuffle block")
  16.           }
  17.         }
  18.       }
  19.     }
复制代码

FetchFailedExecption会被包装在StatutsUpdate上报给SchedulerBackend,然后一路处



相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放


Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现

已有(1)人评论

跳转到指定楼层
355815741 发表于 2015-1-4 10:03:41
学习了,谢谢分享~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条