分享

ClickHouse源码阅读计划(二) - IStorage和MergeTree的datapart/WAL部分梳理

上一篇:ClickHouse源码阅读计划(一) - Everything About MergeTree

问题导读:

1、WAL保存什么信息?
2、如何写入/读取In-Memory datapart的数据?
3、WAL如何恢复In-Memory datapart的数据?



本文基于社区21.4版本分析

本章着重从源码角度梳理Storage部分的代码,并着重研究MergeTree部分的代码.

我们首先讨论ClickHouse代码中对Table的抽象,其父类接口定义在IStorage.h中.该父类负责以下功能:

1.table数据的存储

2.数据的查找和插入

3.数据的存储结构,如压缩方式

4.对数据的并发控制,如锁


我们可以从其定义的虚函数和数据成员.来总结其提供的基本功能:

1.查询表引擎的名字,表的名字,是否属于视图/是否支持SAMPLE/FINAL/PREWHERE/复制功能/并行插入/去重/合并小block成大block等功能.

2.每个table都有一个全局唯一的id,为{database,table,UUID}三个字段组成的结构体

3.定义表的metadata,并且支持多版本以支持无锁读写元数据.

4.提供了三种并发控制的函数,两种类型的锁,用于保证数据库表操作时的数据安全.

三种并发控制:

lockForShare

用于保证占有lock时table不会被drop.用于SELECT,后台的merge

lockForAlter

用于保证占有lock时同时刻只会处理一个ALTER Query

lockExclusively

用于保证占有lock时没有任何其他线程(如select,merge,alter)对表进行操作

两种锁:

alter_lock

进行alter操作的时候必须要抢占的锁以保证任何时刻只有一个alter线程处理table,drop等操作也需要调用lockExclusively抢占该锁以保证进行操作前所有alter操作已经结束.

drop_lock

所有select/merge相关的query需要调用lockForShare抢占该锁保证表没有被drop.

所有drop相关操作时需要调用lockExclusively抢占该锁,保证进行drop时所有其他任何select/merge操作都已经结束.

5.getQueryProcessingStage方法提供查询该表处理Select Query的状态.

6.表的操作相关的方法定义,如

watch用于监控表中数据变化;read,write接口分别用于数据的读写,还有如drop,truncate,rename,alter,optimize,mutate等操作的定义.

7.表的属性相关方法定义,如

获取表的行数字节数,存储policy,插入的行数/字节数等

总结:从IStorage接口中,我们得知在设计表引擎的时候,其父类需要定义:表操作相关方法,表的元数据查询和修改的方法,唯一标识表的数据结构,保证数据安全的并发控制操作和数据结构等等.

