分享

MapReduce生成HFile入库到HBase及源码分析

pig2 2014-8-2 11:27:59 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 18263
问题导读:
1.面对巨量的数据,该如何导入hbase?
2.Bulk Loading适合什么场景?
3.HFileOutputFormat类的作用是什么?
4.Bulk Loading的原理是什么?






如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即hbase提供的HFileOutputFormat类。

它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

下边给出mapreduce程序样例,数据源是hbase,结果文件输出路径自定义:

  1. public class HFileOutput {
  2.                 //job 配置
  3.         public static Job configureJob(Configuration conf) throws IOException {
  4.                 Job job = new Job(configuration, "countUnite1");
  5.                 job.setJarByClass(HFileOutput.class);
  6.                                 //job.setNumReduceTasks(2);
  7.                 //job.setOutputKeyClass(ImmutableBytesWritable.class);
  8.                 //job.setOutputValueClass(KeyValue.class);
  9.                 //job.setOutputFormatClass(HFileOutputFormat.class);
  10.                 Scan scan = new Scan();
  11.                 scan.setCaching(10);
  12.                 scan.addFamily(INPUT_FAMILY);
  13.                 TableMapReduceUtil.initTableMapperJob(inputTable, scan,
  14.                                 HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
  15.                 //这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class
  16.                                 job.setReducerClass(HFileOutputRedcuer.class);
  17.                 //job.setOutputFormatClass(HFileOutputFormat.class);
  18.                 HFileOutputFormat.configureIncrementalLoad(job, new HTable(
  19.                                 configuration, outputTable));
  20.                 HFileOutputFormat.setOutputPath(job, new Path());
  21.                                 //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
  22.                 return job;
  23.         }
  24.         public static class HFileOutputMapper extends
  25.                         TableMapper<ImmutableBytesWritable, LongWritable> {
  26.                 public void map(ImmutableBytesWritable key, Result values,
  27.                                 Context context) throws IOException, InterruptedException {
  28.                         //mapper逻辑部分
  29.                         context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
  30.                 }
  31.         }
  32.         public static class HFileOutputRedcuer extends
  33.                         Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
  34.                 public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
  35.                                 Context context) throws IOException, InterruptedException {
  36.                                                 //reducer逻辑部分
  37.                         KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
  38.                                         Bytes.toBytes(count));
  39.                         context.write(key, kv);
  40.                 }
  41.         }
  42. }
复制代码

这里需要注意的是无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是:< ImmutableBytesWritable, KeyValue> 或者< ImmutableBytesWritable, Put>。否则报这样的错误:
  1. java.lang.IllegalArgumentException: Can't read partitions file
  2. ...
  3. Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
复制代码


上边配置部分,注释掉的其实写不写都无所谓,因为看源码(最后有贴出源码)就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的需要手动配置。setNumReduceTasks设置是根据region个数自动配置的。

生成的文件入库代码为:

  1. public class TestLoadIncrementalHFileToHBase {  
  2.     public static void main(String[] args) throws IOException {  
  3.         Configuration conf = HBaseConfiguration.create();  
  4.         byte[] TABLE = Bytes.toBytes(args[0]);  
  5.         HTable table = new HTable(TABLE);  
  6.         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
  7.         loader.doBulkLoad(new Path(args[1]), table);  
  8.     }  
  9. }
复制代码

另外hbase有打包好的入库jar包使用方法:
  1. hadoop jar hbase-VERSION.jar completebulkload /myoutput mytable;
复制代码

最后就是你执行你的程序时有可能遇到这样的问题:
  1. FAILED Error: java.lang.ClassNotFoundException: com.google.common.util.concurrent.ThreadFactoryBuilder
复制代码

就是你需要添加一个jar包,位置在HBASE_HOME/bin/guava-r09.jar ,添加上就OK了。

