分享

TableInputFormat分片及分片数据读取源码级分析

52Pig 2014-10-14 22:56:31 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 12171
本帖最后由 howtodown 于 2014-10-14 23:32 编辑
阅读导读:
1.TableInputFormat类是做什么用的?
2.getSplits()方法中table的第一个region的startKey不是EMPTY_BYTE_ARRAY的话会有什么结果?
3.TableRecordReaderImpl的initialize方法都做了哪些事?
4.Pair结构由两部分组成?




我们在MapReduce中TextInputFormat分片和读取分片数据源码级分析 这篇中以TextInputFormat为例讲解了InputFormat的分片过程以及RecordReader读取分片数据的过程。接下来咱们分析TableInputFormat的分片信息和数据读取过程。
 TableInputFormat这是专门处理基于HBase的MapReduce的输入数据的格式类。我们可以看看继承结构:
(1)public class TableInputFormat extends TableInputFormatBase implements Configurable;

(2)public abstract class TableInputFormatBase extends InputFormat<ImmutableBytesWritable, Result>。其中InputFormat是输入格式的基类。
 TableInputFormat类主要是构造HTable对象和Scan对象,主要在方法setConf(Configuration configuration)构造,代码如下:

  1. /**
  2.    * Sets the configuration. This is used to set the details for the table to
  3.    * be scanned.
  4.    *
  5.    * @param configuration  The configuration to set.
  6.    * @see org.apache.hadoop.conf.Configurable#setConf(
  7.    *   org.apache.hadoop.conf.Configuration)
  8.    */
  9.   @Override
  10.   public void setConf(Configuration configuration) {
  11.     this.conf = configuration;
  12.     String tableName = conf.get(INPUT_TABLE);
  13.     try {
  14.       setHTable(new HTable(new Configuration(conf), tableName));
  15.     } catch (Exception e) {
  16.       LOG.error(StringUtils.stringifyException(e));
  17.     }
  18.     Scan scan = null;
  19.     if (conf.get(SCAN) != null) {
  20.       try {
  21.         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
  22.       } catch (IOException e) {
  23.         LOG.error("An error occurred.", e);
  24.       }
  25.     } else {
  26.       try {
  27.         scan = new Scan();
  28.         if (conf.get(SCAN_ROW_START) != null) {
  29.           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
  30.         }
  31.         if (conf.get(SCAN_ROW_STOP) != null) {
  32.           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
  33.         }
  34.         if (conf.get(SCAN_COLUMNS) != null) {
  35.           addColumns(scan, conf.get(SCAN_COLUMNS));
  36.         }
  37.         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
  38.           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
  39.         }
  40.         if (conf.get(SCAN_TIMESTAMP) != null) {
  41.           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
  42.         }
  43.         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
  44.           scan.setTimeRange(
  45.               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
  46.               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
  47.         }
  48.         if (conf.get(SCAN_MAXVERSIONS) != null) {
  49.           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
  50.         }
  51.         if (conf.get(SCAN_CACHEDROWS) != null) {
  52.           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
  53.         }
  54.         // false by default, full table scans generate too much BC churn
  55.         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
  56.       } catch (Exception e) {
  57.           LOG.error(StringUtils.stringifyException(e));
  58.       }
  59.     }
  60.     setScan(scan);
  61.   }
复制代码

首先会通过配置信息获取HBase表名,然后构造一个HTable对象;如果用户自己的作业中有配置Scan的话,就会解码Scan字符串转换为Scan对象;如果用户没配置Scan就会创建一个默认的Scan,进行一些基本配置。
 关于TableInputFormatBase,我们重点还是讲两个方法:RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context)方法和List<InputSplit> getSplits(JobContext context)方法,前者是读取分片信息所指的数据供TableMapper处理,后者是构造HBase表的分片信息。这里的分片信息是TableSplit extends InputSplit implements Writable, Comparable,这个TableSpli维护4个字段,HBase表名:byte [] tableName、scan起始rowkey:byte [] startRow、scan结束rowkey:byte [] endRow、以及该region所在节点:String regionLocation。
 一、先看getSplits方法吧