我们借助cpptree工具来看一下IStorage的类继承关系:从下面的树状图可以看到ClickHouse提供了非常丰富的表引擎定义.

  1. IStorage
  2.       ├── IStorageSystemOneBlock
  3.       │   ├── StorageSystemAggregateFunctionCombinators
  4.       │   ├── StorageSystemAsynchronousMetrics
  5.       │   ├── StorageSystemBuildOptions
  6.       │   ├── StorageSystemClusters
  7.       │   ├── StorageSystemCollations
  8.       │   ├── StorageSystemContributors
  9.       │   ├── StorageSystemCurrentRoles
  10.       │   ├── StorageSystemDDLWorkerQueue
  11.       │   ├── StorageSystemDataTypeFamilies
  12.       │   ├── StorageSystemDatabases
  13.       │   ├── StorageSystemDictionaries
  14.       │   ├── StorageSystemDistributionQueue
  15.       │   ├── StorageSystemEnabledRoles
  16.       │   ├── StorageSystemErrors
  17.       │   ├── StorageSystemEvents
  18.       │   ├── StorageSystemFormats
  19.       │   ├── StorageSystemFunctions
  20.       │   ├── StorageSystemGrants
  21.       │   ├── StorageSystemGraphite
  22.       │   ├── StorageSystemLicenses
  23.       │   ├── StorageSystemMacros
  24.       │   ├── StorageSystemMerges
  25.       │   ├── StorageSystemMetrics
  26.       │   ├── StorageSystemModels
  27.       │   ├── StorageSystemMutations
  28.       │   ├── StorageSystemPrivileges
  29.       │   ├── StorageSystemProcesses
  30.       │   ├── StorageSystemQuotaLimits
  31.       │   ├── StorageSystemQuotaUsage
  32.       │   ├── StorageSystemQuotas
  33.       │   ├── StorageSystemQuotasUsage
  34.       │   ├── StorageSystemReplicatedFetches
  35.       │   ├── StorageSystemReplicationQueue
  36.       │   ├── StorageSystemRoleGrants
  37.       │   ├── StorageSystemRoles
  38.       │   ├── StorageSystemRowPolicies
  39.       │   ├── StorageSystemSettings
  40.       │   ├── StorageSystemSettingsProfileElements
  41.       │   ├── StorageSystemSettingsProfiles
  42.       │   ├── StorageSystemStackTrace
  43.       │   ├── StorageSystemTableEngines
  44.       │   ├── StorageSystemTableFunctions
  45.       │   ├── StorageSystemTimeZones
  46.       │   ├── StorageSystemUserDirectories
  47.       │   ├── StorageSystemUsers
  48.       │   ├── StorageSystemZooKeeper
  49.       │   └── SystemMergeTreeSettings
  50.       ├── IStorageURLBase
  51.       │   ├── StorageXDBC
  52.       │   └── StorageURL
  53.       ├── MergeTreeData
  54.       │   ├── StorageMergeTree
  55.       │   └── StorageReplicatedMergeTree
  56.       ├── StorageBlocks
  57.       ├── StorageProxy
  58.       │   ├── StorageTableFunctionProxy
  59.       │   └── StorageMaterializeMySQL
  60.       ├── StorageSetOrJoinBase
  61.       │   ├── StorageJoin
  62.       │   └── StorageSet
  63.       ├── StorageSystemPartsBase
  64.       │   ├── StorageSystemParts
  65.       │   └── StorageSystemPartsColumns
  66.       ├── StorageBuffer
  67.       ├── StorageDictionary
  68.       ├── StorageDistributed
  69.       ├── StorageEmbeddedRocksDB
  70.       ├── StorageFile
  71.       ├── StorageFromMergeTreeDataPart
  72.       ├── StorageGenerateRandom
  73.       ├── StorageHDFS
  74.       ├── StorageInput
  75.       ├── StorageKafka
  76.       ├── StorageLiveView
  77.       ├── StorageLog
  78.       ├── StorageMaterializedView
  79.       ├── StorageMemory
  80.       ├── StorageMerge
  81.       ├── StorageMongoDB
  82.       ├── StorageMySQL
  83.       ├── StorageNull
  84.       ├── StoragePostgreSQL
  85.       ├── StorageRabbitMQ
  86.       ├── StorageS3
  87.       ├── StorageStripeLog
  88.       ├── StorageSystemColumns
  89.       ├── StorageSystemDetachedParts
  90.       ├── StorageSystemDisks
  91.       ├── StorageSystemNumbers
  92.       ├── StorageSystemOne
  93.       ├── StorageSystemReplicas
  94.       ├── StorageSystemStoragePolicies
  95.       ├── StorageSystemTables
  96.       ├── StorageSystemZeros
  97.       ├── StorageTinyLog
  98.       ├── StorageValues
  99.       └── StorageView
复制代码

我们首先来重点看MergeTree相关部分的代码,MergeTree和ReplicatedMergeTree引擎的父类定义在MergeTreeData.h中,MergeTree的datapart数据结构在上篇文章中已经着重介绍,而该头文件中定义的函数和数据成员用于维护一个包含所有datapart的list,以及datapart和partition的相关数据结构和CRUD等操作.

而读取和修改数据的具体实现则具体在{MergeTreeDataSelectExecutor,MergeTreeDataWriter,MergeTreeDataMergerMutator}这三个类中,datapart的具体实现则在IMergeTreeDataPart类中.

这里介绍几个重点:

1.Datapart的事务性提交
在内存中的datapart为mutable datapart,而当datapart被加入到working set后就会变为immutable datapart,这个加入working set的过程提供事务支持,是一个两阶段提交的过程.其中commit()和rollback()函数在MergeTreeData.cpp中实现

  • commit()