下边把HFileOutputFormat类的源码贴出来看一看:
  1. /**
  2. * Writes HFiles. Passed KeyValues must arrive in order.
  3. * Currently, can only write files to a single column family at a
  4. * time.  Multiple column families requires coordinating keys cross family.
  5. * Writes current time as the sequence id for the file. Sets the major compacted
  6. * attribute on created hfiles. Calling write(null,null) will forceably roll
  7. * all HFiles being written.
  8. * @see KeyValueSortReducer
  9. */
  10. public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
  11.   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
  12.   static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
  13.   TimeRangeTracker trt = new TimeRangeTracker();
  14.   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
  15.   throws IOException, InterruptedException {
  16.         // Get the path of the temporary output file
  17.         final Path outputPath = FileOutputFormat.getOutputPath(context);
  18.         final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
  19.         final Configuration conf = context.getConfiguration();
  20.         final FileSystem fs = outputdir.getFileSystem(conf);
  21.         // These configs. are from hbase-*.xml
  22.         final long maxsize = conf.getLong("hbase.hregion.max.filesize",
  23.                 HConstants.DEFAULT_MAX_FILE_SIZE);
  24.         final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",
  25.                 HFile.DEFAULT_BLOCKSIZE);
  26.         // Invented config.  Add to hbase-*.xml if other than default compression.
  27.         final String defaultCompression = conf.get("hfile.compression",
  28.                 Compression.Algorithm.NONE.getName());
  29.         // create a map from column family to the compression algorithm
  30.         final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
  31.         return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
  32.           // Map of families to writers and how much has been output on the writer.
  33.           private final Map<byte [], WriterLength> writers =
  34.                 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
  35.           private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
  36.           private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
  37.           private boolean rollRequested = false;
  38.           public void write(ImmutableBytesWritable row, KeyValue kv)
  39.           throws IOException {
  40.                 // null input == user explicitly wants to flush
  41.                 if (row == null && kv == null) {
  42.                   rollWriters();
  43.                   return;
  44.                 }
  45.                 byte [] rowKey = kv.getRow();
  46.                 long length = kv.getLength();
  47.                 byte [] family = kv.getFamily();
  48.                 WriterLength wl = this.writers.get(family);
  49.                 // If this is a new column family, verify that the directory exists
  50.                 if (wl == null) {
  51.                   fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
  52.                 }
  53.                 // If any of the HFiles for the column families has reached
  54.                 // maxsize, we need to roll all the writers
  55.                 if (wl != null && wl.written + length >= maxsize) {
  56.                   this.rollRequested = true;
  57.                 }
  58.                 // This can only happen once a row is finished though
  59.                 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
  60.                   rollWriters();
  61.                 }
  62.                 // create a new HLog writer, if necessary
  63.                 if (wl == null || wl.writer == null) {
  64.                   wl = getNewWriter(family, conf);
  65.                 }
  66.                 // we now have the proper HLog writer. full steam ahead
  67.                 kv.updateLatestStamp(this.now);
  68.                 trt.includeTimestamp(kv);
  69.                 wl.writer.append(kv);
  70.                 wl.written += length;
  71.                 // Copy the row so we know when a row transition.
  72.                 this.previousRow = rowKey;
  73.           }
  74.           private void rollWriters() throws IOException {
  75.                 for (WriterLength wl : this.writers.values()) {
  76.                   if (wl.writer != null) {
  77.                         LOG.info("Writer=" + wl.writer.getPath() +
  78.                                 ((wl.written == 0)? "": ", wrote=" + wl.written));
  79.                         close(wl.writer);
  80.                   }
  81.                   wl.writer = null;
  82.                   wl.written = 0;
  83.                 }
  84.                 this.rollRequested = false;
  85.           }
  86.           /* Create a new HFile.Writer.
  87.            * @param family
  88.            * @return A WriterLength, containing a new HFile.Writer.
  89.            * @throws IOException
  90.            */
  91.           private WriterLength getNewWriter(byte[] family, Configuration conf)
  92.                   throws IOException {
  93.                 WriterLength wl = new WriterLength();
  94.                 Path familydir = new Path(outputdir, Bytes.toString(family));
  95.                 String compression = compressionMap.get(family);
  96.                 compression = compression == null ? defaultCompression : compression;
  97.                 wl.writer =
  98.                   HFile.getWriterFactory(conf).createWriter(fs,
  99.                   StoreFile.getUniqueFile(fs, familydir), blocksize,
  100.                   compression, KeyValue.KEY_COMPARATOR);
  101.                 this.writers.put(family, wl);
  102.                 return wl;
  103.           }
  104.           private void close(final HFile.Writer w) throws IOException {
  105.                 if (w != null) {
  106.                   w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
  107.                           Bytes.toBytes(System.currentTimeMillis()));
  108.                   w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
  109.                           Bytes.toBytes(context.getTaskAttemptID().toString()));
  110.                   w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
  111.                           Bytes.toBytes(true));
  112.                   w.appendFileInfo(StoreFile.TIMERANGE_KEY,
  113.                           WritableUtils.toByteArray(trt));
  114.                   w.close();
  115.                 }
  116.           }
  117.           public void close(TaskAttemptContext c)
  118.           throws IOException, InterruptedException {
  119.                 for (WriterLength wl: this.writers.values()) {
  120.                   close(wl.writer);
  121.                 }
  122.           }
  123.         };
  124.   }
  125.   /*
  126.    * Data structure to hold a Writer and amount of data written on it.
  127.    */
  128.   static class WriterLength {
  129.         long written = 0;
  130.         HFile.Writer writer = null;
  131.   }
  132.   /**
  133.    * Return the start keys of all of the regions in this table,
  134.    * as a list of ImmutableBytesWritable.
  135.    */
  136.   private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
  137.   throws IOException {
  138.         byte[][] byteKeys = table.getStartKeys();
  139.         ArrayList<ImmutableBytesWritable> ret =
  140.           new ArrayList<ImmutableBytesWritable>(byteKeys.length);
  141.         for (byte[] byteKey : byteKeys) {
  142.           ret.add(new ImmutableBytesWritable(byteKey));
  143.         }
  144.         return ret;
  145.   }
  146.   /**
  147.    * Write out a SequenceFile that can be read by TotalOrderPartitioner
  148.    * that contains the split points in startKeys.
  149.    * @param partitionsPath output path for SequenceFile
  150.    * @param startKeys the region start keys
  151.    */
  152.   private static void writePartitions(Configuration conf, Path partitionsPath,
  153.           List<ImmutableBytesWritable> startKeys) throws IOException {
  154.         if (startKeys.isEmpty()) {
  155.           throw new IllegalArgumentException("No regions passed");
  156.         }
  157.         // We're generating a list of split points, and we don't ever
  158.         // have keys < the first region (which has an empty start key)
  159.         // so we need to remove it. Otherwise we would end up with an
  160.         // empty reducer with index 0
  161.         TreeSet<ImmutableBytesWritable> sorted =
  162.           new TreeSet<ImmutableBytesWritable>(startKeys);
  163.         ImmutableBytesWritable first = sorted.first();
  164.         if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
  165.           throw new IllegalArgumentException(
  166.                   "First region of table should have empty start key. Instead has: "
  167.                   + Bytes.toStringBinary(first.get()));
  168.         }
  169.         sorted.remove(first);
  170.         // Write the actual file
  171.         FileSystem fs = partitionsPath.getFileSystem(conf);
  172.         SequenceFile.Writer writer = SequenceFile.createWriter(fs,
  173.                 conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
  174.         try {
  175.           for (ImmutableBytesWritable startKey : sorted) {
  176.                 writer.append(startKey, NullWritable.get());
  177.           }
  178.         } finally {
  179.           writer.close();
  180.         }
  181.   }
  182.   /**
  183.    * Configure a MapReduce Job to perform an incremental load into the given
  184.    * table. This
  185.    * <ul>
  186.    *   <li>Inspects the table to configure a total order partitioner</li>
  187.    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
  188.    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
  189.    *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
  190.    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
  191.    *         PutSortReducer)</li>
  192.    * </ul>
  193.    * The user should be sure to set the map output value class to either KeyValue or Put before
  194.    * running this function.
  195.    */
  196.   public static void configureIncrementalLoad(Job job, HTable table)
  197.   throws IOException {
  198.         Configuration conf = job.getConfiguration();
  199.         Class<? extends Partitioner> topClass;
  200.         try {
  201.           topClass = getTotalOrderPartitionerClass();
  202.         } catch (ClassNotFoundException e) {
  203.           throw new IOException("Failed getting TotalOrderPartitioner", e);
  204.         }
  205.         job.setPartitionerClass(topClass);
  206.         job.setOutputKeyClass(ImmutableBytesWritable.class);
  207.         job.setOutputValueClass(KeyValue.class);
  208.         job.setOutputFormatClass(HFileOutputFormat.class);
  209.         // Based on the configured map output class, set the correct reducer to properly
  210.         // sort the incoming values.
  211.         // TODO it would be nice to pick one or the other of these formats.
  212.         if (KeyValue.class.equals(job.getMapOutputValueClass())) {
  213.           job.setReducerClass(KeyValueSortReducer.class);
  214.         } else if (Put.class.equals(job.getMapOutputValueClass())) {
  215.           job.setReducerClass(PutSortReducer.class);
  216.         } else {
  217.           LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
  218.         }
  219.         LOG.info("Looking up current regions for table " + table);
  220.         List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
  221.         LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
  222.                 "to match current region count");
  223.         job.setNumReduceTasks(startKeys.size());
  224.         Path partitionsPath = new Path(job.getWorkingDirectory(),
  225.                 "partitions_" + System.currentTimeMillis());
  226.         LOG.info("Writing partition information to " + partitionsPath);
  227.         FileSystem fs = partitionsPath.getFileSystem(conf);
  228.         writePartitions(conf, partitionsPath, startKeys);
  229.         partitionsPath.makeQualified(fs);
  230.         URI cacheUri;
  231.         try {
  232.           // Below we make explicit reference to the bundled TOP.  Its cheating.
  233.           // We are assume the define in the hbase bundled TOP is as it is in
  234.           // hadoop (whether 0.20 or 0.22, etc.)
  235.           cacheUri = new URI(partitionsPath.toString() + "#" +
  236.                 org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
  237.         } catch (URISyntaxException e) {
  238.           throw new IOException(e);
  239.         }
  240.         DistributedCache.addCacheFile(cacheUri, conf);
  241.         DistributedCache.createSymlink(conf);
  242.         // Set compression algorithms based on column families
  243.         configureCompression(table, conf);
  244.         TableMapReduceUtil.addDependencyJars(job);
  245.         LOG.info("Incremental table output configured.");
  246.   }
  247.   /**
  248.    * If > hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner.
  249.    * If 0.20, then we want to use the TOP that we have under hadoopbackport.
  250.    * This method is about hbase being able to run on different versions of
  251.    * hadoop.  In 0.20.x hadoops, we have to use the TOP that is bundled with
  252.    * hbase.  Otherwise, we use the one in Hadoop.
  253.    * @return Instance of the TotalOrderPartitioner class
  254.    * @throws ClassNotFoundException If can't find a TotalOrderPartitioner.
  255.    */
  256.   private static Class<? extends Partitioner> getTotalOrderPartitionerClass()
  257.   throws ClassNotFoundException {
  258.         Class<? extends Partitioner> clazz = null;
  259.         try {
  260.           clazz = (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner");
  261.         } catch (ClassNotFoundException e) {
  262.           clazz =
  263.                 (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner");
  264.         }
  265.         return clazz;
  266.   }
  267.   /**
  268.    * Run inside the task to deserialize column family to compression algorithm
  269.    * map from the
  270.    * configuration.
  271.    *
  272.    * Package-private for unit tests only.
  273.    *
  274.    * @return a map from column family to the name of the configured compression
  275.    *                 algorithm
  276.    */
  277.   static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
  278.         Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
  279.         String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
  280.         for (String familyConf : compressionConf.split("&")) {
  281.           String[] familySplit = familyConf.split("=");
  282.           if (familySplit.length != 2) {
  283.                 continue;
  284.           }
  285.           try {
  286.                 compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
  287.                         URLDecoder.decode(familySplit[1], "UTF-8"));
  288.           } catch (UnsupportedEncodingException e) {
  289.                 // will not happen with UTF-8 encoding
  290.                 throw new AssertionError(e);
  291.           }
  292.         }
  293.         return compressionMap;
  294.   }
  295.   /**
  296.    * Serialize column family to compression algorithm map to configuration.
  297.    * Invoked while configuring the MR job for incremental load.
  298.    *
  299.    * Package-private for unit tests only.
  300.    *
  301.    * @throws IOException
  302.    *                   on failure to read column family descriptors
  303.    */
  304.   static void configureCompression(HTable table, Configuration conf) throws IOException {
  305.         StringBuilder compressionConfigValue = new StringBuilder();
  306.         HTableDescriptor tableDescriptor = table.getTableDescriptor();
  307.         if(tableDescriptor == null){
  308.           // could happen with mock table instance
  309.           return;
  310.         }
  311.         Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
  312.         int i = 0;
  313.         for (HColumnDescriptor familyDescriptor : families) {
  314.           if (i++ > 0) {
  315.                 compressionConfigValue.append('&');
  316.           }
  317.           compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
  318.           compressionConfigValue.append('=');
  319.           compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
  320.         }
  321.         // Get rid of the last ampersand
  322.         conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
  323.   }
  324. }
复制代码










已有(1)人评论

跳转到指定楼层
zzuyao 发表于 2014-12-23 16:31:15
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条