上一篇:ClickHouse源码阅读计划(一) - Everything About MergeTree
问题导读:
1、WAL保存什么信息?
2、如何写入/读取In-Memory datapart的数据?
3、WAL如何恢复In-Memory datapart的数据?
本文基于社区21.4版本分析
本章着重从源码角度梳理Storage部分的代码,并着重研究MergeTree部分的代码.
我们首先讨论ClickHouse代码中对Table的抽象,其父类接口定义在IStorage.h中.该父类负责以下功能:
1.table数据的存储
2.数据的查找和插入
3.数据的存储结构,如压缩方式
4.对数据的并发控制,如锁
我们可以从其定义的虚函数和数据成员.来总结其提供的基本功能:
1.查询表引擎的名字,表的名字,是否属于视图/是否支持SAMPLE/FINAL/PREWHERE/复制功能/并行插入/去重/合并小block成大block等功能.
2.每个table都有一个全局唯一的id,为{database,table,UUID}三个字段组成的结构体
3.定义表的metadata,并且支持多版本以支持无锁读写元数据.
4.提供了三种并发控制的函数,两种类型的锁,用于保证数据库表操作时的数据安全.
三种并发控制:
lockForShare
用于保证占有lock时table不会被drop.用于SELECT,后台的merge
lockForAlter
用于保证占有lock时同时刻只会处理一个ALTER Query
lockExclusively
用于保证占有lock时没有任何其他线程(如select,merge,alter)对表进行操作
两种锁:
alter_lock
进行alter操作的时候必须要抢占的锁以保证任何时刻只有一个alter线程处理table,drop等操作也需要调用lockExclusively抢占该锁以保证进行操作前所有alter操作已经结束.
drop_lock
所有select/merge相关的query需要调用lockForShare抢占该锁保证表没有被drop.
所有drop相关操作时需要调用lockExclusively抢占该锁,保证进行drop时所有其他任何select/merge操作都已经结束.
5.getQueryProcessingStage方法提供查询该表处理Select Query的状态.
6.表的操作相关的方法定义,如
watch用于监控表中数据变化;read,write接口分别用于数据的读写,还有如drop,truncate,rename,alter,optimize,mutate等操作的定义.
7.表的属性相关方法定义,如
获取表的行数字节数,存储policy,插入的行数/字节数等
总结:从IStorage接口中,我们得知在设计表引擎的时候,其父类需要定义:表操作相关方法,表的元数据查询和修改的方法,唯一标识表的数据结构,保证数据安全的并发控制操作和数据结构等等.
我们借助cpptree工具来看一下IStorage的类继承关系:从下面的树状图可以看到ClickHouse提供了非常丰富的表引擎定义.
- IStorage
- ├── IStorageSystemOneBlock
- │ ├── StorageSystemAggregateFunctionCombinators
- │ ├── StorageSystemAsynchronousMetrics
- │ ├── StorageSystemBuildOptions
- │ ├── StorageSystemClusters
- │ ├── StorageSystemCollations
- │ ├── StorageSystemContributors
- │ ├── StorageSystemCurrentRoles
- │ ├── StorageSystemDDLWorkerQueue
- │ ├── StorageSystemDataTypeFamilies
- │ ├── StorageSystemDatabases
- │ ├── StorageSystemDictionaries
- │ ├── StorageSystemDistributionQueue
- │ ├── StorageSystemEnabledRoles
- │ ├── StorageSystemErrors
- │ ├── StorageSystemEvents
- │ ├── StorageSystemFormats
- │ ├── StorageSystemFunctions
- │ ├── StorageSystemGrants
- │ ├── StorageSystemGraphite
- │ ├── StorageSystemLicenses
- │ ├── StorageSystemMacros
- │ ├── StorageSystemMerges
- │ ├── StorageSystemMetrics
- │ ├── StorageSystemModels
- │ ├── StorageSystemMutations
- │ ├── StorageSystemPrivileges
- │ ├── StorageSystemProcesses
- │ ├── StorageSystemQuotaLimits
- │ ├── StorageSystemQuotaUsage
- │ ├── StorageSystemQuotas
- │ ├── StorageSystemQuotasUsage
- │ ├── StorageSystemReplicatedFetches
- │ ├── StorageSystemReplicationQueue
- │ ├── StorageSystemRoleGrants
- │ ├── StorageSystemRoles
- │ ├── StorageSystemRowPolicies
- │ ├── StorageSystemSettings
- │ ├── StorageSystemSettingsProfileElements
- │ ├── StorageSystemSettingsProfiles
- │ ├── StorageSystemStackTrace
- │ ├── StorageSystemTableEngines
- │ ├── StorageSystemTableFunctions
- │ ├── StorageSystemTimeZones
- │ ├── StorageSystemUserDirectories
- │ ├── StorageSystemUsers
- │ ├── StorageSystemZooKeeper
- │ └── SystemMergeTreeSettings
- ├── IStorageURLBase
- │ ├── StorageXDBC
- │ └── StorageURL
- ├── MergeTreeData
- │ ├── StorageMergeTree
- │ └── StorageReplicatedMergeTree
- ├── StorageBlocks
- ├── StorageProxy
- │ ├── StorageTableFunctionProxy
- │ └── StorageMaterializeMySQL
- ├── StorageSetOrJoinBase
- │ ├── StorageJoin
- │ └── StorageSet
- ├── StorageSystemPartsBase
- │ ├── StorageSystemParts
- │ └── StorageSystemPartsColumns
- ├── StorageBuffer
- ├── StorageDictionary
- ├── StorageDistributed
- ├── StorageEmbeddedRocksDB
- ├── StorageFile
- ├── StorageFromMergeTreeDataPart
- ├── StorageGenerateRandom
- ├── StorageHDFS
- ├── StorageInput
- ├── StorageKafka
- ├── StorageLiveView
- ├── StorageLog
- ├── StorageMaterializedView
- ├── StorageMemory
- ├── StorageMerge
- ├── StorageMongoDB
- ├── StorageMySQL
- ├── StorageNull
- ├── StoragePostgreSQL
- ├── StorageRabbitMQ
- ├── StorageS3
- ├── StorageStripeLog
- ├── StorageSystemColumns
- ├── StorageSystemDetachedParts
- ├── StorageSystemDisks
- ├── StorageSystemNumbers
- ├── StorageSystemOne
- ├── StorageSystemReplicas
- ├── StorageSystemStoragePolicies
- ├── StorageSystemTables
- ├── StorageSystemZeros
- ├── StorageTinyLog
- ├── StorageValues
- └── StorageView
复制代码
我们首先来重点看MergeTree相关部分的代码,MergeTree和ReplicatedMergeTree引擎的父类定义在MergeTreeData.h中,MergeTree的datapart数据结构在上篇文章中已经着重介绍,而该头文件中定义的函数和数据成员用于维护一个包含所有datapart的list,以及datapart和partition的相关数据结构和CRUD等操作.
而读取和修改数据的具体实现则具体在{MergeTreeDataSelectExecutor,MergeTreeDataWriter,MergeTreeDataMergerMutator}这三个类中,datapart的具体实现则在IMergeTreeDataPart类中.
这里介绍几个重点:
1.Datapart的事务性提交
在内存中的datapart为mutable datapart,而当datapart被加入到working set后就会变为immutable datapart,这个加入working set的过程提供事务支持,是一个两阶段提交的过程.其中commit()和rollback()函数在MergeTreeData.cpp中实现
遍历所有PreCommited的datapart,把所有将要被覆盖的datapart设置为Outdated
- MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData::DataPartsLock * acquired_parts_lock)
- {
- DataPartsVector total_covered_parts;
-
- if (!isEmpty())
- {
-
-
- for (const DataPartPtr & part : precommitted_parts)
- {
- DataPartPtr covering_part;
- DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, *owing_parts_lock);
- if (covering_part)
- {
- part->remove_time.store(0, std::memory_order_relaxed); /// The part will be removed without waiting for old_parts_lifetime seconds.
- data.modifyPartState(part, DataPartState::Outdated);
- }
- else
- {
- total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end());
- for (const DataPartPtr & covered_part : covered_parts)
- {
- covered_part->remove_time.store(current_time, std::memory_order_relaxed);
-
- reduce_bytes += covered_part->getBytesOnDisk();
- reduce_rows += covered_part->rows_count;
-
- data.modifyPartState(covered_part, DataPartState::Outdated);
- data.removePartContributionToColumnSizes(covered_part);
- }
- reduce_parts += covered_parts.size();
-
- add_bytes += part->getBytesOnDisk();
- add_rows += part->rows_count;
- ++add_parts;
-
- data.modifyPartState(part, DataPartState::Committed);
- data.addPartContributionToColumnSizes(part);
- }
- }
- data.decreaseDataVolume(reduce_bytes, reduce_rows, reduce_parts);
- data.increaseDataVolume(add_bytes, add_rows, add_parts);
- }
-
- clear();
-
- return total_covered_parts;
- }
复制代码
2.MergeTree的变种
- /// Merging mode. See above.
- enum Mode
- {
- Ordinary = 0, /// Enum values are saved. Do not change them.
- Collapsing = 1,
- Summing = 2,
- Aggregating = 3,
- Replacing = 5,
- Graphite = 6,
- VersionedCollapsing = 7,
- };
复制代码
各种MergeTree表引擎的功能上异同点和实现细节留到下一篇文章
3.Datapart的具体代码实现
上篇文章讲了datapart的文件组织,其具体代码是定义在IMergeTreeDataPart.h中
我们侧重关心checksum,datapart的生命周期,索引部分的实现
datapart以六种状态来定义其生命周期:
- enum class State
- {
- Temporary, /// the part is generating now, it is not in data_parts list
- PreCommitted, /// the part is in data_parts, but not used for SELECTs
- Committed, /// active data part, used by current and upcoming SELECTs
- Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
- Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
- DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
- };
复制代码
- 每个表引擎会维护一个datapart的list,所有刚从disk上获取或刚插入或者刚merged生成的新datapart都属于Temporary状态
- 把Temporary状态的datapart加入到working set是一个两阶段提交过程,第一阶段会进入PreCommited状态
- 如果失败就会回滚到Outdated状态等待销毁,当前状态的datapart还能够被select语句查询到,后台会定时清理这部分的datapart,正在清理过程称为Deleting状态,不能被select查询
- 如果成功就会进入到Commited状态,即成功加入到working set中
- Commited状态的datapart经过DROP操作后会转变为Outdated状态,等待被清除
定义在MergeTreeDataPartChecksum.h中
每个datapart目录下的文件都会分别记录Uint64的文件大小和Uint128的hash值,对于.bin文件还会分别记录压缩前后的文件大小和128bit的hash值
- struct MergeTreeDataPartChecksum
- {
- using uint128 = CityHash_v1_0_2::uint128;
-
- UInt64 file_size {};
- uint128 file_hash {};
-
- bool is_compressed = false;
- UInt64 uncompressed_size {};
- uint128 uncompressed_hash {};
复制代码
定义在MergeTreeDataPartType.h中
- enum Value
- {
- /// Data of each column is stored in one or several (for complex types) files.
- /// Every data file is followed by marks file.
- WIDE,
-
- /// Data of all columns is stored in one file. Marks are also stored in single file.
- COMPACT,
-
- /// Format with buffering data in RAM.
- IN_MEMORY,
-
- UNKNOWN,
- };
复制代码
index相关
.mrk文件
.mrk文件一个重要功能是记录着每个mark之间间隔了多少行(即每个granule的大小);默认是8192行,该部分具体是用一个记录累加和的vector来存储,该部分信息会常驻在内存中
- /// Class contains information about index granularity in rows of IMergeTreeDataPart
- /// Inside it contains vector of partial sums of rows after mark:
- /// |-----|---|----|----|
- /// | 5 | 8 | 12 | 16 |
- /// If user doesn't specify setting adaptive_index_granularity_bytes for MergeTree* table
- /// all values in inner vector would have constant stride (default 8192).
- class MergeTreeIndexGranularity
- {
- private:
- std::vector<size_t> marks_rows_partial_sums;
复制代码
.mrk的其余信息即(graunle在解压前文件的偏移量,graunle在解压后的block的偏移量)则定义在MarkInCompressedFile.h中
- struct MarkInCompressedFile
- {
- size_t offset_in_compressed_file;
- size_t offset_in_decompressed_block;
复制代码
该部分信息常驻Cache而不是RAM,定义在MarkCache.h中,由Clickhouse自己写的LRUCache实现,这个LRUCache的具体实现方式后续有时间再讲.
minmax index
记录着每个datapart分区键对应的column的最大最小值,用于分支裁剪
- /// Index that for each part stores min and max values of a set of columns. This allows quickly excluding
- /// parts based on conditions on these columns imposed by a query.
- /// Currently this index is built using only columns required by partition expression, but in principle it
- /// can be built using any set of columns.
- struct MinMaxIndex
- {
- /// A direct product of ranges for each key column. See Storages/MergeTree/KeyCondition.cpp for details.
- std::vector<Range> hyperrectangle;
复制代码
primary.idx
该部分信息常驻RAM,记录该表引擎primary索引对应的列
- /// Primary key (correspond to primary.idx file).
- /// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
- /// Note that marks (also correspond to primary key) is not always in RAM, but cached. See MarkCache.h.
- using Index = Columns;
- Index index;
复制代码
skipping index
定义在IndexDescription.h,记录着非主键索引的索引信息,同样常驻RAM
- /// Description of non-primary index for Storage
- struct IndexDescription
- {
- /// Definition AST of index
- ASTPtr definition_ast;
-
- /// List of expressions for index calculation
- ASTPtr expression_list_ast;
-
- /// Index name
- String name;
-
- /// Index type (minmax, set, bloom filter, etc.)
- String type;
-
- /// Prepared expressions for index calculations
- ExpressionActionsPtr expression;
-
- /// Index arguments, for example probability for bloom filter
- FieldVector arguments;
-
- /// Names of index columns (not to be confused with required columns)
- Names column_names;
-
- /// Data types of index columns
- DataTypes data_types;
-
- /// Sample block with index columns. (NOTE: columns in block are empty, but not nullptr)
- Block sample_block;
-
- /// Index granularity, make sense for skip indices
- size_t granularity;
-
- /// Parse index from definition AST
- static IndexDescription getIndexFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, const Context & context);
-
- };
复制代码
4.WAL机制
在LSM-Tree中,为了提高数据写入性能,会把数据积攒到一定阈值后再落盘成SST文件,而未落盘的文件会可能因为宕机而产生丢失,因此引入了WAL机制来保证内存数据的安全.而参考资料1则提出,WAL机制还可以带来一个好处:避免高频的小量写操作生成大量的datapart,尽可能减少生成的datapart文件的数量.
我们要解决的问题有:
1.WAL的物理格式,每插入一条datapart,WAL保存什么信息?
2.如何写入/读取In-Memory datapart的数据?
3.宕机发生,重启数据库,WAL如何恢复In-Memory datapart的数据?
我们重点关注MergeTreeData::loadDataParts()方法的过程,MergeTreeWriteAheadLog.h/.cpp的定义和实现.
WAL的内存布局,每插入一条datapart,WAL保存什么信息?
- /** WAL stores addditions and removals of data parts in in-memory format.
- * Format of data in WAL:
- * - version
- * - type of action (ADD or DROP)
- * - part name
- * - part's block in Native format. (for ADD action)
- */
复制代码
wal.bin 文件的主要内存布局:
1.版本
2.datapart操作的类型(ADD还是DROP)
3.datapart的name
4.binary格式的block数据
每插入一条In-Memory Datapart,就会在wal.bin插入一条记录,为
WAL_VERSION+ActionMetadata+ActionType+PartName+Block
的binary格式,其中:
- WAL_VERSION UInt8, 现版本为1
- ActionMetadata
- struct ActionMetadata
- {
- /// The minimum version of WAL reader that can understand metadata written by current ClickHouse version.
- /// This field must be increased when making backwards incompatible changes.
- ///
- /// The same approach can be used recursively inside metadata.
- UInt8 min_compatible_version = 0;
-
- /// Actual metadata.
- UUID part_uuid = UUIDHelpers::Nil;
复制代码
包括UInt8的最小WAL兼容版本,UInt32的序列化后UUID的长度值,以及UINT28的UUID,用于唯一标识datapart
- Uint8的ActionType,用于标记是DROP还是ADD操作
- datapart的Name,即partitionid_minblock_maxblock_level
- datapart的block数据,由NativeBlockOutputStream封装负责block数据序列化
- void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)
- {
- std::unique_lock lock(write_mutex);
-
- auto part_info = MergeTreePartInfo::fromPartName(part->name, storage.format_version);
- min_block_number = std::min(min_block_number, part_info.min_block);
- max_block_number = std::max(max_block_number, part_info.max_block);
-
- writeIntBinary(WAL_VERSION, *out);
-
- ActionMetadata metadata{};
- metadata.part_uuid = part->uuid;
- metadata.write(*out);
-
- writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
- writeStringBinary(part->name, *out);
- block_out->write(part->block);
- block_out->flush();
- sync(lock);
-
- auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
- if (out->count() > max_wal_bytes)
- rotate(lock);
- }
复制代码
当wal.bin的大小超过write_ahead_log_maxbytes即单个wal的最大字节数时,会生成一个新的wal_minblock_maxblock.bin文件
同样,删除一条datapart也会在wal.bin插入一条对应记录,但是不会插入block的binary数据.
- void MergeTreeWriteAheadLog::dropPart(const String & part_name)
- {
- std::unique_lock lock(write_mutex);
-
- writeIntBinary(WAL_VERSION, *out);
-
- ActionMetadata metadata{};
- metadata.write(*out);
-
- writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
- writeStringBinary(part_name, *out);
- out->next();
- sync(lock);
- }
复制代码
如何写入/读取In-Memory datapart的数据?
那我们查询数据的时候,数据一部分在已经落盘了的datapart中,而另一部分则从in-memory datapart中读取;
具体逻辑实现在MergeTreeReaderInMemory::readRows()
核心逻辑为:从每一列开始遍历,若该part_in_memory存在所要读取的Column,调用getColumnFromBlock()从Block中获取Column数据,并且如果要读取该列所有行,则直接读取,否则创建新的列mutable_column,选取所需要的行来读取;
- size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
- {
- //校检
- ...
- size_t rows_to_read = std::min(max_rows_to_read, part_rows - total_rows_read);
- auto column_it = columns.begin();
- for (size_t i = 0; i < num_columns; ++i, ++column_it)
- {
- auto name_type = getColumnFromPart(*column_it);
-
- /// Copy offsets, if array of Nested column is missing in part.
- auto offsets_it = positions_for_offsets.find(name_type.name);
- if (offsets_it != positions_for_offsets.end() && !name_type.isSubcolumn())
- {
- const auto & source_offsets = assert_cast<const ColumnArray &>(
- *part_in_memory->block.getByPosition(offsets_it->second).column).getOffsets();
-
- if (res_columns[i] == nullptr)
- res_columns[i] = name_type.type->createColumn();
-
- auto mutable_column = res_columns[i]->assumeMutable();
- auto & res_offstes = assert_cast<ColumnArray &>(*mutable_column).getOffsets();
- size_t start_offset = total_rows_read ? source_offsets[total_rows_read - 1] : 0;
- for (size_t row = 0; row < rows_to_read; ++row)
- res_offstes.push_back(source_offsets[total_rows_read + row] - start_offset);
-
- res_columns[i] = std::move(mutable_column);
- }
- else if (part_in_memory->hasColumnFiles(name_type))
- {
- auto block_column = getColumnFromBlock(part_in_memory->block, name_type);
- if (rows_to_read == part_rows)
- {
- res_columns[i] = block_column;
- }
- else
- {
- if (res_columns[i] == nullptr)
- res_columns[i] = name_type.type->createColumn();
-
- auto mutable_column = res_columns[i]->assumeMutable();
- mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read);
- res_columns[i] = std::move(mutable_column);
- }
- }
- }
-
- total_rows_read += rows_to_read;
- return rows_to_read;
- }
复制代码
- void MergeTreeDataPartWriterInMemory::write(
- const Block & block, const IColumn::Permutation * permutation)
- {
- if (part_in_memory->block)
- throw Exception("DataPartWriterInMemory supports only one write", ErrorCodes::LOGICAL_ERROR);
-
- Block primary_key_block;
- if (settings.rewrite_primary_key)
- primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation);
-
- Block result_block;
- if (permutation)
- {
- for (const auto & col : columns_list)
- {
- if (primary_key_block.has(col.name))
- result_block.insert(primary_key_block.getByName(col.name));
- else
- {
- auto permuted = block.getByName(col.name);
- permuted.column = permuted.column->permute(*permutation, 0);
- result_block.insert(permuted);
- }
- }
- }
- else
- {
- for (const auto & col : columns_list)
- result_block.insert(block.getByName(col.name));
- }
-
- index_granularity.appendMark(result_block.rows());
- if (with_final_mark)
- index_granularity.appendMark(0);
- part_in_memory->block = std::move(result_block);
-
- if (settings.rewrite_primary_key)
- calculateAndSerializePrimaryIndex(primary_key_block);
- }
复制代码
核心逻辑是插入的Block的每一列按照primary key排序,然后移动到In-Memory part的block中.每次只能插入一个Graunle的block数据,插入完后会更新mark文件,同时重新计算主键.
在上一篇文章说道,primary index是每个graunle的第一行primary Index数值,这个说法的出处就来自于下面函数的index_columns->insertFrom(primary_column, 0); 一行代码;
- void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
- {
- size_t rows = primary_index_block.rows();
- if (!rows)
- return;
-
- size_t primary_columns_num = primary_index_block.columns();
- index_columns.resize(primary_columns_num);
- for (size_t i = 0; i < primary_columns_num; ++i)
- {
- const auto & primary_column = *primary_index_block.getByPosition(i).column;
- index_columns[i] = primary_column.cloneEmpty();
- index_columns[i]->insertFrom(primary_column, 0);
- if (with_final_mark)
- index_columns[i]->insertFrom(primary_column, rows - 1);
- }
- }
复制代码
当重启数据库的时候,则表引擎的构造函数会调用loadDataparts(),从part_names_with_disks和parts_from_wal两部分来恢复数据.
然后MergeTree引擎以全局的DataPartsIndexes来维护全局的datapart.这里值得一提的是datapartindex使用了Boost::multi_index容器.
上图为重启db引擎时loadDataparts()的示意图
这时产生了另外一个问题: 那么WAL的旧数据(那些已经落盘的)如何清理呢?
MergeTree的clearOldWriteAheadLogs函数复制
而关于落盘,LevelDB调用fdatasync()系统调用,保证数据完整写入到磁盘.而CK的WAL的flush操作是通过std::streambuf::pubsync, 具体是对sync()系统调用的封装.至于sync(),fsync(),fdatasync()到底有何区别,请参考ref第二篇资料.
WAL如何恢复In-Memory datapart的数据
具体实现在:
- MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
复制代码
核心逻辑有:
- MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
- {
-
- while (!in->eof())
- {
-
- try
- {
- ActionMetadata metadata;
-
- readIntBinary(version, *in);
- if (version > 0)
- {
- metadata.read(*in);
- }
-
- readIntBinary(action_type, *in);
- readStringBinary(part_name, *in);
-
- if (action_type == ActionType::DROP_PART)
- {
- dropped_parts.insert(part_name);
- }
- else if (action_type == ActionType::ADD_PART)
- {
- auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
-
- part = storage.createPart(
- part_name,
- MergeTreeDataPartType::IN_MEMORY,
- MergeTreePartInfo::fromPartName(part_name, storage.format_version),
- single_disk_volume,
- part_name);
-
- part->uuid = metadata.part_uuid;
-
- block = block_in.read();
- }
-
- }
-
-
- if (action_type == ActionType::ADD_PART)
- {
- MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
-
- part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
- part->partition.create(metadata_snapshot, block, 0);
- if (metadata_snapshot->hasSortingKey())
- metadata_snapshot->getSortingKey().expression->execute(block);
-
- part_out.writePrefix();
- part_out.write(block);
- part_out.writeSuffixAndFinalizePart(part);
-
- min_block_number = std::min(min_block_number, part->info.min_block);
- max_block_number = std::max(max_block_number, part->info.max_block);
- parts.push_back(std::move(part));
- }
- }
-
- MergeTreeData::MutableDataPartsVector result;
- std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
- [&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
-
- return result;
- }
复制代码
1.读取WAL文件的版本,metadata,action_type等数据
2.如果是ADD操作,读取wal.bin的block的binary数据,并且根据常驻内存的metadata来重建minmax index,分区键等信息重建datapart
3.返回属于ADD操作而不属于DROP操作的datapart
---------------------
|