分享

ClickHouse源码阅读计划(三)物化视图的概念、场景、用法和源码实现

上一篇:ClickHouse源码阅读计划(二) - IStorage和MergeTree的datapart/WAL部分梳理

问题导读:
1.POPULATE关键字有什么用?
2.ClickHouse的物化视图如何实现?
3.如何写入物化视图?


源码分析版本:21.7
物化视图是什么?
View 我理解为一个saved query,是一种虚拟表,不存储任何数据。每当从view读取数据的时候,对应一次从物理表的read操作。
而物化视图则是对应一份持久化的存储,可以是物理表的一份数据子集拷贝,也可以是多表JOIN或者预聚合的一个结果或子集。ClickHouse的物化视图实现更像是触发器,如果view中预先定义了聚合函数,那么(在不指定populate关键字的情况下)聚合函数仅适用于新插入的数据。对源表数据的更改都不会更改物化视图。

物化视图的场景?
假设场景:计算每个用户的日下载量
  1. CREATE TABLE download (
  2. when DateTime,
  3. userid UInt32,
  4. bytes Float32
  5. ) ENGINE=MergeTree
  6. PARTITION BY toYYYYMM(when)
  7. ORDER BY (userid, when);
  8. # 插入数据
  9. INSERT INTO download
  10. SELECT
  11. now() + number * 60 as when,
  12. 25,
  13. rand() % 100000000
  14. FROM system.numbers
  15. LIMIT 5000
  16. # 查看报表数据
  17. SELECT
  18. toStartOfDay(when) AS day,
  19. userid,
  20. count() as downloads,
  21. sum(bytes) AS bytes
  22. FROM download
  23. GROUP BY userid, day
复制代码

如果不使用物化视图,需要每次运行查询以交互方式统计结果。但是对于大型表,提前计算它们更快,更节省资源。因此,最好将结果放在单独的物化视图中,该表就可以连续跟踪每天每个用户的下载总数。
可以理解为把总计结果提前计算好,配合SummingMergeTree/AggregatingMergeTree 还会对新插入物化视图的数据做相应的求和/聚合/去重等操作,存到物化视图表中,实现实时的预聚合。那么就不需要每次运行查询去得到结果了。
另外一个用法,就是可以通过`AS SELECT`语法灵活改变表的排序顺序/表结构的变更。排序key变了,那么针对该key的filter scan能够实现更高效的索引剪枝。

语法介绍
  1. CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] AS SELECT ...
复制代码
  • 不指定 TO [db].[table]的时候,必须要指定 ENGINE
  • 指定TO [db].[table] 目标表的时候,不能使用POPULATE关键字
POPULATE关键字有什么用?
若指定了POPULATE关键字,会把表中现有数据存储到视图中,否则只会写入创建视图之后的数据。
然而如果对数据的精确度要求比较高,不建议使用POPULATE关键字,因为在创建视图过程中插入表中的数据并不会写入视图,会造成数据的丢失。

使用指定ENGINE方法创建
  • 使用POPULATE 创建一个每日计数下载量的物化视图表
  1. CREATE MATERIALIZED VIEW download_daily_mv
  2. ENGINE = SummingMergeTree
  3. PARTITION BY toYYYYMM(day)
  4. ORDER BY (userid, day) POPULATE AS
  5. SELECT
  6.     toStartOfDay(when) AS day,
  7.     userid,
  8.     count() AS downloads,
  9.     sum(bytes) AS bytes
  10. FROM download
  11. GROUP BY
  12.     userid,
  13.     day;
  14.    
  15. SELECT * FROM download_daily_mv  ORDER BY day,userid LIMIT 5;
复制代码

源表中的插入的5000条数据已经存在在物化视图表中

a1.jpeg


  • 不使用POPULATE 创建一个每小时计数下载量的物化视图表
  1. CREATE MATERIALIZED VIEW download_hour_mv
  2. ENGINE = SummingMergeTree
  3. PARTITION BY toYYYYMM(hour) ORDER BY (userid, hour)
  4. AS SELECT
  5.   toStartOfHour(when) AS hour,
  6.   userid,
  7.   count() as downloads,
  8.   sum(bytes) AS bytes
  9. FROM download WHERE when >= toDateTime('2021-07-23 00:00:00')
  10. GROUP BY userid, hour
