分享

MapReduce生成HFile文件,再使用BulkLoad导入HBase中(完全分布式运行)

desehawk 2014-12-17 16:11:13 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 40926

问题导读

1.hbase的BulkLoad有哪些优点?
2.hbase的BulkLoad这种方式有哪些限制?







声明: 若要转载, 请标明出处.
前提: 在对于大量的数据导入到HBase中, 如果一条一条进行插入, 则太耗时了, 所以可以先采用MapReduce生成HFile文件, 然后使用BulkLoad导入HBase中.
引用:
一、这种方式有很多的优点:
1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。
2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。


二、这种方式也有很大的限制:
1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群.

本文代码采用Eclipse编辑器(Linux环境下)

一. 网上的大部分代码都是或多或少有问题, 比如他们或者不是运行在集群上,或者运行时有问题, 后面会对产生哪些问题进行说明, 先不说这么多了,先上代码吧.
二. 源代码(注: 作者亲测运行在集群上成功,集群基于Ubuntu12.04, Hadoop-1.2.1与HBase-0.98,使用自带的ZooKeeper)

1.  MapReduce生产HFile文件
     首先, 需要导入的数据的表格(BigClientEnergyInfo表)有四个列族, 每个列族下面有一些列, 这些信息都使用常量配置类CONSTANT_HADOOP与CONSTANT_HBASE进行说明,如下:
  1. package cn.hey.loaddata2hbase;  
  2.   
  3. /**
  4. *  
  5. * @author HeYong
  6. * @version 1
  7. * @time 2014-05-09
  8. *
  9. */  
  10. public class CONSTANT_HADOOP {  
  11.   
  12.     //大客户表BigClientEnergyInfo的HFile生成Job名字  
  13.     public static final String BigClientEnergyInfo_JobName = "BigClientEnergyInfo_HFileGenerator_Job";  
  14.     //大客户表BigClientEnergyInfo的输入原始文本信息的HDFS路径  
  15.     public static final String BigClientEnergyInfo_inDir = "hdfs://node1:49000/user/hadoop/input/BigClientEnergyInfo/";  
  16.     //大客户表BigClientEnergyInfo的HFile文件的输出HDFS路径  
  17.     public static final String BigClientEnergyInfo_HFile_outDir = "hdfs://node1:49000/user/hadoop/output/BigClientEnergyInfo/";  
  18.       
  19.     //说明: 因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split  
  20.     //所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先进行分区,  
  21.     //而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey  
  22.     //即startKey与endKey  
  23.     //获取最大rowKey与最小rowKey的Job名字  
  24.     public static final String GetMaxAndMinRowKey_JobName = "GetMaxAndMinRowKey_Job";  
  25.     //大客户表BigClientEnergyInfo的输入原始文本信息的HDFS路径  
  26.     public static final String GetMaxAndMinRowKey_inDir = "hdfs://node1:49000/user/hadoop/input/BigClientEnergyInfo/";  
  27.     //最大rowKey与最小rowKey的输出HDFS路径  
  28.     public static final String GetMaxAndMinRowKey_outDir = "hdfs://node1:49000/user/hadoop/output/GetMaxAndMinRowKey/";  
  29. }  
