分享

HBase编程入门实例

virgo 2014-8-19 22:22:04 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 43819
问题导读
1.Table中Family和Qualifier的关系与区别是什么?
2.Table中Family和Qualifier的如何设置?
3.HBaseConfiguration的作用是什么?





相关内容零基础学习hadoop生态系统到上手工作线路指导(HBASE篇)



1、Table中Family和Qualifier的关系与区别
就像用MySQL一样,我们要做的是表设计,MySQL中的表,行,列的在HBase已经有所区别了,在HBase中主要是Table和Family和Qualifier,这三个概念。Table可以直接理解为表,而Family和Qualifier其实都可以理解为列,一个Family下面可以有多个Qualifier,所以可以简单的理解为,HBase中的列是二级列,也就是说Family是第一级列,Qualifier是第二级列。两个是父子关系。
2、谈谈Table中Family和Qualifier的设置
对于传统关系型数据库中的一张table,在业务转换到hbase上建模时,从性能的角度应该如何设置family和qualifier呢?
最极端的,可以每一列都设置成一个family,也可以只有一个family,但所有列都是其中的一个qualifier,那么有什么区别呢?
family越多,那么获取每一个cell数据的优势越明显,因为io和网络都减少了,而如果只有一个family,那么每一次读都会读取当前rowkey的所有数据,网络和io上会有一些损失。
当然如果要获取的是固定的几列数据,那么把这几列写到一个family中比分别设置family要更好,因为只需一次请求就能拿回所有数据。
以上是从读的方面来考虑的,那么写呢?可以参考一下这篇文章:
http://hbase.apache.org/book/number.of.cfs.html

