分享

hadoop代码笔记 Mapreduce shuffle过程之Map输出过程((2)

hyj 发表于 2014-11-28 21:16:15 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 16705

问题导读


1.sortAndSpill方法的作用是什么?
2.MapOutputBuffer实现了IndexedSortable接口,这个接口的作用是什么?







接上文

10.MapOutputBuffer 的spillSingleRecord方法。

如果在collect方法中处理缓存失败,则直接把这条记录些到spill文件中。对应单条记录即使设置了combiner也不用。如果记录非常大,内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常,直接写文件不用写缓存。

  1. private void spillSingleRecord(final K key, final V value,
  2.                                    int partition) throws IOException {
  3.       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  4.       FSDataOutputStream out = null;
  5.       try {
  6.         // 创建spill文件
  7.         final SpillRecord spillRec = new SpillRecord(partitions);
  8.         final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  9.             numSpills, size);
  10.         out = rfs.create(filename);
  11.         IndexRecord rec = new IndexRecord();
  12.         for (int i = 0; i < partitions; ++i) {
  13.           IFile.Writer<K, V> writer = null;
  14.           try {
  15.             long segmentStart = out.getPos();
  16.              writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
  17.                                             spilledRecordsCounter);
  18.             if (i == partition) {
  19.               final long recordStart = out.getPos();
  20.               writer.append(key, value);
  21.               mapOutputByteCounter.increment(out.getPos() - recordStart);
  22.             }
  23.             writer.close();
  24.             // 把偏移记录在index中
  25.             rec.startOffset = segmentStart;
  26.             rec.rawLength = writer.getRawLength();
  27.             rec.partLength = writer.getCompressedLength();
  28.             spillRec.putIndex(rec, i);
  29.             writer = null;
  30.           } catch (IOException e) {
  31.             if (null != writer) writer.close();
  32.             throw e;
  33.           }
  34.         }
  35.          //如果index满了,则把index也写到index文件中。没满则把该条index记录加入到indexCacheList中,并更新totalIndexCacheMemory。
  36.         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  37.           // create spill index file
  38.           Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
  39.               getTaskID(), numSpills,
  40.               partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  41.           spillRec.writeToFile(indexFilename, job);
  42.         } else {
  43.           indexCacheList.add(spillRec);
  44.           totalIndexCacheMemory +=
  45.             spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  46.         }
  47.         ++numSpills;
  48.       } finally {
  49.         if (out != null) out.close();
  50.       }
  51.     }
复制代码






11.MapOutputBuffer的startSpill。唤醒等待spillReady的线程。
  1. private synchronized void startSpill() {
  2.       LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
  3.                "; bufvoid = " + bufvoid);
  4.       LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
  5.                "; length = " + kvoffsets.length);
  6.       kvend = kvindex;
  7.       bufend = bufmark;
  8.       spillReady.signal();
  9.     }
复制代码




12.SpillThread的run方法。
该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。当kvstart == kvend条件成立时,表示没有要spill的记录。

  1. public void run() {
  2.       //临界区
  3.         spillLock.lock();
  4.         spillThreadRunning = true;
  5.         try {
  6.           while (true) {
  7.             spillDone.signal();
  8. 当kvstart == kvend条件成立时,表示没有要spill的记录
  9.             while (kvstart == kvend) {
  10.               spillReady.await();
  11.             }
  12.             try {
  13.               spillLock.unlock();
  14.               //执行操作
  15.               sortAndSpill();
  16.             } catch (Exception e) {
  17.               sortSpillException = e;
  18.             } catch (Throwable t) {
  19.               sortSpillException = t;
  20.               String logMsg = "Task " + getTaskID() + " failed : "
  21.                               + StringUtils.stringifyException(t);
  22.               reportFatalError(getTaskID(), t, logMsg);
  23.             } finally {
  24.               spillLock.lock();
  25.               if (bufend < bufindex && bufindex < bufstart) {
  26.                 bufvoid = kvbuffer.length;
  27.               }
  28.               kvstart = kvend;
  29.               bufstart = bufend;
  30.             }
  31.           }
  32.         } catch (InterruptedException e) {
  33.           Thread.currentThread().interrupt();
  34.         } finally {
  35.           spillLock.unlock();
  36.           spillThreadRunning = false;
  37.         }
  38.       }
  39.     }