复制代码


  1. package cn.hey.loaddata2hbase;  
  2.   
  3. import java.util.LinkedList;  
  4. import java.util.List;  
  5.   
  6. import org.apache.hadoop.hbase.client.HTable;  
  7.   
  8. /**
  9. *  
  10. * @author HeYong
  11. * @version 1
  12. * @time 2014-05-09
  13. *
  14. */  
  15.   
  16. public class CONSTANT_HBASE {  
  17.   
  18.     public static final long timeStamp = System.currentTimeMillis();  
  19.   
  20.     //表集合  
  21.     public static List<HTable> htables = new LinkedList<HTable>();  
  22.     public static final String[] TableNames = {"BigClientEnergyInfo"};  
  23.       
  24.     /**
  25.      * 大客户表信息
  26.      */  
  27.     //列族信息  
  28.     public static final String[] TB0_FamilyNames = {"DateTime","MeterEnergy","ObjInfo","ClientInfo"};  
  29.     //第1个列族中的列  
  30.     public static final String[] TB0_FN0ColNames ={"DATETIME"};   
  31.     //第2个列族中的列  
  32.     public static final String[] TB0_FN1ColNames ={"DT","OBJ_ID","E0","E1","E2","E3","E4","E5"};   
  33.     //第3个列族中的列  
  34.     public static final String[] TB0_FN2ColNames ={"STAT_TYPE","CITY_NO","OBJ_ID","OBJ_NAME","LAYER","LAYER_ID","OBJ_TYPE","TYPE_VALUE",  
  35.         "TYPE_VALUE_GROUP","SORT","SYS_ID","STATION_NO","FLAG"};   
  36.     //第4个列族中的列  
  37.     public static final String[] TB0_FN3ColNames ={"CITY_NO","CONSUMERID","CONSUMERNAME","CUSTOMERTYPE","USERSTATUS","USERADDR","ZONEID","INDUSTRYTYPE",  
  38.         "LINKMAN","LINKPHONE","USETYPE","LINEID"};   
  39.     //列族信息集合  
  40.     public static final String[][] TB0_FNColNames={TB0_FN0ColNames,TB0_FN1ColNames,TB0_FN2ColNames,TB0_FN3ColNames};  
  41.     //每个列族的列索引  
  42.     public static final int[] FNColIndex={1,2,10,23};  
  43.       
  44. }  
复制代码





      接着, 使用创建一个生成四个列族的HFile的MapRed Job,每个列族一个Job, 源代码如下(类BigClientEnergyInfoHFileGenerator):
其中有三点需要特别注意:

      (1)


  1. //特别注意: 一定要设置,不然会报cannot read partitioner file错误  
  2. conf.set("fs.default.name","node1:49000");  
复制代码





     (2)
  1. //特别注意: 一定要设置,不然不会运行在集群上  
  2. conf.set("mapred.job.tracker","node1:49001");  
复制代码







     (3)


  1. //特别注意: 对相关Class文件以及依赖的jar包(如HBase的jar,)进行打包,这是运行在集群上必须要做的一步,不然集群找不到相关的Mapper等类文件  
  2.      File jarpath;  
  3. try {  
  4.      jarpath = JarTools.makeJar("bin");  
  5.      conf.set("mapred.jar", jarpath.toString());  
  6. } catch (Exception e) {  
  7.      logger.error("进行jar打包出错!");  
  8.      e.printStackTrace();  
  9.      return;  
  10. }  