首先,不同的family是在同一个region下面。而每一个family都会分配一个memstore,所以更多的family会消耗更多的内存。
其次,目前版本的hbase,在flush和compaction都是以region为单位的,也就是说当一个family达到flush条件时,该region的所有family所属的memstore都会flush一次,即使memstore中只有很少的数据也会触发flush而生成小文件。这样就增加了compaction发生的机率,而compaction也是以region为单位的,这样就很容易发生compaction风暴从而降低系统的整体吞吐量。
第三,由于hfile是以family为单位的,因此对于多个family来说,数据被分散到了更多的hfile中,减小了split发生的机率。这是把双刃剑。更少的split会导致该region的体积比较大,由于balance是以region的数目而不是大小为单位来进行的,因此可能会导致balance失效。而从好的方面来说,更少的split会让系统提供更加稳定的在线服务。
上述第三点的好处对于在线应用来说是明显的,而坏处我们可以通过在请求的低谷时间进行人工的split和balance来避免掉。
因此对于写比较多的系统,如果是离线应该,我们尽量只用一个family好了,但如果是在线应用,那还是应该根据应用的情况合理地分配family。hbase实例代码:
  1. /**
  2. * Hbase 基本CRUD 样例代码   覆盖Put Get Delete checkAndPut checkAndDelete  Scan
  3. * 通过上面的各种操作的例子, 会基本覆盖Htable可以用的的所有方法
  4. * 这里不涉及Hbase 管理代码的操作
  5. * @author Administrator
  6. *
  7. */
  8. public class HbaseCRUDTest_New {
  9.     private static org.apache.hadoop.conf.Configuration conf = null;
  10.     private static HTablePool pool = null;
  11.     private static HBaseAdmin admin = null;
  12.     private static final int MAX_TABLE_COUNT = 10;
  13.     @BeforeClass
  14.     public static void before()throws Exception{
  15.         /**服务器端缓存客户端的连接 是以conf为单位的(可能不准确:通常一个客户端
  16.          * 连接过来, 服务器端会有一个线程与之对应, 缓存的是这个服务器端的线程),
  17.          * 所以最好不要到处创建conf实例, 一个就够了, 所有共用conf创建的到Hbase
  18.          * 的连接和操作, 会共用一个连接  这样可以提高性能, 也会减小服务器端的压力
  19.          * 实际上创建Htable pool admin都是通过HConnection接口的实现类(
  20.          * HConnectionImplementation)来完成的, 多个HConnection会由
  21.          * HConnectionManager来管理, 而conf是HConnectionImplementation的最
  22.          * 重要的构造参数 , 上面就以conf 来 标识和替代Hconnection 可能会带来歧义
  23.          * 以为conf就是连接本身
  24.          */
  25.         conf = HBaseConfiguration.create();
  26.         //这是一个10台集群的daily 日常性能测试环境
  27.         conf.set("hbase.zookeeper.quorum", "10.232.31.209,10.232.31.210,10.232.31.211");
  28.         conf.set("hbase.zookeeper.property.clientPort", "3325");
  29.         //conf.addResource("dataSource.xml");//也可以载入一个标准的Hbase配置文件
  30.         /**HTable是非线程安全的  在多线程环境下使用HTablePool是一个好的解决方案,
  31.          * 参数MAX_TABLE_COUNT 是 pool保持的每个Htable实例的最大数量  ,
  32.          * 比如为10   如果有100个线程getTable() 同一张表   则他们会共用 pool中的该
  33.          * 表的10个实例   有些可能要排队等 用完的要回收放回去
  34.          * 使用的时候 就不要new Htable了, 直接从pool中取
  35.          * 用完再putTable 放回去
  36.          *
  37.          * 在0.92以上的版本  则不用放回去   直接table.close() 即可    putTable 被标记
  38.          * 为 @Deprecated.  0.90.2 版本使用 putTable 下面的代码都没有 做 这些操作
  39.          * 避免 不同版本 出问题
  40.          */
  41.         pool = new HTablePool(conf, MAX_TABLE_COUNT);
  42.         admin = new HBaseAdmin(conf);
  43.     }
  44.     /**
  45.      * 注意一下:Put Get Delete Scan等操作的对象 都提供一个空的构造函数, 一般不要直接使用, 他们存在主要是在rpc传输的反序列化的时候要用到(了解Java RMI的应该很清楚)
  46.      * @throws IOException
  47.      * @throws InterruptedException
  48.      */
  49.     @Test public void putTest() throws IOException, InterruptedException{
  50.         HTable table = (HTable)pool.getTable("user_test_xuyang");
  51.         //批量操作 共两种 底层都是调用  HConnection的processBatch方法(
  52.         // table.batch(List<Put>) 和table.flushCommits()会直接调用)
  53.         //首先  自动flush 关闭    就像 JDBC中的 auto_commit,  否则 加每一条 提交
  54.         // 一次,影响性能     不过table.put(List<Put>) table.batch(List<Row>)不受这
  55.         // 个影响, 设置false,只有当put总大小超过writeBufferSize 才提交  或者手工
  56.         // table.flushCommits() (table.put(List<Put>)操作完成后会手工提交一次),
  57.         // writeBufferSize 也可以调整
  58.         table.setAutoFlush(false);
  59.         //writeBufferSize 默认为2M ,调大可以增加批量处理的吞吐量, 丢失数据的风险也会加大
  60.         table.setWriteBufferSize(1024*1024*5);
  61.         //这样可以看到 当前客户端缓存了多少put
  62.         ArrayList<Put> putx = table.getWriteBuffer();
  63.         // 批量操作方法一,单一操作的批量 比如Htable.put delete get 都提供了List作
  64.         // 为参数的批处理.   默认每10条 或List<Put>数据量 超过writeBufferSize 提交
  65.         // 如果AutoFlush为true 一次性table.put(List<Put>)只提交一次
  66.         List<Put> puts = new ArrayList<Put>(10);
  67.         for (int i = 0,len=10; i < len; i++) {
  68.             Put put = new Put(Bytes.toBytes("row-"+i),new Date().getTime());
  69.             put.add(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes("value"+i));
  70.             // 这里可以自定义添加时间戳, 默认就是当前时间(RegionServer服务器端的
  71.             // 时间) 也可以自己定义, 多版本时候(默认3)比如想插入一条比现在最新的记
  72.             // 录老的, 一些特殊情况下可能会有这种需求
  73.             put.add(Bytes.toBytes("data"), Bytes.toBytes("email"), System.currentTimeMillis(),
  74.                     Bytes.toBytes("value"+i+"@sina.com"));
  75.             //也可以直接加入一个KeyValue,实际上底层就是存储为KeyValue的, 如果对
  76.             // 底层较熟悉, 这种操行更加高效, 一般上面的就可以完成日常工作了
  77.             put.add(new KeyValue(Bytes.toBytes("row-"+i), Bytes.toBytes("data"),
  78.                     Bytes.toBytes("age"),Bytes.toBytes(20+i)));
  79.             puts.add(put);
  80.             //写操作日志  这个对性能影响比较大,  但有很重要, 如果设为true, 只要写
  81.             // 成功, 就算 机器挂掉 也不会丢失,
  82.             put.setWriteToWAL(false);
  83.             /**
  84.              * Put还有一些额外的东西
  85.              */
  86.             //put.has(family, qualifier,ts,value)
  87.             //put 当前在内存中的大小  这个在setWriteBufferSize 可能会用到
  88.             /**实际上底层是 这么干的(当然还有其他比如put数量对table.flushCommits()的触发)
  89.              * for(Put put:puts){
  90.              *  total+=put.heapSize();
  91.              *  if(total>=table.getWriteBufferSize())
  92.              *      table.flushCommits();
  93.              * }
  94.              */
  95.             put.heapSize();
  96.             //put 中 每次调用add 底层都会添加一个KeyValue,这个是添加的KeyValue数量
  97.             put.size();
  98.             //判断put中是否已经存在了 给定的family qualifier ts value
  99. //          put.has(family, qualifier)
  100. //          put.has(family, qualifier, value)
  101. //          put.has(family, qualifier, ts)
  102. //          put.has(family, qualifier, ts, value)
  103.             //下面的方法 从字面上基本上就可以知道
  104.             put.isEmpty();
  105.             put.getRow();
  106.             put.getRowLock();
  107.             put.getLockId();
  108.             put.numFamilies();
  109.         }
  110.         table.put(puts);
  111.         table.flushCommits();
  112.         System.out.println(table.get(new Get(Bytes.toBytes("row-1"))));
  113.         admin.flush("user_test_xuyang");
  114.         System.out.println(table.get(new Get(Bytes.toBytes("row-1"))));
  115.         //批量操作方法一, 使用batch,可以混合各种操作 ( Put Delete Get 都是接口Row的实现)
  116.         //主要 这个如果处理Put操作 是不会使用客户端缓存的   会直接异步的发送到服务器端
  117.         List<Row> rows = new ArrayList<Row>(10);
  118.         for (int i = 10,len=20; i < len; i++) {
  119.             Put put = new Put(Bytes.toBytes(("row-"+i)));
  120.             put.add(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(("value"+i)));
  121.             put.add(Bytes.toBytes("data"), Bytes.toBytes("email"), Bytes.toBytes(("value"+i+"@sina.com")));
  122.             rows.add(put);
  123.         }
  124.         //可以添加 删除操作   但是 最好不要把对同一行的Put Delete用batch操作 ,
  125.         // 因为 为了更好的性能  发到服务器端操作的顺序  是会改变的   很有可能不是放入的顺序
  126.         rows.add(new Delete(Bytes.toBytes("row-9")));
  127.         table.batch(rows);
  128.             /**  一些需要注意的地方:
  129.              * 1. 提交到服务器  处理如果出现问题  会从服务器端返回RetriesExhaustedWithDetailsException
  130.              * 包含出错的原因 和重试的次数
  131.              * 如果 服务器端还是操作失败 , 这些put还会缓存在客户端  等到下次buffer 被flush,
  132.              * 注意  如果客户端挂掉了   这些数据是会丢失的
  133.              * 当然如果是NoSuchColumnFamilyException只会重试一次 并且不会恢复
  134.              * 下面的情况要注意了
  135.              * table.put(puts); 是会抛出异常的,而且不会再提交  这样数据会丢失的
  136.              * 捕获这个异常手工table.flushCommits() 可以确保已经写入缓存的还可以有可能写入成功
  137.              * try {
  138.                     table.put(puts);
  139.                 } catch (Exception e) {
  140.                     table.flushCommits();
  141.                 }
  142.              * table.flushCommits(); 也会有异常   也要捕获
  143.              *
  144.              * 2. 还时候 启用缓存   正常操作发生异常时候并不会被正常报出来, 有时候
  145.              * 会等到buffer被flush后才报出来  这也是要注意的地方
  146.              *
  147.              * 3.在缓存中的puts 被发送到服务器端的顺序和服务器处理的顺序 是控制不
  148.              * 到的, 如果想指定顺序 , 只能使用较小的批处理  强制他们按照批处理的顺序执行
  149.              */
  150.         /**
  151.          * 完备的一条记录就是一个KeyValue 一个rowkey可能有多个KeyValue(比如
  152.          * 多个版本, 一个版本是一条)
  153.          * rowkey ColumnFamily  Column  TimeStamp Type Value
  154.          * 其中的Type就是区别Put和Delete等操作的类别, 实际上Delete也是添加一条记录
  155.          * (Hbase存储的HDFS文件是只读的, 更新用 添加+删除 组合完成, 删除实际上
  156.          * 也是添加一条删除,实际操作都是添加,在Hbase Compact时候 合并数据时候会剔除标记为删除的rowkey)
  157.          * 这种 增 改 删的一致性操作  在客户端给我们的操作带来了便利
  158.          *
  159.          * 实际上ColumnFamily Column的名字是会以byte的形式存储在数据中的,
  160.          * 因此, 它们在设计的时候名字应该尽可能的短 这样可以节省不少的空间
  161.          */
  162.     }
  163.     /**
  164.      * Delete与Put一致 把全部的Put改成Delete  table.put -->table.delete 就可以了,
  165.      * 不过有些需要注意, 看下面
  166.      */
  167.     @Test public void deleteTest(){
  168.         HTable table = (HTable)pool.getTable("user_test_xuyang");
  169.         try {
  170.             //如果上面介绍的KeyValue 有点印象, 通过delete提供的构造函数可以知道
  171.             //不指定会删除所有的版本
  172.             Delete delete = new Delete(Bytes.toBytes("row-1"));
  173.             table.delete(delete);
  174.         } catch (Exception e) {
  175.             e.printStackTrace();
  176.         }
  177.     }
  178.     /**
  179.      * 一些原子性操作   对于java并发工具包有所了解的 应该会知道 轻量级锁的核心就是CAS机制(Compare and swap),
  180.      * 这里在概念上有些类似, 也可以类似于  SQL中  select 出来然后   insert or update的 操作  Hbase这里可以保证他们在一个原子操作
  181.      * 这个在高并发 场景下  更新值  是个好的选择
  182.      * table.checkAndPut(row, family, qualifier, value, put)
  183.      * table.checkAndDelete(row, family, qualifier, value, delete)
  184.      * @throws IOException
  185.      */
  186.     @Test public void atomicOP() throws IOException{
  187.         byte[] row = Bytes.toBytes("row-12");
  188.         byte[] family = Bytes.toBytes("data");
  189.         HTable table = (HTable)pool.getTable("user_test_xuyang");
  190.         //操作成功会返回 true,否则false;  如果是个不存在的qualifier, 把value置为null  check是会成功的
  191.         Put put = new Put(row);
  192.         put.add(family, Bytes.toBytes("namex"), Bytes.toBytes("value12"));
  193.         //check 和put是同一个row
  194.         boolean result1 = table.checkAndPut(row, family, Bytes.toBytes("namex"), null, put);  //true
  195.         boolean result2 = table.checkAndPut(row, family, Bytes.toBytes("namex"), null, put);   //false
  196.         Put put2 = new Put(row);
  197.         put2.add(family, Bytes.toBytes("namex"), Bytes.toBytes("value12"));
  198.         boolean result3 = table.checkAndPut(row, family, Bytes.toBytes("namex"),
  199.                 Bytes.toBytes("value12"), put2);  //true
  200.         Put put3 = new Put(Bytes.toBytes("row-13"));
  201.         put3.add(family, Bytes.toBytes("namex"), Bytes.toBytes("value13"));
  202.         boolean result4 = table.checkAndPut(row, family, Bytes.toBytes("namex2"),
  203.                 Bytes.toBytes("value12"), put3);  //org.apache.hadoop.hbase.DoNotRetryIOException
  204.         //注意:check 和put的一定要是同一行 否则会报错
  205. //      table.checkAndDelete类似
  206.     }
  207.     /**
  208.      * 上面的一些操作有些方法可能涉及到Row Locks 但并没有说明   这里详细介绍下
  209.      *
  210.      * 一些会使数据发生变化的操作  比如like put(), delete(), checkAndPut()等等 , 操作都是以一个row为单位的,
  211.      * 使用row lock 可以保证  一次性只能有一个客户端修改一个row
  212.      * 虽然 实践中  客户端应用程序 并没有明确的使用lock, 但服务端会在适当的时机保护每一个独立的操作
  213.      *
  214.      * 如果可能应当尽量避免使用lock, 就像RSBMS一样会有死锁问题
  215.      * @throws IOException
  216.      */
  217.     @Test public void rowLocksTest() throws IOException{
  218.         HTable table = (HTable)pool.getTable("user_test_xuyang");
  219.         byte[] row = Bytes.toBytes("row-8");
  220.         RowLock lock = table.lockRow(row);
  221.         //.....相关操作
  222.         table.unlockRow(lock);
  223.         //锁有效时间 默认时间是1分钟
  224.     }
  225.     @SuppressWarnings("deprecation")
  226.     @Test public void getTest(){
  227.         HTable table = (HTable)pool.getTable("user_test_xuyang");
  228.         Get get = new Get(Bytes.toBytes("row-10"));
  229.         //默认 get 只会取得最新的记录, 使用下面的方法可以获取其他的版本
  230.         //有两个方法 一个带参数的可以指定版本数量, 可能会抛出异常;另外一个没有
  231.         // 参数, 默认Integer.MAX_VALUE, 不会抛出异常
  232.         get.setMaxVersions();
  233.     //  get.setFilter(filter); get 一般数据比较少比较少使用filter, 在Scan的时候会详细介绍Filter
  234.         //通过get.addColumn提供了各种重载方法, 可以过滤只获取哪些ColumnFamily
  235.         // 和Column,get实现这种过滤只能使用这种方法, 接下来的Scan还可以使用Filter来实现
  236.         get.addColumn(Bytes.toBytes("data"),Bytes.toBytes("email"));
  237.         try {
  238.             Result result = table.get(get);
  239.             //这是一个简单的 获取返回结果的方法, 还有其他的通过遍历Map的方式
  240.             List<KeyValue> values = result.list();
  241.             //由于KeyValue靠近底层, 对于一些一些Offset,Length结尾的方法 可以忽略,
  242.             // 比较感兴趣的可以关注下Hbase的底层存储
  243.             for (KeyValue keyValue : values) {
  244.                 StringBuilder sb = new StringBuilder(Bytes.toString(keyValue.getFamily()));
  245.                 sb.append(":").append(Bytes.toString(keyValue.getQualifier())).append("--:");
  246.                 sb.append(Bytes.toString(keyValue.getValue())).append("  ").append(
  247.                         new Date(keyValue.getTimestamp()).toLocaleString());
  248.                 System.out.println(sb.toString());
  249.             }
  250.             //这是另外一种获取返回结果的方式, 这种在Scan的返回多个Result的时候
  251.             // 相对实用, 一个rowkey的都在一起, 一个ColumnFamily的也聚合在一起
  252.             NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> nMap = result.getMap();
  253.             for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry:nMap.entrySet() ) {
  254.                 //entry.getKey()为family key
  255.                 String family = Bytes.toString(entry.getKey());
  256.                 System.out.print(family+":");
  257.                 for (Map.Entry<byte[], NavigableMap<Long, byte[]>> entry2 : entry.getValue().entrySet() ) {
  258.                     // entry2.getKey()为qualifier  当然qualifier有可能为空  这个不是问题  但为null的只能有一个
  259.                     String qualifier = Bytes.toString(entry2.getKey());
  260.                     System.out.print(qualifier+"--:");
  261.                     for (Map.Entry<Long, byte[]> entry3:entry2.getValue().entrySet() ) {
  262.                         //entry3.getKey()为 timestamp  entry3.getValue()为 value
  263.                         System.out.print(Bytes.toString(entry3.getValue())+" "
  264.                                 +new Date(entry3.getKey()).toLocaleString());
  265.                     }
  266.                 }
  267.             }
  268.             System.out.println("------------------");
  269.             //Get的批处理类似于 SQL中的in操作,但操作起来也相当的简单, 和上面
  270.             // 的Put Delete非常类似,也可以混合使用
  271.             List<Row> rows = new ArrayList<Row>();
  272.             rows.add(new Get(Bytes.toBytes("row-10")));
  273.             rows.add(new Get(Bytes.toBytes("row-11")));
  274.             rows.add(new Put(Bytes.toBytes("row-1222221")));
  275.             try {
  276.                 Object[] objs = table.batch(rows);
  277.                 for (Object obj : objs) {
  278.                     printKeyValue((Result)obj);
  279.                 }
  280.             } catch (InterruptedException e) {
  281.                 e.printStackTrace();
  282.             }
  283.         } catch (IOException e) {
  284.             e.printStackTrace();
  285.         }
  286.         /**
  287.          * Get获取单个 或随机的几个row 使用起来非常方便, 对于访问多个连续的row
  288.          * 使用下面将要介绍的Scan操作,通常情况下, 完成一个业务 需要多个操作, 而
  289.          * ORM无法将一个业务所有的操作SQL封装在一起, 除非直接使用JDBC
  290.          * Hbase的这种 Put Get Delete 是不是很棒 , 对了没有Update, update的就
  291.          * 是新增一条, 由于有版本, 旧的不会被立即淘汰掉
  292.          */
  293.     }
  294.     public void printKeyValue(Result result){
  295.         List<KeyValue> values = result.list();
  296.         //由于KeyValue靠近底层, 对于一些一些Offset,Length结尾的方法 可以忽略,
  297.         // 比较感兴趣的可以关注下Hbase的底层存储
  298.         for (KeyValue keyValue : values) {
  299.             StringBuilder sb = new StringBuilder(Bytes.toString(keyValue.getFamily()));
  300.             sb.append(":").append(Bytes.toString(keyValue.getQualifier())).append("--:");
  301.             sb.append(Bytes.toString(keyValue.getValue())).append("  ").append(
  302.                     new Date(keyValue.getTimestamp()).toLocaleString());
  303.             System.out.println(sb.toString());
  304.         }
  305.     }
  306.     /**
  307.      * 对于连续记录的顺序访问  就是类似于 最常见的Select操作
  308.      * 实际上Scan 非常类似于Hibernate 的DetachedCriteria, 而scan 使用的Filter就相当于Criteria的Expression或Restrictions
  309.      * 可以实现离线封装查询条件   这个是相当的给力啊
  310.      */
  311.     @Test public void ScanTest(){
  312.         ResultScanner resultScanner = null;
  313.         try {
  314.             /**
  315.              * 如Scan的名字, Scan是在一定的范围内startkey(StartRow)和endkey(StopRow)
  316.              * 之间 顺序的扫描, 配合Filter 可以跳过不满足条件的记录  返回需要的结果
  317.              * 当然startkey和endkey只是标识一个范围, 它们对应的rowkey可能并不存在,
  318.              * 但如果存在(startkey) 扫描的范围是[startkey,endkey),否则就是(startkey,endkey)
  319.              * 可以看到 Scan 有一个包裹Get的构造, 可以利用该get的rowkey作为startkey
  320.              */
  321.             Scan scan  = new Scan();
  322.             /**
  323.              * ResultScanner 就是table scanner返回的结果集, 类似于游标 可以迭代获取结果,
  324.              * batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,
  325.              * 太大 会对客户端造成比较大的压力,  具体根据需要使用 , 正常使用可以不必管
  326.              * 它, 大批量读取可以考虑用它改善性能
  327.              * 这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,
  328.              * setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier
  329.              * (默认一个row的所有qualifier会在一个Result中)
  330.              *
  331.              * ColumnPaginationFilter 对于一个Row会在一个Result 但是只返回前面一部分
  332.              *
  333.              * 如果使用FirstKeyOnlyFilter等 不是扫描Row全部的Filter 会有冲突 会有异常抛出 */
  334.             scan.setBatch(10);
  335.             /**发给scanners的缓存的Row的数量, 如果没有设置会使用 HTable#getScannerCaching()的值
  336.              * 一般 越大 Scan速度越快, 但消耗的内存也越大*/
  337.             scan.setCaching(10);
  338.             //简而言之就是  batch 是qualifier column级别的   caching是row级别的
  339.             //RegionServer是否应当缓存 当前客户端访问过的数据块    如果是随机的get 这个最好为false
  340.             scan.setCacheBlocks(true);
  341.             /** Scan 最复杂, 也最有用的就是Filter, 特别是FilterList对Filter进行的组合
  342.              * 这里只先介绍Scan的其他参数 ;对于Filter,后面会单独介绍*/
  343.             //startrow和stoprow 可以改变
  344.             scan.setStartRow(Bytes.toBytes("row-12"));
  345.             scan.setStopRow(Bytes.toBytes("row-1110"));
  346.             scan.setMaxVersions(3);//同Get
  347.             /** 可以指定一个时间范围, 扫描指定时间或时间范围的的记录,  */
  348.             scan.setTimeRange(System.currentTimeMillis()-1000000, System.currentTimeMillis());
  349.             /**
  350.              * 也可以指定timestamp 查询
  351.              */
  352.             scan.setTimeStamp(System.currentTimeMillis());
  353.             /**可以使用Get中类似的方法 来限制获取的ColumnFamily Column*/
  354.             scan.addColumn(Bytes.toBytes(""),Bytes.toBytes(""));
  355.             //Scan中使用最多的还是Filter
  356.             HTable table = (HTable)pool.getTable("user_test_xuyang");
  357.             //看下面
  358.             table.setScannerCaching(1000);
  359.             resultScanner = table.getScanner(scan);
  360.             //这是foreach格式    是调用resultScanner.next()的
  361.             //默认情况下  每次调用next() 都要RPC一下服务器   每个row一次, 即时resultScanner(int nbRows)
  362.             //table.setScannerCaching() 默认是1  可以手工设置  设置后 该table实例的所有scan都有效
  363.             //也可以每个scan单设置定就是上面有说过的scan.setCaching(1024*10); 这个会覆盖table设置的值
  364.             for (Result result : resultScanner) {
  365.                 //这里就不多说了   和Get中一样的解析
  366.             }
  367.         } catch (IOException e) {
  368.             e.printStackTrace();
  369.         }finally{
  370.             //这样一定要记住 用完close
  371.             if(resultScanner!=null)resultScanner.close();
  372.         }
  373.     }
  374.     /**
  375.      * 高级的Scan,就是Filter中的FilterList  可以组合各个Filter
  376.      * select cf1.column1,cf2.column2* from table_name where rowkey>10 or value like 'xxx%' limit 10
  377.      * 如果上面的SQL解析出来 and 表示MUST_PASS_ALL, or 表示MUST_PASS_ONE
  378.      * 就是下面这个样(虽然理解可能不同,但下图的代码如下:)
  379.      *  ( (cf1 and column1) or (cf2 and column2*)  ) and (rowkey>10 or value like 'xxx%')
  380.      * setFilter(
  381.      *                                                        -ColumnFamilyFilter  cf1
  382.      *                                     -filterList(ALL)--|
  383.      *                                    |                   -ColumnFilter   column1
  384.      *                  -filterList(ONE)->|
  385.      *                 |                  |                   -ColumnFamilyFilter cf2
  386.      *                 |                   -filterList(ALL)--|
  387.      *                 |                                      -ColumnPrefixFilter column2
  388.      *filterList(ALL)->|                -RowFilter 10
  389.      *                 |-filterList(ONE)->|
  390.      *                 |                   -ValueFilter xx
  391.      *                  -PageFilter 10
  392.      * @author Administrator
  393.      *
  394.      */
  395.     @Test public void scanAdvance(){
  396.         Scan scan  = new Scan();
  397.         List<Filter> rootList = new ArrayList<Filter>();
  398.             List<Filter> selectList = new ArrayList<Filter>();
  399.                 List<Filter> select_1 = new ArrayList<Filter>();
  400.                     select_1.add(new FamilyFilter(CompareFilter.CompareOp.EQUAL,
  401.                             new BinaryComparator(Bytes.toBytes("cf1"))));
  402.                     select_1.add(new QualifierFilter(CompareFilter.CompareOp.EQUAL,
  403.                             new BinaryComparator(Bytes.toBytes("column1"))));
  404.                 List<Filter> select_2 = new ArrayList<Filter>();
  405.                     select_2.add(new FamilyFilter(CompareFilter.CompareOp.EQUAL,
  406.                             new BinaryComparator(Bytes.toBytes("cf2"))));
  407.                     select_2.add(new QualifierFilter(CompareFilter.CompareOp.EQUAL,
  408.                             new BinaryPrefixComparator(Bytes.toBytes("column"))));
  409.             selectList.add(new FilterList(Operator.MUST_PASS_ALL, select_1));
  410.             selectList.add(new FilterList(Operator.MUST_PASS_ALL, select_2));
  411.         rootList.add(new FilterList(Operator.MUST_PASS_ONE,selectList));
  412.             List<Filter> whereList = new ArrayList<Filter>();
  413.                 whereList.add(new RowFilter(CompareFilter.CompareOp.GREATER,
  414.                         new BinaryComparator(Bytes.toBytes(10))));
  415.                 whereList.add(new RowFilter(CompareFilter.CompareOp.EQUAL,
  416.                         new BinaryPrefixComparator(Bytes.toBytes("xxx"))));
  417.         rootList.add(new FilterList(Operator.MUST_PASS_ONE,whereList));
  418.         scan.setFilter(new FilterList(Operator.MUST_PASS_ALL, rootList));
  419.         //这样的嵌套 写起来着实很烦, 可以自己封装成程序
  420.     }
  421.     /**
  422.      * 一个不得不说的操作  分页操作, RDBMS 比如mysql :select * from table_name where sss=sss limit 1 10;oracle 利用rownum也可以迂回实现,
  423.      * Hbase这方面支持的不是太好, 也可以支持翻页
  424.      */
  425.     @Test public void pageTest(){
  426.         //与传统的分页的不同  start 是个起始的row  而不是一个数字 ,   下一页 的时候
  427.         // 需要将上一页的最后一条记录作为分页条件传回来
  428.         //这个start要是byte[],页面上只能暂时保存字符串  怎么办呢??
  429.         //Bytes.toStringBinary(byte[])与Bytes.toBytesBinary(String) 可以完美的实现字符串和byte[]的相互转换
  430.         // Bytes.toStringBinary(Bytes.toBytesBinary("abc")) equals "abc" 是true
  431.         byte[] start = Bytes.toBytes("row-13");
  432.         int limit = 10;
  433.         Scan scan = new Scan();
  434.         scan.setStartRow(start);
  435.         List<Filter> rootList = new ArrayList<Filter>();
  436.         rootList.add(new PageFilter(limit));
  437.         ////root.add(new Filter()) 添加其他的过滤条件
  438.         scan.setFilter(new FilterList(Operator.MUST_PASS_ALL, rootList));
  439.     }
  440.     /**除了上面用到的
  441.      * Htable 还有一些其他的有用方法
  442.      * @throws IOException
  443.      */
  444.     @SuppressWarnings("unused")
  445.     @Test public void htableOthers() throws IOException{
  446.         HTable table = (HTable)pool.getTable("user_test_xuyang");
  447.         byte [] row = Bytes.toBytes("row-13");
  448.         //获取指定row的 数据所在的Region的信息 :名字, 编码后名字(Hadoop 中的
  449.         // 路径名), startKey endkey等--->hri  ;还有该Region所在的主机的地址信息--->addr
  450.         HRegionLocation hrl = table.getRegionLocation(row);
  451.         HRegionInfo hri= hrl.getRegionInfo();
  452.         HServerAddress addr = hrl.getServerAddress();
  453.         //获取所有Region的信息
  454.         Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
  455.         //获取该表所在的所有Region的  startKey 和 endKey
  456.         Pair<byte[][], byte[][]> startendKeys = table.getStartEndKeys();
  457.         //下面的是通过上面的实现的
  458.         table.getStartKeys();
  459.         table.getEndKeys();
  460.         //table.getRowOrBefore(row, family) 这个一般用不到  0.92时候 就要被废弃了
  461.     }
  462.     /**
  463.      * 操作完成后, 清理下资源还是很有必要的,
  464.      * 在系统的ServletContextListener
  465.      */
  466.     @AfterClass
  467.     public static void after(){
  468.         try {
  469.             if(conf!=null) HConnectionManager.deleteConnection(conf, false);
  470.             if(pool!=null)pool.close();
  471.         } catch (IOException e) {
  472.             e.printStackTrace();
  473.         }
  474.     }
  475. }
  476.         // 路径名), startKey endkey等--->hri  ;还有该Region所在的主机的地址信息--->addr
  477.         HRegionLocation hrl = table.getRegionLocation(row);
  478.         HRegionInfo hri= hrl.getRegionInfo();
  479.         HServerAddress addr = hrl.getServerAddress();
  480.         //获取所有Region的信息
  481.         Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
  482.         //获取该表所在的所有Region的  startKey 和 endKey
  483.         Pair<byte[][], byte[][]> starte
