分享

通过BulkLoad向Hbase插入数据

Hbase导入数据的方式很多中,这里介绍通过BulkLoad:
1.BulkLoad是什么?

2.需要将那三个包放到$HADOOP_HOME/lib下?
3.hadoop如何配置?

4.如何通过BulkLoad上传数据

已有(9)人评论

跳转到指定楼层
sstutu 发表于 2014-3-19 20:33:49
补充一下:使用MapReduce往Hbase插入数据:
  1. import java.io.IOException;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.hbase.HBaseConfiguration;  
  6. import org.apache.hadoop.hbase.client.Put;  
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  8. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  9. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Job;  
  13. import org.apache.hadoop.mapreduce.Mapper;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15.   
  16. import com.hbase.log.RecordParser;  
  17.   
  18. public class HbaseInsertData {  
  19.       
  20.     public static class HbaseMapper   
  21.         extends Mapper<LongWritable, Text, Text, Text>{  
  22.          
  23.         RecordParser parser = new RecordParser();  
  24.         @Override  
  25.         protected void map(LongWritable key, Text value, Context context)  
  26.                 throws IOException, InterruptedException {  
  27.             parser.parse(value);  
  28.             String phone = parser.getPhone();  
  29.             int bloodPressure = parser.getBloodPressure();  
  30.             if(bloodPressure > 150) {  
  31.                 context.write(new Text(phone), new Text(bloodPressure + ""));  
  32.             }  
  33.         }  
  34.     }  
  35.       
  36.     public static class HbaseReducer  
  37.         extends TableReducer<Text, Text, ImmutableBytesWritable> {  
  38.   
  39.         @Override  
  40.         protected void reduce(Text key, Iterable<Text> values,  
  41.                 Context context)  
  42.                 throws IOException, InterruptedException {  
  43.             String value = values.iterator().next().toString();  
  44.             Put putRow = new Put(key.getBytes());  
  45.             putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());  
  46.               
  47.             context.write(new ImmutableBytesWritable(key.getBytes()), putRow);  
  48.         }  
  49.     }  
  50.       
  51.     public static void main(String[] args) throws Exception{  
  52.         Configuration conf = HBaseConfiguration.create();  
  53.         conf.set("hbase.zookeeper.quorum.", "localhost");  //千万别忘记配置  
  54.   
  55.         Job job = new Job(conf, "count");  
  56.          
  57.         job.setJarByClass(HbaseInsertData.class);  
  58.         job.setMapperClass(HbaseMapper.class);  
  59.          
  60.         job.setMapOutputKeyClass(Text.class);  
  61.         job.setMapOutputValueClass(Text.class);  
  62.          
  63.         Path in = new Path("hdfs://localhost:9000/input");  
  64.         FileInputFormat.addInputPath(job, in);  
  65.          
  66.         TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);  
  67.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  68.     }  
  69. }  
复制代码
解析的classRecordParser
  1. import org.apache.hadoop.io.Text;  
  2.   
  3. public class RecordParser {  
  4.       
  5.     private String id;  
  6.     private String phone;  
  7.     private int bloodPressure;  
  8.       
  9.     public void parse(String record) {  
  10.         String[] logs = record.split(",");  
  11.         id = logs[1];  
  12.         phone = logs[3];  
  13.         bloodPressure = Integer.parseInt(logs[4]);  
  14.     }  
  15.       
  16.     public void parse(Text record) {  
  17.         this.parse(record.toString());  
  18.     }  
  19.   
  20.     public String getId() {  
  21.         return id;  
  22.     }  
  23.   
  24.     public String getPhone() {  
  25.         return phone;  
  26.     }  
  27.   
  28.     public int getBloodPressure() {  
  29.         return bloodPressure;  
  30.     }  
  31. }  
复制代码
回复

使用道具 举报

howtodown 发表于 2014-3-19 20:29:30
在使用bulkload进行手动Hbase插入的过程:

我们使用的是hadoop-1.0.3,相应的Hbase版本为hbase-0.94.3
(其他的版本可能遇到的问题会不一样,这里大家参考就好)

1.将3个包:guava-11.0.2.jar、zookeeper-3.4.3.jar、protobuf-java-2.4.0a.jar放到$HADOOP_HOME/lib下

2.Hadoop配置:
去掉hadoop-env.sh里HADOOP_CLASSPATH前的注释,并在其前一行添加HBASE_HOME,如下,
export HBASE_HOME = XXX
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-X.X.X.jar:$HBASE_HOME/hbase-X.X.X-test.jar:$HBASE_HOME/conf:${HBASE_HOME}/lib/zookeeper-X.X.X.jar:${HBASE_HOME}/lib/guava-11.0.2.jar

3.将$HBASE_HOME/conf下的hbase-site.xml拷到$HADOOP_HOME/conf目录下,将$HBASE_HOME/下的hbase-X.X.X.jar拷到$HADOOP_HOME/lib目录下,并启动/重启hadoop和hbase

4.将文档上传到hdfs上(我们这里将Hfile文件上传到hdfs的/hongjian目录下)
hadoop fs -put ../zm_GraduateStudent-S-PO /hongjian

5.在HBase新建将要导入的表(只需要表名和列族名)
create 'zm_GraduateStudent-S-PO','d','f'(这里我们新建表"zm_GraduateStudent-S-PO",rowkey是默认的,有两个列族分别为"d"和"f")
后来发现,其实创建表名,bulkload的过程也会自动在hbase中自动创建相应的表。

6.上传数据
$HADOOP_HOME/bin/hadoop jar $HBASE_HOME/hbase-0.94.3.jar  completebulkload hdfs://master:54310/hongjian/zm_GraduateStudent-S-PO zm_GraduateStudent-S-PO


