本帖最后由 howtodown 于 2014-4-20 14:04 编辑
问题导读:
1.如何直接使用HTable进行导入?
2.如何从HDFS文件导入HBase,继承自Mapper?
3.如何读取HBase表写入HBase表中字段?
4.如何让MR和HTable结合?
Version :hadoop1.2.1; hbaes0.94.16;
HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:
1. 直接使用HTable进行导入,代码如下:
- package hbase.curd;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Random;
-
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.util.Bytes;
-
- public class PutExample {
-
- /**
- * @param args
- * @throws IOException
- */
- private HTable table = HTableUtil.getHTable("testtable");
- public static void main(String[] args) throws IOException {
- // TODO Auto-generated method stub
- PutExample pe = new PutExample();
- pe.putRows();
-
- }
-
- public void putRows(){
- List<Put> puts = new ArrayList<Put>();
- for(int i=0;i<10;i++){
- Put put = new Put(Bytes.toBytes("row_"+i));
- Random random = new Random();
-
- if(random.nextBoolean()){
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i));
- }
- if(random.nextBoolean()){
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i));
- }
- if(random.nextBoolean()){
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i));
- }
- if(random.nextBoolean()){
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i));
- }
- if(random.nextBoolean()){
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i));
- }
- puts.add(put);
- }
- try{
- table.put(puts);
- table.close();
- }catch(Exception e){
- e.printStackTrace();
- return ;
- }
- System.out.println("done put rows");
- }
-
- }
复制代码
其中HTableUtil如下:
- package hbase.curd;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.util.Bytes;
-
- public class HTableUtil {
- private static HTable table;
- private static Configuration conf;
-
- static{
- conf =HBaseConfiguration.create();
- conf.set("mapred.job.tracker", "hbase:9001");
- conf.set("fs.default.name", "hbase:9000");
- conf.set("hbase.zookeeper.quorum", "hbase");
- try {
- table = new HTable(conf,"testtable");
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- public static Configuration getConf(){
- return conf;
- }
- public static HTable getHTable(String tablename){
- if(table==null){
- try {
- table= new HTable(conf,tablename);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- return table;
- }
-
- public static byte[] gB(String name){
- return Bytes.toBytes(name);
- }
- }
复制代码
这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。
2.1 从HDFS文件导入HBase,继承自Mapper,代码如下:
- package hbase.mr;
-
- import java.io.IOException;
-
- import hbase.curd.HTableUtil;
-
- import org.apache.commons.cli.CommandLine;
- import org.apache.commons.cli.CommandLineParser;
- import org.apache.commons.cli.HelpFormatter;
- import org.apache.commons.cli.Option;
- import org.apache.commons.cli.Options;
- import org.apache.commons.cli.PosixParser;
- import org.apache.commons.codec.digest.DigestUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
-
- public class ImportFromFile {
-
- /**
- * 从文件导入到HBase
- * @param args
- */
- public static final String NAME="ImportFromFile";
- public enum Counters{LINES}
-
- static class ImportMapper extends Mapper<LongWritable,Text,
- ImmutableBytesWritable,Writable>{
- private byte[] family =null;
- private byte[] qualifier = null;
- @Override
- protected void setup(Context cxt){
- String column = cxt.getConfiguration().get("conf.column");
- byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
- family = colkey[0];
- if(colkey.length>1){
- qualifier = colkey[1];
- }
- }
- @Override
- public void map(LongWritable offset,Text line,Context cxt){
- try{
- String lineString= line.toString();
- byte[] rowkey= DigestUtils.md5(lineString);
- Put put = new Put(rowkey);
- put.add(family,qualifier,Bytes.toBytes(lineString));
- cxt.write(new ImmutableBytesWritable(rowkey), put);
- cxt.getCounter(Counters.LINES).increment(1);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
- private static CommandLine parseArgs(String[] args){
- Options options = new Options();
- Option o = new Option("t" ,"table",true,"table to import into (must exist)");
- o.setArgName("table-name");
- o.setRequired(true);
- options.addOption(o);
-
- o= new Option("c","column",true,"column to store row data into");
- o.setArgName("family:qualifier");
- o.setRequired(true);
- options.addOption(o);
-
- o = new Option("i", "input", true,
- "the directory or file to read from");
- o.setArgName("path-in-HDFS");
- o.setRequired(true);
- options.addOption(o);
- options.addOption("d", "debug", false, "switch on DEBUG log level");
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = null;
- try {
- cmd = parser.parse(options, args);
- } catch (Exception e) {
- System.err.println("ERROR: " + e.getMessage() + "\n");
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(NAME + " ", options, true);
- System.exit(-1);
- }
- return cmd;
- }
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-
- Configuration conf = HTableUtil.getConf();
- String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();
- CommandLine cmd = parseArgs(otherArgs);
- String table = cmd.getOptionValue("t");
- String input = cmd.getOptionValue("i");
- String column = cmd.getOptionValue("c");
- conf.set("conf.column", column);
- Job job = new Job(conf, "Import from file " + input + " into table " + table);
- job.setJarByClass(ImportFromFile.class);
- job.setMapperClass(ImportMapper.class);
- job.setOutputFormatClass(TableOutputFormat.class);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Writable.class);
- job.setNumReduceTasks(0);
- FileInputFormat.addInputPath(job, new Path(input));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-
- private static String[] initialArg(){
- String []args = new String[6];
- args[0]="-c";
- args[1]="fam:data";
- args[2]="-i";
- args[3]="/user/hadoop/input/picdata";
- args[4]="-t";
- args[5]="testtable";
- return args;
- }
- }
复制代码
2.2 读取HBase表写入HBase表中字段,代码如下:
- package hbase.mr;
-
- import hadoop.util.HadoopUtils;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.mapreduce.TableMapper;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class ParseDriver {
-
- /**
- * 把hbase表中数据拷贝到其他表(或本表)相同字段
- * @param args
- */
- enum Counters{
- VALID, ROWS, COLS, ERROR
- }
- private static Logger log = LoggerFactory.getLogger(ParseDriver.class);
- static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
- private byte[] columnFamily =null ;
- private byte[] columnQualifier =null;
- @Override
- protected void setup(Context cxt){
- columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
- columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
- }
- @Override
- public void map(ImmutableBytesWritable row,Result columns,Context cxt){
- cxt.getCounter(Counters.ROWS).increment(1);
- String value =null;
- try{
- Put put = new Put(row.get());
- for(KeyValue kv : columns.list()){
- cxt.getCounter(Counters.COLS).increment(1);
- value= Bytes.toStringBinary(kv.getValue());
- if(equals(columnQualifier,kv.getQualifier())){ // 过滤column
- put.add(columnFamily,columnQualifier,kv.getValue());
- cxt.write(row, put);
- cxt.getCounter(Counters.VALID).increment(1);
- }
- }
- }catch(Exception e){
- log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
- ",Value:"+value);
- cxt.getCounter(Counters.ERROR).increment(1);
- }
- }
- private boolean equals(byte[] a,byte[] b){
- String aStr= Bytes.toString(a);
- String bStr= Bytes.toString(b);
- if(aStr.equals(bStr)){
- return true;
- }
- return false;
- }
- }
-
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- byte[] columnFamily = Bytes.toBytes("fam");
- byte[] columnQualifier = Bytes.toBytes("data");
- Scan scan = new Scan ();
- scan.addColumn(columnFamily, columnQualifier);
- HadoopUtils.initialConf("hbase");
- Configuration conf = HadoopUtils.getConf();
- conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
- conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
-
- String input ="testtable" ;//
- String output="testtable1"; //
-
- Job job = new Job(conf,"Parse data in "+input+",write to"+output);
- job.setJarByClass(ParseDriver.class);
- TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
- ImmutableBytesWritable.class, Put.class,job);
- TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);
-
- System.exit(job.waitForCompletion(true)?0:1);
-
- }
-
- }
复制代码
其中HadoopUtils代码如下:
- package hadoop.util;
-
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.List;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.util.LineReader;
-
- public class HadoopUtils {
-
- private static Configuration conf;
- public static void initialConf(){
- conf = new Configuration();
- conf.set("mapred.job.tracker", "hbase:9001");
- conf.set("fs.default.name", "hbase:9000");
- conf.set("hbase.zookeeper.quorum", "hbase");
- }
- public static void initialConf(String host){
- conf = new Configuration();
- conf.set("mapred.job.tracker", host+":9001");
- conf.set("fs.default.name", host+":9000");
- conf.set("hbase.zookeeper.quorum", host);
- }
- public static Configuration getConf(){
- if(conf==null){
- initialConf();
- }
- return conf;
- }
-
- public static List<String> readFromHDFS(String fileName) throws IOException {
- Configuration conf = getConf();
- FileSystem fs = FileSystem.get(URI.create(fileName), conf);
- FSDataInputStream hdfsInStream = fs.open(new Path(fileName));
- // 按行读取(新版本的方法)
- LineReader inLine = new LineReader(hdfsInStream, conf);
- Text txtLine = new Text();
-
- int iResult = inLine.readLine(txtLine); //读取第一行
- List<String> list = new ArrayList<String>();
- while (iResult > 0 ) {
- list.add(txtLine.toString());
- iResult = inLine.readLine(txtLine);
- }
-
- hdfsInStream.close();
- fs.close();
- return list;
- }
- }
复制代码
2.3 MR和HTable结合,代码如下:
- package hbase.mr;
-
- import hadoop.util.HadoopUtils;
- import hbase.mr.AnalyzeDriver.Counters;
-
- import java.io.IOException;
- import java.util.Date;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.mapreduce.TableMapper;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class ParseSinglePutDriver {
-
- /**
- * 使用HTable进行写入
- * 把infoTable 表中的 qualifier字段复制到qualifier1字段
- * 单个Put
- * @param args
- */
- private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
- static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
- private HTable infoTable =null ;
- private byte[] columnFamily =null ;
- private byte[] columnQualifier =null;
- private byte[] columnQualifier1 =null;
- @Override
- protected void setup(Context cxt){
- log.info("ParseSinglePutDriver setup,current time: "+new Date());
- try {
- infoTable = new HTable(cxt.getConfiguration(),
- cxt.getConfiguration().get("conf.infotable"));
- infoTable.setAutoFlush(false);
- } catch (IOException e) {
- log.error("Initial infoTable error:\n"+e.getMessage());
- }
- columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
- columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
- columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
- }
- @Override
- protected void cleanup(Context cxt){
- try {
- infoTable.flushCommits();
- log.info("ParseSinglePutDriver cleanup ,current time :"+new Date());
- } catch (IOException e) {
- log.error("infoTable flush commits error:\n"+e.getMessage());
- }
- }
- @Override
- public void map(ImmutableBytesWritable row,Result columns,Context cxt){
- cxt.getCounter(Counters.ROWS).increment(1);
- String value =null ;
- try{
- Put put = new Put(row.get());
- for(KeyValue kv : columns.list()){
- cxt.getCounter(Counters.COLS).increment(1);
- value= Bytes.toStringBinary(kv.getValue());
- if(equals(columnQualifier,kv.getQualifier())){ // 过滤column
- put.add(columnFamily,columnQualifier1,kv.getValue());
- infoTable.put(put);
- }
- }
- }catch(Exception e){
- log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
- ",Value:"+value);
- cxt.getCounter(Counters.ERROR).increment(1);
- }
- }
- private boolean equals(byte[] a,byte[] b){
- String aStr= Bytes.toString(a);
- String bStr= Bytes.toString(b);
- if(aStr.equals(bStr)){
- return true;
- }
- return false;
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- String input ="testtable";
- byte[] columnFamily = Bytes.toBytes("fam");
- byte[] columnQualifier = Bytes.toBytes("data");
- byte[] columnQualifier1 = Bytes.toBytes("data1");
- Scan scan = new Scan ();
- scan.addColumn(columnFamily, columnQualifier);
- HadoopUtils.initialConf("hbase");
- Configuration conf = HadoopUtils.getConf();
- conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
- conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
- conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
- conf.set("conf.infotable", input);
-
- Job job = new Job(conf,"Parse data in "+input+",into tables");
- job.setJarByClass(ParseSinglePutDriver.class);
- TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
- ImmutableBytesWritable.class, Put.class,job);
- job.setOutputFormatClass(NullOutputFormat.class);
- job.setNumReduceTasks(0);
- System.exit(job.waitForCompletion(true)?0:1);
- }
-
- }
复制代码
2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。
- package hbase.mr;
-
- import hadoop.util.HadoopUtils;
- import hbase.mr.AnalyzeDriver.Counters;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.mapreduce.TableMapper;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class ParseListPutDriver {
-
- /**
- * 使用HTable进行写入
- * List <Put> 进行测试,查看效率
- * 把infoTable 表中的 qualifier字段复制到qualifier1字段
- * @param args
- */
- private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
- static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
- private HTable infoTable =null ;
- private byte[] columnFamily =null ;
- private byte[] columnQualifier =null;
- private byte[] columnQualifier1 =null;
- private List<Put> list = new ArrayList<Put>();
- @Override
- protected void setup(Context cxt){
- log.info("ParseListPutDriver setup,current time: "+new Date());
- try {
- infoTable = new HTable(cxt.getConfiguration(),
- cxt.getConfiguration().get("conf.infotable"));
- infoTable.setAutoFlush(false);
- } catch (IOException e) {
- log.error("Initial infoTable error:\n"+e.getMessage());
- }
- columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
- columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
- columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
- }
- @Override
- protected void cleanup(Context cxt){
- try {
- infoTable.put(list);
- infoTable.flushCommits();
- log.info("ParseListPutDriver cleanup ,current time :"+new Date());
- } catch (IOException e) {
- log.error("infoTable flush commits error:\n"+e.getMessage());
- }
- }
- @Override
- public void map(ImmutableBytesWritable row,Result columns,Context cxt){
- cxt.getCounter(Counters.ROWS).increment(1);
- String value =null ;
- try{
- Put put = new Put(row.get());
- for(KeyValue kv : columns.list()){
- cxt.getCounter(Counters.COLS).increment(1);
- value= Bytes.toStringBinary(kv.getValue());
- if(equals(columnQualifier,kv.getQualifier())){ // 过滤column
- put.add(columnFamily,columnQualifier1,kv.getValue());
- list.add(put);
- }
- }
- }catch(Exception e){
- log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
- ",Value:"+value);
- cxt.getCounter(Counters.ERROR).increment(1);
- }
- }
- private boolean equals(byte[] a,byte[] b){
- String aStr= Bytes.toString(a);
- String bStr= Bytes.toString(b);
- if(aStr.equals(bStr)){
- return true;
- }
- return false;
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- String input ="testtable";
- byte[] columnFamily = Bytes.toBytes("fam");
- byte[] columnQualifier = Bytes.toBytes("data");
- byte[] columnQualifier1 = Bytes.toBytes("data2");
- Scan scan = new Scan ();
- scan.addColumn(columnFamily, columnQualifier);
- HadoopUtils.initialConf("hbase");
- Configuration conf = HadoopUtils.getConf();
- conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
- conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
- conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
- conf.set("conf.infotable", input);
-
- Job job = new Job(conf,"Parse data in "+input+",into tables");
- job.setJarByClass(ParseListPutDriver.class);
- TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
- ImmutableBytesWritable.class, Put.class,job);
- job.setOutputFormatClass(NullOutputFormat.class);
- job.setNumReduceTasks(0);
- System.exit(job.waitForCompletion(true)?0:1);
- }
-
- }
复制代码
数据记录条数为:26632,可以看到下面图片中的时间记录对比:
由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下(hbase1.0.jar为编译打包的MR程序):
blog地址:http://blog.csdn.net/fansy1990
|