复制代码
这里加了个时间点,意味着只有在该时间段之后的数据才会物化到mv中。
可以看到现在还是个空表,如何物化数据呢?

a2.jpeg



你可以直接把源表数据直接insert到mv中,手动把自动同步时间点前的源数据插入到物化视图表中,这样就实现了原有数据表的同步。
  1. INSERT INTO download_hour_mv SELECT
  2.     toStartOfHour(when) AS hour,
  3.     userid,
  4.     count() AS downloads,
  5.     sum(bytes) AS bytes
  6. FROM download
  7. WHERE when <= toDateTime('2021-07-23 00:00:00')
  8. GROUP BY
  9.     userid,
  10.     hour
复制代码

a3.jpeg



当然你也可以直接写数据到源表,实现数据的自动同步
  1. INSERT INTO download SELECT
  2.     toDateTime('2021-09-01 04:00:00') + (number * (1 / 3)) AS when,
  3.     19,
  4.     rand() % 1000000
  5. FROM system.numbers
  6. LIMIT 10
复制代码

a4.jpeg


物化视图创建一个具有特殊名称的私有表来保存数据。如果通过键入“ DROP TABLE download_daily_mv”删除实例化视图,则私有表将消失。如果需要更改视图,则需要将其删除并使用新数据重新创建。

a5.jpeg


我们可以通过show tables看到私有表:

a6.jpeg



我们之前也讲了,物化视图实际上也是占用了disk上的datapart,我们可以看一下私有表和mv表对应的datapart情况:
  1. SELECT
  2.     partition,
  3.     name,
  4.     rows,
  5.     bytes_on_disk,
  6.     modification_time,
  7.     min_date,
  8.     max_date,
  9.     engine
  10. FROM system.parts
  11. WHERE table = '.inner_id.40fd3d25-a09b-4a4d-80fd-3d25a09bba4d'
  12. SELECT
  13.     partition,
  14.     name,
  15.     rows,
  16.     bytes_on_disk,
  17.     modification_time,
  18.     min_date,
  19.     max_date,
  20.     engine
  21. FROM system.parts
  22. WHERE table = 'download_hour_mv'
复制代码

a7.jpeg



可以看到物化表其实只是一个视图,不存储datapart,而只有其对应的私有表才存储相应的datapart。

使用 to db.table方法创建
用这种方法创建mv需要用[to db.table]手动指定目标私有表
先创建源表并且创建数据
  1. CREATE TABLE counter (
  2.   when DateTime DEFAULT now(),
  3.   device UInt32,
  4.   value Float32
  5. ) ENGINE=MergeTree
  6. PARTITION BY toYYYYMM(when)
  7. ORDER BY (device, when);
  8. INSERT INTO counter
  9.   SELECT
  10.     toDateTime('2015-01-01 00:00:00') + toInt64(number/10) AS when,
  11.     (number % 10) + 1 AS device,
  12.     (device * 3) +  (number/10000) + (rand() % 53) * 0.1 AS value
  13.   FROM system.numbers LIMIT 1000000;
复制代码

假设我们需要查询全时间段设备的点击率,最大最小和平均值,我们可用以下SQL
  1. SELECT
  2.   device,
  3.   sum(count) AS count,
  4.   maxMerge(max_value_state) AS max,
  5.   minMerge(min_value_state) AS min,
  6.   avgMerge(avg_value_state) AS avg
  7. FROM counter
  8. GROUP BY device
  9. ORDER BY device ASC
复制代码
但是这么做的话查询速度会很慢,因为需要扫全表的数据。如果我们创建一个物化视图表,计算一个日聚合的数据,那么就可以直接汇总每日聚合的数据返回结果。注意这里的maxMerge函数可以理解为在SQL语法层面暴露一个部分值局部聚合的这么一个功能。换句话说,使用SummingMergeTree的物化视图也可以实现AggregatingMergeTree的聚合功能,因此而推荐使用SummingMergeTree。

创建目标表
  1. CREATE TABLE counter_daily (
  2.   day DateTime,
  3.   device UInt32,
  4.   count UInt64,
  5.   max_value_state AggregateFunction(max, Float32),
  6.   min_value_state AggregateFunction(min, Float32),
  7.   avg_value_state AggregateFunction(avg, Float32)
  8. )
  9. ENGINE = SummingMergeTree()
  10. PARTITION BY tuple()
  11. ORDER BY (device, day);
