hbase中自带了一个 AggregationClient的协处理器,可以进行count、avg等函数,但是感觉不好用,查看了 AggregationClient的源码发现在使用是必须指定 列簇,而且只指定一个列簇,如果指定多个列簇会报Exception in thread "main" java.io.IOException: There must be only one family,但是在统计是如果有一个列簇下的有些列没有值,统计结果就是0。红色部分就是 AggregateImplementation的内部使用规则,只获取第一个列簇。
源码内容:
public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
long counter = 0l;
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
if (scan.getFilter() == null && qualifier == null)
scan.setFilter(new FirstKeyOnlyFilter());
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
try {</font>
boolean hasMoreRows = false;
do {</font>
hasMoreRows = scanner.next(results);
if (results.size() > 0) {
counter++;</font>
}
results.clear();
} while (hasMoreRows);
} finally {
scanner.close();
}
log.info("Row counter from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + counter);
return counter;
} 复制代码
因为使用有些不方便,所以介绍一下如何进行自定义协处理器。
hbase中协处理器分两种类型,一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程,现在我介绍的是第二种endpoint。下面是一个统计的例子。
1、定义一个接口CustomProtocol实现CoprocessorProtocol;
CustomProtocol中可以添加自己需要的方法,比如count方法;
public interface CustomProtocol extends CoprocessorProtocol{
public long rowCount(Scan scan,Filter filter) throws IOException;
} 复制代码
2、定义一个类CustomProtocolIm继承BaseEndpointCoprocessor,实现CoprocessorProtocol;
public class CustomProtocolIm extends BaseEndpointCoprocessor implements CustomProtocol{
@Override
public long rowCount(Scan scan, Filter filter) throws IOException {
//在服务端进行统计
long counter = 0l;
List<KeyValue> results = new ArrayList<KeyValue>();
if (scan.getFilter() == null)
scan.setFilter(new FirstKeyOnlyFilter());
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
try {
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
if (results.size() > 0) {
counter++;
}
results.clear();
} while (hasMoreRows);
} finally {
scanner.close();
}
return counter;
}
} 复制代码
以上是现对服务端,也就是regionserver端来说的。
3、定义一个客户端类CustomRowCountClient;
public class CustomRowCountClient {
public long rowcount(HTable htable) throws Throwable{
Map<byte[],Long> results = htable.coprocessorExec(CustomProtocol.class, null, null, new Batch.Call<CustomProtocol,Long>() {//Long代表CustomProtocol的返回值类型
@Override
public Long call(CustomProtocol rcp) throws IOException {
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
return rcp.rowCount(scan, null);
}
});
//将regionserver统计的数据进行汇总
long total = 0;
for(Map.Entry<byte[],Long> e : results.entrySet()){
total+=e.getValue().longValue();
}
return total;
}
} 复制代码
4、在hbase集群进行部署,在hbase安装目录conf下,修改hbase-site.xml,添加
<property>
<name>hbase.coprocessor.region.classes</name>
<value>custom.coprocessor.CustomProtocolIm</value>
</property> 复制代码
如果有多个协处理器使用逗号进行分隔。
5、重启集群,就可以通过访问CustomRowCountClient进行统计了。也可以在shell命令中这样操作
hbase(main):003:0> disable 'test'
hbase(main):003:0>alter 'test', METHOD => 'table_att','coprocessor'=>'|custom.coprocessor.CustomProtocolIm||'
hbase> enable 'test' 复制代码
这种方法只会对指定的表生效。