分享

hadoop代码笔记 Mapreduce shuffle过程之Map输出过程((1)

本帖最后由 hyj 于 2014-11-28 21:17 编辑
问题导读
1.map结果是放在磁盘上,还是直接传输给reduce?
2.负责map输出的为哪个函数?






一、概要描述

shuffle是MapReduce的一个核心过程,因此没有在前面的MapReduce作业提交的过程中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:
1.png
本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。
二、 流程描述
  • 在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
  • 在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
  • NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
  • MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
  • SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
  • 当map输出完成后,会调用output的close方法。
  • 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。
2.jpg
Mapreduce shuffle过程之Map输出过程代码流程



三、代码详细
1 MapTask的runNewMapper方法 注意到有这样一段代码。

即当job中只有map没有reduce的时候,这个rg.apache.hadoop.mapreduce.RecordWriter类型的对象 output是一Outputformat中定义的writer,即直接写到输出中。如果是有Reduce,则output是一个NewOutputCollector类型输出。
  1. if (job.getNumReduceTasks() == 0) {
  2.         output = outputFormat.getRecordWriter(taskContext);
  3.       } else {
  4.         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
  5.       }
  6.       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),input, output, committer,                                                 reporter, split);
  7.       input.initialize(split, mapperContext);
  8.       mapper.run(mapperContext);
复制代码

和其他的RecordWriter一样,NewOutputCollector也继承自RecordWriter抽象类。除了一个close方法释放资源外,该抽象类定义的最主要的方法就一个void write(K key, V value)。即写入key,value。

2. Mapper的run方法,对每个输出执行map方法。
  1. public void run(Context context) throws IOException, InterruptedException {
  2.     setup(context);
  3.     while (context.nextKeyValue()) {
  4.       map(context.getCurrentKey(), context.getCurrentValue(), context);
  5.     }
  6.     cleanup(context);
  7. }
复制代码




3. Mapper的map方法,默认是直接把key和value写入

  1. protected void map(KEYIN key, VALUEIN value,
  2. Context context) throws IOException, InterruptedException {
  3. context.write((KEYOUT) key, (VALUEOUT) value);
  4. }
复制代码


一般使用中会做很多我们需要的操作,如著名的wordcount中,把一行单词切分后,数一(value都设为one = new IntWritable(1)),但最终都是要把结果写入。即调用context.write(key,value)

  1. public void map(Object key, Text value, Context context
  2.                     ) throws IOException, InterruptedException {
  3.       StringTokenizer itr = new StringTokenizer(value.toString());
  4.       while (itr.hasMoreTokens()) {
  5.         word.set(itr.nextToken());
  6.         context.write(word, one);
  7.       }
  8.     }
复制代码





4.TaskInputOutputContext的write方法。

调用的是contex中的RecordWriter的write方法。即调用的是NewOutputCollector的write方法。

  1. public void write(KEYOUT key, VALUEOUT value
  2.                      ) throws IOException, InterruptedException {
  3.      output.write(key, value);
  4. }
复制代码





5.NewOutputCollector的write方法。
  1. public void write(K key, V value) throws IOException, InterruptedException {
  2.        collector.collect(key, value,
  3.                          partitioner.getPartition(key, value, partitions));
  4. }
复制代码





从方法名上不难看出提供写数据的是MapOutputCollector类型的 collector对象从NewOutputCollector的构造函数中看到collector的初始化。


  1. collector = new MapOutputBuffer<k,v>(umbilical, job, reporter);
复制代码