复制代码





13..MapOutputBuffer的sortAndSpill() 方法

SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。

  1. private void sortAndSpill() throws IOException, ClassNotFoundException,
  2.                                        InterruptedException {
  3.       //approximate the length of the output file to be the length of the
  4.       //buffer + header lengths for the partitions
  5.       long size = (bufend >= bufstart
  6.           ? bufend - bufstart
  7.           : (bufvoid - bufend) + bufstart) +
  8.                   partitions * APPROX_HEADER_LENGTH;
  9.       FSDataOutputStream out = null;
  10.       try {
  11.         // 创建溢出文件
  12.         final SpillRecord spillRec = new SpillRecord(partitions);
  13.         final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  14.             numSpills, size);
  15.         out = rfs.create(filename);
  16.         final int endPosition = (kvend > kvstart)
  17.           ? kvend
  18.           : kvoffsets.length + kvend;
  19. //使用sorter进行排序, 在内存中进行,参照MapOutputBuffer的compare方法实现的这里的排序也是对序列化的字节做的排序。排序是在kvoffsets上面进行,参照MapOutputBuffer的swap方法实现。
  20.         sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  21.         int spindex = kvstart;
  22.         IndexRecord rec = new IndexRecord();
  23.         InMemValBytes value = new InMemValBytes();
  24.         for (int i = 0; i < partitions; ++i) {
  25.           IFile.Writer<K, V> writer = null;
  26.           try {
  27.             long segmentStart = out.getPos();
  28.             writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
  29.                                       spilledRecordsCounter);
  30.             if (combinerRunner == null) {
  31.               // 如果没有combinner则直接写键值
  32.               DataInputBuffer key = new DataInputBuffer();
  33.               while (spindex < endPosition &&
  34.                   kvindices[kvoffsets[spindex % kvoffsets.length]
  35.                             + PARTITION] == i) {
  36.                 final int kvoff = kvoffsets[spindex % kvoffsets.length];
  37.                 getVBytesForOffset(kvoff, value);
  38.                 key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
  39.                           (kvindices[kvoff + VALSTART] -
  40.                            kvindices[kvoff + KEYSTART]));
  41.                 //键值写到溢出文件
  42.                 writer.append(key, value);
  43.                 ++spindex;
  44.               }
  45.             } else {
  46.               int spstart = spindex;
  47.               while (spindex < endPosition &&
  48.                   kvindices[kvoffsets[spindex % kvoffsets.length]
  49.                             + PARTITION] == i) {
  50.                 ++spindex;
  51.               }
  52.               //如果设置了combiner,则调用了combine方法后的结果写到IFile中,writer还是先前的writer。减少溢写到磁盘的数据量。
  53.               if (spstart != spindex) {
  54.                 combineCollector.setWriter(writer);
  55.                 RawKeyValueIterator kvIter =
  56.                   new MRResultIterator(spstart, spindex);
  57.                 combinerRunner.combine(kvIter, combineCollector);
  58.               }
  59.             }
  60.             // close the writer
  61.             writer.close();
  62.             // record offsets
  63.             rec.startOffset = segmentStart;
  64.             rec.rawLength = writer.getRawLength();
  65.             rec.partLength = writer.getCompressedLength();
  66.             spillRec.putIndex(rec, i);
  67.             writer = null;
  68.           } finally {
  69.             if (null != writer) writer.close();
  70.           }
  71.         }
  72.         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  73.           // 写溢出索引文件,格式如+ "/spill" + spillNumber +  ".out.index"
  74.           Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
  75.               getTaskID(), numSpills,
  76.               partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  77.           spillRec.writeToFile(indexFilename, job);
  78.         } else {
  79.           indexCacheList.add(spillRec);
  80.           totalIndexCacheMemory +=
  81.             spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  82.         }
  83.         LOG.info("Finished spill " + numSpills);
  84.         ++numSpills;
  85.       } finally {
  86.         if (out != null) out.close();
  87.       }
  88.     }
复制代码





14 MapOutputBuffer的compare方法和swap方法

