分享

Hadoop MapReduce中如何处理跨行Block和inputSplit

pig2 发表于 2014-5-12 21:31:33 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 9 24888
问题导读:
1.Hadoop对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?
2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit?
3.LineRecordReader的nextKeyValue方法的作用是什么?





Hadoop的初学者经常会疑惑这样两个问题:
1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?
2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?
      对于上面的两个问题,首先要明确两个概念:Block和InputSplit
      1. block是hdfs存储文件的单位(默认是64M);
      2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

  1.     public List<InputSplit> getSplits(JobContext job) throws IOException {  
  2.       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  3.       long maxSize = getMaxSplitSize(job);  
  4.       
  5.       // generate splits  
  6.       List<InputSplit> splits = new ArrayList<InputSplit>();  
  7.       List<FileStatus> files = listStatus(job);        
  8.       for (FileStatus file: files) {  
  9.         Path path = file.getPath();  
  10.         long length = file.getLen();  
  11.         if (length != 0) {  
  12.           FileSystem fs = path.getFileSystem(job.getConfiguration());  
  13.           BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);  
  14.           if (isSplitable(job, path)) {  
  15.             long blockSize = file.getBlockSize();  
  16.             long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  17.       
  18.             long bytesRemaining = length;  
  19.             while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
  20.               int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
  21.               splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
  22.                                        blkLocations[blkIndex].getHosts()));  
  23.               bytesRemaining -= splitSize;  
  24.             }  
  25.       
  26.             if (bytesRemaining != 0) {  
  27.               splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
  28.                          blkLocations[blkLocations.length-1].getHosts()));  
  29.             }  
  30.           } else { // not splitable  
  31.             splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));  
  32.           }  
  33.         } else {   
  34.           //Create empty hosts array for zero length files  
  35.           splits.add(makeSplit(path, 0, length, new String[0]));  
  36.         }  
  37.       }  
  38.       // Save the number of input files for metrics/loadgen  
  39.       job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  40.       LOG.debug("Total # of splits: " + splits.size());  
  41.       return splits;  
  42.     }  
复制代码


从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize
对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。
        FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

  1.     while (getFilePosition() <= end) {  
  2.       newSize = in.readLine(value, maxLineLength,  
  3.           Math.max(maxBytesToConsume(pos), maxLineLength));  
  4.       if (newSize == 0) {  
  5.         break;  
  6.       }  
复制代码
其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

  1.     public int readLine(Text str, int maxLineLength,  
  2.                         int maxBytesToConsume) throws IOException {  
  3.       if (this.recordDelimiterBytes != null) {  
  4.         return readCustomLine(str, maxLineLength, maxBytesToConsume);  
  5.       } else {  
  6.         return readDefaultLine(str, maxLineLength, maxBytesToConsume);  
  7.       }  
  8.     }  
  9.       
  10.     /**
  11.      * Read a line terminated by one of CR, LF, or CRLF.
  12.      */  
  13.     private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)  
  14.     throws IOException {  
  15.       str.clear();  
  16.       int txtLength = 0; //tracks str.getLength(), as an optimization  
  17.       int newlineLength = 0; //length of terminating newline  
  18.       boolean prevCharCR = false; //true of prev char was CR  
  19.       long bytesConsumed = 0;  
  20.       do {  
  21.         int startPosn = bufferPosn; //starting from where we left off the last time  
  22.         if (bufferPosn >= bufferLength) {  
  23.           startPosn = bufferPosn = 0;  
  24.           if (prevCharCR)  
  25.             ++bytesConsumed; //account for CR from previous read  
  26.           bufferLength = in.read(buffer);  
  27.           if (bufferLength <= 0)  
  28.             break; // EOF  
  29.         }  
  30.         for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline  
  31.           if (buffer[bufferPosn] == LF) {  
  32.             newlineLength = (prevCharCR) ? 2 : 1;  
  33.             ++bufferPosn; // at next invocation proceed from following byte  
  34.             break;  
  35.           }  
  36.           if (prevCharCR) { //CR + notLF, we are at notLF  
  37.             newlineLength = 1;  
  38.             break;  
  39.           }  
  40.           prevCharCR = (buffer[bufferPosn] == CR);  
  41.         }  
  42.         int readLength = bufferPosn - startPosn;  
  43.         if (prevCharCR && newlineLength == 0)  
  44.           --readLength; //CR at the end of the buffer  
  45.         bytesConsumed += readLength;  
  46.         int appendLength = readLength - newlineLength;  
  47.         if (appendLength > maxLineLength - txtLength) {  
  48.           appendLength = maxLineLength - txtLength;  
  49.         }  
  50.         if (appendLength > 0) {  
  51.           str.append(buffer, startPosn, appendLength);  
  52.           txtLength += appendLength;  
  53.         }  
  54.       } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);   <span style="color: #ff0000;">//①</span>  
  55.       
  56.       if (bytesConsumed > (long)Integer.MAX_VALUE)  
  57.         throw new IOException("Too many bytes before newline: " + bytesConsumed);      
  58.       return (int)bytesConsumed;  
  59.     }  