6.MapOutputBuffer的构造函数,在了解MapOutputBuffer的collect方法前,先了解下期构造函数,看做了哪些初始化。

  1. public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
  2.                            TaskReporter reporter
  3.                            ) throws IOException, ClassNotFoundException {
  4.       this.job = job;
  5.       this.reporter = reporter;
  6.       localFs = FileSystem.getLocal(job);
  7.      //1)设定map的分区数,即作业 配置中的的reduce数
  8.       partitions = job.getNumReduceTasks();
  9.       rfs = ((LocalFileSystem)localFs).getRaw();
  10.       indexCacheList = new ArrayList<SpillRecord>();
  11.       //2)重要的参数
  12.       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
  13.       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
  14.       final int sortmb = job.getInt("io.sort.mb", 100);
  15.       if (spillper > (float)1.0 || spillper < (float)0.0) {
  16.         throw new IOException("Invalid "io.sort.spill.percent": " + spillper);
  17.       }
  18.       if (recper > (float)1.0 || recper < (float)0.01) {
  19.         throw new IOException("Invalid "io.sort.record.percent": " + recper);
  20.       }
  21.       if ((sortmb & 0x7FF) != sortmb) {
  22.         throw new IOException("Invalid "io.sort.mb": " + sortmb);
  23.       }
  24.       //3)sorter,使用其对map的输出在partition内进行内排序。
  25.       sorter = ReflectionUtils.newInstance(
  26.             job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
  27.       LOG.info("io.sort.mb = " + sortmb);
  28.       // buffers and accounting
  29.      //把单位是M的sortmb设定左移20,还原单位为个
  30.       int maxMemUsage = sortmb << 20;
  31.       int recordCapacity = (int)(maxMemUsage * recper);
  32.       recordCapacity -= recordCapacity % RECSIZE;
  33.       //输出缓存
  34.       kvbuffer = new byte[maxMemUsage - recordCapacity];
  35.       bufvoid = kvbuffer.length;
  36.       recordCapacity /= RECSIZE;
  37.       kvoffsets = new int[recordCapacity];
  38.       kvindices = new int[recordCapacity * ACCTSIZE];
  39.       softBufferLimit = (int)(kvbuffer.length * spillper);
  40.       softRecordLimit = (int)(kvoffsets.length * spillper);
  41.       LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
  42.       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
  43.       // k/v serialization
  44.       comparator = job.getOutputKeyComparator();
  45.       keyClass = (Class<K>)job.getMapOutputKeyClass();
  46.       valClass = (Class<V>)job.getMapOutputValueClass();
  47.       serializationFactory = new SerializationFactory(job);
  48.       keySerializer = serializationFactory.getSerializer(keyClass);
  49.       keySerializer.open(bb);
  50.       valSerializer = serializationFactory.getSerializer(valClass);
  51.       valSerializer.open(bb);
  52.       // counters
  53.       mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
  54.       mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
  55.       Counters.Counter combineInputCounter =
  56.         reporter.getCounter(COMBINE_INPUT_RECORDS);
  57.       combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
  58.       // 4)compression
  59.       if (job.getCompressMapOutput()) {
  60.         Class<? extends CompressionCodec> codecClass =
  61.           job.getMapOutputCompressorClass(DefaultCodec.class);
  62.         codec = ReflectionUtils.newInstance(codecClass, job);
  63.       }
  64.       // 5)combiner是一个NewCombinerRunner类型,调用Job的reducer来对map的输出在map端进行combine。
  65.       combinerRunner = CombinerRunner.create(job, getTaskID(),
  66.                                              combineInputCounter,
  67.                                              reporter, null);
  68.       if (combinerRunner != null) {
  69.         combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
  70.       } else {
  71.         combineCollector = null;
  72.       }
  73.       //6)启动一个SpillThread线程来
  74.       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
  75.       spillThread.setDaemon(true);
  76.       spillThread.setName("SpillThread");
  77.       spillLock.lock();
  78.       try {
  79.         spillThread.start();
  80.         while (!spillThreadRunning) {
  81.           spillDone.await();
  82.         }
  83.       } catch (InterruptedException e) {
  84.         throw (IOException)new IOException("Spill thread failed to initialize"
  85.             ).initCause(sortSpillException);
  86.       } finally {
  87.         spillLock.unlock();
  88.       }
  89.       if (sortSpillException != null) {
  90.         throw (IOException)new IOException("Spill thread failed to initialize"
  91.             ).initCause(sortSpillException);
  92.       }
  93. }
复制代码




7.MapOutputBuffer的collect方法。

参数partition是partitioner根据key计算得到的当前key value属于的partition索引。写key和value写入缓存,当缓存满足spill条件时,通过调用startSpill方法设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待)缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区。写入之前,key与value值都会被序列化成字节数组。kvindices保持了记录所属的分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,可以在缓冲区中找到对应的记录。

输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾和变量bufmark,用于标记记录的结尾。需要bufmark,是因为key或value的输出是变长的。



3.jpg

