package builder.index;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
public class IndexBuilder3 extends Configured{
public static class MapperIndex extends TableMapper<ImmutableBytesWritable,Put>{
private String tableName;
private String columnFamily;
private String[] qualifiers;
//列为key,索引表表名为value
private Map<byte[],ImmutableBytesWritable> indexs =
new HashMap<byte[],ImmutableBytesWritable>();
@SuppressWarnings("deprecation")
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
//获取所有需要建索引的列
Set<byte[]> keys = indexs.keySet();
for (byte[] k:keys){
// 该列对应的索引表名字
ImmutableBytesWritable tname = indexs.get(k);
// 该列的值
byte[] val =value.getValue(Bytes.toBytes(columnFamily), k);
if (val != null){
// val为hbase表的key
Put put = new Put(val);
// 原来的key作为hbase表的value
put.add(Bytes.toBytes(columnFamily),
Bytes.toBytes("rowkey"),
key.get());
context.write(tname, put);
}
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
tableName = conf.get("tableName");
columnFamily = conf.get("columnFamily");
qualifiers = conf.getStrings("qualifiers");
for (String q:qualifiers){
indexs.put(Bytes.toBytes(q),
new ImmutableBytesWritable(Bytes.toBytes(tableName+"_"+q)));
}
}
}
public static void createTable(String tableName,
String columnFamily,
String[] qualifiers,
Configuration conf) throws IOException{
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
for (String q:qualifiers) {
if (q != null && q.trim() != ""){
try{
//表名_列 为索引表表名
TableName tname = TableName.valueOf(tableName+"_"+q);
if (admin.tableExists(tname)) {
admin.disableTable(tname);
admin.deleteTable(tname);
HTableDescriptor tableDesc = new HTableDescriptor(tname);
tableDesc.addFamily(new HColumnDescriptor(columnFamily));
tableDesc.setDurability(Durability.SYNC_WAL );
admin.createTable(tableDesc);
System.out.println("表:"+tname.toString()+"创建成功!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
admin.close();
connection.close();
}
public static void main(String args[]) throws Exception{
Configuration conf = HBaseConfiguration.create();
//ZK节点列表,具体参考自己的集群配置
conf.set("hbase.zookeeper.quorum", "192.168.201.2");
String[] theArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
//最少要有表名,列族, 列3个参数。列可以有多个。
if (theArgs.length < 3) {
System.exit(-1);
}
conf.set("tableName", theArgs[0]);
conf.set("columnFamily", theArgs[1]);
String[] qualifiers = new String[theArgs.length-2];
//把列加到数组里
for (int i=2; i<theArgs.length; i++) {
qualifiers[i-2] = theArgs[i];
}
conf.setStrings("qualifiers", qualifiers);
//建表
createTable(theArgs[0], theArgs[1] ,qualifiers, conf);
Job job = Job.getInstance(conf,"Builder_Index_for_"+theArgs[0]);
job.setJarByClass(IndexBuilder3.class);
//无需reduce阶段
job.setNumReduceTasks(0);
//输入格式为表
job.setInputFormatClass(TableInputFormat.class);
//多表输出格式
job.setOutputFormatClass(MultiTableOutputFormat.class);
Scan scan = new Scan();
//一次存取行数
scan.setCaching(500);
//无需缓存
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(theArgs[0],
scan,
MapperIndex.class,
ImmutableBytesWritable.class,
Put.class,
job);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}
把程序打成jar包,放到hbase的lib目录下
执行命令:hbase builder.index.IndexBuilder3 表 列族 列......