本帖最后由 pig2 于 2014-1-16 17:10 编辑
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。
Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。
Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那,哪个key到哪个Reducer的分配过程,是由Partitioner规定的,它只有一个方法,输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。
Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。
Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。
OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。
基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。
SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat,我们就不再详细分析啦。
---------------------------------------------------------------------------------------------------------------------------------------------------
Mapper的输出,在发送到Reducer前是存放在本地文件系统的,IFile提供了对Mapper输出的管理。我们已经知道,Mapper的输出是<Key,Value>对,IFile以记录<key-len, value-len, key,value>的形式存放了这些数据。为了保存键值对的边界,很自然IFile需要保存key-len和value-len。
和IFile相关的类图如下:
其中,文件流形式的输入和输出是由IFIleInputStream和IFIleOutputStream抽象。以记录形式的读/写操作由IFile.Reader/IFile.Writer提供,IFile.InMemoryReader用于读取存在于内存中的IFile文件格式数据。
我们以输出为例,来分析这部分的实现。首先是下图的和序列化反序列化相关的Serialization/Deserializer,这部分的code是在包org.apache.hadoop.io.serializer。序列化由Serializer抽象,通过Serializer的实现,用户可以利用serialize方法把对象序列化到通过open方法打开的输出流里。Deserializer提供的是相反的过程,对应的方法是deserialize。hadoop.io.serializer中还实现了配合工作的Serialization和对应的工厂SerializationFactory。两个具体的实现是WritableSerialization和JavaSerialization,分别对应了Writeble的序列化反序列化和Java本身带的序列化反序列化。
有了Serializer/Deserializer,我们来分析IFile.Writer。Writer的构造函数是:
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, Counters.Counter writesCounter)
conf,配置参数,out是Writer的输出,keyClass 和valueClass 是输出的Kay,Value的class属性,codec是对输出进行压缩的方法,参数writesCounter用于对输出字节数进行统计的Counters.Counter。通过这些参数,我们可以构造我们使用的支持压缩功能的输出流(类成员out,类成员rawOut保存了构造函数传入的out),相关的计数器,还有就是Kay,Value的Serializer方法。
Writer最主要的方法是append方法(居然不是write方法,呵呵),有两种形式:
public void append(K key, V value) throws IOException {
public void append(DataInputBuffer key, DataInputBuffer value)
append(K key, V value)的主要过程是检查参数,然后将key和value序列化到DataOutputBuffer中,并获取序列化后的长度,最后把长度(2个)和DataOutputBuffer中的结果写到输出,并复位DataOutputBuffer和计数。append(DataInputBuffer key, DataInputBuffer value)处理过程也比较类似,就不再分析了。
close方法中需要注意的是,我们需要标记文件尾,或者是流结束。目前是通过写2个值为EOF_MARKER的长度来做标记。
IFileOutputStream是用于配合Writer的输出流,它会在IFiles的最后添加校验数据。当Writer调用IFileOutputStream的write操作时,IFileOutputStream计算并保持校验和,流被close的时候,校验结果会写到对应文件的文件尾。实际上存放在磁盘上的文件是一系列的<key-len, value-len, key, value>记录和校验结果。
Reader的相关过程,我们就不再分析了。
下一篇Hadoop源代码分析(*IDs类和*Context类)及包hadoop.mapred中的MapReduce接口(5)
上一篇
Hadoop二次开发必懂:源码分析Task的内部类和辅助类(3)
---------------------------------------------------------------------------------------------------------------------------------------------------
|
|