MapOutputBuffer实现了IndexedSortable接口,从接口命名上就可以猜想到,这个排序不是移动数据,而是移动数据的索引。在这里要排序的其实是kvindices对象,通过移动其记录在kvoffets上的索引来实现。 如图,表示了写磁盘前Sort的效果。kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。通过观察MapOutputBuffer的compare知道,先是在partition上排序,然后是在key上排序。

1.jpg

kvindices在kvoffets上排序
  1. public int compare(int i, int j) {
  2.       final int ii = kvoffsets[i % kvoffsets.length];
  3.       final int ij = kvoffsets[j % kvoffsets.length];
  4.       // 先在partition上排序
  5.       if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
  6.         return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
  7.       }
  8.       // 然后在可以上排序
  9.       return comparator.compare(kvbuffer,
  10.           kvindices[ii + KEYSTART],
  11.           kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
  12.           kvbuffer,
  13.           kvindices[ij + KEYSTART],
  14.           kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
  15.     }
  16.      public void swap(int i, int j) {
  17.       i %= kvoffsets.length;
  18.       j %= kvoffsets.length;
  19.       //通过交互在kvoffsets上的索引达到排序效果
  20.       int tmp = kvoffsets[i];
  21.       kvoffsets[i] = kvoffsets[j];
  22.       kvoffsets[j] = tmp;
  23.     }
复制代码

15. MapOutputBuffer的flush() 方法
Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,调用flush方法,合并spill{n}文件产生最后的输出。先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程.

  1. public synchronized void flush() throws IOException, ClassNotFoundException,
  2.                                             InterruptedException {
  3.       LOG.info("Starting flush of map output");
  4.       spillLock.lock();
  5.       try {
  6.         while (kvstart != kvend) {
  7.           reporter.progress();
  8.           spillDone.await();
  9.         }
  10.         if (sortSpillException != null) {
  11.           throw (IOException)new IOException("Spill failed"
  12.               ).initCause(sortSpillException);
  13.         }
  14.         if (kvend != kvindex) {
  15.           kvend = kvindex;
  16.           bufend = bufmark;
  17.           sortAndSpill();
  18.         }
  19.       } catch (InterruptedException e) {
  20.         throw (IOException)new IOException(
  21.             "Buffer interrupted while waiting for the writer"
  22.             ).initCause(e);
  23.       } finally {
  24.         spillLock.unlock();
  25.       }
  26.       assert !spillLock.isHeldByCurrentThread();
  27.       try {
  28.         spillThread.interrupt();
  29.         spillThread.join();
  30.       } catch (InterruptedException e) {
  31.         throw (IOException)new IOException("Spill failed"
  32.             ).initCause(e);
  33.       }
  34.       // release sort buffer before the merge
  35.       kvbuffer = null;
  36.       mergeParts();
  37.     }
复制代码