复制代码

我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:
        while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
        newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:
        1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?
        2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?
        为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

  1.     public boolean nextKeyValue() throws IOException {  
  2.       if (key == null) {  
  3.         key = new LongWritable();  
  4.       }  
  5.       key.set(pos);  
  6.       if (value == null) {  
  7.         value = new Text();  
  8.       }  
  9.       int newSize = 0;  
  10.       // We always read one extra line, which lies outside the upper  
  11.       // split limit i.e. (end - 1)  
  12.       while (getFilePosition() <= end) {        <span style="color: #ff0000;"> //②</span>  
  13.         newSize = in.readLine(value, maxLineLength,  
  14.             Math.max(maxBytesToConsume(pos), maxLineLength));  
  15.         if (newSize == 0) {  
  16.           break;  
  17.         }  
  18.         pos += newSize;  
  19.         inputByteCounter.increment(newSize);  
  20.         if (newSize < maxLineLength) {  
  21.           break;  
  22.         }  
  23.       
  24.         // line too long. try again  
  25.         LOG.info("Skipped line of size " + newSize + " at pos " +   
  26.                  (pos - newSize));  
  27.       }  
  28.       if (newSize == 0) {  
  29.         key = null;  
  30.         value = null;  
  31.         return false;  
  32.       } else {  
  33.         return true;  
  34.       }  
  35.     }  
复制代码

过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。
     再来看LineRecordReader的initialize方法:

  1.     // If this is not the first split, we always throw away first record  
  2.     // because we always (except the last split) read one extra line in  
  3.     // next() method.  
  4.     if (start != 0) {  
  5.       start += in.readLine(new Text(), 0, maxBytesToConsume(start));  
  6.     }  
  7.     this.pos = start;  
复制代码
如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

个人理解
blocksplit.png


已有(9)人评论

跳转到指定楼层
跃阳紫 发表于 2014-7-29 16:27:41
这确实是初学者困惑的地方,另请问Hadoop数据分块的依据是什么,是根据业务关系还是就只是根据block的默认大小64MB进行分块的,这样就可能导致一条超过一个BLock的大小记录,而被分到两个block中?
回复

使用道具 举报

wordwan@163.com 发表于 2014-8-19 10:00:07
非常好,如果楼主拿个实际项目说说就更好了
回复

使用道具 举报

maizhu 发表于 2014-10-3 20:19:41
感谢,学习了
回复

使用道具 举报

tang 发表于 2015-4-3 20:05:17
回复

使用道具 举报

水电费12 发表于 2015-12-9 21:38:33
不错!谢谢楼主
回复

使用道具 举报

shanquan2006 发表于 2016-1-26 15:04:47
新手学习,至少明白Hadoop有处理这种情况的机制了
回复

使用道具 举报

bingyuac 发表于 2016-5-1 17:03:09
很详细,受益良多
回复

使用道具 举报

yanglei 发表于 2016-5-16 15:28:04
谢谢楼主,知道具体的hadoop策略了。
回复

使用道具 举报

heelo 发表于 2017-11-9 10:02:16
感谢楼主分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条