import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseApi {
private Configuration conf = null;
private Admin admin = null;
private Connection connection = null;
public HbaseApi(String host){
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", host);
try {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (Exception e) {
e.printStackTrace();
}
//conf.set("hbase.zookeeper.property.clientPort", "2181");
}
/**
* 创建命名空间
* @param spaceName
*/
public void createNameSpace(String spaceName){
try {
admin.createNamespace(NamespaceDescriptor.create(spaceName).build());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 测试表存不存在
* @param tableName 表名
* @return
* @throws IOException
*/
public boolean isTableExists(String tableName) {
try {
TableName tname = TableName.valueOf(tableName);
return admin.tableExists(tname);
} catch(Exception e){
e.printStackTrace();
return false;
}
}
/**
* 建表 需要表名和列簇
* @param tableName 表名
* @param columsFamilys 列簇
* @throws IOException
*/
public void createTable(String tableName,String[] columsFamilys){
try{
TableName tname = TableName.valueOf(tableName);
if(admin.tableExists(tname)){
System.out.println("表:"+tableName+"已经存在!");
} else{
TableName tablename = TableName.valueOf(tableName);
//新建表描述 HTableDescriptor admin.getTableDescriptor ()
HTableDescriptor tableDesc = new HTableDescriptor(tablename);
//往表描述里面加列描述HColumnDescriptor
for (String colum:columsFamilys){
//可以通过 HColumnDescriptor 对象设置 列族的特性 ,
//比如:通过hcd.setTimeToLive(5184000) 设置数据保存的最长时间;
//通过 hcd.setInMemory(true ) 设置数据保存在内存中以提高响应速度;
//通过 hcd .setMaxVersions(10) 设置数据保存的最大版本数;
//通过 hcd.setMinVersions(5) 设置数据保存的最小版本数(配合TimeToLive使用)。更多特性请自行查阅官网API;
tableDesc.addFamily(new HColumnDescriptor(colum));
}
//ASYNC_WAL : 当数据变动时,异步写WAL日志
//SYNC_WAL : 当数据变动时,同步写WAL日志
//FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
//SKIP_WAL : 不写WAL日志
//USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WA
//发送一个请求,等待返回,然后再发送下一个请求
tableDesc.setDurability(Durability.SYNC_WAL );
//建表
//通过tableDesc.setMaxFileSize(512) 设置一个region中的store文件的最大size,
//当一个region中的最大store文件达到这个size时,region就开始分裂;
//通过tableDesc.setMemStoreFlushSize(512) 设置region内存中的memstore的最大值,
//当memstore达到这个值时,开始往磁盘中刷数据。 更多特性请自行查阅官网API
admin.createTable(tableDesc);
System.out.println("表:"+tableName+"创建成功!");
}
}catch(NamespaceNotFoundException e){
System.out.println("指定的命名空间:"+tableName.split(":")[0]+" 不存在");
}catch(IOException e){
e.printStackTrace();
}
}
/**
* 删除表
* @param tableName 表名
*/
public void dropTable(String tableName){
TableName tname = TableName.valueOf(tableName);
try {
if (admin.tableExists(tname)) {
// 关闭一个表
admin.disableTable(tname);
admin.deleteTable(tname);
System.out.println("删除表 "+tableName+"成功!");
} else {
System.out.println("删除的表 "+tableName+"不存在!");
System.exit(0);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 插入行
* @param tableName 表名
* @param rowKey 行键
* @param columFamily 列簇
* @param colum 列
* @param value 值
* @throws IOException
*/
public void addRow(String tableName,String rowKey, String columFamily,
String colum, String value){
try{
TableName tname = TableName.valueOf(tableName);
//获取table实例
Table table = connection.getTable(tname);
//新建Put实例,并指定
Put put = new Put(Bytes.toBytes(rowKey));
// 参数分别:列族、列、值
put.addColumn(Bytes.toBytes(columFamily), Bytes.toBytes(colum), Bytes.toBytes(value));
table.put(put);
} catch(IOException e){
e.printStackTrace();
}
}
/**
* 删除一行
* @param tableName 表名
* @param row 行键
* @throws Exception
*/
public void delRow(String tableName, String row) {
try{
TableName tname = TableName.valueOf(tableName);
//获取table实例
Table table = connection.getTable(tname);
Delete del = new Delete(Bytes.toBytes(row));
table.delete(del);
} catch(Exception e){
e.printStackTrace();
}
}
/**
* 删除多行
* @param tableName 表名
* @param rows 行键
* @throws Exception
*/
public void delMultiRows(String tableName, String[] rows) {
try{
TableName tname = TableName.valueOf(tableName);
//获取table实例
Table table = connection.getTable(tname);
List<Delete> delList = new ArrayList<Delete>();
for (String row : rows) {
Delete del = new Delete(Bytes.toBytes(row));
delList.add(del);
}
table.delete(delList);
} catch(Exception e){
e.printStackTrace();
}
}
/**
* 获取一条数据
* @param tableName 表名
* @param row 行键
* @throws Exception
*/
public void getRow(String tableName, String row) {
try{
TableName tname = TableName.valueOf(tableName);
//获取table实例
Table table = connection.getTable(tname);
Get get = new Get(Bytes.toBytes(row));
Result result = table.get(get);
// 获取每个cell并遍历
for (Cell cell : result.rawCells()) {
System.out.print("行键:" + new String(CellUtil.cloneRow(cell)));
System.out.print("时间戳:" + cell.getTimestamp() + " ");
System.out.print("列族名:" + new String(CellUtil.cloneFamily(cell)) + " ");
System.out.print("列名:" + new String(CellUtil.cloneQualifier(cell)) + " ");
System.out.println("值:" + new String(CellUtil.cloneValue(cell)));
}
} catch(Exception e){
e.printStackTrace();
}
}
/**
* 获取所有数据
* @param tableName 表名
* @throws Exception
*/
public void getAllRows(String tableName){
try{
TableName tname = TableName.valueOf(tableName);
//获取table实例
Table table = connection.getTable(tname);
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
for (Cell cell : result.rawCells()) {
System.out.print("行键:" + new String(CellUtil.cloneRow(cell)) + " ");
System.out.print("时间戳:" + cell.getTimestamp() + " ");
System.out.print("列族名:" + new String(CellUtil.cloneFamily(cell)) + " ");
System.out.print("列名:" + new String(CellUtil.cloneQualifier(cell)) + " ");
System.out.println("值:" + new String(CellUtil.cloneValue(cell)));
}
}
} catch(Exception e){
e.printStackTrace();
}
}
/**
* 创建 Admin 对象时就已经建立了客户端程序与HBase集群的connection ,
* 所以在程序执行完成后,务必通过 admin.close() 关闭connection
*/
public void closed(){
try {
connection.close();
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}