本帖最后由 bob007 于 2016-1-24 18:45 编辑
这几天为了满足业务需求,自己写了个Coprocessor,这里写一篇博客记录一下。 使用Coprocessor的目的是这样的,假如你的业务使你不得不进行全表查询,如果使用传统的Scan的话,那么全表查询将会给集群带来高的带宽压力,而且可能Client端也负载不了海量数据的计算。HBase提供了AggregateImplementation,可以进行简单的例如计算sum、average等操作,但是这些操作都是只针对一个列进行的,我遇到的情况是需要同时统计多个列,因此,就只能自己写Coprocessor了。 HBase的Coprocessor是运行在RegionServer上的一段程序,有点类似于关系型数据库中的触发器和存储过程。这个Coprocessor通常有两种类型,分别是Observer和Endpoint。Oberserver相当于触发器,可用于建立二级索引等操作。Endpoint相当于存储过程,可用于在各个RegionServer上做一些计算等,然后将计算的结果汇集到Client端来做最后的处理,这有点儿像Map/Reduce的过程。我这几天是将Coprocessor用作Endpoint,所以这里只对编写Endpoint的代码进行介绍。 在HBase版本0.96以后,HBase的RPC框架采用的就是protobuf作为通讯协议,关于protobuf的介绍网上有很多,这里就不再介绍了。
1.首先需要下载protobuf的解析器protobuf-2.5.0.tar.gz,然后,按照如下的方式安装解析器。
[mw_shl_code=bash,true]tar -zxvf protobuf-2.5.0.tar.gz
./configure --prefix=/home/cyber_space/protobuf
make
make install[/mw_shl_code]
2.编写.proto文件
这个.proto文件我是仿照hbase源代码中的两个Example写的,这两个Example的源代码路径在如下路径中,
[mw_shl_code=bash,true]/hbase-1.0.1.1_src/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
/hbase-1.0.1.1_src/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java[/mw_shl_code]
它们的.proto文件的路径在如下的路径中,
[mw_shl_code=bash,true]~/hbase-1.0.1.1_src/hbase-examples/src/main/protobuf/Examples.proto
~/hbase-1.0.1.1_src/hbase-examples/src/main/protobuf/BulkDelete.proto [/mw_shl_code]
简单解释一下,message是消息关键字,这里我定义了两个消息,用于Coprocessor交流,CountRequest是RPC调用端发出的请求消息,CountResponse是服务端返回的消息,然后又定义了一个Service,这个service里可以定义一个远程调用的方法,getsizes,然后调用的参数就是CountRequest消息,返回是CountResponse消息。[default=0]表示的是这些变量默认为0。每个变量后的=1、=2表示的是变量的优先级。
[mw_shl_code=bash,true]option java_package = "com.uestc.coprocessor";
option java_outer_classname = "JAggregateProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message CountRequest {
required string columns = 1;
}
message CountResponse {
required int64 qiniu_count = 1 [default = 0];
required int64 qiniu2_count = 2 [default = 0];
required int64 jinshan_count = 3 [default = 0];
optional int64 other_cloud = 4 [default=0];
}
service RowCountService {
rpc getSizes(CountRequest)
returns (CountResponse);
}[/mw_shl_code]
写完这个消息后,使用我们刚才安装的protoc工具将其编译生成我们需要的Java代码,使用的命令如下:
[mw_shl_code=bash,true]protoc --java_out=/home/cyber_space/ MultiColumnAggregationService.proto[/mw_shl_code]
这样就会生成我们在.proto文件中指定的JAVA类,类中包含了必要的Service和getter、setter方法等。
3.接下来要写一个类,这个类将在RegionServer端运行,然后被客户端通过RPC框架调用这个实现类需要继承我们用protoc工具生成的Service类,实现Coprocessor、CoprocessorService接口,以便给HBase的RPC框架调用。start方法我是照着Example写的,这个方法会在HBase启动时,或者是Coprocessor启动时被调用。stop方法可以暂时不实现内容。getService方法返回这个类的对象就可了。getSizes方法就是刚才写在.proto文件的service中的RPC方法。下面详细讲一下getSizes方法。 getSizes方法中可通过request的getter方法获取客户端的消息,这里我就使用了request.getColumns() 获取到CountRequest中定义的columns字符串变量。在getSizes方法中实例化了一个Scan,并设置了若干Filter,来过滤掉不需要的列。通过scanner = env.getRegion().getScanner(scan)方法来实例化一个InternalScanner,这个Scanner比较接近表信息在HBase底层的存储结构。它的next方法返回的是一行的所有Cell,每个Cell是有Key-ColumnFamilly-Qualifier-value组成的,可以用CellUtil类来对Cell做一些常用操作。最后Response对象,可以用构建器模式进行实例化,在RPCCallback中放入done即可。 [mw_shl_code=bash,true]package com.uestc.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.uestc.coprocessor.JAggregateProtocol.CountRequest;
import com.uestc.coprocessor.JAggregateProtocol.CountResponse;
public class MultiColumnAggregation extends JAggregateProtocol.RowCountService
implements Coprocessor, CoprocessorService {
public static final String COLUMNNAME_QINIU = "Qiniu";
public static final String COLUMNNAME_JINSHAN = "Jinshan";
public static final String COLUMNNAME_QINIU2 = "Qiniu2";
private RegionCoprocessorEnvironment env;
@Override
public void start(CoprocessorEnvironment arg0) throws IOException {
if (arg0 instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) arg0;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment arg0) throws IOException {
}
@Override
public Service getService() {
return this;
}
@Override
public void getSizes(RpcController controller, CountRequest request, RpcCallback<CountResponse> done) {
long[] values = { 0, 0, 0, 0 };
String columns = request.getColumns();
if (columns == null || "".equals(columns))
throw new NullPointerException("you need specify the columns");
String[] columnArray = columns.split(";");
// 设置filter,只过滤出我们需要的列
Filter filter = null;
ArrayList<Filter> filters = new ArrayList<>();
for (String column : columnArray) {
filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("location"),
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(column));
filters.add(filter);
}
filter = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("location"));
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("chunksize"));
scan.setFilter(filter);
JAggregateProtocol.CountResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
do {
hasMore = scanner.next(results);
if (results.size() < 2)
continue;
// chunksize 在前,location在后
Cell kv0 = results.get(0);
long chunksize = Long.parseLong(Bytes.toString(CellUtil.cloneValue(kv0)));
Cell kv1 = results.get(1);
String location = Bytes.toString(CellUtil.cloneValue(kv1));
switch (location) {
case COLUMNNAME_QINIU:
values[0] += chunksize;
break;
case COLUMNNAME_QINIU2:
values[1] += chunksize;
break;
case COLUMNNAME_JINSHAN:
values[2] += chunksize;
break;
default:
break;
}
results.clear();
} while (hasMore);
// 生成response
response = JAggregateProtocol.CountResponse.newBuilder().setQiniuCount(values[0]).setQiniu2Count(values[1])
.setJinshanCount(values[2]).build();
} catch (IOException e) {
e.printStackTrace();
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
}
}
done.run(response);
}
}
[/mw_shl_code]
4.再编写一个客户端类来调用我们刚才写的代码。
客户端代码通过HBase的RPC框架调用HRegionServer上的代码,HRegionServer上的代码完成工作后将结果返回给客户端,客户端汇集各个RegionServer上的操作结果,做最后的处理,如计算各个RegionServer返回结果的和等。我这里的代码如下,仅做一个示例,有很多不完善的地方: 代码中声明了一个RPCCallback,通过RPCCallBack的get方法,就可以得到在RegionServer执行调用获得的结果,各个调用的结果会返回到Map容器中,然后通过对Map容器的迭代,客户端即可获得各个RegionServer上的执行结果,从而进行自定义的计算。 [mw_shl_code=bash,true]package com.uestc.coprocessor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import com.google.protobuf.ServiceException;
import com.uestc.util.JsonUtil;
import com.uestc.util.Log;
public class MultiColumnAggregateClient {
public static class CountInfo {
public long qiniuCount;
public long qiniu2Count;
public long jinshanCount;
public long otherCount;
@Override
public String toString() {
String rString = String.format("qiniuCount:%d;Qiniu2Count:%d;JinshanCount:%d;OtherCount:%d",
this.qiniuCount, this.qiniu2Count, this.jinshanCount, this.otherCount);
return rString;
}
/**
* 生成jsonString
*
* @return
* @throws IllegalArgumentException
* @throws IllegalAccessException
*/
public String toJsonString() throws IllegalArgumentException, IllegalAccessException {
Field[] fields = this.getClass().getFields();
Map<String, Object> map = new LinkedHashMap<>();
for (Field field : fields) {
map.put(field.getName(), field.get(this));
}
return JsonUtil.createJsonString(map);
}
}
public CountInfo getSeveralCounts(Table table, String columnNames) throws ServiceException, Throwable {
final JAggregateProtocol.CountRequest request = JAggregateProtocol.CountRequest.newBuilder()
.setColumns(columnNames).build();
Map<byte[], CountInfo> map = table.coprocessorService(JAggregateProtocol.RowCountService.class, null, null,
new Batch.Call<JAggregateProtocol.RowCountService, CountInfo>() {
@Override
public CountInfo call(JAggregateProtocol.RowCountService aggregate) throws IOException {
BlockingRpcCallback<JAggregateProtocol.CountResponse> rpcCallback = new BlockingRpcCallback<>();
aggregate.getSizes(null, request, rpcCallback);
JAggregateProtocol.CountResponse response = rpcCallback.get();
CountInfo countInfo = new CountInfo();
countInfo.qiniuCount = response.getQiniuCount();
countInfo.qiniu2Count = response.getQiniu2Count();
countInfo.jinshanCount = response.getJinshanCount();
countInfo.otherCount = response.getOtherCloud();
return countInfo;
}
});
CountInfo result = new CountInfo();
for (CountInfo countInfo : map.values()) {
result.qiniuCount += countInfo.qiniuCount;
result.qiniu2Count += countInfo.qiniu2Count;
result.jinshanCount += countInfo.jinshanCount;
result.otherCount += countInfo.otherCount;
}
Log.logger.info(result);
return result;
}
}
[/mw_shl_code]
5.部署我们的Coprocessor这里分成两步 (1)、可用Eclipse把MultiColumnAggregation类文件(即需要在RegionServer上运行的类)打成jar包,然后把它分发到各个HBase节点上,并把它们放到HBase目录的lib子目录下,这是最简单的方法;或者也可以去配置CLASSPATH、HBASE_CLASSPATH之类的,保证HBase能访问到我们的jar文件。 (2)、编辑HBASE_HOME/conf/hbase-site.xml文件,配置Coprocessor类 若有多个Coprocessor,则用逗号分隔,value标签中的第一个Coprocessor是HBase自带的,第二个是我们自己自定义的。
[mw_shl_code=bash,true]<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation,com.uestc.coprocessor.MultiColumnAggregation</value>
</property>[/mw_shl_code]
6.编写调用代码可以这样来调用刚才我们自己写的Coprocessor: 这里的MTable是我自己封装的Table类,它的getTable方法即返回Table类的实例。
[mw_shl_code=bash,true]public MultiColumnAggregateClient.CountInfo sumUpSizes() throws IOException {
MultiColumnAggregateClient client = new MultiColumnAggregateClient();
MultiColumnAggregateClient.CountInfo info = null;
MTable mTable = null;
try {
mTable = new MTable("file");
info = client.getSeveralCounts(mTable.getTable(), "Qiniu;Qiniu2;Jinshan");
} catch (Throwable e) {
Log.logException(e);
} finally {
if (mTable != null)
mTable.close();
}
return info;
}[/mw_shl_code]
以上的代码因为项目上赶时间临时写的,有很多不完善的地方,但是也完成了需求,通过这个自定义的Coprocessor,就可以一次调用同时统计多个列的的和了,极大的提高了计算的效率。
|