遍历所有PreCommited的datapart,把所有将要被覆盖的datapart设置为Outdated

  1. MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData::DataPartsLock * acquired_parts_lock)
  2. {
  3.     DataPartsVector total_covered_parts;
  4.     if (!isEmpty())
  5.     {
  6.         for (const DataPartPtr & part : precommitted_parts)
  7.         {
  8.             DataPartPtr covering_part;
  9.             DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, *owing_parts_lock);
  10.             if (covering_part)
  11.             {
  12.                 part->remove_time.store(0, std::memory_order_relaxed); /// The part will be removed without waiting for old_parts_lifetime seconds.
  13.                 data.modifyPartState(part, DataPartState::Outdated);
  14.             }
  15.             else
  16.             {
  17.                 total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end());
  18.                 for (const DataPartPtr & covered_part : covered_parts)
  19.                 {
  20.                     covered_part->remove_time.store(current_time, std::memory_order_relaxed);
  21.                     reduce_bytes += covered_part->getBytesOnDisk();
  22.                     reduce_rows += covered_part->rows_count;
  23.                     data.modifyPartState(covered_part, DataPartState::Outdated);
  24.                     data.removePartContributionToColumnSizes(covered_part);
  25.                 }
  26.                 reduce_parts += covered_parts.size();
  27.                 add_bytes += part->getBytesOnDisk();
  28.                 add_rows += part->rows_count;
  29.                 ++add_parts;
  30.                 data.modifyPartState(part, DataPartState::Committed);
  31.                 data.addPartContributionToColumnSizes(part);
  32.             }
  33.         }
  34.         data.decreaseDataVolume(reduce_bytes, reduce_rows, reduce_parts);
  35.         data.increaseDataVolume(add_bytes, add_rows, add_parts);
  36.     }
  37.     clear();
  38.     return total_covered_parts;
  39. }
复制代码

2.MergeTree的变种

  1. /// Merging mode. See above.
  2.         enum Mode
  3.         {
  4.             Ordinary            = 0,    /// Enum values are saved. Do not change them.
  5.             Collapsing          = 1,
  6.             Summing             = 2,
  7.             Aggregating         = 3,
  8.             Replacing           = 5,
  9.             Graphite            = 6,
  10.             VersionedCollapsing = 7,
  11.         };
复制代码

各种MergeTree表引擎的功能上异同点和实现细节留到下一篇文章

3.Datapart的具体代码实现

上篇文章讲了datapart的文件组织,其具体代码是定义在IMergeTreeDataPart.h中

我们侧重关心checksum,datapart的生命周期,索引部分的实现

  • 生命周期--状态机转换


datapart以六种状态来定义其生命周期:

  1.   enum class State
  2.     {
  3.         Temporary,       /// the part is generating now, it is not in data_parts list
  4.         PreCommitted,    /// the part is in data_parts, but not used for SELECTs
  5.         Committed,       /// active data part, used by current and upcoming SELECTs
  6.         Outdated,        /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
  7.         Deleting,        /// not active data part with identity refcounter, it is deleting right now by a cleaner
  8.         DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
  9.     };
复制代码

v2-902b0491108be67dd327d9d303b5db06_1440w.jpeg

  • 每个表引擎会维护一个datapart的list,所有刚从disk上获取或刚插入或者刚merged生成的新datapart都属于Temporary状态
  • 把Temporary状态的datapart加入到working set是一个两阶段提交过程,第一阶段会进入PreCommited状态
  • 如果失败就会回滚到Outdated状态等待销毁,当前状态的datapart还能够被select语句查询到,后台会定时清理这部分的datapart,正在清理过程称为Deleting状态,不能被select查询
  • 如果成功就会进入到Commited状态,即成功加入到working set中
  • Commited状态的datapart经过DROP操作后会转变为Outdated状态,等待被清除

  • 校验和


定义在MergeTreeDataPartChecksum.h中