整个过程遇到很多问题,具体的解决办法,列举如下:

1、如果报错Caused by: java.lang.IllegalStateException: The value of the hbase.metrics.showTableName conf option has not been specified in SchemaMetrics的问题的话,这是hbase-0.94.3的bug,需要更新。
1.jpg


https://issues.apache.org/jira/browse/HBASE-4802这里下载patch文件,
在src的父目录下,执行patch -p0 < *3*
*3*指的是patch的目录地址
然后用maven进行重新编译:mvn package -Dmaven.test.skip.exec=true
如果你没有安装maven的话,在http://maven.apache.org/download.cgi下载一个maven,然后执行
mvn package -Dmaven.test.skip.exec=true
此过程需要联网,下载相应的依赖。

2、bulkload这个命令只能执行一遍,执行之后,hdfs里面的数据会被移动插入到hbase里面。其实bulkload的过程就是一个移动数据的过程。
如果重复bulkload,会报错如下:
2.jpg
3.jpg


$HADOOP_HOME/bin/hadoop jar $HBASE_HOME/hbase-0.94.3.jar  completebulkload hdfs://master:54310/hongjian/*1* *2*
*1*指的是你的Hfile文件目录
*2*指的是你在Hbase中需要插入的表名

回复

使用道具 举报

sstutu 发表于 2014-3-19 20:30:15
1.需要将那三个包放到$HADOOP_HOME/lib下?
将3个包:guava-11.0.2.jar、zookeeper-3.4.3.jar、protobuf-java-2.4.0a.jar放到$HADOOP_HOME/lib下

2.hadoop如何配置?
去掉hadoop-env.sh里HADOOP_CLASSPATH前的注释,并在其前一行添加HBASE_HOME,如下,
export HBASE_HOME = XXX
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-X.X.X.jar:$HBASE_HOME/hbase-X.X.X-test.jar:$HBASE_HOME/conf:${HBASE_HOME}/lib/zookeeper-X.X.X.jar:${HBASE_HOME}/lib/guava-11.0.2.jar

3.如何通过BulkLoad上传数据
$HADOOP_HOME/bin/hadoop jar $HBASE_HOME/hbase-0.94.3.jar  completebulkload hdfs://master:54310/hongjian/zm_GraduateStudent-S-PO zm_GraduateStudent-S-PO

回复

使用道具 举报

desehawk 发表于 2014-3-19 20:36:55
本帖最后由 pig2 于 2014-3-19 20:41 编辑

bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用,参考http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html

hbase提供了现成的程序将hdfs上的文件导入hbase,即bulk-load方式。
它包括两个步骤(也可以一次完成):
1 将文件包装成hfile,
hadoop jar /path/to/hbase.jar importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
  1.         hadoop dfs -cat test/1
  2.         1       2
  3.         3       4
  4.         5       6
  5.         7       8
复制代码
执行
  1.         hadoop jar ~/hbase/hbase-0.90.2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,f1 t8 test
复制代码
将会启动mapreduce程序在hdfs上生成t8这张表,它的rowkey分别为1 3 5 7,对应的value为2 4 6 8
注意,源文件默认以"\t"为分割符,如果需要换成其它分割符,在执行时加上-Dimporttsv.separator=",",则变成了以","分割

2 在上一步中,如果设置了输出目录,如
  1.    hadoop jar ~/hbase/hbase-0.90.2.jar importtsv -Dimporttsv.bulk.output=tmp -Dimporttsv.columns=HBASE_ROW_KEY,f1 t8 test
复制代码
那么t8表还暂时不会生成,只是将hfile输出到tmp文件夹下,我们可以查看tmp:
  1.         hadoop dfs -du tmp
  2.         Found 3 items
  3.         0           hdfs://namenode:9000/user/test/tmp/_SUCCESS
  4.         65254       hdfs://namenode:9000/user/test/tmp/_logs
  5.         462         hdfs://namenode:9000/user/test/tmp/f1
复制代码
然后执行hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable将这个输出目录中的hfile转移到对应的region中,这一步因为只是mv,所以相当快。如:
hadoop jar ~/hbase/hbase-0.90.2.jar completebulkload tmp t8
然后
  1. hadoop dfs -du /hbase/t8/c408963c084d328490cc2f809ade9428
  2.         Found 4 items
  3.         124         hdfs://namenode:9000/hbase/t8/c408963c084d328490cc2f809ade9428/.oldlogs
  4.         692         hdfs://namenode:9000/hbase/t8/c408963c084d328490cc2f809ade9428/.regioninfo
  5.         0           hdfs://namenode:9000/hbase/t8/c408963c084d328490cc2f809ade9428/.tmp
  6.         462         hdfs://namenode:9000/hbase/t8/c408963c084d328490cc2f809ade9428/f1
复制代码
此时己经生成了表t8
注意,如果数据特别大,而表中原来就有region,那么会执行切分工作,查找数据对应的region并装载

程序使用中注意:

1 因为是执行hadoop程序,不会自动查找hbase的config路径,也就找不到hbase的环境变量。因此需要将hbase-site.xml加入到hadoop-conf变量中
2 还需要将hbase/lib中的jar包放入classpath中
3 执行以上的步骤2时需要将zookeeper的配置写入core-site.xml中,因为在那一步时甚至不会读取hbase-site.xml,否则会连不上zookeeper


回复

使用道具 举报

471505881qq 发表于 2014-4-1 11:35:26

谢谢楼主的分享!
回复

使用道具 举报

471505881qq 发表于 2014-4-1 15:33:04

要留着备用。
回复

使用道具 举报

stark_summer 发表于 2015-2-13 18:49:27
回复

使用道具 举报

YLV 发表于 2015-3-11 14:54:03
先留着,谢谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条