复制代码



本文固定链接:http://www.xiaoyaochong.net/wordpress/index.php/2013/05/23/hbase%e5%85%a5%e9%97%a8%e5%ae%9e%e4%be%8b/ | 逍遥冲





已有(4)人评论

跳转到指定楼层
yuchtao 发表于 2015-4-16 17:13:02
看了觉得不错,等下操作一下
回复

使用道具 举报

benhui 发表于 2015-11-25 11:10:46
不错,正在学习hbase,有很大的帮助
回复

使用道具 举报

孤独的战神 发表于 2015-11-30 14:07:57
好好学习,天天向上
回复

使用道具 举报

宁哥哥哥 发表于 2016-10-27 20:54:00
基于微博数据应用的HBase实战开发
课程观看地址:http:// www. xuetuwuyou. com/course/150
课程出自学途无忧网:http:// www. xuetuwuyou. com

一、课程用到的软件
1.centos6.7
2.apache-tomcat-7.0.47
3.solr-5.5
4.zookeeper 3.4.6
5.eclipse-jee-neon-R-win32-x86_64
6.jdk1.7_49
7.HBase1.2.2
8.Ganglia3.7.2
9.Sqoop1.99.7
10.Hadoop2.7.2

二、课程目标
当数据量达到TB或PB级的时候,传统关系型数据型已力不从心。在大数据热潮中,推出了NoSQL数据库,这种天生就为分布式存储而设计的技术,尤其以Apache HBase为代表,占领海量数据存储技术的大半壁江山。本教视从实战角度出来,向学员们手把手掌握HBase使用精髓,让学员达到如下目标:
1. 了解分布式存储的原理及架构。
2. 掌握如何使用HBase实现海量数据存储与检索。
3. 掌握HBase在开发中常见的技术大坑与调优技术。