每个datapart目录下的文件都会分别记录Uint64的文件大小和Uint128的hash值,对于.bin文件还会分别记录压缩前后的文件大小和128bit的hash值

  1. struct MergeTreeDataPartChecksum
  2. {
  3.     using uint128 = CityHash_v1_0_2::uint128;
  4.     UInt64 file_size {};
  5.     uint128 file_hash {};
  6.     bool is_compressed = false;
  7.     UInt64 uncompressed_size {};
  8.     uint128 uncompressed_hash {};
复制代码

  • datapart的类型


定义在MergeTreeDataPartType.h中

  1. enum Value
  2.     {
  3.         /// Data of each column is stored in one or several (for complex types) files.
  4.         /// Every data file is followed by marks file.
  5.         WIDE,
  6.         /// Data of all columns is stored in one file. Marks are also stored in single file.
  7.         COMPACT,
  8.         /// Format with buffering data in RAM.
  9.         IN_MEMORY,
  10.         UNKNOWN,
  11.     };
复制代码

index相关

.mrk文件

v2-cae698543628ae8f97ab5aa14522af1c_1440w.jpeg

.mrk文件一个重要功能是记录着每个mark之间间隔了多少行(即每个granule的大小);默认是8192行,该部分具体是用一个记录累加和的vector来存储,该部分信息会常驻在内存中

  1. /// Class contains information about index granularity in rows of IMergeTreeDataPart
  2. /// Inside it contains vector of partial sums of rows after mark:
  3. /// |-----|---|----|----|
  4. /// |  5  | 8 | 12 | 16 |
  5. /// If user doesn't specify setting adaptive_index_granularity_bytes for MergeTree* table
  6. /// all values in inner vector would have constant stride (default 8192).
  7. class MergeTreeIndexGranularity
  8. {
  9. private:
  10.     std::vector<size_t> marks_rows_partial_sums;
复制代码

.mrk的其余信息即(graunle在解压前文件的偏移量,graunle在解压后的block的偏移量)则定义在MarkInCompressedFile.h中

  1. struct MarkInCompressedFile
  2. {
  3.     size_t offset_in_compressed_file;
  4.     size_t offset_in_decompressed_block;
复制代码

该部分信息常驻Cache而不是RAM,定义在MarkCache.h中,由Clickhouse自己写的LRUCache实现,这个LRUCache的具体实现方式后续有时间再讲.

minmax index

记录着每个datapart分区键对应的column的最大最小值,用于分支裁剪

  1.   /// Index that for each part stores min and max values of a set of columns. This allows quickly excluding
  2.     /// parts based on conditions on these columns imposed by a query.
  3.     /// Currently this index is built using only columns required by partition expression, but in principle it
  4.     /// can be built using any set of columns.
  5.     struct MinMaxIndex
  6.     {
  7.         /// A direct product of ranges for each key column. See Storages/MergeTree/KeyCondition.cpp for details.
  8.         std::vector<Range> hyperrectangle;
复制代码

primary.idx

该部分信息常驻RAM,记录该表引擎primary索引对应的列


  1.   /// Primary key (correspond to primary.idx file).
  2.     /// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
  3.     /// Note that marks (also correspond to primary key) is not always in RAM, but cached. See MarkCache.h.
  4.     using Index = Columns;
  5.     Index index;
复制代码

skipping index

定义在IndexDescription.h,记录着非主键索引的索引信息,同样常驻RAM


  1. /// Description of non-primary index for Storage
  2. struct IndexDescription
  3. {
  4.     /// Definition AST of index
  5.     ASTPtr definition_ast;
  6.     /// List of expressions for index calculation
  7.     ASTPtr expression_list_ast;
  8.     /// Index name
  9.     String name;
  10.     /// Index type (minmax, set, bloom filter, etc.)
  11.     String type;
  12.     /// Prepared expressions for index calculations
  13.     ExpressionActionsPtr expression;
  14.     /// Index arguments, for example probability for bloom filter
  15.     FieldVector arguments;
  16.     /// Names of index columns (not to be confused with required columns)
  17.     Names column_names;
  18.     /// Data types of index columns
  19.     DataTypes data_types;
  20.     /// Sample block with index columns. (NOTE: columns in block are empty, but not nullptr)
  21.     Block sample_block;
  22.     /// Index granularity, make sense for skip indices
  23.     size_t granularity;
  24.     /// Parse index from definition AST
  25.     static IndexDescription getIndexFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, const Context & context);
  26. };
复制代码

4.WAL机制

在LSM-Tree中,为了提高数据写入性能,会把数据积攒到一定阈值后再落盘成SST文件,而未落盘的文件会可能因为宕机而产生丢失,因此引入了WAL机制来保证内存数据的安全.而参考资料1则提出,WAL机制还可以带来一个好处:避免高频的小量写操作生成大量的datapart,尽可能减少生成的datapart文件的数量.

我们要解决的问题有:

1.WAL的物理格式,每插入一条datapart,WAL保存什么信息?

2.如何写入/读取In-Memory datapart的数据?

3.宕机发生,重启数据库,WAL如何恢复In-Memory datapart的数据?

我们重点关注MergeTreeData::loadDataParts()方法的过程,MergeTreeWriteAheadLog.h/.cpp的定义和实现.

WAL的内存布局,每插入一条datapart,WAL保存什么信息?

  1. /** WAL stores addditions and removals of data parts in in-memory format.
  2.   * Format of data in WAL:
  3.   * - version
  4.   * - type of action (ADD or DROP)
  5.   * - part name
  6.   * - part's block in Native format. (for ADD action)
  7.   */
复制代码

wal.bin 文件的主要内存布局:

1.版本

2.datapart操作的类型(ADD还是DROP)

3.datapart的name

4.binary格式的block数据

每插入一条In-Memory Datapart,就会在wal.bin插入一条记录,为

WAL_VERSION+ActionMetadata+ActionType+PartName+Block

的binary格式,其中:

  • WAL_VERSION UInt8, 现版本为1
  • ActionMetadata

  1. struct ActionMetadata
  2.     {
  3.         /// The minimum version of WAL reader that can understand metadata written by current ClickHouse version.
  4.         /// This field must be increased when making backwards incompatible changes.
  5.         ///
  6.         /// The same approach can be used recursively inside metadata.
  7.         UInt8 min_compatible_version = 0;
  8.         /// Actual metadata.
  9.         UUID part_uuid = UUIDHelpers::Nil;
复制代码

包括UInt8的最小WAL兼容版本,UInt32的序列化后UUID的长度值,以及UINT28的UUID,用于唯一标识datapart

  • Uint8的ActionType,用于标记是DROP还是ADD操作
  • datapart的Name,即partitionid_minblock_maxblock_level
  • datapart的block数据,由NativeBlockOutputStream封装负责block数据序列化

  1. void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)
  2. {
  3.     std::unique_lock lock(write_mutex);
  4.     auto part_info = MergeTreePartInfo::fromPartName(part->name, storage.format_version);
  5.     min_block_number = std::min(min_block_number, part_info.min_block);
  6.     max_block_number = std::max(max_block_number, part_info.max_block);
  7.     writeIntBinary(WAL_VERSION, *out);
  8.     ActionMetadata metadata{};
  9.     metadata.part_uuid = part->uuid;
  10.     metadata.write(*out);
  11.     writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
  12.     writeStringBinary(part->name, *out);
  13.     block_out->write(part->block);
  14.     block_out->flush();
  15.     sync(lock);
  16.     auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
  17.     if (out->count() > max_wal_bytes)
  18.         rotate(lock);
  19. }
复制代码

当wal.bin的大小超过write_ahead_log_maxbytes即单个wal的最大字节数时,会生成一个新的wal_minblock_maxblock.bin文件

同样,删除一条datapart也会在wal.bin插入一条对应记录,但是不会插入block的binary数据.

  1. void MergeTreeWriteAheadLog::dropPart(const String & part_name)
  2. {
  3.     std::unique_lock lock(write_mutex);
  4.     writeIntBinary(WAL_VERSION, *out);
  5.     ActionMetadata metadata{};
  6.     metadata.write(*out);
  7.     writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
  8.     writeStringBinary(part_name, *out);
  9.     out->next();
  10.     sync(lock);
  11. }
复制代码

如何写入/读取In-Memory datapart的数据?

那我们查询数据的时候,数据一部分在已经落盘了的datapart中,而另一部分则从in-memory datapart中读取;

  • 读取in-memory datapart


具体逻辑实现在MergeTreeReaderInMemory::readRows()

核心逻辑为:从每一列开始遍历,若该part_in_memory存在所要读取的Column,调用getColumnFromBlock()从Block中获取Column数据,并且如果要读取该列所有行,则直接读取,否则创建新的列mutable_column,选取所需要的行来读取;

  1. size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
  2. {
  3.     //校检   
  4.     ...   
  5.     size_t rows_to_read = std::min(max_rows_to_read, part_rows - total_rows_read);
  6.     auto column_it = columns.begin();
  7.     for (size_t i = 0; i < num_columns; ++i, ++column_it)
  8.     {
  9.         auto name_type = getColumnFromPart(*column_it);
  10.         /// Copy offsets, if array of Nested column is missing in part.
  11.         auto offsets_it = positions_for_offsets.find(name_type.name);
  12.         if (offsets_it != positions_for_offsets.end() && !name_type.isSubcolumn())
  13.         {
  14.             const auto & source_offsets = assert_cast<const ColumnArray &>(
  15.                 *part_in_memory->block.getByPosition(offsets_it->second).column).getOffsets();
  16.             if (res_columns[i] == nullptr)
  17.                 res_columns[i] = name_type.type->createColumn();
  18.             auto mutable_column = res_columns[i]->assumeMutable();
  19.             auto & res_offstes = assert_cast<ColumnArray &>(*mutable_column).getOffsets();
  20.             size_t start_offset = total_rows_read ? source_offsets[total_rows_read - 1] : 0;
  21.             for (size_t row = 0; row < rows_to_read; ++row)
  22.                 res_offstes.push_back(source_offsets[total_rows_read + row] - start_offset);
  23.             res_columns[i] = std::move(mutable_column);
  24.         }
  25.         else if (part_in_memory->hasColumnFiles(name_type))
  26.         {
  27.             auto block_column = getColumnFromBlock(part_in_memory->block, name_type);
  28.             if (rows_to_read == part_rows)
  29.             {
  30.                 res_columns[i] = block_column;
  31.             }
  32.             else
  33.             {
  34.                 if (res_columns[i] == nullptr)
  35.                     res_columns[i] = name_type.type->createColumn();
  36.                 auto mutable_column = res_columns[i]->assumeMutable();
  37.                 mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read);
  38.                 res_columns[i] = std::move(mutable_column);
  39.             }
  40.         }
  41.     }
  42.     total_rows_read += rows_to_read;
  43.     return rows_to_read;
  44. }
复制代码

  • 写入in-memory datapart

  1. void MergeTreeDataPartWriterInMemory::write(
  2.     const Block & block, const IColumn::Permutation * permutation)
  3. {
  4.     if (part_in_memory->block)
  5.         throw Exception("DataPartWriterInMemory supports only one write", ErrorCodes::LOGICAL_ERROR);
  6.     Block primary_key_block;
  7.     if (settings.rewrite_primary_key)
  8.         primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation);
  9.     Block result_block;
  10.     if (permutation)
  11.     {
  12.         for (const auto & col : columns_list)
  13.         {
  14.             if (primary_key_block.has(col.name))
  15.                 result_block.insert(primary_key_block.getByName(col.name));
  16.             else
  17.             {
  18.                 auto permuted = block.getByName(col.name);
  19.                 permuted.column = permuted.column->permute(*permutation, 0);
  20.                 result_block.insert(permuted);
  21.             }
  22.         }
  23.     }
  24.     else
  25.     {
  26.         for (const auto & col : columns_list)
  27.             result_block.insert(block.getByName(col.name));
  28.     }
  29.     index_granularity.appendMark(result_block.rows());
  30.     if (with_final_mark)
  31.         index_granularity.appendMark(0);
  32.     part_in_memory->block = std::move(result_block);
  33.     if (settings.rewrite_primary_key)
  34.         calculateAndSerializePrimaryIndex(primary_key_block);
  35. }
复制代码

核心逻辑是插入的Block的每一列按照primary key排序,然后移动到In-Memory part的block中.每次只能插入一个Graunle的block数据,插入完后会更新mark文件,同时重新计算主键.

在上一篇文章说道,primary index是每个graunle的第一行primary Index数值,这个说法的出处就来自于下面函数的index_columns->insertFrom(primary_column, 0); 一行代码;

  1. void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
  2. {
  3.     size_t rows = primary_index_block.rows();
  4.     if (!rows)
  5.         return;
  6.     size_t primary_columns_num = primary_index_block.columns();
  7.     index_columns.resize(primary_columns_num);
  8.     for (size_t i = 0; i < primary_columns_num; ++i)
  9.     {
  10.         const auto & primary_column = *primary_index_block.getByPosition(i).column;
  11.         index_columns[i] = primary_column.cloneEmpty();
  12.         index_columns[i]->insertFrom(primary_column, 0);
  13.         if (with_final_mark)
  14.             index_columns[i]->insertFrom(primary_column, rows - 1);
  15.     }
  16. }
复制代码

当重启数据库的时候,则表引擎的构造函数会调用loadDataparts(),从part_names_with_disks和parts_from_wal两部分来恢复数据.

然后MergeTree引擎以全局的DataPartsIndexes来维护全局的datapart.这里值得一提的是datapartindex使用了Boost::multi_index容器.

v2-144752147c9015cda1315fc21a70bece_1440w.jpeg


上图为重启db引擎时loadDataparts()的示意图

这时产生了另外一个问题: 那么WAL的旧数据(那些已经落盘的)如何清理呢?

MergeTree的clearOldWriteAheadLogs函数复制

而关于落盘,LevelDB调用fdatasync()系统调用,保证数据完整写入到磁盘.而CK的WAL的flush操作是通过std::streambuf::pubsync, 具体是对sync()系统调用的封装.至于sync(),fsync(),fdatasync()到底有何区别,请参考ref第二篇资料.

WAL如何恢复In-Memory datapart的数据

具体实现在:

  1. MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
复制代码

核心逻辑有:

  1. MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
  2. {
  3.     while (!in->eof())
  4.     {
  5.    
  6.         try
  7.         {
  8.             ActionMetadata metadata;
  9.             readIntBinary(version, *in);
  10.             if (version > 0)
  11.             {
  12.                 metadata.read(*in);
  13.             }
  14.             readIntBinary(action_type, *in);
  15.             readStringBinary(part_name, *in);
  16.             if (action_type == ActionType::DROP_PART)
  17.             {
  18.                 dropped_parts.insert(part_name);
  19.             }
  20.             else if (action_type == ActionType::ADD_PART)
  21.             {
  22.                 auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
  23.                 part = storage.createPart(
  24.                     part_name,
  25.                     MergeTreeDataPartType::IN_MEMORY,
  26.                     MergeTreePartInfo::fromPartName(part_name, storage.format_version),
  27.                     single_disk_volume,
  28.                     part_name);
  29.                 part->uuid = metadata.part_uuid;
  30.                 block = block_in.read();
  31.             }
  32.    
  33.         }
  34.       
  35.         if (action_type == ActionType::ADD_PART)
  36.         {
  37.             MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
  38.             part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
  39.             part->partition.create(metadata_snapshot, block, 0);
  40.             if (metadata_snapshot->hasSortingKey())
  41.                 metadata_snapshot->getSortingKey().expression->execute(block);
  42.             part_out.writePrefix();
  43.             part_out.write(block);
  44.             part_out.writeSuffixAndFinalizePart(part);
  45.             min_block_number = std::min(min_block_number, part->info.min_block);
  46.             max_block_number = std::max(max_block_number, part->info.max_block);
  47.             parts.push_back(std::move(part));
  48.         }
  49.     }
  50.     MergeTreeData::MutableDataPartsVector result;
  51.     std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
  52.         [&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
  53.     return result;
  54. }
复制代码

1.读取WAL文件的版本,metadata,action_type等数据

2.如果是ADD操作,读取wal.bin的block的binary数据,并且根据常驻内存的metadata来重建minmax index,分区键等信息重建datapart

3.返回属于ADD操作而不属于DROP操作的datapart







最新经典文章,欢迎关注公众号



---------------------


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条