可以带着下面问题来阅读
1.每个store生成一个压缩请求由谁来完成?
2.store压缩是否可以并发执行?
hbase压缩入口,是在hbase flush后,计算是否需要压缩。如果需要压缩,则发起压缩请求。
- boolean shouldCompact = region.flushcache();
- // We just want to check the size
- boolean shouldSplit = region.checkSplit() != null;
- if (shouldSplit) {
- this.server.compactSplitThread.requestSplit(region);
- } else if (shouldCompact) {
- server.compactSplitThread.requestCompaction(region, getName());
- }
复制代码
requestCompaction的下方法如下:可以看出,一个region为每个store生成一个压缩请求,CompactionRequest,把该请求叫给线程池处理。所以多个store压缩是并发进行的。- public synchronized void requestCompaction(final HRegion r,
- final String why) throws IOException {
- for(Store s : r.getStores().values()) {
- requestCompaction(r, s, why, Store.NO_PRIORITY);
- }
- }
复制代码
- public synchronized void requestCompaction(final HRegion r, final Store s,
- final String why, int priority) throws IOException {
- if (this.server.isStopped()) {
- return;
- }
复制代码
//这里是store根据选择需要压缩文件的算法,优先压缩那些storeFile,即HFile,以及标记压缩是否为major压缩还是min压缩。压缩真正开始工作地方是在CompactionRequest的run方法里。下面我们进入run方法进一步研究下。- CompactionRequest cr = s.requestCompaction(priority);
- if (cr != null) {
- cr.setServer(server);
- if (priority != Store.NO_PRIORITY) {
- cr.setPriority(priority);
- }
- ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
- ? largeCompactions : smallCompactions;
- pool.execute(cr);
- if (LOG.isDebugEnabled()) {
- String type = (pool == smallCompactions) ? "Small " : "Large ";
- LOG.debug(type + "Compaction requested: " + cr
- + (why != null && !why.isEmpty() ? "; Because: " + why : "")
- + "; " + this);
- }
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Not compacting " + r.getRegionNameAsString() +
- " because compaction request was cancelled");
- }
- }
- }
复制代码
CompactionRequest的run方法也很简单,就是一个包装,没有什么逻辑,关键一行代码在
boolean completed = r.compact(this);,可以看出工作又转移到region的compat方法上了,真是绕啊。:- @Override
- public void run() {
- Preconditions.checkNotNull(server);
- if (server.isStopped()) {
- return;
- }
- try {
- long start = EnvironmentEdgeManager.currentTimeMillis();
- boolean completed = r.compact(this);
- long now = EnvironmentEdgeManager.currentTimeMillis();
- LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
- this + "; duration=" + StringUtils.formatTimeDiff(now, start));
- if (completed) {
- server.getMetrics().addCompaction(now - start, this.totalSize);
- // degenerate case: blocked regions require recursive enqueues
- if (s.getCompactPriority() <= 0) {
- server.compactSplitThread
- .requestCompaction(r, s, "Recursive enqueue");
- } else {
- // see if the compaction has caused us to exceed max region size
- server.compactSplitThread.requestSplit(r);
- }
- }
- } catch (IOException ex) {
- LOG.error("Compaction failed " + this, RemoteExceptionHandler
- .checkIOException(ex));
- server.checkFileSystem();
- } catch (Exception ex) {
- LOG.error("Compaction failed " + this, ex);
- server.checkFileSystem();
- } finally {
- s.finishRequest(this);
- LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
- }
- }
复制代码
下面我们看region的compact方法。参数是一个压缩请求。其实compact方法也只是一个代理而已,
最关键的就是获取lock的读锁,然后调用CompactionRequest中的store,来调用该store的compact方法。
我们知道了关键的压缩还是在store里面。
代码如下:
cr.getStore().compact(cr);
执行完了释放读锁。store的compact方法也没有做什么工作,关键代码如下:- try {
- StoreFile.Writer writer =
- //真正的工作在这里。在这里可以看个究竟了。呵呵。下面来分析。
- this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
- // Move the compaction into place.
- if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
- sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
- }
- } else {
- // Create storefile around what we wrote with a reader on it.
- sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- sf.createReader();
- }
- } finally {
- synchronized (filesCompacting) {
- filesCompacting.removeAll(filesToCompact);
- }
- }
复制代码
虽然压缩的主代码比较多,但还是要贴下,贴下代码再来分析。- StoreFile.Writer compact(final Store store,
- final Collection<StoreFile> filesToCompact,
- final boolean majorCompaction, final long maxId)
- throws IOException {
- // Calculate maximum key count after compaction (for blooms)
- // Also calculate earliest put timestamp if major compaction
- int maxKeyCount = 0;
- long earliestPutTs = HConstants.LATEST_TIMESTAMP;
- for (StoreFile file : filesToCompact) {
- StoreFile.Reader r = file.getReader();
- if (r == null) {
- LOG.warn("Null reader for " + file.getPath());
- continue;
- }
- // NOTE: getFilterEntries could cause under-sized blooms if the user
- // switches bloom type (e.g. from ROW to ROWCOL)
- long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType()) ?
- r.getFilterEntries() : r.getEntries();
- maxKeyCount += keyCount;
- // For major compactions calculate the earliest put timestamp
- // of all involved storefiles. This is used to remove
- // family delete marker during the compaction.
- if (majorCompaction) {
- byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
- if (tmp == null) {
- // There's a file with no information, must be an old one
- // assume we have very old puts
- earliestPutTs = HConstants.OLDEST_TIMESTAMP;
- } else {
- earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting " + file +
- ", keycount=" + keyCount +
- ", bloomtype=" + r.getBloomFilterType().toString() +
- ", size=" + StringUtils.humanReadableInt(r.length()) +
- ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
- (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
- }
- }
-
- // keep track of compaction progress
- this.progress = new CompactionProgress(maxKeyCount);
- // Get some configs
- int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
- Compression.Algorithm compression = store.getFamily().getCompression();
- // Avoid overriding compression setting for major compactions if the user
- // has not specified it separately
- Compression.Algorithm compactionCompression =
- (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
- store.getFamily().getCompactionCompression(): compression;
-
- // For each file, obtain a scanner:
- List<StoreFileScanner> scanners = StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false, true);
-
- // Make the instantiation lazy in case compaction produces no product; i.e.
- // where all source cells are expired or deleted.
- StoreFile.Writer writer = null;
- // Find the smallest read point across all the Scanners.
- long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
- MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
- try {
- InternalScanner scanner = null;
- try {
- if (store.getHRegion().getCoprocessorHost() != null) {
- scanner = store.getHRegion()
- .getCoprocessorHost()
- .preCompactScannerOpen(store, scanners,
- majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
- }
- if (scanner == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- /* Include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
- majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
- smallestReadPoint, earliestPutTs);
- }
- if (store.getHRegion().getCoprocessorHost() != null) {
- InternalScanner cpScanner =
- store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return null;
- }
- scanner = cpScanner;
- }
-
- int bytesWritten = 0;
- // since scanner.next() can return 'false' but still be delivering data,
- // we have to use a do/while loop.
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, compactionKVMax);
- if (writer == null && !kvs.isEmpty()) {
- writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
- }
- if (writer != null) {
- // output to writer:
- for (KeyValue kv : kvs) {
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- // update progress per key
- ++progress.currentCompactedKVs;
-
- // check periodically to see if a system stop is requested
- if (Store.closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > Store.closeCheckInterval) {
- bytesWritten = 0;
- isInterrupted(store, writer);
- }
- }
- }
- }
- kvs.clear();
- } while (hasMore);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- } finally {
- if (writer != null) {
- writer.appendMetadata(maxId, majorCompaction);
- writer.close();
- }
- }
- return writer;
- }
复制代码
|