Key Value序列化后缓存


  1. public synchronized void collect(K key, V value, int partition
  2.                                      ) throws IOException {
  3.       reporter.progress();
  4.       if (key.getClass() != keyClass) {
  5.         throw new IOException("Type mismatch in key from map: expected "
  6.                               + keyClass.getName() + ", recieved "
  7.                               + key.getClass().getName());
  8.       }
  9.       if (value.getClass() != valClass) {
  10.         throw new IOException("Type mismatch in value from map: expected "
  11.                               + valClass.getName() + ", recieved "
  12.                               + value.getClass().getName());
  13.       }
  14. //对kvoffsets的长度取模,暗示我们这是一个环形缓存。
  15. final int kvnext = (kvindex + 1) % kvoffsets.length;
  16.      //进入临界区
  17. spillLock.lock();
  18.       try {
  19.         boolean kvfull;
  20.         do {
  21.           if (sortSpillException != null) {
  22.             throw (IOException)new IOException("Spill failed"
  23.                 ).initCause(sortSpillException);
  24.           }
  25.           // sufficient acct space
  26.           kvfull = kvnext == kvstart;
  27.           final boolean kvsoftlimit = ((kvnext > kvend)
  28.               ? kvnext - kvend > softRecordLimit
  29.               : kvend - kvnext <= kvoffsets.length - softRecordLimit);
  30.           if (kvstart == kvend && kvsoftlimit) {
  31.             LOG.info("Spilling map output: record full = " + kvsoftlimit);
  32. //其实是设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束
  33.             startSpill();
  34.           }
  35.           if (kvfull) {
  36.             try {
  37.               while (kvstart != kvend) {
  38. //kvstart不等于kvend,表示系统正在spill,等待spillDone信号
  39.                 reporter.progress();
  40.                 spillDone.await();
  41.               }
  42.             } catch (InterruptedException e) {
  43.               throw (IOException)new IOException(
  44.                   "Collector interrupted while waiting for the writer"
  45.                   ).initCause(e);
  46.             }
  47.           }
  48.         } while (kvfull);
  49.       } finally {
  50.         spillLock.unlock();
  51.       }
  52.       try {
  53. //先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。串行化过程中,往缓冲区写是最终调用了Buffer.write方法
  54.         // serialize key bytes into buffer
  55.         int keystart = bufindex;
  56.         keySerializer.serialize(key);
  57.         if (bufindex < keystart) {
  58.           //如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法   会把bufvoid设置为bufmark,缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。      
  59.         bb.reset();
  60.           keystart = 0;
  61.         }
  62.         // serialize value bytes into buffer
  63.         final int valstart = bufindex;
  64.         valSerializer.serialize(value);
  65.         int valend = bb.markRecord();
  66.         if (partition < 0 || partition >= partitions) {
  67.           throw new IOException("Illegal partition for " + key + " (" +
  68.               partition + ")");
  69.         }
  70.         mapOutputRecordCounter.increment(1);
  71.         mapOutputByteCounter.increment(valend >= keystart
  72.             ? valend - keystart
  73.             : (bufvoid - keystart) + valend);
  74.         // update accounting info
  75.         int ind = kvindex * ACCTSIZE;
  76.         kvoffsets[kvindex] = ind;
  77.         kvindices[ind + PARTITION] = partition;
  78.         kvindices[ind + KEYSTART] = keystart;
  79.         kvindices[ind + VALSTART] = valstart;
  80.         kvindex = kvnext;
  81.       } catch (MapBufferTooSmallException e) {
  82.         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
  83. //如果value的串行化结果太大,不能一次放入缓冲区
  84.         spillSingleRecord(key, value, partition);
  85.         mapOutputRecordCounter.increment(1);
  86.         return;
  87.       }
  88. }
复制代码






8.MapOutputBuffer.BlockingBuffer的reset()方法.

如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法当发现key的串行化结果出现不连续的情况时,会把bufvoid设置为bufmark,缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。

4.jpg

BlockingBuffer.reset方法
bufstart前面的缓冲区如果不能放下整个key串行化的结果,,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终会成功写入key串行化的结果。

  1. protected synchronized void reset() throws IOException {
  2.         int headbytelen = bufvoid - bufmark;
  3.         bufvoid = bufmark;
  4.         //当发现key的串行化结果出现不连续的情况时,会把bufvoid设置为bufmark,缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。
  5.         if (bufindex + headbytelen < bufstart) {
  6.           System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
  7.           System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
  8.           bufindex += headbytelen;
  9.         } else {
  10.          //bufstart前面的缓冲区如果不能够放下整个key串行化的结果,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出
  11.           byte[] keytmp = new byte[bufindex];
  12.           System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
  13.           bufindex = 0;
  14.           out.write(kvbuffer, bufmark, headbytelen);
  15.           out.write(keytmp);
  16.         }
  17.       }
  18.     }