16.MapTask.MapOutputBuffer的mergeParts()方法.
从不同溢写文件中读取出来的,然后再把这些值加起来。因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果配置设置过Combiner,也会使用Combiner来合并相同的key。?mapreduce让每个map只输出一个文件,并且为这个文件提供一个索引文件,以记录每个reduce对应数据的偏移量。

  1. private void mergeParts() throws IOException, InterruptedException,
  2.                                      ClassNotFoundException {
  3.       // get the approximate size of the final output/index files
  4.       long finalOutFileSize = 0;
  5.       long finalIndexFileSize = 0;
  6.       final Path[] filename = new Path[numSpills];
  7.       final TaskAttemptID mapId = getTaskID();
  8.       for(int i = 0; i < numSpills; i++) {
  9.         filename[i] = mapOutputFile.getSpillFile(mapId, i);
  10.         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
  11.       }
  12.       if (numSpills == 1) { //如果只有一个spill文件,则重命名为输出的最终文件
  13.         rfs.rename(filename[0],
  14.             new Path(filename[0].getParent(), "file.out"));
  15.         if (indexCacheList.size() == 0) {
  16.           rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
  17.               new Path(filename[0].getParent(),"file.out.index"));
  18.         } else {
  19.           indexCacheList.get(0).writeToFile(
  20.                 new Path(filename[0].getParent(),"file.out.index"), job);
  21.         }
  22.         return;
  23.       }
  24.       // read in paged indices
  25.       for (int i = indexCacheList.size(); i < numSpills; ++i) {
  26.         Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
  27.         indexCacheList.add(new SpillRecord(indexFileName, job));
  28.       }
  29.       //make correction in the length to include the sequence file header
  30.       //lengths for each partition
  31.       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
  32.       finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  33.       Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
  34.                              finalOutFileSize);
  35.       Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
  36.                             mapId, finalIndexFileSize);
  37.       //The output stream for the final single output file
  38.       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
  39.       if (numSpills == 0) {
  40.         //如果没有spill文件,则创建一个 dummy files
  41.         IndexRecord rec = new IndexRecord();
  42.         SpillRecord sr = new SpillRecord(partitions);
  43.         try {
  44.           for (int i = 0; i < partitions; i++) {
  45.             long segmentStart = finalOut.getPos();
  46.             Writer<K, V> writer =
  47.               new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
  48.             writer.close();
  49.             rec.startOffset = segmentStart;
  50.             rec.rawLength = writer.getRawLength();
  51.             rec.partLength = writer.getCompressedLength();
  52.             sr.putIndex(rec, i);
  53.           }
  54.           sr.writeToFile(finalIndexFile, job);
  55.         } finally {
  56.           finalOut.close();
  57.         }
  58.         return;
  59.       }
  60.       {
  61.         IndexRecord rec = new IndexRecord();
  62.         final SpillRecord spillRec = new SpillRecord(partitions);
  63.         for (int parts = 0; parts < partitions; parts++) {
  64.           //在循环内对每个分区分别创建segment然后做merge
  65.           List<Segment<K,V>> segmentList =
  66.             new ArrayList<Segment<K, V>>(numSpills);
  67.           for(int i = 0; i < numSpills; i++) {
  68.             IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
  69.             Segment<K,V> s =
  70.               new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
  71.                                indexRecord.partLength, codec, true);
  72.             segmentList.add(i, s);
  73.             if (LOG.isDebugEnabled()) {
  74.               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
  75.                   "Spill =" + i + "(" + indexRecord.startOffset + "," +
  76.                   indexRecord.rawLength + ", " + indexRecord.partLength + ")");
  77.             }
  78.           }
  79.           //merge
  80.           @SuppressWarnings("unchecked")
  81.           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
  82.                          keyClass, valClass, codec,
  83.                          segmentList, job.getInt("io.sort.factor", 100),
  84.                          new Path(mapId.toString()),
  85.                          job.getOutputKeyComparator(), reporter,
  86.                          null, spilledRecordsCounter);
  87.           //write merged output to disk
  88.          //执行merge,并且把merge结果写到"/file.out"的最终输出中去。
  89.           long segmentStart = finalOut.getPos();
  90.           Writer<K, V> writer =
  91.               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
  92.                                spilledRecordsCounter);
  93.           if (combinerRunner == null || numSpills < minSpillsForCombine) {
  94.             Merger.writeFile(kvIter, writer, reporter, job);
  95.           } else {
  96.             combineCollector.setWriter(writer);
  97.             combinerRunner.combine(kvIter, combineCollector);
  98.           }
  99.           //close
  100.           writer.close();
  101.           // record offsets
  102.           //把index写到最终的"/file.out.index"文件中。
  103.           rec.startOffset = segmentStart;
  104.           rec.rawLength = writer.getRawLength();
  105.           rec.partLength = writer.getCompressedLength();
  106.           spillRec.putIndex(rec, parts);
  107.         }
  108.         spillRec.writeToFile(finalIndexFile, job);
  109.         finalOut.close();
  110.         for(int i = 0; i < numSpills; i++) {
  111.           rfs.delete(filename[i],true);
  112.         }
  113.       }
  114.     }
复制代码





合并前后index文件和spill文件的结构图

2.jpg



merge最终生成一个spill.out和spill.out.index文件
从前面的分析指导,多个partition的都在一个输出文件中,但是按照partition排序的。即把maper输出按照partition分段了。一个partition对应一个reducer,因此一个reducer只要获取一段即可。



已有(1)人评论

跳转到指定楼层
chinaboy2005 发表于 2014-11-28 22:01:19
好东西,收藏中
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条