分享

hbase 压缩源码分析

hyj 2014-3-2 00:07:47 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 10625
可以带着下面问题来阅读
1.每个store生成一个压缩请求由谁来完成?
2.store压缩是否可以并发执行?


hbase压缩入口,是在hbase flush后,计算是否需要压缩。如果需要压缩,则发起压缩请求。
  1.       boolean shouldCompact = region.flushcache();
  2.       // We just want to check the size
  3.       boolean shouldSplit = region.checkSplit() != null;
  4.       if (shouldSplit) {
  5.         this.server.compactSplitThread.requestSplit(region);
  6.       } else if (shouldCompact) {
  7.         server.compactSplitThread.requestCompaction(region, getName());
  8.       }
复制代码
requestCompaction的下方法如下:可以看出,一个region为每个store生成一个压缩请求,CompactionRequest,把该请求叫给线程池处理。所以多个store压缩是并发进行的。
  1. public synchronized void requestCompaction(final HRegion r,
  2.       final String why) throws IOException {
  3.     for(Store s : r.getStores().values()) {
  4.       requestCompaction(r, s, why, Store.NO_PRIORITY);
  5.     }
  6.   }
复制代码
  1. public synchronized void requestCompaction(final HRegion r, final Store s,
  2.       final String why, int priority) throws IOException {
  3.     if (this.server.isStopped()) {
  4.       return;
  5.     }
复制代码
//这里是store根据选择需要压缩文件的算法,优先压缩那些storeFile,即HFile,以及标记压缩是否为major压缩还是min压缩。压缩真正开始工作地方是在CompactionRequest的run方法里。下面我们进入run方法进一步研究下。
  1.   CompactionRequest cr = s.requestCompaction(priority);
  2.     if (cr != null) {
  3.       cr.setServer(server);
  4.       if (priority != Store.NO_PRIORITY) {
  5.         cr.setPriority(priority);
  6.       }
  7.       ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
  8.           ? largeCompactions : smallCompactions;
  9.       pool.execute(cr);
  10.       if (LOG.isDebugEnabled()) {
  11.         String type = (pool == smallCompactions) ? "Small " : "Large ";
  12.         LOG.debug(type + "Compaction requested: " + cr
  13.             + (why != null && !why.isEmpty() ? "; Because: " + why : "")
  14.             + "; " + this);
  15.       }
  16.     } else {
  17.       if(LOG.isDebugEnabled()) {
  18.         LOG.debug("Not compacting " + r.getRegionNameAsString() +
  19.             " because compaction request was cancelled");
  20.       }
  21.     }
  22.   }
复制代码
CompactionRequest的run方法也很简单,就是一个包装,没有什么逻辑,关键一行代码在
  boolean completed = r.compact(this);,可以看出工作又转移到region的compat方法上了,真是绕啊。:
  1. @Override
  2.     public void run() {
  3.       Preconditions.checkNotNull(server);
  4.       if (server.isStopped()) {
  5.         return;
  6.       }
  7.       try {
  8.         long start = EnvironmentEdgeManager.currentTimeMillis();
  9.         boolean completed = r.compact(this);
  10.         long now = EnvironmentEdgeManager.currentTimeMillis();
  11.         LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
  12.               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
  13.         if (completed) {
  14.           server.getMetrics().addCompaction(now - start, this.totalSize);
  15.           // degenerate case: blocked regions require recursive enqueues
  16.           if (s.getCompactPriority() <= 0) {
  17.             server.compactSplitThread
  18.               .requestCompaction(r, s, "Recursive enqueue");
  19.           } else {
  20.             // see if the compaction has caused us to exceed max region size
  21.             server.compactSplitThread.requestSplit(r);
  22.           }
  23.         }
  24.       } catch (IOException ex) {
  25.         LOG.error("Compaction failed " + this, RemoteExceptionHandler
  26.             .checkIOException(ex));
  27.         server.checkFileSystem();
  28.       } catch (Exception ex) {
  29.         LOG.error("Compaction failed " + this, ex);
  30.         server.checkFileSystem();
  31.       } finally {
  32.         s.finishRequest(this);
  33.         LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
  34.       }
  35.     }
复制代码
下面我们看region的compact方法。参数是一个压缩请求。其实compact方法也只是一个代理而已,
最关键的就是获取lock的读锁,然后调用CompactionRequest中的store,来调用该store的compact方法。
我们知道了关键的压缩还是在store里面。

代码如下:
cr.getStore().compact(cr);
执行完了释放读锁。store的compact方法也没有做什么工作,关键代码如下:
  1. try {
  2.       StoreFile.Writer writer =
  3.         //真正的工作在这里。在这里可以看个究竟了。呵呵。下面来分析。
  4.         this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
  5.       // Move the compaction into place.
  6.       if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
  7.         sf = completeCompaction(filesToCompact, writer);
  8.         if (region.getCoprocessorHost() != null) {
  9.           region.getCoprocessorHost().postCompact(this, sf);
  10.         }
  11.       } else {
  12.         // Create storefile around what we wrote with a reader on it.
  13.         sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
  14.           this.family.getBloomFilterType(), this.dataBlockEncoder);
  15.         sf.createReader();
  16.       }
  17.     } finally {
  18.       synchronized (filesCompacting) {
  19.         filesCompacting.removeAll(filesToCompact);
  20.       }
  21.     }