这里一个split一般对应一个完整region,除非用户设定的开始和结束rowkey不是region的边界,代码如下:
  1. /**
  2.    * Calculates the splits that will serve as input for the map tasks. The
  3.    * number of splits matches the number of regions in a table.
  4.    *
  5.    * @param context  The current job context.
  6.    * @return The list of input splits.
  7.    * @throws IOException When creating the list of splits fails.
  8.    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
  9.    *   org.apache.hadoop.mapreduce.JobContext)
  10.    */
  11.   @Override
  12.   public List<InputSplit> getSplits(JobContext context) throws IOException {
  13.     if (table == null) {
  14.         throw new IOException("No table was provided.");
  15.     }
  16.     // Get the name server address and the default value is null.
  17.     this.nameServer =
  18.       context.getConfiguration().get("hbase.nameserver.address", null);
  19.    
  20.     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();//获取所有Region的开始rowkey和结束rowkey
  21.     if (keys == null || keys.getFirst() == null ||
  22.         keys.getFirst().length == 0) {    //
  23.         //table的第一个region的startKey必须是EMPTY_BYTE_ARRAY,否则输出FIRST_REGION_STARTKEY_NOT_EMPTY信息
  24.       HRegionLocation regLoc = table.getRegionLocation(
  25.           HConstants.EMPTY_BYTE_ARRAY, false);
  26.       if (null == regLoc) {    //一个region也没有
  27.         throw new IOException("Expecting at least one region.");
  28.       }
  29.       List<InputSplit> splits = new ArrayList<InputSplit>(1);
  30.       //构造一个TableSplit,起始rowkey和结束rowkey都是EMPTY_BYTE_ARRAY
  31.       InputSplit split = new TableSplit(table.getTableName(),
  32.           HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
  33.               .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0]);
  34.       splits.add(split);
  35.       return splits;        //返回分片信息
  36.     }
  37.     List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
  38.     for (int i = 0; i < keys.getFirst().length; i++) {    //有多个region
  39.       if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {    //这个方法一直返回true
  40.         continue;
  41.       }
  42.       HServerAddress regionServerAddress =
  43.         table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
  44.       InetAddress regionAddress =
  45.         regionServerAddress.getInetSocketAddress().getAddress();//获取region所在地址
  46.       String regionLocation;
  47.       try {
  48.         regionLocation = reverseDNS(regionAddress);//将地址转换为字符串的主机名
  49.       } catch (NamingException e) {
  50.         LOG.error("Cannot resolve the host name for " + regionAddress +
  51.             " because of " + e);
  52.         regionLocation = regionServerAddress.getHostname();
  53.       }
  54.             byte[] startRow = scan.getStartRow();    //获取scan的开始rowkey
  55.             byte[] stopRow = scan.getStopRow();    //获取scan的结束rowkey
  56.             // determine if the given start an stop key fall into the region
  57.             //比较用户设定的rowkey范围在那些region之中
  58.       if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
  59.                      Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
  60.           (stopRow.length == 0 ||
  61.            Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
  62.         byte[] splitStart = startRow.length == 0 ||
  63.           Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
  64.             keys.getFirst()[i] : startRow;
  65.         byte[] splitStop = (stopRow.length == 0 ||
  66.           Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
  67.           keys.getSecond()[i].length > 0 ?
  68.             keys.getSecond()[i] : stopRow;
  69.         InputSplit split = new TableSplit(table.getTableName(),
  70.           splitStart, splitStop, regionLocation);    //构造TableSplit
  71.         splits.add(split);
  72.         if (LOG.isDebugEnabled()) {
  73.           LOG.debug("getSplits: split -> " + i + " -> " + split);
  74.         }
  75.       }
  76.     }
  77.     return splits;
  78.   }
复制代码


Pair这个结构由两部分组成:start和end,都是byte[][],这是一个二维byte数组,其中一维是region的顺序,二维是start或者end的字符序列。通过getFirst方法就可以获取所有region的start rowkey,通过getSecond可以获得所有的end rowkey。

 这里还有一个要注意的就是每个HBase表的第一个region是没有start rowkey,最后一个region是没有end  rowkey,这里的“没有”是指的是table.getStartEndKeys()这个方法获取的结果,另外大伙也可以从WEB UI中查看指定的HBase表的region信息也可以看到第一个region的start和最后一个region的end并没有显示。还有就是如果这个HBase表只有一个region的话,getFirst方法返回是没有数据的;getSecond也没数据。

 (1)、getSplits方法中的第一个if语句段是上面说的只有一个region的情况。每个HBase表的第一个region的start rowkey都是EMPTY_BYTE_ARRAY,这是一个长度为0的byte数组。table.getRegionLocation方法会找指定的rowkey所在的region所在信息HRegionLocation。如果只有一个region的话,就先找EMPTY_BYTE_ARRAY所在region信息,如果没有这样的信息就是出现了错误;如果如果有这样的信息的话,就构建一个长度为1的InputSplit列表splits,构造一个TableSplit:设定HBase表名,scan起始和结束rowkey都是EMPTY_BYTE_ARRAY,再加上region所在节点,把这个TableSplit加入splits,返回这
个splits。
    (2)、接下来就是HBase表有多个region的情况,构建长度为keys.getFirst().length的InputSplit列表,然后遍历keys.getFirst()获取每个region的位置信息并将其转换成String类型(reverseDNS方法从reverseDNSCacheMap中获取,reverseDNSCacheMap是存储IPAddress => HostName的映射);然后获取用户设定的起始和结束rowkey,并和当前的region的起始和比较结束,如果有rowkey包含这个region就会将这region当做一个InputSplit放入列表中,最后待遍历完之后返回split列表。判断当前region是否应该加入InputSplit列表的条件就是循环中的最后一个if语句段,
