分享

Hadoop-2.4.1如何确定Mapper数量--源码分析

pig2 2014-11-12 14:55:11 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 22590
问题导读
1.在输入格式为默认的TextInputFormat的情况,如何确定mapper的数量?
2.如何确定mapper的数量有哪些影响因素?







MapReduce框架的优势是可以在集群中并行运行mapper和reducer任务,那如何确定mapper和reducer的数量呢,或者说如何以编程的方式控制作业启动的mapper和reducer数量呢?hadoop中建议

reducer的数量为(0.95~1.75 ) * 节点数量 * 每个节点上最大的容器数,
并可使用方法Job.setNumReduceTasks(int),mapper的数量由输入文件的大小确定,且没有相应的setNumMapTasks方法,但可以通过Configuration.set(JobContext.NUM_MAPS, int)设置,其中JobContext.NUM_MAPS的值为mapreduce.job.maps,而在Hadoop的官方网站上对该参数的描述为与MapReduce框架和作业配置巧妙地交互,并且设置起来更加复杂。从这样一句含糊不清的话无法得知究竟如何确定mapper的数量,显然只能求助于源代码了。
      在Hadoop中MapReduce作业通过JobSubmitter类的submitJobInternal(Jobjob, Cluster cluster)方法向系统提交作业(该方法不仅设置mapper数量,还执行了一些其它操作如检查输出格式等,感兴趣的可以参考源代码),在该方法中与设置mapper有关的代码如下:
  1. int maps = writeSplits(job, submitJobDir);
  2. conf.setInt(MRJobConfig.NUM_MAPS, maps);
  3. LOG.info("number of splits:" + maps);
复制代码


      方法writeSplits返回mapper的数量,该方法的源代码如下:
  1. private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir)
  2. throws IOException,InterruptedException, ClassNotFoundException {
  3.     JobConf jConf = (JobConf)job.getConfiguration();
  4.     int maps;
  5.     if (jConf.getUseNewMapper()) {
  6.       maps = writeNewSplits(job, jobSubmitDir);
  7.     } else {
  8.       maps = writeOldSplits(jConf, jobSubmitDir);
  9.     }
  10.     return maps;
  11.   }
复制代码


      在该方法中,根据是否使用了新版本的JobContext而使用不同的方法计算mapper数量,实际情况是jConf.getUseNewMapper()将返回true,因此将执行writeNewSplits(job,jobSubmitDir)语句,该方法的源代码如下:

  1. Configuration conf = job.getConfiguration();
  2. InputFormat input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  3. List[I] splits = input.getSplits(job);
  4. T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
  5. // sort the splits into order based on size, so that the biggest
  6. // go first
  7. Arrays.sort(array, new SplitComparator());
  8. JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
  9. return array.length;
复制代码


      通过上面的代码可以得知,实际的mapper数量为输入分片的数量而分片的数量又由使用的输入格式决定,默认为TextInputFormat,该类为FileInputFormat的子类。

确定分片数量的任务交由FileInputFormat的getSplits(job)完成,在此补充一下FileInputFormat继承自抽象类InputFormat,该类定义了MapReduce作业的输入规范,其中的抽象方法List[I] getSplits(JobContext context)定义了如何将输入分割为InputSplit,不同的输入有不同的分隔逻辑,而分隔得到的每个InputSplit交由不同的mapper处理,因此该方法的返回值确定了mapper的数量。

下面将分为两部分学习该方法是如何在FileInputFormat中实现的,为了将注意力集中在最重要的部分,对日志输出等信息将不做介绍,完整的实现可以参考源代码。

      首先是第一部分,该部分代码计算了最大InputSplit和最小InputSplit的值,如下:

  1. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  2. long maxSize = getMaxSplitSize(job);
复制代码


      其中的getMinSplitSize和getMaxSplitSize方法分别用于获取最小InputSplit和最大InputSplit的值,对应的配置参数分别为mapreduce.input.fileinputformat.split.minsize,默认值为1L和mapreduce.input.fileinputformat.split.maxsize,默认值为Long.MAX_VALUE,十六进制数值为 0x7fffffffffffffffL,对应的十进制为9223372036854775807,getFormatMinSplitSize方法返回该输入格式下InputSplit的下限。以上数字的单位都是byte。由此得出minSize的大小为1L,maxSize的大小为Long.MAX_VALUE。

      其次是生成InputSplit的第二部分。在该部分将生成包含InputSplit的List,而List的大小为InputSplit的数量,进而确定了mapper的数量。其中重要的代码为:
  1. if (isSplitable(job, path)) {
  2.           long blockSize = file.getBlockSize();
  3.           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  4.           long bytesRemaining = length;
  5.           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  6.             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  7.             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  8.                                      blkLocations[blkIndex].getHosts()));
  9.             bytesRemaining -= splitSize;
  10.           }
  11.           if (bytesRemaining != 0) {
  12.             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  13.             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  14.                        blkLocations[blkIndex].getHosts()));
  15.           }
  16. }
复制代码


      blockSize的值为参数dfs.blocksize的值,默认为128M。方法computeSplitSize(blockSize, minSize, maxSize)根据blockSize,minSize,maxSize确定InputSplit的大小,源代码如下:
  1. Math.max(minSize, Math.min(maxSize, blockSize))
复制代码


      从该代码并结合第一部分的分析可以得知,InputSplit的大小取决于dfs.blocksiz、mapreduce.input.fileinputformat.split.minsize、mapreduce.input.fileinputformat.split.maxsize和所使用的输入格式。在输入格式为TextInputFormat的情况下,且不修改InputSplit的最大值和最小值的情况,InputSplit的最终值为dfs.blocksize的值。

变量SPLIT_SLOP的值为1.1,决定了当剩余文件大小多大时停止按照变量splitSize分割文件。根据代码可知,当剩余文件小于等于1.1倍splitSize时,将把剩余的文件做为一个InputSplit,即最后一个InputSplit的大小最大为1.1倍splitSize。


总结     

本文分析了在输入格式为默认的TextInputFormat的情况,如何确定mapper的数量。在不修改源代码的情况下(修改输入格式的InputSplit下限),程序员可以通过设置dfs.blocksiz、mapreduce.input.fileinputformat.split.minsize、mapreduce.input.fileinputformat.split.maxsize参数的值设置InputSplit的大小来影响InputSplit的数量,进而决定mapper的数量。当输入为其它格式时,处理逻辑又不相同了,比如当输入格式为DBInputFormat时,会根据输入表的行数(记录数)决定mapper的数量,更多细节可以参考源代码。





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条