分享

Hbase几种数据入库(load)方式比较

pig2 2014-8-2 12:05:21 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 27944
本帖最后由 pig2 于 2014-8-2 12:05 编辑
问题导读
1.你认为hbase入库有几种方式?
2.能否想到其中一种的实现方式?
3.根据想的内容,对比下面,说出自己的想法?




1. 预先生成HFile入库
这个地址有详细的说明http://www.aboutyun.com/thread-8603-1-1.html

2. 通过MapReduce入库
/* MapReduce 读取hdfs上的文件,以HTable.put(put)的方式在map中完成数据写入,无reduce过程*/

import java.io.IOException;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class HBaseImport extends Configured implements Tool{

static final Log LOG = LogFactory.getLog(HBaseImport.class);

public static final String JOBNAME = "MRImport ";

public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{

Configuration configuration = null;

HTable xTable = null;

private boolean wal = true;

static long count = 0;

@Override

protected void cleanup(Context context) throws IOException,

InterruptedException {

// TODO Auto-generated method stub

super.cleanup(context);

xTable.flushCommits();

xTable.close();

}

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String all[] = value.toString().split("/t");

If(all.length==2){

put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1]));

}

if (!wal) {

put.setWriteToWAL(false);

}

xTable.put(put);

if ((++count % 100)==0) {

context.setStatus(count +" DOCUMENTS done!");

context.progress();

System.out.println(count +" DOCUMENTS done!");

}

}

@Override

protected void setup(Context context) throws IOException,

InterruptedException {

// TODO Auto-generated method stub

super.setup(context);

configuration = context.getConfiguration();

xTable = new HTable(configuration,"testKang");

xTable.setAutoFlush(false);

xTable.setWriteBufferSize(12*1024*1024);

wal = true;

}

}

@Override

public int run(String[] args) throws Exception {

String input = args[0];

Configuration conf = HBaseConfiguration.create(getConf());

conf.set("hbase.master", "m0:60000");

Job job = new Job(conf,JOBNAME);

job.setJarByClass(HBaseImport.class);

job.setMapperClass(Map.class);

job.setNumReduceTasks(0);

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.setInputPaths(job, input);

job.setOutputFormatClass(NullOutputFormat.class);

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws IOException {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

int res = 1;

try {

res = ToolRunner.run(conf, new HBaseImport (), otherArgs);

} catch (Exception e) {

e.printStackTrace();

}

System.exit(res);

}

}

3. 通过Java程序入库
/* Java多线程读取本地磁盘上的文件,以HTable.put(put)的方式完成数据写入*/

import java.io.BufferedReader;

import java.io.File;

import java.io.FileReader;

import java.io.IOException;

import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

public class InsertContactJava {

public static long startTime;

public static long rowkey = 0; //起始rowkey

public static final int lineCount = 100000; //每次提交时录入的行数

public static String tableName = "usercontact_kang"; //录入目的表名

public static int countLie = 8; //表的列数

public static void main(String[] args) throws IOException {

startTime = System.currentTimeMillis() / 1000;

System.out.println("start time = " + startTime);

Thread t1 = new Thread() {

@Override

public void run() {

try {

insert_one("/run/jar/123");

//loadByLieWithVector("/run/jar/123");

//loadByLieWithArrayList("/run/jar/123");

} catch (IOException e) {

e.printStackTrace();

}

}

};

t1.start();

}

public static void insert_one(String path) throws IOException {

Configuration conf = HBaseConfiguration.create();

HTable table = new HTable(conf, tableName);

File f = new File(path);

ArrayList<Put> list = new ArrayList<Put>();

BufferedReader br = new BufferedReader(new FileReader(f));

String tmp = br.readLine();

int count = 0;

while (tmp != null) {

if (list.size() > 10000) {

table.put(list);

table.flushCommits();

list.clear();

} else {

String arr_value[] = tmp.toString().split("/t", 10);

String first[] = arr_value[0].split("~", 5);

String second[] = arr_value[1].split("~", 5);

String rowname = getIncreasRowKey();

String firstaccount = first[0];

String firstprotocolid = first[1];

String firstdomain = first[2];

String inserttime = Utils.getToday("yyyyMMdd");

String secondaccount = second[0];

String secondprotocolid = second[1];

String seconddomain = second[2];

String timescount = Integer.valueOf(arr_value[2]).toString();

Put p = new Put(rowname.getBytes());

p.add(("ucvalue").getBytes(), "FIRSTACCOUNT".getBytes(),

firstaccount.getBytes());

p.add(("ucvalue").getBytes(), "FIRSTDOMAIN".getBytes(),

firstdomain.getBytes());

p.add(("ucvalue").getBytes(), "FIRSTPROTOCOLID".getBytes(),

firstprotocolid.getBytes());

p.add(("ucvalue").getBytes(), "INSERTTIME".getBytes(),

inserttime.getBytes());

p.add(("ucvalue").getBytes(), "SECONDACCOUNT".getBytes(),

secondaccount.getBytes());

p.add(("ucvalue").getBytes(), "SECONDDOMAIN".getBytes(),

seconddomain.getBytes());

p.add(("ucvalue").getBytes(), "SECONDPROTOCOLID".getBytes(),

secondprotocolid.getBytes());

p.add(("ucvalue").getBytes(), "TIMESCOUNT".getBytes(),

timescount.getBytes());

list.add(p);

}

tmp = br.readLine();

count++;

}

if (list.size() > 0) {

table.put(list);

table.flushCommits();

}

table.close();

System.out.println("total = " + count);

long endTime = System.currentTimeMillis() / 1000;

long costTime = endTime - startTime;

System.out.println("end time = " + endTime);

System.out.println(path + ": cost time = " + costTime);

}

4. 入库方式比较
&#216; 生成HFile方式:

生成HFile的过程比较慢,生成HFile后写入hbase非常快,基本上就是hdfs上的mv过程.对于生成HFile方式入库的时候有一个改进的方案,就是先对数据排序,然后生成HFile。

HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作,最慢的时候这种操作会耗时1小时。


&#216; MapReduce方式:

开始会很快,但是由于mr和hbase竞争资源,到一个特定的时间点会变很慢

&#216; Java程序方式:

多客户端,多线程同时入库,目前看来是最好的方式,client和regionserver分开,硬盘读写分开,瓶颈只在网络和内存上。咨询了一些牛人,大多推荐这种方式,并且一定要多客户端,多线程。关于入库效率的调优,在我另一篇博客中有说明。


相关帖子推荐:
HBase 5种写入数据方式




已有(8)人评论

跳转到指定楼层
随风而逝 发表于 2016-1-21 13:07:28
您好,我遇到一个问题是我要入库二十亿的消息,因为hbase的写压力比较大,用bulkload经常导入失败,所以我就用客户端来做,但是我预设了100个分区后,在客户端设置不自动提交,然后buffersize是12M,10个线程跑入库,结果报错说是region分裂导致region下线,数据丢失了一万多,您碰到过这种问题吗?最后的region分裂成168个了
回复

使用道具 举报

mr.andy 发表于 2014-8-3 02:49:00
学习中,谢谢楼主
回复

使用道具 举报

刚果 发表于 2015-9-11 16:30:04
回复

使用道具 举报

llp 发表于 2015-11-6 14:41:23
为什么要先对数据排序,然后生成HFile   请问这样有什么好处?
回复

使用道具 举报

马一博 发表于 2016-7-22 10:52:43
回复赚积分。。。
回复

使用道具 举报

如果爱betty 发表于 2018-11-21 16:03:18
为什么要先对数据排序,然后生成HFile,同样有这个疑问?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条