复制代码
虽然压缩的主代码比较多,但还是要贴下,贴下代码再来分析。
  1. StoreFile.Writer compact(final Store store,
  2.       final Collection<StoreFile> filesToCompact,
  3.       final boolean majorCompaction, final long maxId)
  4.   throws IOException {
  5.     // Calculate maximum key count after compaction (for blooms)
  6.     // Also calculate earliest put timestamp if major compaction
  7.     int maxKeyCount = 0;
  8.     long earliestPutTs = HConstants.LATEST_TIMESTAMP;
  9.     for (StoreFile file : filesToCompact) {
  10.       StoreFile.Reader r = file.getReader();
  11.       if (r == null) {
  12.         LOG.warn("Null reader for " + file.getPath());
  13.         continue;
  14.       }
  15.       // NOTE: getFilterEntries could cause under-sized blooms if the user
  16.       //       switches bloom type (e.g. from ROW to ROWCOL)
  17.       long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType()) ?
  18.           r.getFilterEntries() : r.getEntries();
  19.       maxKeyCount += keyCount;
  20.       // For major compactions calculate the earliest put timestamp
  21.       // of all involved storefiles. This is used to remove
  22.       // family delete marker during the compaction.
  23.       if (majorCompaction) {
  24.         byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
  25.         if (tmp == null) {
  26.           // There's a file with no information, must be an old one
  27.           // assume we have very old puts
  28.           earliestPutTs = HConstants.OLDEST_TIMESTAMP;
  29.         } else {
  30.           earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
  31.         }
  32.       }
  33.       if (LOG.isDebugEnabled()) {
  34.         LOG.debug("Compacting " + file +
  35.           ", keycount=" + keyCount +
  36.           ", bloomtype=" + r.getBloomFilterType().toString() +
  37.           ", size=" + StringUtils.humanReadableInt(r.length()) +
  38.           ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
  39.           (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
  40.       }
  41.     }
  42.     // keep track of compaction progress
  43.     this.progress = new CompactionProgress(maxKeyCount);
  44.     // Get some configs
  45.     int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
  46.     Compression.Algorithm compression = store.getFamily().getCompression();
  47.     // Avoid overriding compression setting for major compactions if the user
  48.     // has not specified it separately
  49.     Compression.Algorithm compactionCompression =
  50.       (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
  51.       store.getFamily().getCompactionCompression(): compression;
  52.     // For each file, obtain a scanner:
  53.     List<StoreFileScanner> scanners = StoreFileScanner
  54.       .getScannersForStoreFiles(filesToCompact, false, false, true);
  55.     // Make the instantiation lazy in case compaction produces no product; i.e.
  56.     // where all source cells are expired or deleted.
  57.     StoreFile.Writer writer = null;
  58.     // Find the smallest read point across all the Scanners.
  59.     long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
  60.     MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
  61.     try {
  62.       InternalScanner scanner = null;
  63.       try {
  64.         if (store.getHRegion().getCoprocessorHost() != null) {
  65.           scanner = store.getHRegion()
  66.               .getCoprocessorHost()
  67.               .preCompactScannerOpen(store, scanners,
  68.                   majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
  69.         }
  70.         if (scanner == null) {
  71.           Scan scan = new Scan();
  72.           scan.setMaxVersions(store.getFamily().getMaxVersions());
  73.           /* Include deletes, unless we are doing a major compaction */
  74.           scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
  75.             majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
  76.             smallestReadPoint, earliestPutTs);
  77.         }
  78.         if (store.getHRegion().getCoprocessorHost() != null) {
  79.           InternalScanner cpScanner =
  80.             store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
  81.           // NULL scanner returned from coprocessor hooks means skip normal processing
  82.           if (cpScanner == null) {
  83.             return null;
  84.           }
  85.           scanner = cpScanner;
  86.         }
  87.         int bytesWritten = 0;
  88.         // since scanner.next() can return 'false' but still be delivering data,
  89.         // we have to use a do/while loop.
  90.         List<KeyValue> kvs = new ArrayList<KeyValue>();
  91.         // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
  92.         boolean hasMore;
  93.         do {
  94.           hasMore = scanner.next(kvs, compactionKVMax);
  95.           if (writer == null && !kvs.isEmpty()) {
  96.             writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
  97.           }
  98.           if (writer != null) {
  99.             // output to writer:
  100.             for (KeyValue kv : kvs) {
  101.               if (kv.getMemstoreTS() <= smallestReadPoint) {
  102.                 kv.setMemstoreTS(0);
  103.               }
  104.               writer.append(kv);
  105.               // update progress per key
  106.               ++progress.currentCompactedKVs;
  107.               // check periodically to see if a system stop is requested
  108.               if (Store.closeCheckInterval > 0) {
  109.                 bytesWritten += kv.getLength();
  110.                 if (bytesWritten > Store.closeCheckInterval) {
  111.                   bytesWritten = 0;
  112.                   isInterrupted(store, writer);
  113.                 }
  114.               }
  115.             }
  116.           }
  117.           kvs.clear();
  118.         } while (hasMore);
  119.       } finally {
  120.         if (scanner != null) {
  121.           scanner.close();
  122.         }
  123.       }
  124.     } finally {
  125.       if (writer != null) {
  126.         writer.appendMetadata(maxId, majorCompaction);
  127.         writer.close();
  128.       }
  129.     }
  130.     return writer;
  131.   }
复制代码

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条