package hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
public class TestCoprocessor extends BaseRegionObserver {
/**
* pre类似触发器中的before
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// TODO Auto-generated method stub
//set configuration
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.201.2");
//要先建好java_api_ind表
TableName tname = TableName.valueOf("java_api_ind");
Table table = ConnectionFactory.createConnection(conf).getTable(tname);
List<Cell> cells = put.get(Bytes.toBytes("c1"), Bytes.toBytes("age"));
for (Cell cell:cells){
Put indexPut = new Put(Bytes.toBytes(new String(CellUtil.cloneValue(cell))));
Cell indexKv = new KeyValue(
CellUtil.cloneValue(cell),
Bytes.toBytes("c1"),
Bytes.toBytes("age"),
CellUtil.cloneRow(cell));
indexPut.add(indexKv);
table.put(indexPut);
}
table.close();
}
}
说是高级开发,其实挺简单的。
以上程序打成jar包放到Hdfs文件系统上,例:/demo/test.jar
进入hbase shell
disable 'java_api'
alter 'java_api',
METHOD => 'table_att','coprocessor' => 'hdfs://192.168.201.2:9000/demo/test.jar|hbase.coprocessor.TestCoprocessor|1001|'
enable 'java_api'
然后在执行java_api的put操作的时候就会促发,记得事先要建好java_api_ind表