分享

Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的ma...

PeersLee 发表于 2016-7-20 12:54:35 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 10989
问题导读:
1.如何进行第一次优化?
2.如何进行第二次优化?
3.如何进行第三次优化?
4.如何进行源码分析?




解决方案:

前言
首先确保已经搭建好Hadoop集群环境,可以参考《Linux下Hadoop集群环境的搭建》一文的内容。我在测试mapreduce任务时,发现相比于使用Job.setNumReduceTasks(int)控制reduce任务数量而言,控制map任务数量一直是一个困扰我的问题。好在经过很多摸索与实验,终于梳理出来,希望对在工作中进行Hadoop进行性能调优的新人们有个借鉴。本文只针对FileInputFormat的任务划分进行分析,其它类型的InputFormat的划分方式又各有不同。虽然如此,都可以按照本文类似的方法进行分析和总结。

为了简便起见,本文以Hadoop2.6.0自带的word count例子为例,进行展开。

wordcount
我们首先准备好wordcount所需的数据,一共有两份文件,都位于hdfs的/wordcount/input目录下:

2016-07-20_123943.png

这两个文件的内容分别为:

[mw_shl_code=applescript,true]On the top of the Crumpretty Tree  
The Quangle Wangle sat,  
But his face you could not see,  
On account of his Beaver Hat.[/mw_shl_code]

[mw_shl_code=applescript,true]But his face you could not see,  
On account of his Beaver Hat.[/mw_shl_code]

有关如何操作hdfs并准备好数据的细节,本文不作赘述。
现在我们不作任何性能优化(不增加任何配置参数),然后执行hadoop-mapreduce-examples子项目(有关此项目介绍,可以阅读《Hadoop2.6.0子项目hadoop-mapreduce-examples的简单介绍》一文)中自带的wordcount例子:


[mw_shl_code=shell,true]hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result1[/mw_shl_code]

当然也可以使用朴素的方式运行wordcount例子:

[mw_shl_code=shell,true]hadoop org.apache.hadoop.examples.WordCount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result1[/mw_shl_code]

最后执行的结果在hdfs的/wordcount/output/result1目录下:

2016-07-20_124141.png

执行结果可以查看/wordcount/output/result1/part-r-00000的内容:

2016-07-20_124215.png

第一次优化
wordcount例子,查看运行结果不是本文的目的。在执行wordcount例子时,在任务运行信息中可以看到创建的map及reduce任务的数量:


2016-07-20_124251.png

可以看到FileInputFormat的输入文件有2个,JobSubmitter任务划分的数量是2,最后产生的map任务数量也是2,看到这我们可以猜想由于我们提供了两个输入文件,所以会有2个map任务。我们此处姑且不论这种猜测正确与否,现在我们打算改变map任务的数量。通过查看文档,很多人知道使用mapreduce.job.maps参数可以快速修改map任务的数量,事实果真如此?让我们先来实验一番,输入以下命令:

[mw_shl_code=shell,true]hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.job.maps=1 /wordcount/input /wordcount/output/result2[/mw_shl_code]

执行以上命令后,观察输出的信息,与之前未添加mapreduce.job.maps参数的输出信息几乎没有变化。难道Hadoop的实现人员开了一个玩笑,亦或者这是一个bug?我们先给这个问题在我们的大脑中设置一个检查点,最后再来看看究竟是怎么回事。

第二次优化
用mapreduce.job.maps调整map任务数量没有见效,我们翻翻文档,发现还有mapreduce.input.fileinputformat.split.minsize参数,它可以控制map任务输入划分的最小字节数。这个参数和mapreduce.input.fileinputformat.split.maxsize通常配合使用,后者控制map任务输入划分的最大字节数。我们目前只调整mapreduce.input.fileinputformat.split.minsize的大小,划分最小的尺寸变小是否预示着任务划分数量变多?来看看会发生什么?输入以下命令:


[mw_shl_code=shell,true]hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.minsize=1 /wordcount/input /wordcount/output/result3[/mw_shl_code]

执行以上命令后,观察输出信息,依然未发生改变。好吧,弟弟不给力,我们用它的兄弟参数mapreduce.input.fileinputformat.split.maxsize来控制。如果我们将mapreduce.input.fileinputformat.split.maxsize改得很小,会怎么样?输入以下命令:

[mw_shl_code=shell,true]hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result4[/mw_shl_code]

这是的信息有了改变,我们似乎取得了想要的结果:

2016-07-20_124451.png

2016-07-20_124517.png

呵呵,任务划分成了177个,想想也是,我们把最大的划分字节数仅仅设置为1字节。接着往下看确实执行了177个map任务:

2016-07-20_124605.png

我们还可以通过Web UI观察map任务所分配的Container。首先查看Slave1节点上分配的Container情况:

2016-07-20_124640.png

确实说明最多有15个Container分配给当前作业执行map任务。由于在YARN中yarn.nodemanager.resource.cpu-vcores参数的默认值是8,所以Slave1和Slave2两台机器上的虚拟cpu总数是16,由于ResourceManager会为mapreduce任务分配一个Container给ApplicationMaster(即MrAppMaster),所以整个集群只剩余了15个Container用于ApplicationMaster向NodeManager申请和运行map任务。

第三次优化
阅读文档我们知道dfs.blocksize可以控制块的大小,看看这个参数能否发挥作用。为便于测试,我们首先需要修改hdfs-site.xml中dfs.blocksize的大小为10m(最小就只能这么小,Hadoop限制了参数单位至少是10m)。


[mw_shl_code=shell,true]<property>  
  <name>dfs.blocksize</name>  
  <value>10m</value>  
</property>[/mw_shl_code]

然后,将此配置复制到集群的所有NameNode和DataNode上。为了使此配置在不重启的情况下生效,在NameNode节点上执行以下命令:

[mw_shl_code=shell,true]hadoop dfsadmin -refreshNodes  
yarn rmadmin -refreshNodes[/mw_shl_code]

我们使用以下命令查看下系统内的文件所占用的blocksize大小:、

[mw_shl_code=shell,true]hadoop dfs -stat "%b %n %o %r %y" /wordcount/input/quangle*[/mw_shl_code]

输出结果如下:

2016-07-20_124840.png

可以看到虽然quangle.txt和quangle2.txt的字节数分别是121字节和56字节,但是在hdfs中这两个文件的blockSize已经是10m了。现在我们试试以下命令:

[mw_shl_code=shell,true]hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result5[/mw_shl_code]

观察输出信息,发现没有任何效果。

源码分析


经过以上3次不同实验,发现只有mapreduce.input.fileinputformat.split.maxsize参数确实影响了map任务的数量。现在我们通过源码分析,来一探究竟吧。

首先我们看看WordCount例子的源码,其中和任务划分有关的代码如下:


[mw_shl_code=java,true]for (int i = 0; i < otherArgs.length - 1; ++i) {  
  FileInputFormat.addInputPath(job, new Path(otherArgs));  
}  
FileOutputFormat.setOutputPath(job,  
  new Path(otherArgs[otherArgs.length - 1]));  
System.exit(job.waitForCompletion(true) ? 0 : 1);[/mw_shl_code]

我们看到使用的InputFormat是FileOutputFormat,任务执行调用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代码如下:

[mw_shl_code=java,true]public boolean waitForCompletion(boolean verbose  
                                 ) throws IOException, InterruptedException,  
                                          ClassNotFoundException {  
  if (state == JobState.DEFINE) {  
    submit();  
  }  
  // 省略本文不关心的代码  
  return isSuccessful();  
}[/mw_shl_code]

这里的submit方法的实现如下:

[mw_shl_code=java,true]public void submit()   
         throws IOException, InterruptedException, ClassNotFoundException {  
    // 省略本文不关心的代码</span>  
    final JobSubmitter submitter =   
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
      public JobStatus run() throws IOException, InterruptedException,   
      ClassNotFoundException {  
        return submitter.submitJobInternal(Job.this, cluster);  
      }  
    });  
    state = JobState.RUNNING;  
    LOG.info("The url to track the job: " + getTrackingURL());  
   }[/mw_shl_code]

submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有关划分任务的代码如下:

[mw_shl_code=java,true]// Create the splits for the job  
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
int maps = writeSplits(job, submitJobDir);  
conf.setInt(MRJobConfig.NUM_MAPS, maps);  
LOG.info("number of splits:" + maps);[/mw_shl_code]

writeSplits方法的实现如下:

[mw_shl_code=java,true]private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,  
    Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  JobConf jConf = (JobConf)job.getConfiguration();  
  int maps;  
  if (jConf.getUseNewMapper()) {  
    maps = writeNewSplits(job, jobSubmitDir);  
  } else {  
    maps = writeOldSplits(jConf, jobSubmitDir);  
  }  
  return maps;  
}[/mw_shl_code]

由于WordCount使用的是新的mapreduce API,所以最终会调用writeNewSplits方法。writeNewSplits的实现如下:

[mw_shl_code=java,true]private <T extends InputSplit>  
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  Configuration conf = job.getConfiguration();  
  InputFormat<?, ?> input =  
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  
  
  List<InputSplit> splits = input.getSplits(job);  
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);  
  
  // sort the splits into order based on size, so that the biggest  
  // go first  
  Arrays.sort(array, new SplitComparator());  
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,   
      jobSubmitDir.getFileSystem(conf), array);  
  return array.length;  
}[/mw_shl_code]

writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(提示:大家可以直接通过此处的调用,查看不同InputFormat的划分任务实现)。根据前面的分析我们知道此时的InputFormat即为FileOutputFormat,其getSplits方法的实现如下:

[mw_shl_code=java,true]public List<InputSplit> getSplits(JobContext job) throws IOException {  
  Stopwatch sw = new Stopwatch().start();  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  long maxSize = getMaxSplitSize(job);  
  
  // generate splits  
  List<InputSplit> splits = new ArrayList<InputSplit>();  
  List<FileStatus> files = listStatus(job);  
  for (FileStatus file: files) {  
    Path path = file.getPath();  
    long length = file.getLen();  
    if (length != 0) {  
      BlockLocation[] blkLocations;  
      if (file instanceof LocatedFileStatus) {  
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
      } else {  
        FileSystem fs = path.getFileSystem(job.getConfiguration());  
        blkLocations = fs.getFileBlockLocations(file, 0, length);  
      }  
      if (isSplitable(job, path)) {  
        long blockSize = file.getBlockSize();  
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  
        long bytesRemaining = length;  
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                      blkLocations[blkIndex].getHosts(),  
                      blkLocations[blkIndex].getCachedHosts()));  
          bytesRemaining -= splitSize;  
        }  
  
        if (bytesRemaining != 0) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                     blkLocations[blkIndex].getHosts(),  
                     blkLocations[blkIndex].getCachedHosts()));  
        }  
      } else { // not splitable  
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),  
                    blkLocations[0].getCachedHosts()));  
      }  
    } else {   
      //Create empty hosts array for zero length files  
      splits.add(makeSplit(path, 0, length, new String[0]));  
    }  
  }  
  // Save the number of input files for metrics/loadgen  
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  sw.stop();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
        + ", TimeTaken: " + sw.elapsedMillis());  
  }  
  return splits;  
}[/mw_shl_code]

getFormatMinSplitSize方法固定返回1,getMinSplitSize方法实际就是mapreduce.input.fileinputformat.split.minsize参数的值(默认为1),那么变量minSize的大小为mapreduce.input.fileinputformat.split.minsize与1之间的最大值。
getMaxSplitSize方法实际是mapreduce.input.fileinputformat.split.maxsize参数的值,那么maxSize即为mapreduce.input.fileinputformat.split.maxsize参数的值。
由于我的试验中有两个输入源文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小为2。

在遍历files列表的过程中,会获取每个文件的blockSize,最终调用computeSplitSize方法计算每个输入文件应当划分的任务数。computeSplitSize方法的实现如下:



[mw_shl_code=java,true]protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}[/mw_shl_code]

因此我们知道每个输入文件被划分的公式如下:
map任务要划分的大小(splitSize )=(maxSize与blockSize之间的最小值)与minSize之间的最大值

bytesRemaining 是单个输入源文件未划分的字节数
根据getSplits方法,我们知道map任务划分的数量=输入源文件数目 * (bytesRemaining / splitSize个划分任务+bytesRemaining不能被splitSize 整除的剩余大小单独划分一个任务 )

总结

根据源码分析得到的计算方法和之前的优化结果,我们最后总结一下:

对于第一次优化,由于FileOutputFormat压根没有采用mapreduce.job.maps参数指定的值,所以它当然不会有任何作用。

对于第二次优化,minSize几乎由mapreduce.input.fileinputformat.split.minsize控制;mapreduce.input.fileinputformat.split.maxsize默认的大小是Long.MAX_VALUE,所以blockSize即为maxSize与blockSize之间的最小值;blockSize的默认大小是128m,所以blockSize与值为1的mapreduce.input.fileinputformat.split.minsize之间的最大值为blockSize,即map任务要划分的大小的大小与blockSize相同。

对于第三次优化,虽然我们将blockSize设置为10m(最小也只能这么小了,hdfs对于block大小的最低限制),根据以上公式maxSize与blockSize之间的最小值必然是blockSize,而blockSize与minSize之间的最大值也必然是blockSize。说明blockSize实际上已经发挥了作用,它决定了splitSize的大小就是blockSize。由于blockSize大于bytesRemaining,所以并没有对map任务数量产生影响。

针对以上分析,我们用更加容易理解的方式列出这些配置参数的关系:

当mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.minsize参数决定。
当mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由dfs.blockSize配置决定。(第二次优化符合此种情况)
当dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.maxsize参数决定。

已有(1)人评论

跳转到指定楼层
zhzhang 发表于 2016-7-21 08:20:49
多谢楼主分享!
弱弱的问一句,官网中有划分公式啊:splitSize=max{minSize,min{maxSize,blockSize}}
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条