上一篇:ClickHouse源码阅读计划(二) - IStorage和MergeTree的datapart/WAL部分梳理
问题导读:
1.POPULATE关键字有什么用? 2.ClickHouse的物化视图如何实现? 3.如何写入物化视图?
源码分析版本:21.7
物化视图是什么?View 我理解为一个saved query,是一种虚拟表,不存储任何数据。每当从view读取数据的时候,对应一次从物理表的read操作。 而物化视图则是对应一份持久化的存储,可以是物理表的一份数据子集拷贝,也可以是多表JOIN或者预聚合的一个结果或子集。ClickHouse的物化视图实现更像是触发器,如果view中预先定义了聚合函数,那么(在不指定populate关键字的情况下)聚合函数仅适用于新插入的数据。对源表数据的更改都不会更改物化视图。
物化视图的场景?假设场景:计算每个用户的日下载量
- CREATE TABLE download (
- when DateTime,
- userid UInt32,
- bytes Float32
- ) ENGINE=MergeTree
- PARTITION BY toYYYYMM(when)
- ORDER BY (userid, when);
-
- # 插入数据
- INSERT INTO download
- SELECT
- now() + number * 60 as when,
- 25,
- rand() % 100000000
- FROM system.numbers
- LIMIT 5000
-
- # 查看报表数据
- SELECT
- toStartOfDay(when) AS day,
- userid,
- count() as downloads,
- sum(bytes) AS bytes
- FROM download
- GROUP BY userid, day
复制代码
如果不使用物化视图,需要每次运行查询以交互方式统计结果。但是对于大型表,提前计算它们更快,更节省资源。因此,最好将结果放在单独的物化视图中,该表就可以连续跟踪每天每个用户的下载总数。 可以理解为把总计结果提前计算好,配合SummingMergeTree/AggregatingMergeTree 还会对新插入物化视图的数据做相应的求和/聚合/去重等操作,存到物化视图表中,实现实时的预聚合。那么就不需要每次运行查询去得到结果了。 另外一个用法,就是可以通过`AS SELECT`语法灵活改变表的排序顺序/表结构的变更。排序key变了,那么针对该key的filter scan能够实现更高效的索引剪枝。
语法介绍- CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] AS SELECT ...
复制代码
- 不指定 TO [db].[table]的时候,必须要指定 ENGINE
- 指定TO [db].[table] 目标表的时候,不能使用POPULATE关键字
POPULATE关键字有什么用? 若指定了POPULATE关键字,会把表中现有数据存储到视图中,否则只会写入创建视图之后的数据。
然而如果对数据的精确度要求比较高,不建议使用POPULATE关键字,因为在创建视图过程中插入表中的数据并不会写入视图,会造成数据的丢失。
使用指定ENGINE方法创建- 使用POPULATE 创建一个每日计数下载量的物化视图表
- CREATE MATERIALIZED VIEW download_daily_mv
- ENGINE = SummingMergeTree
- PARTITION BY toYYYYMM(day)
- ORDER BY (userid, day) POPULATE AS
- SELECT
- toStartOfDay(when) AS day,
- userid,
- count() AS downloads,
- sum(bytes) AS bytes
- FROM download
- GROUP BY
- userid,
- day;
-
- SELECT * FROM download_daily_mv ORDER BY day,userid LIMIT 5;
复制代码
源表中的插入的5000条数据已经存在在物化视图表中
- 不使用POPULATE 创建一个每小时计数下载量的物化视图表
- CREATE MATERIALIZED VIEW download_hour_mv
- ENGINE = SummingMergeTree
- PARTITION BY toYYYYMM(hour) ORDER BY (userid, hour)
- AS SELECT
- toStartOfHour(when) AS hour,
- userid,
- count() as downloads,
- sum(bytes) AS bytes
- FROM download WHERE when >= toDateTime('2021-07-23 00:00:00')
- GROUP BY userid, hour
复制代码
这里加了个时间点,意味着只有在该时间段之后的数据才会物化到mv中。 可以看到现在还是个空表,如何物化数据呢?
你可以直接把源表数据直接insert到mv中,手动把自动同步时间点前的源数据插入到物化视图表中,这样就实现了原有数据表的同步。 - INSERT INTO download_hour_mv SELECT
- toStartOfHour(when) AS hour,
- userid,
- count() AS downloads,
- sum(bytes) AS bytes
- FROM download
- WHERE when <= toDateTime('2021-07-23 00:00:00')
- GROUP BY
- userid,
- hour
复制代码
当然你也可以直接写数据到源表,实现数据的自动同步 - INSERT INTO download SELECT
- toDateTime('2021-09-01 04:00:00') + (number * (1 / 3)) AS when,
- 19,
- rand() % 1000000
- FROM system.numbers
- LIMIT 10
复制代码
物化视图创建一个具有特殊名称的私有表来保存数据。如果通过键入“ DROP TABLE download_daily_mv”删除实例化视图,则私有表将消失。如果需要更改视图,则需要将其删除并使用新数据重新创建。
我们可以通过show tables看到私有表:
我们之前也讲了,物化视图实际上也是占用了disk上的datapart,我们可以看一下私有表和mv表对应的datapart情况: - SELECT
- partition,
- name,
- rows,
- bytes_on_disk,
- modification_time,
- min_date,
- max_date,
- engine
- FROM system.parts
- WHERE table = '.inner_id.40fd3d25-a09b-4a4d-80fd-3d25a09bba4d'
-
-
- SELECT
- partition,
- name,
- rows,
- bytes_on_disk,
- modification_time,
- min_date,
- max_date,
- engine
- FROM system.parts
- WHERE table = 'download_hour_mv'
复制代码
可以看到物化表其实只是一个视图,不存储datapart,而只有其对应的私有表才存储相应的datapart。
使用 to db.table方法创建用这种方法创建mv需要用[to db.table]手动指定目标私有表 先创建源表并且创建数据 - CREATE TABLE counter (
- when DateTime DEFAULT now(),
- device UInt32,
- value Float32
- ) ENGINE=MergeTree
- PARTITION BY toYYYYMM(when)
- ORDER BY (device, when);
-
- INSERT INTO counter
- SELECT
- toDateTime('2015-01-01 00:00:00') + toInt64(number/10) AS when,
- (number % 10) + 1 AS device,
- (device * 3) + (number/10000) + (rand() % 53) * 0.1 AS value
- FROM system.numbers LIMIT 1000000;
复制代码
假设我们需要查询全时间段设备的点击率,最大最小和平均值,我们可用以下SQL - SELECT
- device,
- sum(count) AS count,
- maxMerge(max_value_state) AS max,
- minMerge(min_value_state) AS min,
- avgMerge(avg_value_state) AS avg
- FROM counter
- GROUP BY device
- ORDER BY device ASC
复制代码
但是这么做的话查询速度会很慢,因为需要扫全表的数据。如果我们创建一个物化视图表,计算一个日聚合的数据,那么就可以直接汇总每日聚合的数据返回结果。注意这里的maxMerge函数可以理解为在SQL语法层面暴露一个部分值局部聚合的这么一个功能。换句话说,使用SummingMergeTree的物化视图也可以实现AggregatingMergeTree的聚合功能,因此而推荐使用SummingMergeTree。
创建目标表
- CREATE TABLE counter_daily (
- day DateTime,
- device UInt32,
- count UInt64,
- max_value_state AggregateFunction(max, Float32),
- min_value_state AggregateFunction(min, Float32),
- avg_value_state AggregateFunction(avg, Float32)
- )
- ENGINE = SummingMergeTree()
- PARTITION BY tuple()
- ORDER BY (device, day);
复制代码
在定义物化视图的时候使用select从源表把数据注入到目标表中
- CREATE MATERIALIZED VIEW counter_daily_mv
- TO counter_daily
- AS SELECT
- toStartOfDay(when) as day,
- device,
- count(*) as count,
- maxState(value) AS max_value_state,
- minState(value) AS min_value_state,
- avgState(value) AS avg_value_state
- FROM counter
- WHERE when >= toDate('2019-01-01 00:00:00')
- GROUP BY device, day
- ORDER BY device, day;
复制代码
一开始物化视图是没有数据的,需要手动把数据加载到目标表中 - INSERT INTO counter_daily
- SELECT
- toStartOfDay(when) as day,
- device,
- count(*) AS count,
- maxState(value) AS max_value_state,
- minState(value) AS min_value_state,
- avgState(value) AS avg_value_state
- FROM counter
- WHERE when < toDateTime('2019-01-01 00:00:00')
- GROUP BY device, day
- ORDER BY device, day
复制代码
现在数据都写入到目标表中了,就可以查了 - SELECT
- device,
- sum(count) AS count,
- maxMerge(max_value_state) AS max,
- minMerge(min_value_state) AS min,
- avgMerge(avg_value_state) AS avg
- FROM counter_daily_mv
- GROUP BY device
- ORDER BY device ASC
复制代码
这么一对比,通过引入预聚合的物化视图,就可以大大减少要计算聚合结果时候扫描的数据量,从而提高了查询速度。
ClickHouse的物化视图如何实现?创建物化视图StorageMaterializedView::StorageMaterializedView 构造函数
1.校验语法 2.从`AS SELECT ...` 子句的AST提取出SELEC subquery的信息 3.如果没有指定目标表,会自己生成一条创建inner table的query交由InterpreterCreateQuery执行,同时mv还可以attach上inner table来指定目标表 4.在Database 的catalog中会维护一个 - ViewDependencies = std::map<StorageID, std::set<StorageID>>;
复制代码
保存Table -> set of 实际上依赖的源表的映射关系,因此需要更新物化视图表到其映射的源表的映射关系。注意,这里的源表不是自动生成的inner table,而是创建mv时指定的AS SELECT子句中的表
查询物化视图mv实际上都是基于目标表来进行plan的构建,步骤如下: 1.构建query_plan,详细逻辑看下面的read函数 2.从query_plan构建Pipeline - plan.convertToPipe
- buildQueryPipeline
复制代码
3.交由Pipeline Executor执行 - /// Remove columns from target_header that does not exists in src_header
- static void removeNonCommonColumns(const Block & src_header, Block & target_header)
- {
- std::set<size_t> target_only_positions;
- for (const auto & column : target_header)
- {
- if (!src_header.has(column.name))
- target_only_positions.insert(target_header.getPositionByName(column.name));
- }
- target_header.erase(target_only_positions);
- }
-
-
- void StorageMaterializedView::read(
- QueryPlan & query_plan,
- const Names & column_names,
- const StorageMetadataPtr & metadata_snapshot,
- SelectQueryInfo & query_info,
- ContextPtr local_context,
- QueryProcessingStage::Enum processed_stage,
- const size_t max_block_size,
- const unsigned num_streams){
- // 实际操作的是目标表
- auto storage = getTargetTable();
-
- // 获取mv的datastream的第一个block
- auto mv_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, local_context, processed_stage);
-
- // 获取查询语句的列对应的第一个block
- auto target_header = query_plan.getCurrentDataStream().header;
-
- //...
- /// No need to convert columns that does not exists in MV
- // 从查询的列中去除那些mv不存在的列
- removeNonCommonColumns(mv_header, target_header);
-
- /// No need to convert columns that does not exists in the result header.
- ///
- /// Distributed storage may process query up to the specific stage, and
- /// so the result header may not include all the columns from the
- /// materialized view.
- // 从mv的列中去除那些查询不需要的列
- removeNonCommonColumns(target_header, mv_header);
-
- // 如果两个block含的列不同,则把目标所要查询的列结构转化为mv的结构,并把
- // 转换算子写入query plan
- if (!blocksHaveEqualStructure(mv_header, target_header))
- {
- auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),
- mv_header.getColumnsWithTypeAndName(),
- ActionsDAG::MatchColumnsMode::Name);
- auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
- converting_step->setStepDescription("Convert target table structure to MaterializedView structure");
- query_plan.addStep(std::move(converting_step));
- }
-
- // 给目标表上lock,限流等参数,
- auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
- query_plan.getCurrentDataStream(),
- storage,
- std::move(lock),
- limits,
- leaf_limits,
- nullptr,
- nullptr);
- // 转换算子写入query plan
- adding_limits_and_quota->setStepDescription("Lock destination table for MaterializedView");
- query_plan.addStep(std::move(adding_limits_and_quota));
- }
复制代码
写入物化视图
- InterpreterInsertQuery.cpp
-
- 有两处与物化视图相关:
-
- // 构造物化到view的输出流
- out = std::make_shared<PushingToViewsBlockOutputStream>(table, metadata_snapshot, getContext(), query_ptr, no_destination);
-
- // 在执行pipeline中增加对源表table/内部表的引用
- res.pipeline.addStorageHolder(table);
- if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
- {
- if (auto inner_table = mv->tryGetTargetTable())
- res.pipeline.addStorageHolder(inner_table);
- }
复制代码
写入源表的数据如何与自动生成的inner table对应起来呢?靠的就是PushingToViewsBlockOutputStream。在写入源表的同时还会写入依赖其的所有物化视图的目标表中。 - PushingToViewsBlockOutputStream.cpp
复制代码
构造函数:
具体的写入函数实现在process函数: 每调用一次process写入一个Block - void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view)
-
- // 绑定mv的数据源为写入表
- local_context->addViewSource(
- StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
-
- in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
- in = std::make_shared<SquashingBlockInputStream>(
- in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
- in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
-
-
- while (Block result_block = in->read())
- {
- Nested::validateArraySizes(result_block);
- // 写入物化视图的inner table
- view.out->write(result_block);
- }
复制代码
- MaterializingBlockInputStream
把从写入表select出来的data block stream的列数据进行物化.因为从select语句从写入表读出来的block stream中,block的column都是ColumnConst类型。 所谓ColumnConst可以代表另外一个列的任意常数引用,但是列中的元素不是所代表列中的元素,而只存储一个值,还存储着真是列数据的一个指针。我才想这么实现是为了加快block中传输,从stream读取出block之后再做物化操作,从指针还原数据。 - SquashingBlockInputStream
把连续的几个block合并成一个block,减少因group by操作产生的block数量 - ConvertingBlockInputStream
把写入表block转化为物化视图内部表block的结构
删除物化视图
- void StorageMaterializedView::drop()
- {
- auto table_id = getStorageID();
- const auto & select_query = getInMemoryMetadataPtr()->getSelectQuery();
- if (!select_query.select_table_id.empty())
- DatabaseCatalog::instance().removeDependency(select_query.select_table_id, table_id);
-
- dropInnerTableIfAny(true, getContext());
- }
-
- void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
- {
- if (has_inner_table && tryGetTargetTable())
- InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
- }
复制代码
1.删除物化视图到其对应的目标表集合的映射关系 2.drop inner表;如果其目标表不是inner 表,那么目标表不会被删除
---------------------
|