复制代码

在定义物化视图的时候使用select从源表把数据注入到目标表中
  1. CREATE MATERIALIZED VIEW counter_daily_mv
  2. TO counter_daily
  3. AS SELECT
  4.     toStartOfDay(when) as day,
  5.     device,
  6.     count(*) as count,
  7.     maxState(value) AS max_value_state,
  8.     minState(value) AS min_value_state,
  9.     avgState(value) AS avg_value_state
  10. FROM counter
  11. WHERE when >= toDate('2019-01-01 00:00:00')
  12. GROUP BY device, day
  13. ORDER BY device, day;
复制代码

一开始物化视图是没有数据的,需要手动把数据加载到目标表中
  1. INSERT INTO counter_daily
  2. SELECT
  3.   toStartOfDay(when) as day,
  4.   device,
  5.   count(*) AS count,
  6.   maxState(value) AS max_value_state,
  7.   minState(value) AS min_value_state,
  8.   avgState(value) AS avg_value_state
  9. FROM counter
  10. WHERE when < toDateTime('2019-01-01 00:00:00')
  11. GROUP BY device, day
  12. ORDER BY device, day
复制代码

现在数据都写入到目标表中了,就可以查了
  1. SELECT
  2.   device,
  3.   sum(count) AS count,
  4.   maxMerge(max_value_state) AS max,
  5.   minMerge(min_value_state) AS min,
  6.   avgMerge(avg_value_state) AS avg
  7. FROM counter_daily_mv
  8. GROUP BY device
  9. ORDER BY device ASC
复制代码

a8.jpeg


这么一对比,通过引入预聚合的物化视图,就可以大大减少要计算聚合结果时候扫描的数据量,从而提高了查询速度。

ClickHouse的物化视图如何实现?创建物化视图
StorageMaterializedView::StorageMaterializedView 构造函数

a9.jpeg


1.校验语法
2.从`AS SELECT ...` 子句的AST提取出SELEC subquery的信息
3.如果没有指定目标表,会自己生成一条创建inner table的query交由InterpreterCreateQuery执行,同时mv还可以attach上inner table来指定目标表
4.在Database 的catalog中会维护一个
  1. ViewDependencies = std::map<StorageID, std::set<StorageID>>;
复制代码
保存Table -> set of 实际上依赖的源表的映射关系,因此需要更新物化视图表到其映射的源表的映射关系。注意,这里的源表不是自动生成的inner table,而是创建mv时指定的AS SELECT子句中的表

查询物化视图
mv实际上都是基于目标表来进行plan的构建,步骤如下:
1.构建query_plan,详细逻辑看下面的read函数
2.从query_plan构建Pipeline
  1. plan.convertToPipe
  2. buildQueryPipeline
复制代码
3.交由Pipeline Executor执行
  1. /// Remove columns from target_header that does not exists in src_header
  2. static void removeNonCommonColumns(const Block & src_header, Block & target_header)
  3. {
  4.     std::set<size_t> target_only_positions;
  5.     for (const auto & column : target_header)
  6.     {
  7.         if (!src_header.has(column.name))
  8.             target_only_positions.insert(target_header.getPositionByName(column.name));
  9.     }
  10.     target_header.erase(target_only_positions);
  11. }
  12. void StorageMaterializedView::read(
  13.     QueryPlan & query_plan,
  14.     const Names & column_names,
  15.     const StorageMetadataPtr & metadata_snapshot,
  16.     SelectQueryInfo & query_info,
  17.     ContextPtr local_context,
  18.     QueryProcessingStage::Enum processed_stage,
  19.     const size_t max_block_size,
  20.     const unsigned num_streams){
  21.         // 实际操作的是目标表
  22.         auto storage = getTargetTable();
  23.         // 获取mv的datastream的第一个block
  24.         auto mv_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, local_context, processed_stage);
  25.         
  26.         // 获取查询语句的列对应的第一个block
  27.         auto target_header = query_plan.getCurrentDataStream().header;
  28.         //...
  29.         /// No need to convert columns that does not exists in MV
  30.         // 从查询的列中去除那些mv不存在的列
  31.         removeNonCommonColumns(mv_header, target_header);
  32.         /// No need to convert columns that does not exists in the result header.
  33.         ///
  34.         /// Distributed storage may process query up to the specific stage, and
  35.         /// so the result header may not include all the columns from the
  36.         /// materialized view.
  37.         // 从mv的列中去除那些查询不需要的列
  38.         removeNonCommonColumns(target_header, mv_header);
  39.         // 如果两个block含的列不同,则把目标所要查询的列结构转化为mv的结构,并把
  40.         // 转换算子写入query plan
  41.         if (!blocksHaveEqualStructure(mv_header, target_header))
  42.         {
  43.             auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),
  44.                                                                         mv_header.getColumnsWithTypeAndName(),
  45.                                                                         ActionsDAG::MatchColumnsMode::Name);
  46.             auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
  47.             converting_step->setStepDescription("Convert target table structure to MaterializedView structure");
  48.             query_plan.addStep(std::move(converting_step));
  49.         }   
  50.         // 给目标表上lock,限流等参数,
  51.         auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
  52.                 query_plan.getCurrentDataStream(),
  53.                 storage,
  54.                 std::move(lock),
  55.                 limits,
  56.                 leaf_limits,
  57.                 nullptr,
  58.                 nullptr);
  59.         // 转换算子写入query plan
  60.         adding_limits_and_quota->setStepDescription("Lock destination table for MaterializedView");
  61.         query_plan.addStep(std::move(adding_limits_and_quota));
  62. }
