本帖最后由 levycui 于 2021-2-23 21:06 编辑
问题导读:
1、如何理解物理存储抽象?
2、如何设计Rowset和Tablet?
3、TabletMeta和RowsetMeta的持久化存储的方式有什么不同?
4、Service 和 StorageEngine如何设计?
BE存储引擎部分代码设计文档(2019)
主要工作
- 类名和文件名尽量一致,看代码的时候比较方便便。
- 逻辑上分层,理理论上只能从上往下调用,不不能从下往上调用,明确每个层次的作用。
- 划分概念,现在是乱的,看了了类名不不知道意思。
- 写好注释,看懂了的代码就加上注释,新来的同学看代码比较容易一些,如果英文不行就使用中文注释吧。
- 重构完之后,理理论上各个模块应该可以单独测试。
- 本次不涉及功能裁剪,只是重新组合各个模块的功能让模块之间的职责划分更清晰,现在接口内部的行为的实现代码能不动就不动了。
主要概念
storageengine –> tablet –> rowset –> segment group –> segment, storageengine 包含多个tablet, tablet包含多个rowset。
将BE原来的table这个概念换成tablet,每个tablet 有唯一的tabletid和schema hash, rollup 和 schema change都是产生不同的tablet。
rowset相当于原来的一个版本,rowset可以是一个版本的数据,rowset也可以包含多个版本的数据。
version:只是代表一个版本号,version不能表示数据文件。
Delta 特定指导入的一个批次,delta这个概念可能只是出现在我们的讨论中,不会出现在代码中。
接口设计
物理存储抽象
整个BE是建立在以下2个子存储系统上来完成存储的管理工作的:
- 文件系统,这个主要是底层的rowset读写文件需要这个,比如创建文件,读取文件,删除文件,拷贝文件等。这块考虑到未来我们可能用BOS作为我们的存储系统,所以这块我们做了了一个抽象叫做 FsEnv。
- 元数据存储系统,是底层存储tablet和rowset的元数据的一个系统,目前我们使用rocksdb实现,未来比如用BOS作为存储的时候这个可能也会变化,所以我们我们把这块也做了了抽象叫做 MetaEnv。
- FsEnv和MetaEnv其实都跟底层的存储介质相关,并且它们经常需要一起使用,而且需要在多个对象之 间传递,所以在FsEnv和MetaEnv之上我们构建了一个DataDir层,封装了FsEnv和MetaEnv,每个 DataDir都绑定了一个存储介质StorageMedium。下面分别介绍一下它们的主要接口。
- // StoreType 描述底层的物理理存储的类型 StoreType
- {
- POSIX; // 基于本地文件系统的一种dir实现
- BOS; // 基于百度对象存储系统的dir实现
- }
-
- // data dir 并不不仅仅封装了了FsEnv和MetaEnv, 还封装了部分磁盘录管理的逻辑,如当前我们个磁盘目录下最多允许1000个子目录
- // 当我们要create个新的tablet的时候,data dir就需要做下这个录的选择了
- // 未来汇报存储状态的时候我们也直接让DataDir来汇报就可以了了
- DataDir {
- Status init();
- // 根据tablet的元数据,创建tablet存储的文件结构,并不不修改元数据 Status create_tablet_dir(TabletMeta* tablet_meta);
- // 删除tablet的物理文件结构
- Status delete_tablet_dir(TabletMeta* tablet_meta); Status get_fs_env(FsEnv* fs_env);
- Status get_meta_env(MetaEnv* meta_env);
- }
- // 抽象下文件系统操作的逻辑,未来我们可能使用bos作为我们的存储介质,到时候只要重新实现下fs env就可以了
- FsEnv
- {
- create_file(); create_dir(); pread(); pwrite();
- }
- // 抽象tablet和rowset的元数据操作的逻辑,未来假如对接bos的话需要实现个基于bos的元数据管理模块
- // 比如我们基于bos做管理的时候,metastore可能实现为个件,到时候把本地的数据直接保存到bos上作为件就可以了
- MetaEnv
- {
- Status get_all_tablet_meta(vector<TabletMeta*> tablet_metas);
- Status get_all_rowset_meta(vector<RowsetMeta*> rowset_metas);
- Status save_tablet_meta(TabletMeta* tablet_meta);
- Status delete_tablet_meta(TabletMeta* tablet_meta);
- Status save_rowset_meta(RowsetMeta* rowset_meta);
- Status delete_rowset_meta(RowsetMeta* rowset_meta);
- }
复制代码
Rowset设计
Rowset代表了一个或多个批次的数据,抽象它的原因是想以后能支持更多的文件存储格式。在设计过程中考虑了一下几点:
- tablet是不感知磁盘的,真正感知存储路径的是rowset,每个rowset都应该有一个单独的磁盘目录,比如我们在做磁盘迁移的时候,一个tablet的数据可能跨越了多个磁盘,这样实际上就要求每个rowset有⾃己的存储目录。
- rowset一旦生成了就应该是一个不可变的结构,理论上任何状态都不能改变,这样有利于控制我们系统的读写并发。
- rowset的元数据尽量少一些,因为一个节点可能有几十万个tablet,每个tablet⼜可能有几百到几千个rowset,而rowset的元数据位于内存中,如果数据量太大内存可能爆掉。
- rowset的设计要满⾜我们磁盘拷贝的需求,比如我们把一个磁盘的数据和元数据都拷贝一下到另外一个磁盘上或者把磁盘目录变更一下,系统应该是能够加载的。所以不能直接在rowset meta中存储data dir。
- rowset的id在一个be内要唯一, 但是在不同的be上会有重复。rowset id只是一个唯一的标识,并不代表任何含义,两个BE上如果有两个rowset的id相同,他们的数据内容不一定一样。
复制代码
Tablet的设计
在新的架构里tablet只是一个rowset的容器器,tablet可以根据rowset创建相应的reader。
tablet里所有的rowset都是生效的rowset,不生效的rowset不放在这⾥面,这样简化了tablet的逻辑。
- TabletState
- {
-
- }
-
- TabletMeta
- {
- int64_t version; // 表示meta修改的版本号,当有多个属于同一个tablet的meta存在时,选择version最的那个
- tableid; // 前没想到有什么用
- partition_id; // publish version的时候需要根据partition id来获取所有关联的rowset, 因为version是绑定在partition上的,不能按照tablet来partition的原因是怕tabletid太多了
- shardid; // tablet 所在的shard的id,这个是跟磁盘管理理相关的逻辑,是个optional
- TabletState state;
- vector<RowsetMeta> rowsets;
- serialize();
- deserialize();
- }
-
- Tablet {
- 方法:
- // 当compaction、写结束、clone结束的时候调用这个api来修改
- // 这把要增加的rowset和要删除的rowset放在起可以保持个原子性
- // 当compaction的时候可能会增加些rowset,删除些rowset,并且希望这个原性
- Status modify_rowsets(vector<Rowset*> added_rowsets, vector<Rowset*> deleted_r
- owsets);
-
- Status create_snapshot();
-
- Status release_snapshot();
-
- // 这个API 只有在垃圾清理理的时候用
- Status get_all_rowsets(vector<Rowset*> rowsets)
- {
- // 根据文件列列表的前缀返回所有的独立的rowsetid
- }
- 属性:
- DataDir data_dir; // 当前tablet 默认的data dir,当写入新的rowset的时候,需要用这个dat
- adir来存储数据
- }
-
- TabletReader
- {
- set_read_version(int64_t end_version);
- set_start_key();
- set_end_key();
- set_filter_expr();
- }
复制代码
TabletMeta和RowsetMeta的持久化存储的⽅式:
当前存在的问题
- 过去tablet meta中保存了一个rowset meta的列表,每次更新rowset都需要更新整个tablet meta,这样写放大非常严重。
- 在这种⽅方式下未生效的rowset【COMMITTED】和已生效的rowset【VISIBLE】都存储在tablet meta中,比较混乱。
所以这次我们计划更新tablet meta和rowset meta的管理方式,具体逻辑如下:
- 当rowset commit的时候,需要修改rowset的状态为COMMITTED,需要把rowsetmeta信息持久化到meta store中。
- 当rowset publish的时候,需要修改rowset的状态为VISIBLE,修改rowset的version,把rowset信息持久化到meta env中。然后把rowset meta添加到tablet meta中,此时tablet meta可以不持久化。
- 这会带来一个问题,就是系统重启的时候如果rowset特别多,那么启动会非常慢。所以需要engine定期的把tablet meta持久化,一旦持久化,那么就把已经visible的rowset meta从meta store中删除,因为他们已经保存到tablet meta中了。 这样做还带来一个好处,当rowset向tablet中增加时,只是内存操作,加一个lock就可以,不涉及io,速度可以做到⾮常快。
系统启动加载过程
在上述结构下,整个系统重启的时候的tablet和rowset加载过程如下:
- 根据系统的配置文件读取data dir,目前doris的架构⾥,每个data dir就是配置文件的一个磁盘目录。
- 读取每个data dir下的meta store中的tablet meta和rowset meta,根据tablet meta中的shard id构建 rowset对象的shard id信息。
- 根据加载后的所有的tablet meta构建tablet对象
如果在不同的data dir目录下存在多个tablet meta,那么选择version最大的tablet meta构建tablet对象。
如果rowset的状态是PUBLISHED,那么就把他追加到对应的tablet里。
如果rowset的状态是COMMITTED,那么就根据txn id,把他加到txn map里。
Service 和 StorageEngine设计
我们把BE向外部暴露的能力封装为一个的Service,Service有两类一类是轮询fe获得task,然后执行 task;另一类是作为rpc的服务端供rpc调用的。Service假如需要访问rowset或者tablet的时候,我们都首先注册一个task,由storageengine负责生成task的各种状态,然后暴露对应的task给service,然后service调用 task的execute方法来执行。这样就把所有的并发和状态控制都封装在StorageEngine中,简单的逻辑如下:
- service模块调用StorageEngine对应的⽅法create一个task,在create task中storageengine做一系列的检查,加锁的逻辑
- service拿到task之后调用task的execute方法,来执行具体的⼯作。service调用storageengine的finish方法结束一个task。
- 如果task执行失败,那么调用cancel task方法来取消task,storageengine需要清除对应的状态。在上述逻辑下,所有的状态维护,锁的维护都由storageengine完成,外部不再看到锁,看到状态。以目前为例我们有以下⼏种task:
- ` IngestService { // batch ingest的逻辑 storageengine->create_rowset_writer();while(1) { download_file(); rowset_writer.write_block(); } storageengine->finish_rowset_writer(rowset_writer);
-
- // 流式导的逻辑 storageengine->create_rowset_writer(); // 每次写都调 rowset_writer.write_block(); // 写完之后 storageengine->finish_rowset_writer(rowset_writer); }
-
- AlterService { storageengine->create_alter_Task(); alter_task.execute() { } storageengine->finish_alter_Task(alter_Task); }
-
- // 从文件恢复一个rowset CloneService { // 当全量clone的时候,tablet不存在,那么先调create tablet创建一个tablet // 增量clone的时候,tablet存在,不需要create tablet if (tablet not exists) { storageengine->create_tablet(); } storageengine->create_clone_task(); clone_Task.execute(); storageengine->finish_clone_task(clone_task); }
-
- AlterTask { Status execute() { // 1. 读取老老的数据 // 2. 把数据写入到新的tablet中 }
-
- Status get_rowsets(vector<RowSet*> rowsets); }
-
- CloneTask { // 开始执一个clone Task // 1. 调远程的节点,make snapshot // 2. 从远程节点下载文件,放到本地临时文件夹,包括meta,index,data Status execute() { make_snapshot(); }
-
- // 当clone任务结束后,上层调用这个API,这个API返回clone的rowset的列表
- // 上层API把对应的rowset根据条件添加到tablet中
- Status get_rowsets(vector<RowSet*> rowsets); }
-
- MigrateTask { Status execute() { // 修改tablet的meta,把tablet meta持久化到目标的 // 1. 遍历migrate task中的rowset 列列表 // 2. 一个个rowset进行行转化。 }
-
- Status get_rowsets(vector<RowSet*> rowsets); }
-
- CompactionTask { Status execute(); Status get_rowsets(vector<RowSet*> rowsets); }
-
-
- ### StorageEngine部分
-
- StorageEngine { // 当上游的导入任务需要导一份数据的时候调这个api,这个api会创建个rowset writer给上层 // 上层调用这个api完成数据的写入 Status open_rowset_writer(int64_t tablet_id, int64_t schema_hash, int64_t txnId, RowsetWriter* rowset_writer) { // 1. 检测当前tablet是否在做schema change,如果在做,那么就把schema change的tablet也拿出来 // 2. 成rowset id 和 rowset writer // 3. 根据tablet的属性,设置rowset writer写文件的录 // 4. 在txn map中注册下txn与rowset,tablet的关系 // 5. 在一些场景下【clone,schema change,rollup】的时候没有txn id,此时就不需要建立关联了。 }
-
- // 当向个rowset writer 写完数据之后调用这个API
- Status finish_rowset_writer(RowsetWriter* rowset_writer)
- {
- // 1. 生成对应的rowset, 如果有alter task在执那么就生成两个tablet的rowset
- // 2. 将rowset的状态变更为committed状态。
- // 3. 当户调用finish的时候,需要把rowset元数据持久化,因为调完finish之后,户会调用txn.commit
- // 旦commit成功,那么数据就不能丢失了。
- }
-
- // 当写错误的时候调用这个API,表示取消一个rowset writer
- Status cancel_rowset_writer(RowsetWriter* rowset_writer)
- {
- // 1. 关闭rowset 打开的文件句柄
- // 2. 把rowset 从txn map中移除
- // 3. 这可以不删除rowset对应的件,因为垃圾清理线程会清理它们
- }
-
- // 当service 层收到publish version task的时候调用这个api
- Status publish_txn(int64_t txn_id)
- {
- // 1. 找到txn对应的rowset,把rowset的状态变更更为 VISIBLE
- // 2. 持久化rowset meta到meta store中
- // 3. 把rowset添加到tablet对应的rowset 列列表中
- }
-
- // 清理掉txn关联的rowset
- Status clear_txn(int64_t txn_id) {
- // 1. 如果txn 关联的rowset meta已经持久化到meta store中,那么需要从meta store中清理。
- }
-
- // 创建个alter task, 在be端不区分schema change和rollup,统一用alter task来区分。
- Status create_alter_task(AlterTableReq* req, AlterTask* alter_task)
- {
- // 1. 选择个data dir
- // 2. 在data dir下创建个tablet
- // 3. 把tablet信息填充到 alter_task
- // 4. 把tablet信息添加到storage engine的map
- // 5. 在tablet的meta中记录一个状态,这个状态要持久化,否则会有问题。
- }
-
- // 当alter task结束后,调用这个api,把新的rowset添加到tablet的rowset列列表中
- Status finish_alter_task(AlterTask* alter_task)
- {
- // 1. 读取创建的新的rowset,把他们加入到新的tablet中
- // 2. 清理一下alter task的状态
- // 3. 持久化2个tablet的meta状态
- }
-
- Status cancel_alter_task(AlterTask* alter_task) {
- // 1. 删除创建的新的tablet
- // 2. 清理正在操作的tablet的alter status
- }
-
- // 清除一个tablet上标记的alter状态
- Status clear_alter_status(int64_t tablet_id);
-
- Status create_clone_task(CloneTask* clone_task)
- {
- }
-
- // clone task本质上是从远端复制一堆rowset到本地,当cloneTask结束的时候,调用这个api
- // 这个api会拿到clone Task成的rowset,把他们加入到tablet的rowset列列表中。
- Status finish_clone_task(CloneTask* clone_task)
- {
- // 1. 取出clone task中生成的rowset 把rowset 添加到tablet的meta中
- // 2. 持久化tabletmeta
- // 3. 把rowset 添加到tablet的active rowset列表中
- }
-
- Status make_snapshot(int64_t tablet_id, TabletSnapshot* snapshot)
- {
- // 创建一个snapshot
- }
-
- // 把个tablet从一个录移动到另外一个录。
- Status create_migrate_task(int64_t tablet_id, Path dest_path)
- {
- // 1. 修改tablet的default dir,这样新来的数据就都可以往新的dir写了
- // 2. 持久化tablet meta
- // 3. 返回所有visible的rowset和正在写的rowset。
- // 4. 把要转化的rowset的信息填充到migrate task中。
- }
-
- Status finish_migrate_task(MigrateTask* migrate_task);
-
- Status create_compaction_task(CompactionTask* compaction_task)
- {
- // 根据compaction policy选择个最合适的tablet
- // 选择一堆rowset,创建一个rowset writer 填充到task中
- }
-
- Status finish_compaction_task()
- {
- // 读取compaction task 成的rowset
- // 变更tablet meta,持久化tabletmeta
- // 变更tablet 中的rowset列表
- }
-
- Status cancel_compaction_task();
-
- Status drop_tablet()
- {
- tablet_meta->set_state(TabletState.DELETING);
- store = tablet_info->get_store();
- store->save_meta(tablet_meta);
- store->delete_tablet(tablet_meta);
- tablet_meta->set_state(TabletState.DELETED);
- store->save_meta(tablet_meta);
- }
-
- Status create_tablet(CreateTabletReq* req)
- {
- OlapStore* store = _select_store();
- TabletMeta* tablet_meta = _create_new_meta(req);
- store->create_tablet(tablet_meta);store->save_meta(tablet_meta);
- }
-
- // 系统启动的时候只是根据data dir的配置,读取meta store中所有的tablet和rowset的元数据信息
- // 并没有读取具体的物理件的信息,因为读取物理文件信息在rowset规模特别大的时候会非常慢
- Status load()
- {
- vector<DataDir> data_dirs = 根据配置文件读取所有的dir
- for (DataDir dir : data_dirs) {
- dir.load_tablets(vector<TabletMeta*> tablets);
- for (TabletMeta* talbet_meta : tablets) {
- // 构建tablet 对象
- for (RowsetMeta* rowset_meta : tablet_meta -> get_rowsets) {
- // 构建txn map
- }
- }
- }
- }
-
- // 这是个垃圾回收的操作,BE运过程中会产许多垃圾,如当rowset在写的时候,写了部分,此时文件系统core掉了会产
- // rowset 的垃圾文件;当tablet被删除的时候可能会产生tablet的垃圾文件
- //
- Status garbge_collection()
- {
- for (TabletInfo* tablet_info : tablet_infos)
- {
- // 清理理所有的垃圾的tablet
- if (tablet_info->tablet_meta->tablet_state == TabletState.DELETING)
- {
- tablet_info->store()->delete_tablet(tablet_meta);
- tablet_meta->set_state(TabletState.DELETED);
- store->save_meta(tablet_meta);
- continue;
- }
- // 清理tablet中用的rowset文件, get all tablets 的实现目前考虑的就是根据文件名的前缀作为rowsetid
- rowset_ids = tablet_info->get_tablet()->get_all_rowsets();
- for (int64_t rowset_id : rowset_ids)
- {
- if (rowset_id not in committed rowsets
- && rowset id not in prepare rowsets
- && rowset id not in writing rowsets
- )
- {
- tablet_info->get_tablet()->delete_rowset(rowset_id);
- }
- }
- }
- // 垃圾的txn不要在这清理, storage engine会**定时的把txn向fe汇报,fe会下发清理txn的任务,收到clear txn任务的时候清理**
- } } ```
复制代码
错误处理逻辑
抽象一个错误处理的队列,发生错误后,如果在当前线程中不能直接处理,那么需要把错误消息封装一下直接发送给错误队列,由各个不同的任务线程,从队列中读取错误信息来处理。
最新经典文章,欢迎关注公众号
作者:wingsgo
来源:https://wingsgo.github.io/2020/0 ... _refactor_2019.html
|