修改下面函数试试:
MapOutputBuffer的sortAndSpill() 方法 SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
- private void sortAndSpill() throws IOException, ClassNotFoundException,
- InterruptedException {
- //approximate the length of the output file to be the length of the
- //buffer + header lengths for the partitions
- long size = (bufend >= bufstart
- ? bufend - bufstart
- : (bufvoid - bufend) + bufstart) +
- partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // 创建溢出文件
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
- out = rfs.create(filename);
-
- final int endPosition = (kvend > kvstart)
- ? kvend
- : kvoffsets.length + kvend;
- //使用sorter进行排序, 在内存中进行,参照MapOutputBuffer的compare方法实现的这里的排序也是对序列化的字节做的排序。排序是在kvoffsets上面进行,参照MapOutputBuffer的swap方法实现。
- sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
- int spindex = kvstart;
- IndexRecord rec = new IndexRecord();
- InMemValBytes value = new InMemValBytes();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<k, v=""> writer = null;
- try {
- long segmentStart = out.getPos();
- writer = new Writer<k, v="">(job, out, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combinerRunner == null) {
- // 如果没有combinner则直接写键值
- DataInputBuffer key = new DataInputBuffer();
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- final int kvoff = kvoffsets[spindex % kvoffsets.length];
- getVBytesForOffset(kvoff, value);
- key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
- (kvindices[kvoff + VALSTART] -
- kvindices[kvoff + KEYSTART]));
- //键值写到溢出文件
- writer.append(key, value);
- ++spindex;
- }
- } else {
- int spstart = spindex;
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- ++spindex;
- }
- //如果设置了combiner,则调用了combine方法后的结果写到IFile中,writer还是先前的writer。减少溢写到磁盘的数据量。
- if (spstart != spindex) {
- combineCollector.setWriter(writer);
- RawKeyValueIterator kvIter =
- new MRResultIterator(spstart, spindex);
- combinerRunner.combine(kvIter, combineCollector);
- }
- }
-
- // close the writer
- writer.close();
-
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, i);
-
- writer = null;
- } finally {
- if (null != writer) writer.close();
- }
- }
-
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // 写溢出索引文件,格式如+ '/spill' + spillNumber + '.out.index'
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- LOG.info('Finished spill ' + numSpills);
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }</k,></k,>
复制代码
|