本帖最后由 pig2 于 2014-1-16 17:03 编辑
辅助类1:
MapTask的辅助类主要针对Mapper的输入和输出。首先我们来看MapTask中用的的Mapper输入,在类图中,这部分位于右上角。
MapTask.TrackedRecordReader是一个Wrapper,在原有输入RecordReader的基础上,添加了收集上报统计数据的功能。
MapTask.SkippingRecordReader也是一个Wrapper,它在MapTask.TrackedRecordReader的基础上,添加了忽略部分输入的功能。在分析MapTask.SkippingRecordReader之前,我们先看一下类SortedRanges和它相关的类。
类SortedRanges.Ranges表示了一个范围,以开始位置和范围长度(这样的话就可以表示长度为0的范围)来表示一个范围,并提供了一系列的范围操作方法。注意,方法getEndIndex得到的右端点并不包含在范围内(应理解为开区间)。SortedRanges包含了一系列不重叠的范围,为了保证包含的范围不重叠,在add方法和remove方法上需要做一些处理,保证不重叠的约束。SkipRangeIterator是访问SortedRanges包含的Ranges的迭代器。
MapTask.SkippingRecordReader的实现很简单,因为要忽略的输入都保持在SortedRanges.Ranges,只需要在next方法中,判断目前范围时候落在SortedRanges.Ranges中,如果是,忽略,并将忽略的记录写文件(可配置)
NewTrackingRecordReader和NewOutputCollector被新API使用,我们不分析。
MapTask的输出辅助类都继承自MapOutputCollector,它只是在OutputCollector的基础上添加了close和flush方法。
DirectMapOutputCollector用在Reducer的数目为0,就是不需要Reduce阶段的时候。它是直接通过
out = job.getOutputFormat().getRecordWriter(fs,job, finalName, reporter);
得到对应的RecordWriter,collect直接到RecordWriter上。
如果Mapper后续有reduce任务,系统会使用MapOutputBuffer做为输出,这是个比较复杂的类,有1k行左右的代码。
我们知道,Mapper是通过OutputCollector将Map的结果输出,输出的量很大,Hadoop的机制是通过一个circle buffer 收集Mapper的输出, 到了io.sort.mb * percent量的时候,就spill到disk,如下图。图中出现了两个数组和一个缓冲区,kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。
当Mapper任务结束后,有可能会出现多个spill文件,这些文件会做一个归并排序,形成Mapper的一个输出(spill.out和spill.out.index),如下图:
这个输出是按partition排序的,这样的话,Mapper的输出被分段,Reducer要获取的就是spill.out中的一段。(注意,内存和硬盘上的索引结构不一样)
---------------------------------------------------------------------------------------------------------------------------------------------------
辅助类2:有了上面Mapper输出的内存存储结构和硬盘存储结构讨论,我们来仔细分析MapOutputBuffer的流程。
首先是成员变量。最先初始化的是作业配置job和统计功能reporter。通过配置,MapOutputBuffer可以获取本地文件系统(localFs和rfs),Reducer的数目和Partitioner。
SpillRecord是文件spill.out{spill号}.index在内存中的对应抽象(内存数据和文件数据就差最后的校验和),该文件保持了一系列的IndexRecord,如下图:
IndexRecord有3个字段,分别是startOffset:记录偏移量,rawLength:初始长度,partLength:实际长度(可能有压缩)。SpillRecord保持了一系列的IndexRecord,并提供方法用于添加记录(没有删除记录的操作,因为不需要),获取记录,写文件,读文件(通过构造函数)。
接下来是一些和输出缓存区kvbuffer,缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets相关的处理,下面的图有助于说明这段代码。
这部分依赖于3个配置参数,io.sort.spill.percent是kvbuffer,kvindices和kvoffsets的总大小(以M为单位,缺省是100,就是100M,这一部分是MapOutputBuffer中占用存储最多的)。io.sort.record.percent是kvindices和kvoffsets占用的空间比例(缺省是0.05)。前面的分析我们已经知道kvindices和kvoffsets,如果记录数是N的话,它占用的空间是4N*4bytes,根据这个关系和io.sort.record.percent的值,我们可以计算出kvindices和kvoffsets最多能有多少个记录,并分配相应的空间。参数io.sort.spill.percent指示当输出缓冲区或kvindices和kvoffsets记录数量到达对应的占用率的时候,会启动spill,将内存缓冲区的记录存放到硬盘上,softBufferLimit和softRecordLimit为对应的字节数。
值对<key, value>输出到缓冲区是通过Serializer串行化的,这部分的初始化跟在上面输出缓存后面。接下来是一些计数器和可能的数据压缩处理器的初始化,可能的Combiner和combiner工作的一些配置。
最后是启动spillThread,该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。
先看生产者,MapOutputBuffer.collect的主要流程是:
- 报告进度和参数检测(<K, V>符合Mapper的输出约定);
- spillLock.lock(),进入临界区;
- 如果达到spill条件,设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待);
- spillLock.unlock();
- 输出key,value并更新kvindices和kvoffsets(注意,方法collect是synchronized,key和value各自输出,它们也会占用连续的输出缓冲区);
kvstart,kvend和kvindex三个变量在判断是否需要spill和spill是否结束的过程中很重要,kvstart是有效记录开始的下标,kvindex是下一个可做记录的位置,kvend的作用比较特殊,它在一般情况下kvstart==kvend,但开始spill的时候它会被赋值为kvindex的值,spill结束时,它的值会被赋给kvstart,这时候kvstart==kvend。这就是说,如果kvstart不等于kvend,系统正在spill,否则,kvstart==kvend,系统处于普通工作状态。其实在代码中,我们可以看到很多kvstart==kvend的判断。
下面我们分情况,讨论kvstart,kvend和kvindex的配合。初始化的时候,它们都被赋值0。
下图给出了一个没有spill的记录添加过程:
注意kvindex和kvnext的关系,取模实现了循环缓冲区
如果在添加记录的过程中,出现spill(多种条件),那么,主要的过程如下:
首先还是计算kvnext,主要,这个时候kvend==kvstart(图中没有画出来)。如果spill条件满足,那么,kvindex的值会赋给kvend(这是kvend不等于kvstart),从kvstart和kvend的大小关系,我们可以知道记录位于数组的那一部分(左边是kvstart<kvend的情况,右边是另外的情况)。Spill结束的时候,kvend值会被赋给kvstart, kvend==kvstart又重新满足,同时,我们可以发现kvindex在这个过程中没有变化,新的记录还是写在kvindex指向的位置,然后,kvindex=kvnect,kvindex移到下一个可用位置。
大家体会一下上面的过程,特别是kvstart,kvend和kvindex的配合,其实,<key,value>对输出使用的缓冲区,也有类似的过程。
Collect在处理<key,value>输出时,会处理一个MapBufferTooSmallException,这是value的串行化结果太大,不能一次放入缓冲区的指示,这种情况下我们需要调用spillSingleRecord,特殊处理。
---------------------------------------------------------------------------------------------------------------------------------------------------
辅助类3:接下来讨论的是key,value的输出,这部分比较复杂,不过有了前面kvstart,kvend和kvindex配合的分析,有利于我们理解这部分的代码。输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾(见后面BlockingBuffer.reset分析),和变量bufmark,用于标记记录的结尾。这部分代码需要bufmark,是因为key或value的输出是变长的,(前面元信息记录大小是常量,就不需要这样的变量)。最好的情况是缓冲区没有翻转和value串行化结果很小,如下图:
先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。
串行化过程中,往缓冲区写是最终调用了Buffer.write方法,我们后面再分析。
如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法,解决这个问题。下图解释了如何解决这个问题:
当发现key的串行化结果出现不连续的情况时,我们会把bufvoid设置为bufmark,见缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。
上面的调整有一个条件,就是bufstart前面的缓冲区能够放下整个key串行化的结果,如果不能,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终我们会成功写入key串行化的结果。
下面我们看write方法。key,value串行化过程中,往缓冲区写数据是最终调用了Buffer.write方法,又是一个复杂的方法。
- do-while循环,直到我们有足够的空间可以写数据(包括缓冲区和kvindices和kvoffsets)
- 首先我们计算缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap(这个实在拗口),见下面的讨论;条件(buffull && !wrap)用于判断目前有没有足够的写空间;
- 在spill没启动的情况下(kvstart == kvend),分两种情况,如果数组中有记录(kvend != kvindex),那么,根据需要(目前输出空间不足或记录数达到spill条件)启动spill过程;否则,如果空间还是不够(buffull && !wrap),表明这个记录非常大,以至于我们的内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常;
- 如果空间不足同时spill在运行,等待spillDone;
- 写数据,注意,如果buffull,则写数据会不连续,则写满剩余缓冲区,然后设置bufindex=0,并从bufindex处接着写。否则,就是从bufindex处开始写。
下图给出了缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap计算的几种可能:
情况1和情况2中,buffull判断为从bufindex到bufvoid是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间是否比输入大,如果是,wrap为true;情况3和情况4中,buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。明显,条件(buffull && !wrap)满足时,目前的空间不够一次写。
接下来我们来看spillSingleRecord,只是用于写放不进内存缓冲区的<key,value>对。过程很流水,首先是创建SpillRecord记录,输出文件和IndexRecord记录,然后循环,构造SpillRecord并在恰当的时候输出记录(如下图),最后输出spill{n}.index文件。
前面我们提过spillThread,在这个系统中它是消费者,这个消费者相当简单,需要spill时调用函数sortAndSpill,进行spill。sortAndSpill和spillSingleRecord类似,函数的开始也是创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。
按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
sortAndSpill最后是输出spill{n}.index文件。
combineAndSpill比价简单,我们就不分析了。
BlockingBuffer中最后要分析的方法是flush方法。调用flush方法,意味着Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,并合并spill{n}文件产生最后的输出。
缓冲区处理部分很简单,先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程。
flush合并spill{n}文件是通过mergeParts方法。如果Mapper最后只有一个spill{n}文件,简单修改该文件的文件名就可以。如果Mapper没有任何输出,那么我们需要创建哑输出(dummy files)。如果spill{n}文件多于1个,那么按partition循环处理所有文件,将处于处理partition的记录输出。处理partition的过程中可能还会再次调用combineAndSpill,最记录再做一次combination,其中还涉及到工具类Merger,我们就不再深入研究了。
下一篇
Hadoop二次开发必懂:源码分析Task的内部类和辅助类(3)
上一篇
Hadoop二次开发必懂:源码分析MapReduce概论及MapTask(1)
|