复制代码





     特别注意:  因为我这里是对工程下的bin目录里面的内容进行打包,所以需要把依赖的jar包先放入bin文件夹中, 再Bulid Path->Add to Build Path, 不然会出现在运行时, 依赖的包中的类找不到, 如HBase包中的ImmutableBytesWritable类等.  当然你也可以放在别的目录下,然后进行打包, 反正需要将相关Class文件与依赖的jar包进行打包. 这里自己写了一个JarTools类进行对指定文件夹下面的内容进行打包


  1. package cn.hey.loaddata2hbase;  
  2.   
  3. import java.io.File;  
  4. import java.io.IOException;  
  5. import java.net.URI;  
  6. import java.util.ArrayList;  
  7. import java.util.Iterator;  
  8. import java.util.List;  
  9.   
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.fs.FileSystem;  
  12. import org.apache.hadoop.fs.Path;  
  13. import org.apache.hadoop.hbase.KeyValue;  
  14. import org.apache.hadoop.hbase.client.HTable;  
  15. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  16. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;  
  17. import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;  
  18. import org.apache.hadoop.hbase.util.Bytes;  
  19. import org.apache.hadoop.io.LongWritable;  
  20. import org.apache.hadoop.io.Text;  
  21. import org.apache.hadoop.mapreduce.Job;  
  22. import org.apache.hadoop.mapreduce.Mapper;  
  23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  25. import org.apache.log4j.LogManager;  
  26. import org.apache.log4j.Logger;  
  27.   
  28. import cn.hey.file.FileOperation;  
  29. import cn.hey.hbase.HbaseOperation;  
  30. import cn.hey.utils.JarTools;  
  31.   
  32. /**
  33. *  
  34. * @author HeYong
  35. * @version 1
  36. * @time 2014-05-09
  37. *
  38. */  
  39.   
  40. public class BigClientEnergyInfoHFileGenerator {  
  41.   
  42.     public static Logger logger = LogManager.getLogger(BigClientEnergyInfoHFileGenerator.class);  
  43.     /**
  44.      *  
  45.      * @param args 第一个元素表示第几个表,第二个元素表示该表的列族个数
  46.      * @throws IOException
  47.      * @throws InterruptedException
  48.      * @throws ClassNotFoundException
  49.      * @throws Exception
  50.      */  
  51.     public static void main(String[] args) throws  IOException, InterruptedException, ClassNotFoundException, Exception{  
  52.         if(args.length<2){  
  53.             logger.error("参数个数不对!");  
  54.             return;  
  55.         }  
  56.         int tableIndex = Integer.parseInt(args[0]);  
  57.         int familyNum = Integer.parseInt(args[1]);  
  58.         int index = 0;  
  59.         long beginTime=0,endTime=0;  
  60.         while(index<familyNum){  
  61.             beginTime = System.currentTimeMillis();  
  62.             GeneratorJob(tableIndex,index);  
  63.             endTime = System.currentTimeMillis();  
  64.             FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime1.txt",(((endTime-beginTime)/(1.0*60*1000)))+"\n");  
  65.             ++index;  
  66.         }  
  67.         FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime1.txt","-----------------------------");  
  68.     }  
  69.       
  70.     public static class HFileGenerateMapper extends  
  71.             Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {  
  72.         private static int familyIndex = 0;  
  73.         private static Configuration conf = null;  
  74.         @Override  
  75.         protected void setup(Context context) throws IOException,  
  76.                 InterruptedException {  
  77.             conf = context.getConfiguration();  
  78.             familyIndex = conf.getInt("familyIndex",0);  
  79.         }  
  80.         @Override  
  81.         protected void map(LongWritable key, Text value, Context context)  
  82.                 throws IOException, InterruptedException {  
  83.             ImmutableBytesWritable rowkey = new ImmutableBytesWritable(  
  84.                     value.toString().split(",")[0].getBytes());  
  85.             List<KeyValue> list = null;  
  86.             list = createKeyValue(value.toString());  
  87.             Iterator<KeyValue> it = list.iterator();  
  88.             while (it.hasNext()) {  
  89.                 KeyValue kv = new KeyValue();  
  90.                 kv = it.next();  
  91.                 if (kv != null) {  
  92.                     context.write(rowkey, kv);  
  93.                 }  
  94.             }  
  95.   
  96.         }  
  97.         private List<KeyValue> createKeyValue(String str) {  
  98.             List<KeyValue> list = new ArrayList<KeyValue>(CONSTANT_HBASE.TB0_FNColNames[familyIndex].length);  
  99.             String[] values = str.toString().split(",");  
  100.             String[] qualifiersName = CONSTANT_HBASE.TB0_FNColNames[familyIndex];  
  101.             for (int i = 0; i < qualifiersName.length; i++) {  
  102.                 String rowkey = values[0];  
  103.                 String family = CONSTANT_HBASE.TB0_FamilyNames[familyIndex];  
  104.                 String qualifier = qualifiersName[i];  
  105.                 String value_str = values[i+CONSTANT_HBASE.FNColIndex[familyIndex]];  
  106.   
  107.                 KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),  
  108.                         Bytes.toBytes(family), Bytes.toBytes(qualifier),  
  109.                         CONSTANT_HBASE.timeStamp, Bytes.toBytes(value_str));  
  110.                 list.add(kv);  
  111.             }  
  112.             return list;  
  113.         }  
  114.     }  
  115.   
  116.     //测试Mapper,用来进行测试的, 后面没有用到  
  117.     public static class HFileMapper extends Mapper<LongWritable, Text,ImmutableBytesWritable,KeyValue> {   
  118.      
  119.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {   
  120.             String[] values = value.toString().split(" ", -1);   
  121.               
  122.               
  123.             byte[] rkey = Bytes.toBytes(values[0]);     //rowkey  
  124.             byte[] family = Bytes.toBytes("info");      //列族  
  125.             byte[] column = Bytes.toBytes("name");      //列  
  126.             byte[] val = Bytes.toBytes(values[1]);      //值  
  127.             //Put tmpPut=new  Put(subject);  
  128.             ImmutableBytesWritable rowKey = new ImmutableBytesWritable(rkey);   
  129.             
  130.             KeyValue kvProtocol = new KeyValue(rkey , family, column, val);                                                                                    
  131.             context.write(rowKey,  kvProtocol );      
  132.         }   
  133.    
  134.     }   
  135.    /**
  136.    *  
  137.    * @param tableIndex 表示第几个表(从0开始),具体参见CONSTANT_HBASE类
  138.    * @param familyIndex 表示该表的第几个列族(从0开始),具体参见CONSTANT_HBASE类
  139.    * @throws IOException
  140.    */  
  141.     public static void GeneratorJob(int tableIndex,int familyIndex) throws IOException{   
  142.          Configuration conf = HbaseOperation.HBASE_CONFIG;   
  143.          //特别注意: 一定要设置,不然会爆cannot read partitioner file错误  
  144.          conf.set("fs.default.name","node1:49000");  
  145.          //特别注意: 一定要设置,不然不会运行在集群上  
  146.          conf.set("mapred.job.tracker","node1:49001");  
  147.          //特别注意: 对相关Class以及依赖的jar包(如HBase的jar)进行打包,这是运行在集群上必须要做的一步,不然集群找不到相关的Mapper等类文件  
  148.          File jarpath;  
  149.          try {  
  150.              jarpath = JarTools.makeJar("bin");  
  151.              conf.set("mapred.jar", jarpath.toString());  
  152.          } catch (Exception e) {  
  153.              logger.error("进行jar打包出错!");  
  154.              e.printStackTrace();  
  155.              return;  
  156.          }  
  157.            
  158.          //设置job  
  159.          Job job = new Job(conf, CONSTANT_HADOOP.BigClientEnergyInfo_JobName);   
  160.          job.setJarByClass(BigClientEnergyInfoHFileGenerator.class);   
  161.      
  162.          //设置Map任务输出Key-Value类型,一定要为该类型,Value可以改为HBase的Put类型  
  163.          job.setOutputKeyClass(ImmutableBytesWritable.class);   
  164.          job.setOutputValueClass(KeyValue.class);   
  165.      
  166.          //设置Mapper与Reducer类  
  167.          job.setMapperClass(HFileGenerateMapper.class);   
  168.          job.setReducerClass(KeyValueSortReducer.class);   
  169.          // 不需要设置,系统会根据相关信息调用 HFileOutputFormat  
  170.          // job.setOutputFormatClass(HFileOutputFormat.class);  
  171.          // 不需要设置, 系统会根据表的Region数创建多少Reducer  
  172.          // job.setNumReduceTasks(4);   
  173.          // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);   
  174.   
  175.           HTable table = new HTable(conf, CONSTANT_HBASE.TableNames[tableIndex]);   
  176.           HFileOutputFormat.configureIncrementalLoad(job, table);   
  177.      
  178.           //设置数据输入输出目录  
  179.           String str_inPath = CONSTANT_HADOOP.BigClientEnergyInfo_inDir;  
  180.           String str_outPath = CONSTANT_HADOOP.BigClientEnergyInfo_HFile_outDir+CONSTANT_HBASE.TB0_FamilyNames[familyIndex];  
  181.             
  182.           //创建HDFS对象  
  183.           FileSystem fs = FileSystem.get(URI.create(str_inPath),conf);  
  184.           // 如果输出路径存在就先删掉,因为不允许输出路径事先存在  
  185.           Path outPath = new Path(str_outPath);  
  186.           if (fs.exists(outPath))  
  187.               fs.delete(outPath, true);  
  188.           FileInputFormat.addInputPath(job, new Path(str_inPath));   
  189.           FileOutputFormat.setOutputPath(job, new Path(str_outPath));   
  190.          
  191.           try {  
  192.                 job.waitForCompletion(true);  
  193.           } catch (InterruptedException e) {  
  194.               logger.info(CONSTANT_HADOOP.BigClientEnergyInfo_JobName+" 任务运行出错!");  
  195.                 e.printStackTrace();  
  196.           } catch (ClassNotFoundException e) {  
  197.                 logger.info(CONSTANT_HADOOP.BigClientEnergyInfo_JobName+" 任务运行出错!");  
  198.                 e.printStackTrace();  
  199.           }  
  200.     }   
  201.   
  202. }  