条件是:((startRow.length == 0 || keys.getSecond().length == 0 ||Bytes.compareTo (startRow, keys.getSecond()) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()) > 0)),
分解这个条件为两部分:
A、(startRow.length == 0 || keys.getSecond().length == 0 ||Bytes.compareTo (startRow, keys.getSecond()) < 0)这个是要确定用户设定的start rowkey是否小于当前region的结束rowkey;

B、(stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()) > 0)这个是要确定用户设定的end rowkey是否大于当前region的开始rowkey,这俩条件必须同时满足才可以,一旦满足就要确定这个TableSplit的开始rowkey和结束rowkey了:

A、startRow.length == 0 || Bytes.compareTo(keys.getFirst(), startRow) >= 0如果为true的话,说明用户设定的起始rowkey可能是从表的开头开始或者是当前region的起始rowkey大于用户设定的则应该将当前region的起始rowkey作为TableSplit的起始rowkey,如果表达式为false的话将用户设定的起始rowkey作为TableSplit的起始rowkey;

B、(stopRow.length == 0 || Bytes.compareTo(keys.getSecond(), stopRow) <= 0) && keys.getSecond().length > 0如果为true的话,说明首先要确定是否设定的结束rowkey或者当前region的结束rowkey小于用户设定的结束rowkey,且要保证当前region不是最后一个(keys.getSecond().length > 0),这样的的TableSplit的结束rowkey就是当前region的结束rowkey,如果为false则将用户设定的结束rowkey为TableSplit的结束rowkey,为什么要不是最后一个region呢?因为最后一个region的end rowkey的长度始终为0,比较之下会将最后一个region的end rowkey设置给TableSplit,显然这是不对,能到这里说明这个region应该被分配给一个TableSplit,如果是最后一个region的话,那么这个TableSplit的结束rowkey应该是用户设定的而非这个region自己的。获得这个tableSplit的开始和结束rowkey之后就可以封装这个TableSplit了,并放入InputSplit列表。最终待所有的region遍历结束之后返回这个InputSplit列表。这样getSplits方法就结束了。
 二、createRecordReader方法
这个方法代码如下:

  1. <span style="font-style: normal;">/**
  2.    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
  3.    * the default.
  4.    *
  5.    * @param split  The split to work with.
  6.    * @param context  The current context.
  7.    * @return The newly created record reader.
  8.    * @throws IOException When creating the reader fails.
  9.    * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
  10.    *   org.apache.hadoop.mapreduce.InputSplit,
  11.    *   org.apache.hadoop.mapreduce.TaskAttemptContext)
  12.    */
  13.   @Override
  14.   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
  15.       InputSplit split, TaskAttemptContext context)
  16.   throws IOException {
  17.     if (table == null) {
  18.       throw new IOException("Cannot create a record reader because of a" +
  19.           " previous error. Please look at the previous logs lines from" +
  20.           " the task's full log for more details.");
  21.     }
  22.     TableSplit tSplit = (TableSplit) split;
  23.     TableRecordReader trr = this.tableRecordReader;
  24.     // if no table record reader was provided use default
  25.     if (trr == null) {
  26.       trr = new TableRecordReader();
  27.     }
  28.     Scan sc = new Scan(this.scan);
  29.     sc.setStartRow(tSplit.getStartRow());
  30.     sc.setStopRow(tSplit.getEndRow());
  31.     trr.setScan(sc);
  32.     trr.setHTable(table);
  33.     try {
  34.       trr.initialize(tSplit, context);
  35.     } catch (InterruptedException e) {
  36.       throw new InterruptedIOException(e.getMessage());
  37.     }
  38.     return trr;
  39.   }</span>
复制代码

这个方法主要是获取TableSplit,然后构造一个Scan,设定开始和结束rowkey;设定HTablePool;将Scan和HTable传递给一个TableRecordReader对象,然后调用initialize(tSplit, context)初始化,最后返回这个TableRecordReader。可以看出TableRecordReader这个是读取key/value的。TableRecordReader中实际操作数据的是TableRecordReaderImpl,TableRecordReader的nextKeyValue()、getCurrentValue()、initialize、getCurrentKey()方法会调用TableRecordReaderImpl的相应方法。
 TableRecordReaderImpl的initialize方法主要是重新创建一个新的Scan,并将createRecordReader传过来的赋值给这个新的currentScan,并获取对应的ResultScanner。
 TableRecordReaderImpl的nextKeyValue()会先创建一个key = new ImmutableBytesWritable()和value = new Result(),这就是我们继承TableMapper中map方法中的参数类型,然后每次调用该方法通过执行value = this.scanner.next()方法来获取HBase中的一行数据赋值给value,这里表明如果scanner.next()运行无异常的话key中是没有数据的(出现异常之后会存储value对应行的rowkey),只有value有数据。执行了这个方法就可以通过getCurrentValue()、getCurrentKey()方法来获取value和key了。











已有(1)人评论

跳转到指定楼层
wubaozhou 发表于 2015-1-1 22:39:42
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条