分享

hbase源码之如何查询出来下一个KeyValue

xioaxu790 2014-10-14 21:26:24 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 16250
问题导读
1、KeyValueHeap的构造函数与seek操作有什么关系?
2、在KeyValue里是怎么被调用的?
3、populateResult方法有什么作用?





在讲《Get、Scan在服务端是如何处理?》当中的nextInternal流程,它的第一步从storeHeap当中取出当前kv,这块其实有点儿小复杂的,因为它存在异构的Scanner(一个MemStoreScanner和多个StoreFileScanner),那怎么保证从storeHeap里面拿出来的总是离上一个kv最接近的kv呢?

这里我们知道,在打开这些Scanner之后,就对他们进行了一下seek操作,它们就已经调整到最佳位置了。

我们看看KeyValueHeap的构造函数里面去看看吧。

  1. public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) throws IOException {
  2.     this.comparator = new KVScannerComparator(comparator);
  3.     if (!scanners.isEmpty()) {
  4.       this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
  5.           this.comparator);
  6.       //...
  7.      this.current = pollRealKV();
  8.     }
  9. }
复制代码


它内部有一个叫heap的PriorityQueue<KeyValueScanner>队列,它会对所有的Scanner进行排序,排序的比较器是KVScannerComparator, 然后current又调用了pollRealKV通过比较获得当前的Scanner,后面会讲。

那好,我们直接进去KVScannerComparator看看它的compare方法就能知道怎么回事了。

  1. public int compare(KeyValueScanner left, KeyValueScanner right) {
  2.       // 先各取出来一个KeyValue进行比较
  3.       int comparison = compare(left.peek(), right.peek());
  4.       if (comparison != 0) {
  5.         return comparison;
  6.       } else {
  7.         // key相同,选择最新的那个
  8.         long leftSequenceID = left.getSequenceID();
  9.         long rightSequenceID = right.getSequenceID();
  10.         if (leftSequenceID > rightSequenceID) {
  11.           return -1;
  12.         } else if (leftSequenceID < rightSequenceID) {
  13.           return 1;
  14.         } else {
  15.           return 0;
  16.         }
  17.       }
  18. }
复制代码


额,从上面代码看得出来,把left和right各取出一个kv来进行比较,如果一样就比较SequenceID,SequenceID越大说明这个文件越新,返回-1,在升序的情况下,这个Scanner就跑到前面去了。
这样就实现了heap里面拿出来的第一个就是最小的kv的最新版。

在继续将之前,我们看一下在KeyValue是怎么被调用的,这样我们好理清思路。
  1. //从storeHeap里面取出一个来
  2. KeyValue current = this.storeHeap.peek();
  3. //后面是一顿比较,比较通过,把结果保存到results当中
  4. KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset, length);
复制代码



接着看populateResult方法。

  1. private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
  2.         byte[] currentRow, int offset, short length) throws IOException {
  3.       KeyValue nextKv;
  4.       do {
  5.         //从heap当中取出剩下的结果保存在results当中
  6.         heap.next(results, limit - results.size());
  7.         //如果够数了,就返回了
  8.         if (limit > 0 && results.size() == limit) {
  9.           return KV_LIMIT;
  10.         }
  11.         nextKv = heap.peek();
  12.       } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
  13.       return nextKv;
  14. }
复制代码


我们对KeyValueHeap的使用,就是先peek,然后再next,我们接下来就按这个顺序看吧。

先从peek取出来一个,peek就是从heap队列取出来的current的scanner取出来的当前的KeyValue。
  1. if (this.current == null) {
  2.       return null;
  3. }
  4. return this.current.peek();
复制代码



然后我们看next方法。
  1. public boolean next(List<Cell> result, int limit) throws IOException {
  2.     if (this.current == null) {
  3.       return false;
  4.     }
  5.     InternalScanner currentAsInternal = (InternalScanner)this.current;
  6.     boolean mayContainMoreRows = currentAsInternal.next(result, limit);
  7.     KeyValue pee = this.current.peek();
  8.     if (pee == null || !mayContainMoreRows) {
  9.       this.current.close();
  10.     } else {
  11.       this.heap.add(this.current);
  12.     }
  13.     this.current = pollRealKV();
  14.     return (this.current != null);
  15. }
复制代码



1、通过currentAsInternal.next继续获取kv,它是只针对通过通过检查的当前行的剩下的KeyValue,这个过程在之前那篇文章讲过了。

2、如果后面没有值了,就关闭这个Scanner。

3、然后还有,就把这个Scanner放回heap上,等待下一次调用。

4、使用pollRealKV再去一个新的Scanner出来。
  1. private KeyValueScanner pollRealKV() throws IOException {
  2.     KeyValueScanner kvScanner = heap.poll();
  3.     if (kvScanner == null) {
  4.       return null;
  5.     }
  6.     while (kvScanner != null && !kvScanner.realSeekDone()) {
  7.       if (kvScanner.peek() != null) {
  8.         //查询之前没有查的
  9.         kvScanner.enforceSeek();
  10.         //把之前的查到位置的kv拿出来
  11.         KeyValue curKV = kvScanner.peek();
  12.         if (curKV != null) {
  13.           //再选出来下一个的scanner
  14.           KeyValueScanner nextEarliestScanner = heap.peek();
  15.           if (nextEarliestScanner == null) {
  16.             // 后面没了,只能是它了
  17.             return kvScanner;
  18.           }
  19.          
  20.           // 那下一个Scanner的kv也出来比较比较
  21.           KeyValue nextKV = nextEarliestScanner.peek();
  22.           if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
  23.             // 它确实小,那么就把它放出去吧
  24.             return kvScanner;
  25.           }
  26.           // 把它放回去,和别的kv进行竞争
  27.           heap.add(kvScanner);
  28.         } else {
  29.           // 它没东西了,关闭完事
  30.           kvScanner.close();
  31.         }
  32.       } else {
  33.         // 它没东西了,关闭完事
  34.         kvScanner.close();
  35.       }
  36.       kvScanner = heap.poll();
  37.     }
  38.     return kvScanner;
  39. }
复制代码



鉴于它每次都要比较的情况,如果一个列族下的HFile比较多的话,它的比较次数也会增大,会影响查询效率,查询时间和HFile的数量成线性关系。

另外补充点内容,是前面写Scan的时候拉下的:

由于写入同一个rowkey相关的KeyValue的时候时间戳在前的先写入,查询的时候又需要总是读该rowkey最新的KeyValue,所以在查询的时候会先seek到该rowkey的时间戳最大的位置,具体查的时候,不断的向前seekBefore,直到这个rowkey的KeyValue全部查完位置,然后再向前定位到一个rowkey的位置。

简而言之:

不同rowkey的向前查,从rowkey小的查到rowkey大的;查相同rowkey的向后查,从最新的时间戳到查到最久的时间戳。



总结:
这就把如何查询出来下一个KeyValue的过程讲完了,它的peek方法、next方法、比较的方法,希望对大家有帮助,这个系列的文章到此也就结束了,下个目标是跟随超哥学习Spark源码。

已有(2)人评论

跳转到指定楼层
buildhappy 发表于 2014-10-14 21:47:34
学习  谢谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条