复制代码







生成HFile程序说明:
①. 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
②. 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
③. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。好像最新的版本可以多个列族.
④. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置。TotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
⑤. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。
       然后, 使用BulkLoad工具将HFile文件导入HBase中,  源代码如下(类BigClientEnergyInfoHFileLoader):

  1. package cn.hey.loaddata2hbase;  
  2.   
  3. import java.io.File;  
  4.   
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;  
  7. import org.apache.log4j.LogManager;  
  8. import org.apache.log4j.Logger;  
  9.   
  10. import cn.hey.file.FileOperation;  
  11. import cn.hey.hbase.HbaseOperation;  
  12.   
  13. /**
  14. *  
  15. * @author HeYong
  16. * @version 1
  17. * @time 2014-05-09
  18. *
  19. */  
  20.   
  21. public class BigClientEnergyInfoHFileLoader {  
  22.   
  23.     public static Logger logger = LogManager.getLogger(HFileLoader.class);  
  24.     public static void main(String[] args) throws Exception {  
  25.       
  26.          
  27.         if(args.length<2){  
  28.             logger.error("参数个数不对!");  
  29.             return;  
  30.         }  
  31.         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(  
  32.                 HbaseOperation.HBASE_CONFIG);  
  33.         
  34.         int tableIndex = Integer.parseInt(args[0]);  
  35.         int familyNum = Integer.parseInt(args[1]);  
  36.         int i = 0;  
  37.         long beginTime=0,endTime=0;  
  38.         while(i<familyNum){  
  39.             beginTime = System.currentTimeMillis();  
  40.             String str_outPath = CONSTANT_HADOOP.str_outPath+CONSTANT_HBASE.TB0_FamilyNames[i];  
  41.             loader.doBulkLoad(new Path(str_outPath),CONSTANT_HBASE.htables.get(tableIndex));  
  42.             endTime = System.currentTimeMillis();  
  43.                 //将用时相关写入文件  
  44.                 FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime2.txt",(((endTime-beginTime)/(1.0*60*1000)))+"\n");  
  45.             ++i;  
  46.         }  
  47.         FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime2.txt","------------------------");  
  48.     }  
  49.       
  50.   
  51. }  
