问题导读
1.sortAndSpill方法的作用是什么?
2.MapOutputBuffer实现了IndexedSortable接口,这个接口的作用是什么?
接上文
10.MapOutputBuffer 的spillSingleRecord方法。
如果在collect方法中处理缓存失败,则直接把这条记录些到spill文件中。对应单条记录即使设置了combiner也不用。如果记录非常大,内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常,直接写文件不用写缓存。
- private void spillSingleRecord(final K key, final V value,
- int partition) throws IOException {
- long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // 创建spill文件
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
- out = rfs.create(filename);
-
- IndexRecord rec = new IndexRecord();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<K, V> writer = null;
- try {
- long segmentStart = out.getPos();
- writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
- spilledRecordsCounter);
-
- if (i == partition) {
- final long recordStart = out.getPos();
- writer.append(key, value);
- mapOutputByteCounter.increment(out.getPos() - recordStart);
- }
- writer.close();
-
- // 把偏移记录在index中
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, i);
-
- writer = null;
- } catch (IOException e) {
- if (null != writer) writer.close();
- throw e;
- }
- }
- //如果index满了,则把index也写到index文件中。没满则把该条index记录加入到indexCacheList中,并更新totalIndexCacheMemory。
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
复制代码
11.MapOutputBuffer的startSpill。唤醒等待spillReady的线程。 - private synchronized void startSpill() {
- LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
- "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
- "; length = " + kvoffsets.length);
- kvend = kvindex;
- bufend = bufmark;
- spillReady.signal();
- }
复制代码
12.SpillThread的run方法。 该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。当kvstart == kvend条件成立时,表示没有要spill的记录。
- public void run() {
- //临界区
- spillLock.lock();
- spillThreadRunning = true;
- try {
- while (true) {
- spillDone.signal();
- 当kvstart == kvend条件成立时,表示没有要spill的记录
- while (kvstart == kvend) {
- spillReady.await();
- }
- try {
- spillLock.unlock();
- //执行操作
- sortAndSpill();
- } catch (Exception e) {
- sortSpillException = e;
- } catch (Throwable t) {
- sortSpillException = t;
- String logMsg = "Task " + getTaskID() + " failed : "
- + StringUtils.stringifyException(t);
- reportFatalError(getTaskID(), t, logMsg);
- } finally {
- spillLock.lock();
- if (bufend < bufindex && bufindex < bufstart) {
- bufvoid = kvbuffer.length;
- }
- kvstart = kvend;
- bufstart = bufend;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- spillLock.unlock();
- spillThreadRunning = false;
- }
- }
- }
复制代码
13..MapOutputBuffer的sortAndSpill() 方法
SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
- private void sortAndSpill() throws IOException, ClassNotFoundException,
- InterruptedException {
- //approximate the length of the output file to be the length of the
- //buffer + header lengths for the partitions
- long size = (bufend >= bufstart
- ? bufend - bufstart
- : (bufvoid - bufend) + bufstart) +
- partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // 创建溢出文件
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
- out = rfs.create(filename);
-
- final int endPosition = (kvend > kvstart)
- ? kvend
- : kvoffsets.length + kvend;
- //使用sorter进行排序, 在内存中进行,参照MapOutputBuffer的compare方法实现的这里的排序也是对序列化的字节做的排序。排序是在kvoffsets上面进行,参照MapOutputBuffer的swap方法实现。
- sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
- int spindex = kvstart;
- IndexRecord rec = new IndexRecord();
- InMemValBytes value = new InMemValBytes();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<K, V> writer = null;
- try {
- long segmentStart = out.getPos();
- writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combinerRunner == null) {
- // 如果没有combinner则直接写键值
- DataInputBuffer key = new DataInputBuffer();
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- final int kvoff = kvoffsets[spindex % kvoffsets.length];
- getVBytesForOffset(kvoff, value);
- key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
- (kvindices[kvoff + VALSTART] -
- kvindices[kvoff + KEYSTART]));
- //键值写到溢出文件
- writer.append(key, value);
- ++spindex;
- }
- } else {
- int spstart = spindex;
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- ++spindex;
- }
- //如果设置了combiner,则调用了combine方法后的结果写到IFile中,writer还是先前的writer。减少溢写到磁盘的数据量。
- if (spstart != spindex) {
- combineCollector.setWriter(writer);
- RawKeyValueIterator kvIter =
- new MRResultIterator(spstart, spindex);
- combinerRunner.combine(kvIter, combineCollector);
- }
- }
-
- // close the writer
- writer.close();
-
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, i);
-
- writer = null;
- } finally {
- if (null != writer) writer.close();
- }
- }
-
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // 写溢出索引文件,格式如+ "/spill" + spillNumber + ".out.index"
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- LOG.info("Finished spill " + numSpills);
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
复制代码
14 MapOutputBuffer的compare方法和swap方法
MapOutputBuffer实现了IndexedSortable接口,从接口命名上就可以猜想到,这个排序不是移动数据,而是移动数据的索引。在这里要排序的其实是kvindices对象,通过移动其记录在kvoffets上的索引来实现。 如图,表示了写磁盘前Sort的效果。kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。通过观察MapOutputBuffer的compare知道,先是在partition上排序,然后是在key上排序。
kvindices在kvoffets上排序 - public int compare(int i, int j) {
- final int ii = kvoffsets[i % kvoffsets.length];
- final int ij = kvoffsets[j % kvoffsets.length];
- // 先在partition上排序
- if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
- return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
- }
- // 然后在可以上排序
- return comparator.compare(kvbuffer,
- kvindices[ii + KEYSTART],
- kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
- kvbuffer,
- kvindices[ij + KEYSTART],
- kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
- }
-
- public void swap(int i, int j) {
- i %= kvoffsets.length;
- j %= kvoffsets.length;
- //通过交互在kvoffsets上的索引达到排序效果
- int tmp = kvoffsets[i];
- kvoffsets[i] = kvoffsets[j];
- kvoffsets[j] = tmp;
- }
复制代码
15. MapOutputBuffer的flush() 方法
Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,调用flush方法,合并spill{n}文件产生最后的输出。先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程.
- public synchronized void flush() throws IOException, ClassNotFoundException,
- InterruptedException {
- LOG.info("Starting flush of map output");
- spillLock.lock();
- try {
- while (kvstart != kvend) {
- reporter.progress();
- spillDone.await();
- }
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
- if (kvend != kvindex) {
- kvend = kvindex;
- bufend = bufmark;
- sortAndSpill();
- }
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Buffer interrupted while waiting for the writer"
- ).initCause(e);
- } finally {
- spillLock.unlock();
- }
- assert !spillLock.isHeldByCurrentThread();
-
- try {
- spillThread.interrupt();
- spillThread.join();
- } catch (InterruptedException e) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(e);
- }
- // release sort buffer before the merge
- kvbuffer = null;
- mergeParts();
- }
复制代码
16.MapTask.MapOutputBuffer的mergeParts()方法. 从不同溢写文件中读取出来的,然后再把这些值加起来。因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果配置设置过Combiner,也会使用Combiner来合并相同的key。?mapreduce让每个map只输出一个文件,并且为这个文件提供一个索引文件,以记录每个reduce对应数据的偏移量。
- private void mergeParts() throws IOException, InterruptedException,
- ClassNotFoundException {
- // get the approximate size of the final output/index files
- long finalOutFileSize = 0;
- long finalIndexFileSize = 0;
- final Path[] filename = new Path[numSpills];
- final TaskAttemptID mapId = getTaskID();
-
- for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
- finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
- }
- if (numSpills == 1) { //如果只有一个spill文件,则重命名为输出的最终文件
- rfs.rename(filename[0],
- new Path(filename[0].getParent(), "file.out"));
- if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
- new Path(filename[0].getParent(),"file.out.index"));
- } else {
- indexCacheList.get(0).writeToFile(
- new Path(filename[0].getParent(),"file.out.index"), job);
- }
- return;
- }
-
- // read in paged indices
- for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
- indexCacheList.add(new SpillRecord(indexFileName, job));
- }
-
- //make correction in the length to include the sequence file header
- //lengths for each partition
- finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
- finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
-
- //The output stream for the final single output file
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
- if (numSpills == 0) {
- //如果没有spill文件,则创建一个 dummy files
- IndexRecord rec = new IndexRecord();
- SpillRecord sr = new SpillRecord(partitions);
- try {
- for (int i = 0; i < partitions; i++) {
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
- writer.close();
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- sr.putIndex(rec, i);
- }
- sr.writeToFile(finalIndexFile, job);
- } finally {
- finalOut.close();
- }
- return;
- }
- {
- IndexRecord rec = new IndexRecord();
- final SpillRecord spillRec = new SpillRecord(partitions);
- for (int parts = 0; parts < partitions; parts++) {
- //在循环内对每个分区分别创建segment然后做merge
- List<Segment<K,V>> segmentList =
- new ArrayList<Segment<K, V>>(numSpills);
- for(int i = 0; i < numSpills; i++) {
- IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
- Segment<K,V> s =
- new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
- indexRecord.partLength, codec, true);
- segmentList.add(i, s);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("MapId=" + mapId + " Reducer=" + parts +
- "Spill =" + i + "(" + indexRecord.startOffset + "," +
- indexRecord.rawLength + ", " + indexRecord.partLength + ")");
- }
- }
-
- //merge
- @SuppressWarnings("unchecked")
- RawKeyValueIterator kvIter = Merger.merge(job, rfs,
- keyClass, valClass, codec,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(mapId.toString()),
- job.getOutputKeyComparator(), reporter,
- null, spilledRecordsCounter);
-
- //write merged output to disk
- //执行merge,并且把merge结果写到"/file.out"的最终输出中去。
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combinerRunner == null || numSpills < minSpillsForCombine) {
- Merger.writeFile(kvIter, writer, reporter, job);
- } else {
- combineCollector.setWriter(writer);
- combinerRunner.combine(kvIter, combineCollector);
- }
-
- //close
- writer.close();
-
- // record offsets
- //把index写到最终的"/file.out.index"文件中。
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, parts);
- }
- spillRec.writeToFile(finalIndexFile, job);
- finalOut.close();
- for(int i = 0; i < numSpills; i++) {
- rfs.delete(filename[i],true);
- }
- }
- }
复制代码
合并前后index文件和spill文件的结构图
merge最终生成一个spill.out和spill.out.index文件 从前面的分析指导,多个partition的都在一个输出文件中,但是按照partition排序的。即把maper输出按照partition分段了。一个partition对应一个reducer,因此一个reducer只要获取一段即可。
|