复制代码


写入物化视图
  1. InterpreterInsertQuery.cpp
  2. 有两处与物化视图相关:
  3.    
  4.     // 构造物化到view的输出流
  5.     out = std::make_shared<PushingToViewsBlockOutputStream>(table, metadata_snapshot, getContext(), query_ptr, no_destination);
  6.     // 在执行pipeline中增加对源表table/内部表的引用
  7.     res.pipeline.addStorageHolder(table);
  8.     if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
  9.     {
  10.         if (auto inner_table = mv->tryGetTargetTable())
  11.             res.pipeline.addStorageHolder(inner_table);
  12.     }
复制代码

写入源表的数据如何与自动生成的inner table对应起来呢?靠的就是PushingToViewsBlockOutputStream。在写入源表的同时还会写入依赖其的所有物化视图的目标表中。
  1. PushingToViewsBlockOutputStream.cpp
复制代码
构造函数:

a10.jpeg


具体的写入函数实现在process函数:
每调用一次process写入一个Block
  1. void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view)
  2.             // 绑定mv的数据源为写入表
  3.             local_context->addViewSource(
  4.                 StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
  5.    
  6.             in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
  7.             in = std::make_shared<SquashingBlockInputStream>(
  8.                     in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
  9.             in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
  10.             while (Block result_block = in->read())
  11.            {
  12.             Nested::validateArraySizes(result_block);
  13.             // 写入物化视图的inner table
  14.             view.out->write(result_block);
  15.             }
复制代码

  • MaterializingBlockInputStream
把从写入表select出来的data block stream的列数据进行物化.因为从select语句从写入表读出来的block stream中,block的column都是ColumnConst类型。
所谓ColumnConst可以代表另外一个列的任意常数引用,但是列中的元素不是所代表列中的元素,而只存储一个值,还存储着真是列数据的一个指针。我才想这么实现是为了加快block中传输,从stream读取出block之后再做物化操作,从指针还原数据。
  • SquashingBlockInputStream
把连续的几个block合并成一个block,减少因group by操作产生的block数量
  • ConvertingBlockInputStream
把写入表block转化为物化视图内部表block的结构

删除物化视图
  1. void StorageMaterializedView::drop()
  2. {
  3.     auto table_id = getStorageID();
  4.     const auto & select_query = getInMemoryMetadataPtr()->getSelectQuery();
  5.     if (!select_query.select_table_id.empty())
  6.         DatabaseCatalog::instance().removeDependency(select_query.select_table_id, table_id);
  7.     dropInnerTableIfAny(true, getContext());
  8. }
  9. void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
  10. {
  11.     if (has_inner_table && tryGetTargetTable())
  12.         InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
  13. }
复制代码

1.删除物化视图到其对应的目标表集合的映射关系
2.drop inner表;如果其目标表不是inner 表,那么目标表不会被删除


最新经典文章,欢迎关注公众号



---------------------






已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条