本帖最后由 hyj 于 2015-4-22 22:00 编辑
下面是一些相关代码,希望对楼主有所帮助
API 操作:
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.MasterNotRunningException;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.ZooKeeperConnectionException;
- import org.apache.hadoop.hbase.client.Get;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.HTablePool;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
- import org.apache.hadoop.hbase.filter.Filter;
- import org.apache.hadoop.hbase.filter.FilterList;
- import org.apache.hadoop.hbase.filter.PrefixFilter;
- import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import com.kktest.hbase.HashChoreWoker;
- import com.kktest.hbase.HashRowKeyGenerator;
- import com.kktest.hbase.RowKeyGenerator;
- import com.kktest.hbase.BitUtils;
-
- /**
- * hbase 客户端
- *
- * @author kuang hj
- *
- */
- @SuppressWarnings("all")
- public class HBaseClient {
-
- private static Logger logger = LoggerFactory.getLogger(HBaseClient.class);
- private static Configuration config;
- static {
- config = HBaseConfiguration.create();
- config.set("hbase.zookeeper.quorum",
- "192.168.1.100:2181,192.168.1.101:2181,192.168.1.103:2181");
- }
-
- /**
- * 根据随机散列(hash)创建分区表
- *
- * @throws Exception
- * hash_split_table
- */
- public static void testHashAndCreateTable(String tableNameTmp,
- String columnFamily) throws Exception {<p> // 取随机散列 10 代表 10个分区
- HashChoreWoker worker = new HashChoreWoker(1000000, 10);
- byte[][] splitKeys = worker.calcSplitKeys();
-
- HBaseAdmin admin = new HBaseAdmin(config);
- TableName tableName = TableName.valueOf(tableNameTmp);
-
- if (admin.tableExists(tableName)) {
- try {
- admin.disableTable(tableName);
- } catch (Exception e) {
- }
- admin.deleteTable(tableName);
- }
-
- HTableDescriptor tableDesc = new HTableDescriptor(tableName);
- HColumnDescriptor columnDesc = new HColumnDescriptor(
- Bytes.toBytes(columnFamily));
- columnDesc.setMaxVersions(1);
- tableDesc.addFamily(columnDesc);
-
- admin.createTable(tableDesc, splitKeys);
-
- admin.close();
- }
-
- /**
- * @Title: queryData
- * @Description: 从HBase查询出数据
- * @author kuang hj
- * @param tableName
- * 表名
- * @param rowkey
- * rowkey
- * @return 返回用户信息的list
- * @throws Exception
- */
- @SuppressWarnings("all")
- public static ArrayList<String> queryData(String tableName, String rowkey)
- throws Exception {
- ArrayList<String> list = new ArrayList<String>();
- logger.info("开始时间");
- HTable table = new HTable(config, tableName);
-
- Get get = new Get(rowkey.getBytes()); // 根据主键查询
- Result r = table.get(get);
- logger.info("结束时间");
- KeyValue[] kv = r.raw();
- for (int i = 0; i < kv.length; i++) {
- // 循环每一列
- String key = kv[i].getKeyString();
-
- String value = kv[i].getValueArray().toString();
-
- // 将查询到的结果写入List中
- list.add(key + ":"+ value);
-
- }// end of 遍历每一列
-
- return list;
- }
-
- /**
- * 增加表数据
- *
- * @param tableName
- * @param rowkey
- */
- public static void insertData(String tableName, String rowkey) {
- HTable table = null;
- try {
- table = new HTable(config, tableName);
- // 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
- for (int i = 1; i < 100; i++) {
- byte[] result = getNumRowkey(rowkey,i);
- Put put = new Put(result);
- // 本行数据的第一列
- put.add(rowkey.getBytes(), "name".getBytes(),
- ("aaa" + i).getBytes());
- // 本行数据的第三列
- put.add(rowkey.getBytes(), "age".getBytes(),
- ("bbb" + i).getBytes());
- // 本行数据的第三列
- put.add(rowkey.getBytes(), "address".getBytes(),
- ("ccc" + i).getBytes());
-
- table.put(put);
- }
-
- } catch (Exception e1) {
- e1.printStackTrace();
- }
- }
-
- private static byte[] getNewRowkey(String rowkey) {
- byte[] result = null;
-
- RowKeyGenerator rkGen = new HashRowKeyGenerator();
- byte[] splitKeys = rkGen.nextId();
-
- byte[] rowkeytmp = rowkey.getBytes();
-
- result = new byte[splitKeys.length + rowkeytmp.length];
- System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
- System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
- rowkeytmp.length);
-
- return result;
- }
-
- public static void main(String[] args) {
- RowKeyGenerator rkGen = new HashRowKeyGenerator();
- byte[] splitKeys = rkGen.nextId();
- System.out.println(splitKeys);
- }
-
- private static byte[] getNumRowkey(String rowkey, int i) {
- byte[] result = null;
-
- RowKeyGenerator rkGen = new HashRowKeyGenerator();
- byte[] splitKeys = rkGen.nextId();
-
- byte[] rowkeytmp = rowkey.getBytes();
-
- byte[] intVal = BitUtils.getByteByInt(i);
- result = new byte[splitKeys.length + rowkeytmp.length + intVal.length];
- System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
- System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
- rowkeytmp.length);
- System.arraycopy(intVal, 0, result, splitKeys.length+rowkeytmp.length,
- intVal.length);
-
- return result;
- }
-
-
-
- /**
- * 删除表
- *
- * @param tableName
- */
- public static void dropTable(String tableName) {
- try {
- HBaseAdmin admin = new HBaseAdmin(config);
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- } catch (MasterNotRunningException e) {
- e.printStackTrace();
- } catch (ZooKeeperConnectionException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 查询所有
- *
- * @param tableName
- */
- public static void QueryAll(String tableName) {
- HTable table = null;
- try {
- table = new HTable(config, tableName);
- ResultScanner rs = table.getScanner(new Scan());
- for (Result r : rs) {
- System.out.println("获得到rowkey:" + new String(r.getRow()));
- for (KeyValue keyValue : r.raw()) {
- System.out.println("列:" + new String(keyValue.getFamily())
- + "====值:" + new String(keyValue.getValue()));
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 查询所有
- *
- * @param tableName
- */
- public static void QueryByCondition1(String tableName) {
-
- HTable table = null;
- try {
- table = new HTable(config, tableName);
- Get scan = new Get("abcdef".getBytes());// 根据rowkey查询
- Result r = table.get(scan);
- System.out.println("获得到rowkey:" + new String(r.getRow()));
- for (KeyValue keyValue : r.raw()) {
- System.out.println("列:" + new String(keyValue.getFamily())
- + "====值:" + new String(keyValue.getValue()));
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 根据rowkwy前坠查询
- * @param tableName
- * @param rowkey
- */
- public static void queryByRowKey(String tableName,String rowkey)
- {
- try {
- HTable table = new HTable(config, tableName);
- Scan scan = new Scan();
- scan.setFilter(new PrefixFilter(rowkey.getBytes()));
- ResultScanner rs = table.getScanner(scan);
- KeyValue[] kvs = null;
- for (Result tmp : rs)
- {
- kvs = tmp.raw();
- for (KeyValue kv : kvs)
- {
- System.out.print(kv.getRow()+" ");
- System.out.print(kv.getFamily()+" :");
- System.out.print(kv.getQualifier()+" ");
- System.out.print(kv.getTimestamp()+" ");
- System.out.println(kv.getValue());
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
- /**
- * 查询所有
- *
- * @param tableName
- */
- public static void QueryByCondition2(String tableName) {
-
- try {
- HTable table = new HTable(config, tableName);
- // 当列column1的值为aaa时进行查询
- Filter filter = new SingleColumnValueFilter(
- Bytes.toBytes("column1"), null, CompareOp.EQUAL,
- Bytes.toBytes("aaa"));
- Scan s = new Scan();
- s.setFilter(filter);
- ResultScanner rs = table.getScanner(s);
- for (Result r : rs) {
- System.out.println("获得到rowkey:" + new String(r.getRow()));
- for (KeyValue keyValue : r.raw()) {
- System.out.println("列:" + new String(keyValue.getFamily())
- + "====值:" + new String(keyValue.getValue()));
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- /**
- * 查询所有
- *
- * @param tableName
- */
- public static void QueryByCondition3(String tableName) {
-
- try {
-
- HTable table = new HTable(config, tableName);
-
- List<Filter> filters = new ArrayList<Filter>();
-
- Filter filter1 = new SingleColumnValueFilter(
- Bytes.toBytes("column1"), null, CompareOp.EQUAL,
- Bytes.toBytes("aaa"));
- filters.add(filter1);
-
- Filter filter2 = new SingleColumnValueFilter(
- Bytes.toBytes("column2"), null, CompareOp.EQUAL,
- Bytes.toBytes("bbb"));
- filters.add(filter2);
-
- Filter filter3 = new SingleColumnValueFilter(
- Bytes.toBytes("column3"), null, CompareOp.EQUAL,
- Bytes.toBytes("ccc"));
- filters.add(filter3);
-
- FilterList filterList1 = new FilterList(filters);
-
- Scan scan = new Scan();
- scan.setFilter(filterList1);
- ResultScanner rs = table.getScanner(scan);
- for (Result r : rs) {
- System.out.println("获得到rowkey:" + new String(r.getRow()));
- for (KeyValue keyValue : r.raw()) {
- System.out.println("列:" + new String(keyValue.getFamily())
- + "====值:" + new String(keyValue.getValue()));
- }
- }
- rs.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
复制代码
- HashChoreWoker:
-
- import java.util.Iterator;
- import java.util.TreeSet;
-
- import org.apache.hadoop.hbase.util.Bytes;
-
- /**
- *
- * @author kuang hj
- *
- */
- public class HashChoreWoker{
- // 随机取机数目
- private int baseRecord;
- // rowkey生成器
- private RowKeyGenerator rkGen;
- // 取样时,由取样数目及region数相除所得的数量.
- private int splitKeysBase;
- // splitkeys个数
- private int splitKeysNumber;
- // 由抽样计算出来的splitkeys结果
- private byte[][] splitKeys;
-
- public HashChoreWoker(int baseRecord, int prepareRegions) {
- this.baseRecord = baseRecord;
- // 实例化rowkey生成器
- rkGen = new HashRowKeyGenerator();
- splitKeysNumber = prepareRegions - 1;
- splitKeysBase = baseRecord / prepareRegions;
- }
-
- public byte[][] calcSplitKeys() {
- splitKeys = new byte[splitKeysNumber][];
- // 使用treeset保存抽样数据,已排序过
- TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- for (int i = 0; i < baseRecord; i++) {
- rows.add(rkGen.nextId());
- }
- int pointer = 0;
- Iterator<byte[]> rowKeyIter = rows.iterator();
- int index = 0;
- while (rowKeyIter.hasNext()) {
- byte[] tempRow = rowKeyIter.next();
- rowKeyIter.remove();
- if ((pointer != 0) && (pointer % splitKeysBase == 0)) {
- if (index < splitKeysNumber) {
- splitKeys[index] = tempRow;
- index++;
- }
- }
- pointer++;
- }
- rows.clear();
- rows = null;
- return splitKeys;
- }
- }
复制代码
- HashRowKeyGenerator:
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.hbase.util.MD5Hash;
-
- import com.kktest.hbase.BitUtils;
- /**
- *
- *
- **/
- public class HashRowKeyGenerator implements RowKeyGenerator {
- private static long currentId = 1;
- private static long currentTime = System.currentTimeMillis();
- //private static Random random = new Random();
-
- public byte[] nextId()
- {
- try {
- currentTime = getRowKeyResult(Long.MAX_VALUE - currentTime);
- byte[] lowT = Bytes.copy(Bytes.toBytes(currentTime), 4, 4);
- byte[] lowU = Bytes.copy(Bytes.toBytes(currentId), 4, 4);
- byte[] result = Bytes.add(MD5Hash.getMD5AsHex(Bytes.add(lowT, lowU))
- .substring(0, 8).getBytes(), Bytes.toBytes(currentId));
- return result;
- } finally {
- currentId++;
- }
- }
-
- /**
- * getRowKeyResult
- * @param tmpData
- * @return
- */
- public static long getRowKeyResult(long tmpData)
- {
- String str = String.valueOf(tmpData);
- StringBuffer sb = new StringBuffer();
- char[] charStr = str.toCharArray();
- for (int i = charStr.length -1 ; i > 0; i--)
- {
- sb.append(charStr[i]);
- }
-
- return Long.parseLong(sb.toString());
- }
- }
复制代码
|