fc013 发表于 2020-9-4 18:24:53

HBASE中间件之Phoenix实战



问题导读

1.怎样安装Phoenix?
2.Phoenix是什么?
3.使用Phoenix?



一、安装步骤

1、      配置Phoenix的parcel

下载地址 https://archive.cloudera.com/phoenix/6.2.0/parcels/



2、      分配并激活Phoenix的parcel包



3、      将Phoenix的CSD jar包放在Cloudera-scm-server机器上的/opt/cloudera/csd/目录下;并重启cloduera-scm-server

4、      Cloudera-manager页面添加Phoenix服务并勾选开启kerberos即可。

5、      修改hbase配置:

hbase-site.xml 的 HBase 服务高级配置代码段(安全阀)和hbase-site.xml 的 HBase 客户端高级配置代码段(安全阀)都需要添加如下三个配置:



6、      重启Phoenix、hbase并重新部署客户端配置

7、      至此,Phoenix安装结束。

二、Phoenix简介

Phoenix作为应用层和HBASE之间的中间件,以下特性使它在大数据量的简单查询场景有着独有的优势

[*]二级索引支持(global index + local index)
[*]编译SQL成为原生HBASE的可并行执行的scan
[*]在数据层完成计算,server端的coprocessor执行聚合
[*]下推where过滤条件到server端的scan filter上
[*]利用统计信息优化、选择查询计划(5.x版本将支持CBO)
[*]skip scan功能提高扫描速度


对于CDH集群集成的Phoenix一般可以使用以下两种方式访问Phoenix

1.      phoenix-sqlline
2.      JDBC-API

三、连接方式

Phoenix-sqlline

1、      由于开启了kerberos,登录服务器后应先kinit成具有相应权限的用户。
kinit –kt phoenix.keytab phoenix/qdedhtest3@XXX.COM
2、      控制台输入phoenix-sqlline
3、      展示所有表:
4、      退出: !table
5、      其他详细命令见:!help

JDBC-API

      有两种连接方式,一是jdbc:phoenix:zookeeper,这种方式是直连zookeeper,比较重。

      我们推荐使用jdbc:phoenix:thin:url这种是连接phoenix query server,比较轻量化。

Maven依赖:
<dependency>
    <groupId>com.aliyun.phoenix</groupId>
    <artifactId>ali-phoenix-shaded-thin-client</artifactId>
    <version>5.2.1-HBase-2.x</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.12.0</version>
</dependency>
Java API 示例:

package util;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.sql.*;


public class phoenixUtil {
    private static Logger log = Logger.getLogger(phoenixUtil.class);

    /*
   * 获取连接
   */
    public Connection getConn() {

      /*
         *jar包部署在linux系统中,会默认使用/etc/krb5.conf文件
         * 若部署在windows系统中,则需自己指定krb5.conf路径
         */

      if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
            System.setProperty("java.security.krb5.conf", "F:\\keytab\\krb5.conf");
      }

      Connection conn = null;
      try {
            Class.forName("org.apache.phoenix.queryserver.client.Driver");
            String url="jdbc:phoenix:thin:url=http://qdedhtest3:8765;serialization=PROTOBUF;authentication=SPNEGO;" +
                  "principal=phoenix/qdedhtest3@XXX.COM;keytab=F:\\keytab\\phoenix.keytab";
            conn = DriverManager.getConnection(url);
      } catch (ClassNotFoundException e) {
            log.error(e.getMessage());
            e.printStackTrace();
      } catch (SQLException e1) {
            log.error(e1.getMessage());
            e1.printStackTrace();
      }
      return conn;

    }

    /**
   * 关闭资源
   * @param conn
   * @param statement
   * @param rs
   */
    public void closeRes(Connection conn, Statement statement, ResultSet rs) {
      try {
            if (conn != null) {
                conn.close();
            }
            if (statement != null)
                statement.close();
            if (rs != null)
                rs.close();
      } catch (Exception e) {
            e.printStackTrace();
      }
    }


    /**
   * 通过phoenix 创建表、插入数据、索引、查询数据
   * 写入数据conn.setAutoCommit(false);
   * 删除数据conn.setAutoCommit(true);
   * phoenix demo
   */
    public void phoenixDemo() {
      Connection conn = getConn();
      ResultSet rs = null;
      Statement stmt = null;
      try {
            stmt = conn.createStatement();

            stmt.execute("drop table if exists dl_yh.test_java");
            stmt.execute("create table dl_yh.test_java (mykey integer not null primary key, mycolumn varchar)");
            stmt.execute("create index test_idx on dl_yh.test_java(mycolumn)");
            for (int i = 0; i < 100; i++) {
               stmt.executeUpdate("upsert into dl_yh.test_java values ("+ i +",'The num is "+ i +"')");
            }
            conn.commit();
            PreparedStatement statement = conn.prepareStatement("select mykey from dl_yh.test_java where mycolumn='The num is 88'");
            rs = statement.executeQuery();
            while (rs.next()) {
                System.out.println("-------------The num is ---------------" + rs.getInt(1));
            }
      } catch (SQLException e) {
            log.error(e.getMessage());
            e.printStackTrace();
      } finally {
            closeRes(conn,stmt,rs);
      }
    }

    public static void main(String[] args) {
      phoenixUtil phoenixUtil = new phoenixUtil();
      phoenixUtil.phoenixDemo();
    }


}
使用步骤