三、适用人群
开发人员、架构师、对分布式存储有兴趣的朋友。

四、课程内容及目录

课时1:HBase简介与部署
课时2:HBase架构与索引算法剖析
课时3:HBase建库建表与CRUD实战
课时4:基于HBase Client API的CRUD实战
课时5:批处理与扫描器实战
课时6:使用Ganglia监控HBase
课时7:过滤器实战之比较过滤器
课时8:过滤器实战之专用过滤器与FilterList
课时9:过滤器实战之自定义过滤器
课时10:Observer协处理器实战之Master级别原理剖析
课时11:Observer协处理器实战之Region级别原理剖析
课时12:Observer协处理器实战之表复制应用实战
课时13:Endpoint协处理器实战之原理剖析
课时14:Endpoint协处理器实战之数据统计应用实战
课时15:使用API管理HBase之核心理论
课时16:使用API管理HBase之编程实战
课时17:使用API管理HBase之编程实战(续)
课时18:搭建分布式HBase集群之Hadoop部署
课时19:搭建分布式HBase集群之HBase部署
课时20:sqoop2部署
课时21:使用sqoop2将mysql数据导入到HBase
课时22:集群管理之节点管理与数据任务
课时23:Rowkey设计与集群常见故障处理
课时24:集群调优经验分享
课时25:项目介绍与Solr环境搭建
课时26:数据层设计与中文分词器配置
课时27:Spring集成HBase之核心操作
课时28:Spring集成HBase之核心操作(续)
课时29:基于dom4j进行数据文件解析
课时30:数据层设计与实现之二级索引开发
课时31:数据层设计与实现之二级索引开发(续)
课时32:Spring集成Solrj之入门操作
课时33:Spring集成Solrj之高级操作
课时34:高亮查询功能开发之一
课时35:高亮查询功能开发之二
课时36:课程总结
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条