立即注册 登录
About云-梭伦科技 返回首页

baber的个人空间 https://aboutyun.com/?29411 [收藏] [复制] [分享] [RSS]

日志

利用MapReduce为Hbase建二级索引

热度 2已有 1093 次阅读2016-3-21 15:06 |个人分类:大数据相关| Hbase二级索引

package builder.index;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class IndexBuilder3 extends Configured{
public static class MapperIndex extends TableMapper<ImmutableBytesWritable,Put>{
private String tableName;
private String columnFamily;
private String[] qualifiers;
//列为key,索引表表名为value
private Map<byte[],ImmutableBytesWritable> indexs =
new HashMap<byte[],ImmutableBytesWritable>();
@SuppressWarnings("deprecation")
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
//获取所有需要建索引的列
Set<byte[]> keys = indexs.keySet();
for (byte[] k:keys){
// 该列对应的索引表名字
ImmutableBytesWritable tname = indexs.get(k);
// 该列的值
byte[] val =value.getValue(Bytes.toBytes(columnFamily), k);
if (val != null){
// val为hbase表的key
Put put = new Put(val);
// 原来的key作为hbase表的value
put.add(Bytes.toBytes(columnFamily),
Bytes.toBytes("rowkey"),
key.get());
context.write(tname, put);
}
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
tableName = conf.get("tableName");
columnFamily = conf.get("columnFamily");
qualifiers = conf.getStrings("qualifiers");
for (String q:qualifiers){
indexs.put(Bytes.toBytes(q), 
new ImmutableBytesWritable(Bytes.toBytes(tableName+"_"+q)));
}
}
}

public static void createTable(String tableName, 
String columnFamily, 
String[] qualifiers, 
Configuration conf) throws IOException{
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
for (String q:qualifiers) {
if (q != null && q.trim() != ""){
try{
//表名_列  为索引表表名
TableName tname = TableName.valueOf(tableName+"_"+q);
if (admin.tableExists(tname)) {
admin.disableTable(tname);
admin.deleteTable(tname);
HTableDescriptor tableDesc =  new HTableDescriptor(tname);
tableDesc.addFamily(new HColumnDescriptor(columnFamily));
tableDesc.setDurability(Durability.SYNC_WAL );
admin.createTable(tableDesc);
System.out.println("表:"+tname.toString()+"创建成功!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
admin.close();
connection.close();

}
public static void main(String args[]) throws Exception{
Configuration conf = HBaseConfiguration.create();
//ZK节点列表,具体参考自己的集群配置
conf.set("hbase.zookeeper.quorum", "192.168.201.2");
String[] theArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
//最少要有表名,列族, 列3个参数。列可以有多个。
if (theArgs.length < 3) {
System.exit(-1);
}
conf.set("tableName", theArgs[0]);
conf.set("columnFamily", theArgs[1]);
String[] qualifiers = new String[theArgs.length-2];
//把列加到数组里
for (int i=2; i<theArgs.length; i++) {
qualifiers[i-2] = theArgs[i];
}
conf.setStrings("qualifiers", qualifiers);
//建表
createTable(theArgs[0], theArgs[1] ,qualifiers, conf);
Job job = Job.getInstance(conf,"Builder_Index_for_"+theArgs[0]);
job.setJarByClass(IndexBuilder3.class);
//无需reduce阶段
job.setNumReduceTasks(0);
//输入格式为表
job.setInputFormatClass(TableInputFormat.class);
//多表输出格式
job.setOutputFormatClass(MultiTableOutputFormat.class);
Scan scan = new Scan();
//一次存取行数
        scan.setCaching(500);
        //无需缓存
        scan.setCacheBlocks(false);
        
TableMapReduceUtil.initTableMapperJob(theArgs[0],
scan,
MapperIndex.class,
ImmutableBytesWritable.class,
Put.class,
job);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}


把程序打成jar包,放到hbase的lib目录下
执行命令:hbase builder.index.IndexBuilder3 表 列族 列......

路过

雷人
1

握手

鲜花

鸡蛋

刚表态过的朋友 (1 人)

发表评论 评论 (1 个评论)

回复 ljlinux2012 2017-3-1 18:35
顶顶顶顶顶顶顶顶顶顶

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条