复制代码



       最后,使用一个Driver类, 先创建HTable,然后调用上面的两个类,源代码如下(类BigClientEnergyInfoLoadDriver):
       说明: 因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split, 所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先创建分区,可以查阅相关资料(关于HBase调优的资料), 而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey, 即startKey与endKey.

  1. package cn.hey.loaddata2hbase;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.ArrayList;  
  5. import java.util.List;  
  6.   
  7. import org.apache.hadoop.hbase.client.HTable;  
  8. import org.apache.log4j.LogManager;  
  9. import org.apache.log4j.Logger;  
  10.   
  11. import cn.hey.hbase.HbaseOperation;  
  12. import cn.hey.hdfs.HDFSOperation;  
  13.   
  14. /**
  15. *  
  16. * @author HeYong
  17. * @version 1
  18. * @time 2014-05-09
  19. *
  20. */  
  21.   
  22. public class BigClientEnergyInfoLoadDriver {  
  23.   
  24.     protected static Logger logger = LogManager.getLogger(BigClientEnergyInfoLoadDriver.class);  
  25.     /**
  26.      * @param args
  27.      * @throws ClassNotFoundException  
  28.      * @throws InterruptedException  
  29.      * @throws IOException  
  30.      */  
  31.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  
  32.         //首先删除在CONSTANT_HBASE类中的第0个表,即BigClientEnergyInfo表  
  33.         dropHTable(0);  
  34.         /**
  35.          * 说明: 因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split,
  36.          * 所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先创建分区,可以查阅相关资料(关于HBase调优的资料),
  37.          * 而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey,
  38.          * 即startKey与endKey.
  39.          *  
  40.          */  
  41.         //调用GetMaxAndMinRowKeyDriver.获取startKey与endKey  
  42.         GetMaxAndMinRowKeyDriver.main(null);  
  43.         //读取startKey与endKey,readHDFSFile方法即读取指定HDFS文件中的内容,每一行作为一个字符串  
  44.         List<String> strList = HDFSOperation.readHDFSFile(CONSTANT_HADOOP.GetMaxAndMinRowKey_outDir+"part-r-00000");  
  45.         if(strList==null||strList.size()<2){  
  46.             logger.info("startKey与endKey读取失败!");  
  47.             return;  
  48.         }  
  49.         String startKey = strList.get(0);  
  50.         String endKey = strList.get(1);  
  51.         if(startKey==null||"".equals(startKey)||endKey==null||"".equals(endKey)){  
  52.             logger.info("startKey或endKey为空!");  
  53.             return;  
  54.         }  
  55.         args = new String[2];  
  56.         //第0个表,表的索引,即表BigClientEnergyInfo  
  57.         args[0]="0";  
  58.         //该表所拥有的列族的数目  
  59.         args[1]= ""+CONSTANT_HBASE.TB0_FamilyNames.length;  
  60.         //创建第0个表,即大客户表  
  61.         boolean flag = false;  
  62.         try {  
  63.             //创建表时预先创建的Region个数  
  64.             int numPreRegions = 7;  
  65.             flag = createHTable(0,startKey,endKey,numPreRegions);  
  66.         } catch (IOException e1) {  
  67.             e1.printStackTrace();  
  68.         }  
  69.         if(flag){  
  70.             //产生该表的HFile文件  
  71.             try {  
  72.                 BigClientEnergyInfoHFileGenerator.main(args);  
  73.             } catch (IOException e) {  
  74.                 e.printStackTrace();  
  75.             } catch (InterruptedException e) {  
  76.                 e.printStackTrace();  
  77.             } catch (ClassNotFoundException e) {  
  78.                 e.printStackTrace();  
  79.             } catch (Exception e) {  
  80.                 e.printStackTrace();  
  81.             }  
  82.               
  83.             //将HFile导入HBase中  
  84.             try {  
  85.                 HFileLoader.main(args);  
  86.             } catch (Exception e) {  
  87.                 e.printStackTrace();  
  88.             }  
  89.         }  
  90.     }  
  91.     /**
  92.      *  
  93.      * @param index 第几个表
  94.      * @param startKey 创建预先分区的startKey
  95.      * @param endKey 创建预先分区的endKey
  96.      * @param numRegions 创建预先分区个数
  97.      * @return 是否创建成功
  98.      * @throws IOException
  99.      */  
  100.     public static boolean createHTable(int index,String startKey,String endKey,int numRegions) throws IOException{  
  101.         if(index<0||index>=CONSTANT_HBASE.TableNames.length){  
  102.             logger.error("表下标越界!");  
  103.             return false;  
  104.         }  
  105.         if(startKey==null||"".equals(startKey)){  
  106.             logger.error("startKey不能为空!");  
  107.             return false;  
  108.         }  
  109.         if(endKey==null||"".equals(endKey)){  
  110.             logger.error("endKey不能为空!");  
  111.             return false;  
  112.         }  
  113.         if(numRegions<0){  
  114.             logger.error("分区个数<0!");  
  115.             return false;  
  116.         }  
  117.         List<String> list = new ArrayList<String>();  
  118.         String tableName = CONSTANT_HBASE.TableNames[index];  
  119.         for(String familyName:CONSTANT_HBASE.TB0_FamilyNames){  
  120.             list.add(familyName);  
  121.         }  
  122.         if(HbaseOperation.createTable(tableName, list,startKey,endKey,numRegions)){  
  123.             logger.info("创建HTable :"+tableName+"成功");  
  124.         }  
  125.         HTable table = new HTable(HbaseOperation.HBASE_CONFIG,tableName);  
  126.         CONSTANT_HBASE.htables.add(table);  
  127.         return true;  
  128.     }  
  129.     public static void dropHTable(int index){  
  130.         String tableName = CONSTANT_HBASE.TableNames[index];  
  131.         HbaseOperation.dropTable(tableName);  
  132.     }  
  133.       
  134. }  