复制代码





9. MapOutputBuffer.Buffer的write方法。


在key和value序列列化的时候,被调用写到缓存中。如果spill线程正在把缓存的数据写溢出文件,则阻塞。
  1. public synchronized void write(byte b[], int off, int len)
  2.           throws IOException {
  3.         boolean buffull = false;
  4.         boolean wrap = false;
  5.         spillLock.lock();
  6.         try {
  7.           do {//循环,直到有足够的空间可以写数据
  8.             if (sortSpillException != null) {
  9.               throw (IOException)new IOException("Spill failed"
  10.                   ).initCause(sortSpillException);
  11.             }
  12.             // sufficient buffer space?
  13.             if (bufstart <= bufend && bufend <= bufindex) {
  14.               buffull = bufindex + len > bufvoid;
  15.               wrap = (bufvoid - bufindex) + bufstart > len;
  16.             } else {
  17.               // bufindex <= bufstart <= bufend
  18.               // bufend <= bufindex <= bufstart
  19.               wrap = false;
  20.               buffull = bufindex + len > bufstart;
  21.             }
  22.             if (kvstart == kvend) {
  23.               // spill thread not running
  24.               if (kvend != kvindex) {
  25.                 //如果数组中有记录(kvend != kvindex),那么,根据需要(目前输出空间不足或记录数达到spill条件)启动spill过程
  26.                 final boolean bufsoftlimit = (bufindex > bufend)
  27.                   ? bufindex - bufend > softBufferLimit
  28.                   : bufend - bufindex < bufvoid - softBufferLimit;
  29.                 if (bufsoftlimit || (buffull && !wrap)) {
  30.                   LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
  31.                   startSpill();
  32.                 }
  33.               } else if (buffull && !wrap) {
  34.                // 如果空间不够(buffull && !wrap),但是缓存中没有记录,表明这个记录非常大,内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常,直接写文件不用写缓存
  35.                 final int size = ((bufend <= bufindex)
  36.                   ? bufindex - bufend
  37.                   : (bufvoid - bufend) + bufindex) + len;
  38.                 bufstart = bufend = bufindex = bufmark = 0;
  39.                 kvstart = kvend = kvindex = 0;
  40.                 bufvoid = kvbuffer.length;
  41.                 throw new MapBufferTooSmallException(size + " bytes");
  42.               }
  43.             }
  44.             if (buffull && !wrap) {
  45.               try {
  46.               //如果空间不足但是spill在运行,等待spillDone
  47.                 while (kvstart != kvend) {
  48.                   reporter.progress();
  49.                   spillDone.await();
  50.                 }
  51.               } catch (InterruptedException e) {
  52.                   throw (IOException)new IOException(
  53.                       "Buffer interrupted while waiting for the writer"
  54.                       ).initCause(e);
  55.               }
  56.             }
  57.           } while (buffull && !wrap);
  58.         } finally {
  59.           spillLock.unlock();
  60.         }
  61.         //真正把数据写缓存的地方!如果buffull,则写数据会不连续,则写满剩余缓冲区,然后设置bufindex=0,并从bufindex处接着写。否则,就是从bufindex处开始写。
  62.         if (buffull) {
  63.           //缓存剩余长度
  64.           final int gaplen = bufvoid - bufindex;
  65.           //把剩余的写满
  66.           System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
  67.           //剩下长度
  68.           len -= gaplen;
  69.           //剩下偏移
  70.           off += gaplen;
  71.          //写指针移到开头
  72.           bufindex = 0;
  73.         }
  74.         从指定的开头写
  75.         System.arraycopy(b, off, kvbuffer, bufindex, len);
  76.         bufindex += len;
  77.       }
  78.     }
复制代码






5.jpg


buffull和wrap条件说明
如图,对bufful和wrap条件进行说明: 在上面两种情况下,即情况1和情况2,
  1. buffull = bufindex + len > bufvoid;
  2. wrap = (bufvoid - bufindex) + bufstart > len;
复制代码


buffull条件判断为从下次写指针的位置bufindex到缓存结束bufvoid的空间是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间(前后空白合在一起)是否比输入大,如果是,wrap为true; 情况3和情况4中,
  1. wrap = false;
  2. buffull = bufindex + len > bufstart;
复制代码


buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。 条件(buffull && !wrap)满足时,目前的空间不够一次写。


下一篇:hadoop代码笔记 Mapreduce shuffle过程之Map输出过程((2)

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条