问题导读
1、HBase上SQL解决方案有哪几种?
2、如何操作 Phoenix?
针对HBase上SQL解决方案,目前社区内比较热门的有Cloudera的Impala,Horntworks的Drill,以及Hive。根据与HBase的操作方式,可以分为三种:
以MapReduce为核心,单个任务使用hbase-client原始接口访问;
以Google Dremel为核心,单个任务使用hbase-client原始接口访问;
以HBase-Coprocessor为核心,结合Google Dremel的思想,客户端合并多个节点的处理结果。
Phoenix的安装:
1)git clone 地址
2)安装apache-maven,可以自行google
3)mvn process-sources
4)mvn package -DskipTests
5)拷贝phoenix-{versionid}.jar到HBASE_HOME/lib/下,重启RS
6)导入数据
java -jar phoenix-{versionid}-client.jar $(zkquorum) example/web_stat.sql example/web_stat.csv 复制代码
7)执行查询
java -jar phoenix-{versionid}-client.jar $(zkquorum) example/web_stat_query.sql 复制代码
对于SQL查询的解析过程:
antlr3/PhoenixSQL.g ===antlr===〉
ls ${PHOENIX_HOME}/target/generated-sources/antlr3/com/salesforce/phoenix/parse
PhoenixSQLLexer.java PhoenixSQLParser.java PhoenixSQL.tokens 复制代码
这里以一条Select语句为例子介绍其中的流程:
SELECT DOMAIN, AVG(CORE) Average_CPU_Usage, AVG(DB) Average_DB_Usage
FROM WEB_STAT
GROUP BY DOMAIN
ORDER BY DOMAIN DESC; 复制代码
1)提交的SQL语句, PhoenixSQLLexer执行词法解析。注意这里的PhoenixSQLLexer是从src/antlr3/PhoenixSQL.g,经过antlr的翻译,生成的java代码。
目前SQL语句中可以识别的Token有:
tokens
{
SELECT=’select’;
FROM=’from’;
USING=’using’;
WHERE=’where’;
NOT=’not’;
AND=’and’;
OR=’or’;
NULL=’null’;
TRUE=’true’;
FALSE=’false’;
LIKE=’like’;
AS=’as’;
OUTER=’outer’;
ON=’on’;
IN=’in’;
GROUP=’group’;
HAVING=’having’;
ORDER=’order’;
BY=’by’;
ASC=’asc’;
DESC=’desc’;
NULLS=’nulls’;
LIMIT=’limit’;
FIRST=’first’;
LAST=’last’;
DATA=’data’;
CASE=’case’;
WHEN=’when’;
THEN=’then’;
ELSE=’else’;
END=’end’;
EXISTS=’exists’;
IS=’is’;
FIRST=’first’;
DISTINCT=’distinct’;
JOIN=’join’;
INNER=’inner’;
LEFT=’left’;
RIGHT=’right’;
FULL=’full’;
BETWEEN=’between’;
UPSERT=’upsert’;
INTO=’into’;
VALUES=’values’;
DELETE=’delete’;
CREATE=’create’;
DROP=’drop’;
PRIMARY=’primary’;
KEY=’key’;
ALTER=’alter’;
COLUMN=’column’;
TABLE=’table’;
ADD=’add’;
SPLIT=’split’;
EXPLAIN=’explain’;
VIEW=’view’;
IF=’if’;
CONSTRAINT=’constraint’;
} 复制代码
2)根据PhoenixSQLParser的解析确定com.salesforce.phoenix.jdbc.PhoenixStatement.ExecutableStatement(Interface)的类型,目前有如下几类:
增删数据:ExecutableAddColumnStatement、ExecutableDropColumnStatement
创建/删除表格:ExecutableCreateTableStatement、ExecutableDropTableStatement
Select操作:ExecutableSelectStatement
导入数据:ExecutableUpsertStatement
解释执行:ExecutableExplainStatement
3)执行(2)中提供的实例化的ExecutableStatement提供executeQuery方法:
创建QueryCompiler。
执行compile过程。(识别limit、having、where、order、projector等操作,生成ScanPlan)
封装Scanner,并根据识别出的修饰词,对于结果进行修饰,整合出ResultIterator的各种功能的实现,具体在com.salesforce.phoenix.iterator包下。
该SQL对应的包装类为:OrderedAggregatingResultIterator.//它是如何组织数据,保证数据按照DESC或者ASC的方式展示?
在Delegator当中创建:
rowAggregators = {
instance of com.salesforce.phoenix.expression.function.CountAggregateFunction$1(id=2409), instance of com.salesforce.phoenix.expression.function.CountAggregateFunction$1(id=2410), instance of com.salesforce.phoenix.expression.aggregator.LongSumAggregator(id=2411), instance of com.salesforce.phoenix.expression.aggregator.LongSumAggregator(id=2412)
}
复制代码
对于创建表格的逻辑:
1)解析SQL,翻译可执行的ExecutableCreateTableStatement,实例化MutationPlan。
2)创建MetaDataClient对象,将解析出的Statement转换成PTable的模型,更新SYSTEM.TABLE中的内容.(如果SYSTEM.TABLE不存在,还需要创建该表)
3)调用PhoenixConnection.addTable操作,这里会根据ConnectionQueryServicesImpl执行相关的服务。
4)加载Coprocessor。
descriptor.addCoprocessor(ScanRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(HashJoiningRegionObserver.class.getName(), phoenixJarPath, 1, null); 复制代码
这里加载的Coprocessor有:
ScanRegionObserver:封装RegionObserver.postScannerOpen接口,捕获出现的异常。即在scanner开启之后,做基本遍历,属于基础类实现。
UngroupedAggregateRegionObserver:
GroupedAggregateRegionObserver
HashJoiningRegionObserver 复制代码
会在RegionCoprocessorHost的组织下,分别执行这四个类的doPostScanOpen操作,会根据QueryPlan以及Statement中包含的信息,进行功能筛选和组装,最终被返回的结果,是已经按照需求处理过的,从而实现类似于GroupBy、Sort等操作。
2、Coprocessor机制 :
包括两部分,Observer和Endpoint
Observer有RegionObserver、WALObserver、MasterObserver。用来实现固定执行点的”插桩”的功能,有点像关系型数据库当中的触发器的功能。
这里以RegionObserver的实现为例,介绍一下其中实现细节。
1)为Table加载Observer接口的实现类。
2)客户端调用某个操作的位置时,调用接口。例如,RegionObserver的postScannerOpen()会在执行scannerOpen之后执行。
3)每一个Region设置一个RegionCoprocessorHost,负责管理加载到该Region的Coprocessor。
4)每一个Region设置一个RegionCoprocesorEnvironment,封装在ObserverContext当中,作为执行Coprocessor的上下文环境。
Endpoint不同于Observer,虽然它也是被加载到Region上,但是它的执行方式,是由Client端借助Table.coprocessorExec执行,是client到Regions的一次或者多次RPC操作,有时可能还需要在Client端对获取到的数据进行合并。可以查看一例:使用Coprocessor进行RowCount统计