分享

如何保留spill的结果

gwgyk 发表于 2014-11-27 23:07:22 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 9731
Map Task每次spill(将缓冲区中的内容写入到磁盘)过程中都会生成一个spill*.out文件(*代表次序),
但是Map Task最后会把这些spill文件都合并了。
我想把这些文件留下进行分析,那我修改源码的时候该修改哪些部分呢?
另外spill合并后的文件file.out也想保留下,该修改哪些地方呢?

已有(6)人评论

跳转到指定楼层
jixianqiuxue 发表于 2014-11-27 23:43:39


目前有两个思路

1.跟踪map看是否实时产生日志
2.JobHistory记录job的执行日志
所以整体定位应该是在这两个区域,楼主可以从这两面下工夫。
JobHistory.JobInfo.logStarted()记录job的执行日志
回复

使用道具 举报

gwgyk 发表于 2014-11-28 09:45:09
jixianqiuxue 发表于 2014-11-27 23:43
目前有两个思路

1.跟踪map看是否实时产生日志

你说的这些是不是和日志相关的啊?
我不是想要日志,是想把运行结果:spill.out和file.out这些文件保留下
回复

使用道具 举报

bioger_hit 发表于 2014-11-28 12:55:23
gwgyk 发表于 2014-11-28 09:45
你说的这些是不是和日志相关的啊?
我不是想要日志,是想把运行结果:spill.out和file.out这些文件保留 ...
楼主既然知道了这些文件是怎么产生的,那么这些文件的代码,相信应该难不倒楼主
回复

使用道具 举报

desehawk 发表于 2014-11-28 13:43:53
楼主,是通过代码分析得出的,还是通过什么方式。
拿出来,共同讨论下。

回复

使用道具 举报

hyj 发表于 2014-11-28 19:47:34

修改下面函数试试:

MapOutputBuffer的sortAndSpill() 方法 SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。

  1. private void sortAndSpill() throws IOException, ClassNotFoundException,
  2. InterruptedException {
  3. //approximate the length of the output file to be the length of the
  4. //buffer + header lengths for the partitions
  5. long size = (bufend >= bufstart
  6. ? bufend - bufstart
  7. : (bufvoid - bufend) + bufstart) +
  8. partitions * APPROX_HEADER_LENGTH;
  9. FSDataOutputStream out = null;
  10. try {
  11. // 创建溢出文件
  12. final SpillRecord spillRec = new SpillRecord(partitions);
  13. final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  14. numSpills, size);
  15. out = rfs.create(filename);
  16. final int endPosition = (kvend > kvstart)
  17. ? kvend
  18. : kvoffsets.length + kvend;
  19. //使用sorter进行排序, 在内存中进行,参照MapOutputBuffer的compare方法实现的这里的排序也是对序列化的字节做的排序。排序是在kvoffsets上面进行,参照MapOutputBuffer的swap方法实现。
  20. sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  21. int spindex = kvstart;
  22. IndexRecord rec = new IndexRecord();
  23. InMemValBytes value = new InMemValBytes();
  24. for (int i = 0; i < partitions; ++i) {
  25. IFile.Writer<k, v=""> writer = null;
  26. try {
  27. long segmentStart = out.getPos();
  28. writer = new Writer<k, v="">(job, out, keyClass, valClass, codec,
  29. spilledRecordsCounter);
  30. if (combinerRunner == null) {
  31. // 如果没有combinner则直接写键值
  32. DataInputBuffer key = new DataInputBuffer();
  33. while (spindex < endPosition &&
  34. kvindices[kvoffsets[spindex % kvoffsets.length]
  35. + PARTITION] == i) {
  36. final int kvoff = kvoffsets[spindex % kvoffsets.length];
  37. getVBytesForOffset(kvoff, value);
  38. key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
  39. (kvindices[kvoff + VALSTART] -
  40. kvindices[kvoff + KEYSTART]));
  41. //键值写到溢出文件
  42. writer.append(key, value);
  43. ++spindex;
  44. }
  45. } else {
  46. int spstart = spindex;
  47. while (spindex < endPosition &&
  48. kvindices[kvoffsets[spindex % kvoffsets.length]
  49. + PARTITION] == i) {
  50. ++spindex;
  51. }
  52. //如果设置了combiner,则调用了combine方法后的结果写到IFile中,writer还是先前的writer。减少溢写到磁盘的数据量。
  53. if (spstart != spindex) {
  54. combineCollector.setWriter(writer);
  55. RawKeyValueIterator kvIter =
  56. new MRResultIterator(spstart, spindex);
  57. combinerRunner.combine(kvIter, combineCollector);
  58. }
  59. }
  60. // close the writer
  61. writer.close();
  62. // record offsets
  63. rec.startOffset = segmentStart;
  64. rec.rawLength = writer.getRawLength();
  65. rec.partLength = writer.getCompressedLength();
  66. spillRec.putIndex(rec, i);
  67. writer = null;
  68. } finally {
  69. if (null != writer) writer.close();
  70. }
  71. }
  72. if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  73. // 写溢出索引文件,格式如+ '/spill' + spillNumber +  '.out.index'
  74. Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
  75. getTaskID(), numSpills,
  76. partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  77. spillRec.writeToFile(indexFilename, job);
  78. } else {
  79. indexCacheList.add(spillRec);
  80. totalIndexCacheMemory +=
  81. spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  82. }
  83. LOG.info('Finished spill ' + numSpills);
  84. ++numSpills;
  85. } finally {
  86. if (out != null) out.close();
  87. }
  88. }</k,></k,>
复制代码


回复

使用道具 举报

gwgyk 发表于 2014-11-28 20:32:03
hyj 发表于 2014-11-28 19:47
修改下面函数试试:

MapOutputBuffer的sortAndSpill() 方法 SpillThread线程的run方法中调用sortAndSpi ...

谢谢啦,我也找到了,注释掉这么几个地方就行:
TaskTracker.java
  3184       
  2668

Task.java
  1036

JvmManager.java
  185

MapTask.java
  1729

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条