分享

hadoop2.2编程:自定义hadoop map/reduce输入文件切割InputFormat

xioaxu790 2014-5-28 13:10:23 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 11234
本帖最后由 xioaxu790 于 2014-5-28 13:20 编辑
问题导读:
1、hadoop对原始输入文件的处理机制是什么 ?


2、如何自定义一个InputFormat ?



前言
hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类进行实现的。

        那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

       hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。

      但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。

      又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相对较为复杂了,但并不是不能实现。

    1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符。
     1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:

  1. public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {
  2.    @Override
  3.    public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {
  4.         return new SearchRecordReader("\b");
  5.     }
  6.     @Override
  7.     protected boolean isSplitable(FileSystem fs, Path filename) {
  8.          // 输入文件不分片
  9.         return false;
  10.      }
  11. }
复制代码

   2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。
  1. public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
  2. private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);
  3. private CompressionCodecFactory compressionCodecs = null;
  4. private long start;
  5. private long pos;
  6. private long end;
  7. private LineReader in;
  8. private int maxLineLength;
复制代码

3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况,这种情况一定要加以拼接处理。
  1. public class LineReader {
  2. //回车键(hadoop默认)
  3. //private static final byte CR = 13;
  4. //换行符(hadoop默认)
  5. //private static final byte LF = 10;
  6. //按buffer进行文件读取
  7. private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
  8. private int bufferSize = DEFAULT_BUFFER_SIZE;
  9. private InputStream in;
  10. private byte[] buffer;
  11. private int bufferLength = 0;
  12. private int bufferPosn = 0;
  13. LineReader(InputStream in, int bufferSize) {
  14. this.bufferLength = 0;
  15. this.bufferPosn = 0;
  16. this.in = in;
  17. this.bufferSize = bufferSize;
  18. this.buffer = new byte[this.bufferSize];
  19. }
  20. public LineReader(InputStream in, Configuration conf) throws IOException {
  21. this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
  22. }
  23. public void close() throws IOException {
  24. in.close();
  25. }
  26. public int readLine(Text str, int maxLineLength) throws IOException {
  27. return readLine(str, maxLineLength, Integer.MAX_VALUE);
  28. }
  29. public int readLine(Text str) throws IOException {
  30. return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
  31. }
  32. //以下是需要改写的部分_start,核心代码
  33. public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
  34. str.clear();
  35. Text record = new Text();
  36. int txtLength = 0;
  37. long bytesConsumed = 0L;
  38. boolean newline = false;
  39. int sepPosn = 0;
  40. do {
  41.      //已经读到buffer的末尾了,读下一个buffer
  42.      if (this.bufferPosn >= this.bufferLength) {
  43.       bufferPosn = 0;
  44.       bufferLength = in.read(buffer);
  45.       
  46.       //读到文件末尾了,则跳出,进行下一个文件的读取
  47.       if (bufferLength <= 0) {
  48.       break;
  49.       }
  50.          }
  51.      
  52.     int startPosn = this.bufferPosn;
  53.     for (; bufferPosn < bufferLength; bufferPosn ++) {
  54.      //处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
  55.      if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
  56.        sepPosn = 0;
  57.       }
  58.       
  59.      //遇到行分隔符的第一个字符
  60.       if (buffer[bufferPosn] == separator[sepPosn])
复制代码
接上面的代码
  1. bufferPosn ++;
  2. int i = 0;
  3. //判断接下来的字符是否也是行分隔符中的字符
  4. for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
  5. //buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
  6. if(bufferPosn + i >= bufferLength){
  7. bufferPosn += i - 1;
  8. break;
  9. }
  10. //一旦其中有一个字符不相同,就判定为不是分隔符
  11. if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
  12. sepPosn = 0;
  13. break;
  14. }
  15. }
  16. //的确遇到了行分隔符
  17. if(sepPosn == sepLength){
  18. bufferPosn += i;
  19. newline = true;
  20. sepPosn = 0;
  21. break;
  22. }
  23. }
  24. }
  25. int readLength = this.bufferPosn - startPosn;
  26. bytesConsumed += readLength;
  27. //行分隔符不放入块中
  28. //int appendLength = readLength - newlineLength;
  29. if (readLength > maxLineLength - txtLength) {
  30. readLength = maxLineLength - txtLength;
  31. }
  32. if (readLength > 0) {
  33. record.append(this.buffer, startPosn, readLength);
  34. txtLength += readLength;
  35. //去掉记录的分隔符
  36. if(newline){
  37. str.set(record.getBytes(), 0, record.getLength() - sepLength);
  38. }
  39.    }
  40.     } while (!newline && (bytesConsumed < maxBytesToConsume));
  41.     if (bytesConsumed > (long)Integer.MAX_VALUE) {
  42.      throw new IOException("Too many bytes before newline: " + bytesConsumed);
  43.   }
  44.    
  45.    return (int) bytesConsumed;
  46.    }
  47. //以下是需要改写的部分_end
  48. //以下是hadoop-core中LineReader的源码_start
  49. public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
  50.     str.clear();
  51.      int txtLength = 0;
  52.   int newlineLength = 0;
  53.      boolean prevCharCR = false;
  54.     long bytesConsumed = 0L;
  55.      do {
  56.        int startPosn = this.bufferPosn;
  57.       if (this.bufferPosn >= this.bufferLength) {
  58.         startPosn = this.bufferPosn = 0;
  59.         if (prevCharCR)  bytesConsumed ++;
  60.          this.bufferLength = this.in.read(this.buffer);
  61.          if (this.bufferLength <= 0)  break;
  62.       }
  63.       for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
  64.          if (this.buffer[this.bufferPosn] == LF) {
  65.           newlineLength = (prevCharCR) ? 2 : 1;
  66.            this.bufferPosn ++;
  67.           break;
  68.         }
  69.         if (prevCharCR) {
  70.           newlineLength = 1;
  71.           break;
  72.         }
  73.        prevCharCR = this.buffer[this.bufferPosn] == CR;
复制代码



2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。

     这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。





已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条