复制代码


  1. 注: HbaseOperation.createTable方法, 即创建表, HbaseOperation.dropTable方法,即删除表, 源代码如下:  
  2. /**
  3.      * 创建表
  4.      *  
  5.      * @param tableName
  6.      * @param family 列族集名称
  7.      * @param String startKey,String endKey,int numRegions 预先分区相关信息
  8.      */  
  9.     public static boolean createTable(String tableName,List<String> family,String startKey,String endKey,int numRegions) {  
  10.         try {  
  11.             hBaseAdmin = new HBaseAdmin(HBASE_CONFIG);  
  12.             //如果表已存在,则返回  
  13.             if (hBaseAdmin.tableExists(tableName)) {  
  14.   
  15.                 //hBaseAdmin.disableTable(tableName);  
  16.                 //hBaseAdmin.deleteTable(tableName);  
  17.                 logger.info("表: "+tableName+"已经存在!");  
  18.                 return false;  
  19.             }  
  20.             HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  
  21.             for(String name:family){  
  22.                 tableDescriptor.addFamily(new HColumnDescriptor(name));  
  23.             }  
  24.             hBaseAdmin.createTable(tableDescriptor,Bytes.toBytes(startKey),Bytes.toBytes(endKey),numRegions);  
  25.         } catch (MasterNotRunningException e) {  
  26.             e.printStackTrace();  
  27.         } catch (ZooKeeperConnectionException e) {  
  28.             e.printStackTrace();  
  29.         } catch (IOException e) {  
  30.             e.printStackTrace();  
  31.         }  
  32.         return true;  
  33.     }  
  34. /**
  35.      * 删除一张表
  36.      *  
  37.      * @param tableName 表名
  38.      */  
  39.     public static void dropTable(String tableName) {  
  40.         if(tableName==null||"".equals(tableName)){  
  41.             logger.error("表名不能为空!");  
  42.             return;  
  43.         }  
  44.         try {  
  45.             hBaseAdmin = new HBaseAdmin(HBASE_CONFIG);  
  46.             hBaseAdmin.disableTable(tableName);  
  47.             hBaseAdmin.deleteTable(tableName);  
  48.         } catch (MasterNotRunningException e) {  
  49.             e.printStackTrace();  
  50.         } catch (ZooKeeperConnectionException e) {  
  51.             e.printStackTrace();  
  52.         } catch (IOException e) {  
  53.             e.printStackTrace();  
  54.         }  
  55.   
  56.     }
  57. 特别注意: 对HBase进行操作时, 在获取HBase conf时, 即public static Configuration HBASE_CONFIG = HBaseConfiguration.create();的时候, 一定要进行如下设置:  
  58. static {              
  59.                  //设置HMaster  
  60.                   HBASE_CONFIG.set("hbase.zookeeper.master","node1:60000");  
  61.          //设置Zookeeper集群  
  62.           HBASE_CONFIG.set("hbase.zookeeper.quorum", "node2,node3,node4,node5,node6,node7,node8");   
  63.      }不然会出现RegionServer的Zookeeper连接不上HMaster, 千万要注意.
复制代码

到这里就基本大功告成了. 可以通过node1:50030查看job的运行情况, 通过node1:60010查看HBase的相关情况




已有(7)人评论

跳转到指定楼层
271592448 发表于 2014-12-18 09:37:17
1、”仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况“这个地方理解的不对。在表中已经有数据的情况,对rowkey进行排序后,是可以继续导入的。
2、建表的时候,可以做预分区。hbase表按照字典序的,可以对rowkey的某一个值进行取模,做均匀分布,类似phoenix里面的加盐。
回复

使用道具 举报

redhat1986 发表于 2014-12-22 16:20:21
"而且不占用region资源"  hbase的数据不是都存储在region里面的吗?这样存储会不会出现数据倾斜?
回复

使用道具 举报

ainubis 发表于 2015-3-28 18:52:38
谢谢楼主分享。
回复

使用道具 举报

雷夫23 发表于 2016-7-4 18:59:15
谢谢楼主的分享
回复

使用道具 举报

ohano_javaee 发表于 2018-3-23 10:40:19
我有个问题,我并不知道hbase表在hdfs中的存放位置,怎么破?
回复

使用道具 举报

yoyolachel 发表于 2018-7-18 16:30:14
这个厉害,绝对干货!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条