问题导读:
1.面对巨量的数据,该如何导入hbase?
2.Bulk Loading适合什么场景?
3.HFileOutputFormat类的作用是什么?
4.Bulk Loading的原理是什么?
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即hbase提供的HFileOutputFormat类。
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
下边给出mapreduce程序样例,数据源是hbase,结果文件输出路径自定义:
- public class HFileOutput {
- //job 配置
- public static Job configureJob(Configuration conf) throws IOException {
- Job job = new Job(configuration, "countUnite1");
- job.setJarByClass(HFileOutput.class);
- //job.setNumReduceTasks(2);
- //job.setOutputKeyClass(ImmutableBytesWritable.class);
- //job.setOutputValueClass(KeyValue.class);
- //job.setOutputFormatClass(HFileOutputFormat.class);
-
- Scan scan = new Scan();
- scan.setCaching(10);
- scan.addFamily(INPUT_FAMILY);
- TableMapReduceUtil.initTableMapperJob(inputTable, scan,
- HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
- //这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class
- job.setReducerClass(HFileOutputRedcuer.class);
- //job.setOutputFormatClass(HFileOutputFormat.class);
- HFileOutputFormat.configureIncrementalLoad(job, new HTable(
- configuration, outputTable));
- HFileOutputFormat.setOutputPath(job, new Path());
- //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
- return job;
- }
-
- public static class HFileOutputMapper extends
- TableMapper<ImmutableBytesWritable, LongWritable> {
- public void map(ImmutableBytesWritable key, Result values,
- Context context) throws IOException, InterruptedException {
- //mapper逻辑部分
- context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
- }
- }
-
- public static class HFileOutputRedcuer extends
- Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
- public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
- Context context) throws IOException, InterruptedException {
- //reducer逻辑部分
- KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
- Bytes.toBytes(count));
- context.write(key, kv);
- }
- }
- }
复制代码
这里需要注意的是无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是:< ImmutableBytesWritable, KeyValue> 或者< ImmutableBytesWritable, Put>。否则报这样的错误:
- java.lang.IllegalArgumentException: Can't read partitions file
- ...
- Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
复制代码
上边配置部分,注释掉的其实写不写都无所谓,因为看源码(最后有贴出源码)就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的需要手动配置。setNumReduceTasks设置是根据region个数自动配置的。
生成的文件入库代码为:
- public class TestLoadIncrementalHFileToHBase {
- public static void main(String[] args) throws IOException {
- Configuration conf = HBaseConfiguration.create();
- byte[] TABLE = Bytes.toBytes(args[0]);
- HTable table = new HTable(TABLE);
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- loader.doBulkLoad(new Path(args[1]), table);
- }
- }
复制代码
另外hbase有打包好的入库jar包使用方法:
- hadoop jar hbase-VERSION.jar completebulkload /myoutput mytable;
复制代码
最后就是你执行你的程序时有可能遇到这样的问题:
- FAILED Error: java.lang.ClassNotFoundException: com.google.common.util.concurrent.ThreadFactoryBuilder
复制代码
就是你需要添加一个jar包,位置在HBASE_HOME/bin/guava-r09.jar ,添加上就OK了。
下边把HFileOutputFormat类的源码贴出来看一看:
- /**
- * Writes HFiles. Passed KeyValues must arrive in order.
- * Currently, can only write files to a single column family at a
- * time. Multiple column families requires coordinating keys cross family.
- * Writes current time as the sequence id for the file. Sets the major compacted
- * attribute on created hfiles. Calling write(null,null) will forceably roll
- * all HFiles being written.
- * @see KeyValueSortReducer
- */
- public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
- static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
- static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
- TimeRangeTracker trt = new TimeRangeTracker();
-
- public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
- throws IOException, InterruptedException {
- // Get the path of the temporary output file
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = outputdir.getFileSystem(conf);
- // These configs. are from hbase-*.xml
- final long maxsize = conf.getLong("hbase.hregion.max.filesize",
- HConstants.DEFAULT_MAX_FILE_SIZE);
- final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",
- HFile.DEFAULT_BLOCKSIZE);
- // Invented config. Add to hbase-*.xml if other than default compression.
- final String defaultCompression = conf.get("hfile.compression",
- Compression.Algorithm.NONE.getName());
-
- // create a map from column family to the compression algorithm
- final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
-
- return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
- // Map of families to writers and how much has been output on the writer.
- private final Map<byte [], WriterLength> writers =
- new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
- private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
- private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
- private boolean rollRequested = false;
-
- public void write(ImmutableBytesWritable row, KeyValue kv)
- throws IOException {
- // null input == user explicitly wants to flush
- if (row == null && kv == null) {
- rollWriters();
- return;
- }
-
- byte [] rowKey = kv.getRow();
- long length = kv.getLength();
- byte [] family = kv.getFamily();
- WriterLength wl = this.writers.get(family);
-
- // If this is a new column family, verify that the directory exists
- if (wl == null) {
- fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
- }
-
- // If any of the HFiles for the column families has reached
- // maxsize, we need to roll all the writers
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
-
- // This can only happen once a row is finished though
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
- rollWriters();
- }
-
- // create a new HLog writer, if necessary
- if (wl == null || wl.writer == null) {
- wl = getNewWriter(family, conf);
- }
-
- // we now have the proper HLog writer. full steam ahead
- kv.updateLatestStamp(this.now);
- trt.includeTimestamp(kv);
- wl.writer.append(kv);
- wl.written += length;
-
- // Copy the row so we know when a row transition.
- this.previousRow = rowKey;
- }
-
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info("Writer=" + wl.writer.getPath() +
- ((wl.written == 0)? "": ", wrote=" + wl.written));
- close(wl.writer);
- }
- wl.writer = null;
- wl.written = 0;
- }
- this.rollRequested = false;
- }
-
- /* Create a new HFile.Writer.
- * @param family
- * @return A WriterLength, containing a new HFile.Writer.
- * @throws IOException
- */
- private WriterLength getNewWriter(byte[] family, Configuration conf)
- throws IOException {
- WriterLength wl = new WriterLength();
- Path familydir = new Path(outputdir, Bytes.toString(family));
- String compression = compressionMap.get(family);
- compression = compression == null ? defaultCompression : compression;
- wl.writer =
- HFile.getWriterFactory(conf).createWriter(fs,
- StoreFile.getUniqueFile(fs, familydir), blocksize,
- compression, KeyValue.KEY_COMPARATOR);
- this.writers.put(family, wl);
- return wl;
- }
-
- private void close(final HFile.Writer w) throws IOException {
- if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
- Bytes.toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
- Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
- Bytes.toBytes(true));
- w.appendFileInfo(StoreFile.TIMERANGE_KEY,
- WritableUtils.toByteArray(trt));
- w.close();
- }
- }
-
- public void close(TaskAttemptContext c)
- throws IOException, InterruptedException {
- for (WriterLength wl: this.writers.values()) {
- close(wl.writer);
- }
- }
- };
- }
-
- /*
- * Data structure to hold a Writer and amount of data written on it.
- */
- static class WriterLength {
- long written = 0;
- HFile.Writer writer = null;
- }
-
- /**
- * Return the start keys of all of the regions in this table,
- * as a list of ImmutableBytesWritable.
- */
- private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
- throws IOException {
- byte[][] byteKeys = table.getStartKeys();
- ArrayList<ImmutableBytesWritable> ret =
- new ArrayList<ImmutableBytesWritable>(byteKeys.length);
- for (byte[] byteKey : byteKeys) {
- ret.add(new ImmutableBytesWritable(byteKey));
- }
- return ret;
- }
-
- /**
- * Write out a SequenceFile that can be read by TotalOrderPartitioner
- * that contains the split points in startKeys.
- * @param partitionsPath output path for SequenceFile
- * @param startKeys the region start keys
- */
- private static void writePartitions(Configuration conf, Path partitionsPath,
- List<ImmutableBytesWritable> startKeys) throws IOException {
- if (startKeys.isEmpty()) {
- throw new IllegalArgumentException("No regions passed");
- }
-
- // We're generating a list of split points, and we don't ever
- // have keys < the first region (which has an empty start key)
- // so we need to remove it. Otherwise we would end up with an
- // empty reducer with index 0
- TreeSet<ImmutableBytesWritable> sorted =
- new TreeSet<ImmutableBytesWritable>(startKeys);
-
- ImmutableBytesWritable first = sorted.first();
- if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IllegalArgumentException(
- "First region of table should have empty start key. Instead has: "
- + Bytes.toStringBinary(first.get()));
- }
- sorted.remove(first);
-
- // Write the actual file
- FileSystem fs = partitionsPath.getFileSystem(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fs,
- conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
-
- try {
- for (ImmutableBytesWritable startKey : sorted) {
- writer.append(startKey, NullWritable.get());
- }
- } finally {
- writer.close();
- }
- }
-
- /**
- * Configure a MapReduce Job to perform an incremental load into the given
- * table. This
- * <ul>
- * <li>Inspects the table to configure a total order partitioner</li>
- * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
- * <li>Sets the number of reduce tasks to match the current number of regions</li>
- * <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
- * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
- * PutSortReducer)</li>
- * </ul>
- * The user should be sure to set the map output value class to either KeyValue or Put before
- * running this function.
- */
- public static void configureIncrementalLoad(Job job, HTable table)
- throws IOException {
- Configuration conf = job.getConfiguration();
- Class<? extends Partitioner> topClass;
- try {
- topClass = getTotalOrderPartitionerClass();
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed getting TotalOrderPartitioner", e);
- }
- job.setPartitionerClass(topClass);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
- job.setOutputFormatClass(HFileOutputFormat.class);
-
- // Based on the configured map output class, set the correct reducer to properly
- // sort the incoming values.
- // TODO it would be nice to pick one or the other of these formats.
- if (KeyValue.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(KeyValueSortReducer.class);
- } else if (Put.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(PutSortReducer.class);
- } else {
- LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
- }
-
- LOG.info("Looking up current regions for table " + table);
- List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
- LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
- "to match current region count");
- job.setNumReduceTasks(startKeys.size());
-
- Path partitionsPath = new Path(job.getWorkingDirectory(),
- "partitions_" + System.currentTimeMillis());
- LOG.info("Writing partition information to " + partitionsPath);
-
- FileSystem fs = partitionsPath.getFileSystem(conf);
- writePartitions(conf, partitionsPath, startKeys);
- partitionsPath.makeQualified(fs);
-
- URI cacheUri;
- try {
- // Below we make explicit reference to the bundled TOP. Its cheating.
- // We are assume the define in the hbase bundled TOP is as it is in
- // hadoop (whether 0.20 or 0.22, etc.)
- cacheUri = new URI(partitionsPath.toString() + "#" +
- org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
- DistributedCache.addCacheFile(cacheUri, conf);
- DistributedCache.createSymlink(conf);
-
- // Set compression algorithms based on column families
- configureCompression(table, conf);
-
- TableMapReduceUtil.addDependencyJars(job);
- LOG.info("Incremental table output configured.");
- }
-
- /**
- * If > hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner.
- * If 0.20, then we want to use the TOP that we have under hadoopbackport.
- * This method is about hbase being able to run on different versions of
- * hadoop. In 0.20.x hadoops, we have to use the TOP that is bundled with
- * hbase. Otherwise, we use the one in Hadoop.
- * @return Instance of the TotalOrderPartitioner class
- * @throws ClassNotFoundException If can't find a TotalOrderPartitioner.
- */
- private static Class<? extends Partitioner> getTotalOrderPartitionerClass()
- throws ClassNotFoundException {
- Class<? extends Partitioner> clazz = null;
- try {
- clazz = (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner");
- } catch (ClassNotFoundException e) {
- clazz =
- (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner");
- }
- return clazz;
- }
-
- /**
- * Run inside the task to deserialize column family to compression algorithm
- * map from the
- * configuration.
- *
- * Package-private for unit tests only.
- *
- * @return a map from column family to the name of the configured compression
- * algorithm
- */
- static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
- Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
- String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
- for (String familyConf : compressionConf.split("&")) {
- String[] familySplit = familyConf.split("=");
- if (familySplit.length != 2) {
- continue;
- }
-
- try {
- compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
- URLDecoder.decode(familySplit[1], "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // will not happen with UTF-8 encoding
- throw new AssertionError(e);
- }
- }
- return compressionMap;
- }
-
- /**
- * Serialize column family to compression algorithm map to configuration.
- * Invoked while configuring the MR job for incremental load.
- *
- * Package-private for unit tests only.
- *
- * @throws IOException
- * on failure to read column family descriptors
- */
- static void configureCompression(HTable table, Configuration conf) throws IOException {
- StringBuilder compressionConfigValue = new StringBuilder();
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
- if(tableDescriptor == null){
- // could happen with mock table instance
- return;
- }
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- compressionConfigValue.append('&');
- }
- compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
- compressionConfigValue.append('=');
- compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
- }
- // Get rid of the last ampersand
- conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
- }
- }
复制代码
|