1、建表

(1)注意namespace必须为大写,表名大小写都可,但是生成的hbase底表为大写,如下hbase表名为DL_YH:TT_UHOMEUSER_MAC_BINDING(hbase中命名空间后面是冒号)

create table DL_YH.tt_uhomeuser_mac_binding(
"f1"."user_num" VARCHAR,
rela_grp_user_id VARCHAR NOT NULL PRIMARY KEY,
"f1"."mac" VARCHAR,
"f1"."trg_etl_date" VARCHAR,
"f1"."dl_etl_date" VARCHAR,
"f1"."family_id" VARCHAR,
"f1"."family_name" VARCHAR)COMPRESSION='snappy'; --指定压缩方式为snappy
(2) 映射已存在的表必须加column_encoded_bytes=0:关闭列映射规则,会影响查询性能。建议创建视图。删除表会删掉hbase底表,视图没事。对于数据量大的表要加盐SALT。

create table DL_YH.TESTDZ(
id VARCHAR NOT NULL PRIMARY KEY,
"f1"."c1" VARCHAR
)COMPRESSION='snappy',column_encoded_bytes=0;

create view DL_YH.TESTDZ2(
id VARCHAR NOT NULL PRIMARY KEY,
"f1"."c1" VARCHAR
)COMPRESSION='snappy';

CREATE TABLE () SALT_BUCKETS = N
N 与 regionServer数量相同或为rs数量的倍数。

2、导入数据

用sql语句导入数据有两种方式:

1、upsert values



Example:


UPSERT INTO TEST VALUES('foo','bar',3);
UPSERT INTO TEST(NAME,ID) VALUES('foo',123);
UPSERT INTO TEST(ID, COUNTER) VALUES(123, 0) ON DUPLICATE KEY UPDATE COUNTER = COUNTER + 1;
UPSERT INTO TEST(ID, MY_COL) VALUES(123, 0) ON DUPLICATE KEY IGNORE;


2、upsert select



Example:

UPSERT INTO test.targetTable(col1, col2) SELECT col3, col4 FROM test.sourceTable WHERE col5 < 100
UPSERT INTO foo SELECT * FROM bar;


注意事项:

1、Updating data witch UPSERT VALUES-- 注意设置commitSize()
2、使用UPSERT VALUES 写大量记录,关闭自动提交和合理的小批量记录。driver.executeBatch()不是很有优势的,多次 UPDATE VALUES ,再commit()。executeBatch能够最小化 RPC调用在client和服务之间。
4、Deleting data
当删除数据前,开启autocommit,因为client不需要记住需要删除的row的key.

使用bulkload的方式导入数据

kinit –kt admin.keytab admin

HADOOP_CLASSPATH=opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol-2.1.0-cdh6.3.2.jar:/opt/cloudera/parcels/CDH/lib/hbase/conf hadoop jar /opt/cloudera/parcels/PHOENIX/lib/phoenix/phoenix-5.0.0-cdh6.2.0-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool \
-s DL_YH \
-t TT_BK_OES_MODEL_INFO \
-i /tmp/model_info \
-z qdedhtest0,qdedhtest1,qdedhtest2:2181 \
-d '\001'

注意:

1、先通过Phoenix建一张空表,然后通过Phoenix自带的CsvBulkLoadTool导入时,字段中不要包含中文,否则会出现字段分隔问题。
2、通过Phoenix建一张空表,然后直接bulkload到hbase的底表,无法查询
3、先通过hbase建一张空表,然后bulkload初始化数据,然后在Phoenix建一张映射表,若数据量特别大,则映射表会失败,查询超时。可以建视图。
4、视图可以建索引,也可查询,但是不可以更新数据,视图是只读。

3、创建索引

1、全局索引

对于读较多的表,使用全局索引,全局索引会维护一张单独的hbase表用来存储索引字段,全局索引表的RowKey存储了索引列的值和原表RowKey的值,这样编码更有利于提高查询的性能。

实际上全局索引的RowKey将会按照如下格式进行编码。




[*]SALT BYTE: 全局索引表和普通phoenix表一样,可以在创建索引时指定SALT_BUCKETS或者split key。此byte正是存储着salt。
[*]TENANT_ID: 当前数据对应的多租户ID。
[*]NDEX VALUE: 索引数据。
[*]PK VALUE: 原表的RowKey。

CREATE INDEX IDX_COL1 ON test(COL1);


2、本地索引

因为本地索引和原数据是存储在同一个表中的,所以更适合写多的场景。对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

创建本地索引:

create local index LOCAL_IDX_COL1 ON TEST(COL1);

通过HBASE SHELL观察表'TEST', 我们可以看到表中多了一行column为L#0:_0的索引数据。

hbase(main):001:0> scan 'TEST'
ROW                        COLUMN+CELL
\x00\x002\x001            column=L#0:_0, timestamp=1520935997600, value=_0
1                         column=0:COL1, timestamp=1520935997600, value=2
1                         column=0:COL2, timestamp=1520935997600, value=3
1                         column=0:_0, timestamp=1520935997600, value=x
2 row(s) in 0.1680 seconds
本地索引的RowKey将会按照如下格式进行编码:



•      REGION START KEY : 当前row所在region的start key。加上这个start key的好处是,可以让索引数据和原数据尽量在同一个region, 减小IO,提升性能。
•      INDEX ID : 每个ID对应不同的索引表。
•      TENANT ID :当前数据对应的多租户ID。
•      INDEX VALUE: 索引数据。
•      PK VALUE: 原表的RowKey。


3、覆盖索引

覆盖索引的特点是把原数据存储在索引数据表中,这样在查询到索引数据时就不需要再次返回到原表查询,可以直接拿到查询结果。

创建覆盖索引:

createindex IDX_COL1_COVER_COL2 on TEST(COL1) include(COL2);
通过HBASE SHELL 查询表IDX_COL1_COVER_COL2, 我们发现include的列的值被写入到了value中。

hbase(main):003:0> scan 'IDX_COL1_COVER_COL2'
ROW                   COLUMN+CELL
2\x001               column=0:0:COL2, timestamp=1520943893821, value=3
2\x001               column=0:_0, timestamp=1520943893821, value=x
1 row(s) in 0.0180 seconds
对于类似select col2 from TEST where COL1='2'的查询,查询一次索引表就能获得结果。

4. 函数索引

函数索引的特点是能根据表达式创建索引,适用于对查询表,过滤条件是表达式的表创建索引。例如:

//创建函数索引
CREATE INDEX CONCATE_IDX ON TEST (UPPER(COL1||COL2))
4、索引数据

同步索引:

在执行CREATE INDEX IDX_COL1 ON test(COL1)时会进行索引数据的同步。此方法适用于数据量较小的情况。

若数据量较大出现了超时的情况,可在客户端配置文件hbase-site.xml中,把超时参数设置大一些,足够build索引数据的时间。

<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>
异步索引:

CREATE index of not exists event_object_id_idx_b on trans.event(object_id) ASYNC UPDATE_CACHE_FREQUENCY=6000;
创建索引数据:

HADOOP_CLASSPATH=opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol-2.1.0-cdh6.3.2.jar:/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/PHOENIX/lib/phoenix/phoenix-5.0.0-cdh6.2.0-client.jar hbase org.apache.phoenix.mapreduce.index.IndexTool \
--schema DL_YH \
--data-table TT_BK_OES_MODEL_INFO \
--index-table ASYNC_IDX_MODES_CODE2 \
--output-path /tmp/ASYNC_IDX_MODES_CODE

5、查询



Example:

SELECT * FROM TEST LIMIT 1000;
SELECT * FROM TEST LIMIT 1000 OFFSET 100;
SELECT full_name FROM SALES_PERSON WHERE ranking >= 5.0
    UNION ALL SELECT reviewer_name FROM CUSTOMER_REVIEW WHERE score >= 8.0
注意事项:

1.在频繁的查找中,避免 joins,除非一个join表是很小的。
2.where 字句中,过滤头字段采用主键
3.where 字句中,过渡头字段采用 in or可能使where字句不开启优化
4. = < > 在where字句中,会开启扫描优化
5.查询返回字段中包括非索引字段时,不会走二级索引表。
6.Hints:
在查询时,强制使用全局index,当查询的column不包含索引时
SELECT /*+ INDEX(dl_yh.TD_HCC_HSICRM_WOS_STIMULATERECORDSBASE,IDX_VIEW_HCC) */ "f1"."hsicrm_stimulatesubclassname" from dl_yh.TD_HCC_HSICRM_WOS_STIMULATERECORDSBASE where "f1"."hsicrm_workorderid"='BJ19013006118716';如下图测试:22176445条数据,90多个字段,如果直接select * from,这种方式不会走索引,而是触发全表扫描,直接查询超时。
如果是查建索引时include的字段,那么速度很快,0.25秒





6、修改



Example:

ALTER TABLE my_schema.my_table ADD d.dept_id char(10) VERSIONS=10
ALTER TABLE my_table ADD dept_name char(50), parent_id char(15) null primary key
ALTER TABLE my_table DROP COLUMN d.dept_id, parent_id;
ALTER VIEW my_view DROP COLUMN new_col;
ALTER TABLE my_table SET IMMUTABLE_ROWS=true,DISABLE_WAL=true;



Alters the state of an existing index.DISABLE will cause the no further index maintenance to be performed on the index and it will no longer be considered for use in queries. REBUILD will completely rebuild the index and upon completion will enable the index to be used in queries again. UNUSABLE will cause the index to no longer be considered for use in queries, however index maintenance will continue to be performed. USABLE will cause the index to again be considered for use in queries. Note that a disabled index must be rebuild and cannot be set as USABLE.

Example:

ALTER INDEX my_idx ON sales.opportunity DISABLE
ALTER INDEX IF EXISTS my_idx ON server_metrics REBUILD

7、删除



Drops a table. The optional CASCADE keyword causes any views on the table to be dropped as well. When dropping a table, by default the underlying HBase data and index tables are dropped. The phoenix.schema.dropMetaData may be used to override this and keep the HBase table for point-in-time queries.

Example:

DROP TABLE my_schema.my_table;
DROP TABLE IF EXISTS my_table;
DROP TABLE my_schema.my_table CASCADE;


Drops an index from a table. When dropping an index, the data in the index is deleted. Note that since metadata is versioned, snapshot queries connecting at an earlier time stamp may still use the index, as the HBase table backing the index is not deleted.

Example:

DROP INDEX my_idx ON sales.opportunity
DROP INDEX IF EXISTS my_idx ON server_metrics
DROP INDEX "ASYNC_IDX_MODES_CODE" ON DL_YH.TT_BK_OES_MODEL_INFO;
性能调优

待后续使用起来根据实际情况进行调优。




最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg

阿飞 发表于 2020-9-6 09:07:48

为梦狂野 发表于 2020-9-5 13:43
被这个玩意坑过一回,不是说不好,是由于应用场景上的失误。去年使用这个做日志数据亿数据量多维分析,简直 ...

hbase就rowkey比较快的,其他就不行了。可以跟其他组件整合

为梦狂野 发表于 2020-9-5 13:43:54

被这个玩意坑过一回,不是说不好,是由于应用场景上的失误。去年使用这个做日志数据亿数据量多维分析,简直就是慢的不行,还挂服务。不过不得不公平的说句,只有对命中索引和rowkey这类查询才快。其他都不行,高版本后建立索引最多10个,所以多维是不行,特定查询还行。特别是对于要查询hbase上的特定数据时简单统计稍微方便点。一旦复杂多样就不行。 说多了都是泪水。。。

为梦狂野 发表于 2020-9-7 13:45:56

阿飞 发表于 2020-9-6 09:07
hbase就rowkey比较快的,其他就不行了。可以跟其他组件整合

嗯,踩坑多了,才知道得乖乖利用组件特性来搞。
页: [1]
查看完整版本: HBASE中间件之Phoenix实战