本帖最后由 howtodown 于 2014-3-4 00:06 编辑
可以带着下面问题来阅读:
1. Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?
2. 在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,InputSplit的Mapper会不会得出不正确的结果?
对于上面的两个问题,首先要明确两个概念:Block和InputSplit:
1. Block是HDFS存储文件的单位(默认是64M);
2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。
因此以行记录形式的文本,可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。
下面以hadoop-0.22.0源码进行分析
org.apache.hadoop.mapred.FileInputFormat:
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- FileStatus[] files = listStatus(job);
-
- // Save the number of input files for metrics/loadgen
- job.setLong(NUM_INPUT_FILES, files.length);
- long totalSize = 0; // compute total size
- for (FileStatus file: files) { // check we have valid files
- if (file.isDirectory()) {
- throw new IOException("Not a file: "+ file.getPath());
- }
- totalSize += file.getLen();
- }
-
- long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
- long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
- FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
-
- // generate splits
- ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
- NetworkTopology clusterMap = new NetworkTopology();
- for (FileStatus file: files) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job);
- long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(fs, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations,
- length-bytesRemaining, splitSize, clusterMap);
- splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- splitHosts));
- bytesRemaining -= splitSize;
- }
-
- if (bytesRemaining != 0) {
- splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
- }
- } else if (length != 0) {
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
- splits.add(makeSplit(path, 0, length, splitHosts));
- } else {
- //Create empty hosts array for zero length files
- splits.add(makeSplit(path, 0, length, new String[0]));
- }
- }
- LOG.debug("Total # of splits: " + splits.size());
- return splits.toArray(new FileSplit[splits.size()]);
- }
复制代码
从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分,splitSize = computeSplitSize(goalSize, minSize, blockSize);goalSize,minSize,blockSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。
FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,可能被切分到不同的InputSplit。 但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成 。
org.apache.hadoop.mapred.TextInputFormat:- public RecordReader<LongWritable, Text> getRecordReader(
- InputSplit genericSplit, JobConf job,
- Reporter reporter)
- throws IOException {
-
- reporter.setStatus(genericSplit.toString());
- return new LineRecordReader(job, (FileSplit) genericSplit);
- }
复制代码
org.apache.hadoop.mapred.LineRecordReader :- /** Read a line. */
- public synchronized boolean next(LongWritable key, Text value)
- throws IOException {
-
- // We always read one extra line, which lies outside the upper
- // split limit i.e. (end - 1)
- while (getFilePosition() <= end) {
- key.set(pos);
-
- int newSize = in.readLine(value, maxLineLength,
- Math.max(maxBytesToConsume(pos), maxLineLength));
- if (newSize == 0) {
- return false;
- }
- pos += newSize;
- if (newSize < maxLineLength) {
- return true;
- }
-
- // line too long. try again
- LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
- }
-
- return false;
- }
复制代码
对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取 。
如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如何做到不读取L这条记录在B中的部分呢?
org.apache.hadoop.mapred.LineRecordReader:- // If this is not the first split, we always throw away first record
- // because we always (except the last split) read one extra line in
- // next() method.
- if (start != 0) {
- start += in.readLine(new Text(), 0, maxBytesToConsume(start));
- }
复制代码
如果不是first split,则会丢弃第一个record,避免了重复读取的问题。
|