分享

火山引擎 LAS 数据湖存储内核揭秘




问题导读:

1、如何解决不同引擎 SQL 语义的一致性?
2、Hudi Catalog 如何保证数据的一致性?
3、如何避免 Compact 和查询并发同时发生带来的对 Query 的影响?




01 LAS 介绍

LAS 全称(Lakehouse Analysis Service)湖仓一体分析服务,融合了湖与仓的优势,既能够利用湖的优势将所有数据存储到廉价存储中,供机器学习、数据分析等场景使用,又能基于数据湖构建数仓供 BI 报表等业务使用。
640.png

LAS 整体架构如图所示,第一层是湖仓开发工具,然后是分析引擎,分析引擎支持流批一体 SQL,一套 SQL 既能支持流作业又能支持批作业。分析引擎还支持引擎的智能选择及加速,根据 SQL 的特点自动路由到 Spark,Presto 或 Flink 中去执行。再往下一层是统一元数据层,第四层是流批一体存储层。

640 (1).png

LAS 的整体架构存算分离,计算存储可以按需扩展,避免资源浪费,因为存算分离,所以一份数据可以被多个引擎分析。相较于存算一体,成本 TCO 可以下降 30%-50%,并且 LAS 支持动态弹性扩缩容,可进一步降低用户成本。

640 (2).png

基于 LAS 构建企业级实时湖仓,无论离线数据还是实时数据,都可以放到 LAS 流批一体存储中。如果需要实时处理的数据,可以直接利用 LAS 的 Streaming 能力,流读流写,流式写入下一层表中,层层构建 ODS、DWD 等层级关系。如果需要进行离线回溯,不需要换存储,直接通过流批一体 SQL 运行离线任务。

02 问题与挑战

640 (3).png

LAS 流批一体存储是基于开源的 Apache Hudi 构建的,在整个落地过程中,我们遇到了一些问题。Apache Hudi 仅支持单表的元数据管理,缺乏统一的全局视图,会存在数据孤岛。Hudi 选择通过同步分区或者表信息到 Hive Metastore Server 的方式提供全局的元数据访问,但是两个系统之间的同步无法保证原子性,会有一致性问题,因此当前缺乏一个全局可靠视图。另外 Hudi 在 Snashot 的管理上,依赖底层存储系统的视图构建自己的 Snapshot 信息,而不是通过自己的元数据管理。这种机制无法保证底层的存储系统记录的文件信息和每次 Commit 的文件对齐,从而在下游消费的时候会产生读到赃数据,或者坏文件等问题。

针对数据孤岛和元数据一致性问题,LAS 设计了统一元数据服务 MetaServer,提供了一个全局的可靠视图。另外 Hudi 支持 Merge On Read方式,该方式会先将更新数据写入 Log 文件中,读时再和底层的 Base 文件进行合并。为了保障读取效率,Hudi 提供 Compaction 功能,定期将 Log 文件和 Base 文件进行合并后写成新的 Base File。在近实时或实时场景下,业务对于时间非常敏感, 在写入操作后顺序执行 Compaction 会导致产出时间不稳定,影响下游消费。对此社区提供了 Async Compaction 功能,将 Compaction 算子和 Commit 拆开,Compaction 和 Commit 可以在一个 Application 中共享资源,并行执行。

对于 Flink 入湖作业来说,增量导入数据所需的资源和存量 Compact 所需的资源很难对齐。往往后者对于资源的要求会更高,但执行频次会更低。将 Compaction 和增量导入混合到一起,共享资源执行,增量导入可能会因为 Compaction 作业运行不稳定而失败。所以为了节约资源,保障作业的稳定性,需要独立拆分资源供 Compaction 任务的执行。但随着生产任务增长,这些异步作业的管理就是一个新挑战。因此,LAS 提供表操作管理服务 Table Management Service,全托管所有异步任务,包括 Compaction、Clean、Clustering 等。用户无需感知作业的执行状态,也无需额外了解这些操作背后的逻辑,仅仅需要关注入湖任务的稳定性。总结下来,LAS 在数据湖存储的服务化上面主要做了两个工作,统一的元数据服务和表操作管理服务。

03 LAS数据湖服务化设计与实践

640 (4).png

接下来详细介绍这两个服务的实现。Service 层在 LAS 中连接了底层存储的存储格式和上层的查询引擎。LAS 作为一个 PAAS 服务(或者说 SAAS 服务),它要求服务层的设计需要满足云原生的架构,存算分离,支持多租户隔离以及高可用。

640 (5).png

这是服务层的整体架构,包括元数据管理服务 Hudi MetaServer 和表操作管理 Hudi Table Management Service。两者之间有交互,并且会和一些外部系统比如 K8s,Yarn,外部的 Datahub 等进行交互。

640 (6).png

首先来看一下 Hudi MetaServer 元数据管理服务。

640 (7).png

Hudi MetaServer 整体结构分为三大模块:

  • Hudi Catalog

  • 核心功能 MetaServer

  • Event Bus

其中 Hudi Catalog 是读表写表 Client 侧对单表访问的抽象,通过MetaServer Client 与 MetaServer 交互。Event Bus 是事件总线,用于将元数据相关的增删改查事件发送给监听者,监听者可以根据事件类型决定对应的执行操作(比如同步元数据信息到外部的元数据信息系统等)。Table Management Service 就是其中一个监听者,属于其中一个重要组成部分。MetaServer 整体分为两大块——存储层和服务层。存储层用于存储数据湖的所有元数据,服务层用于接受所有元数据的相关增删改查请求。整个服务层是无状态的,因此支持水平扩展。

640 (8).png

存储层存储的元数据信息包括:

  • 表的元数据信息,比如 Schema、Location 等。

  • 分区元数据信息 Location、Parameter 等。

  • 时间线信息,包括构成时间线的一个个 Commit,以及 Commit 对应的 Commit Metadata 信息,Commit Meta 会记录本次更新修改了哪些分区、文件以及统计信息。

  • Snapshot信息,即每次 Commit 的文件信息,包括文件名、大小等等。

640 (9).png

Service 层按照功能模块划分成:

  • Table serivice

  • Partition service

  • Timeline service

  • Snapshot service

用于处理对应的元数据请求。

640 (10).png

接下来看一下 Hudi 的读写过程中如何与 MetaServer 交互。

先看写入部分,当 Client 准备提交一个 Commit 时,它会请求 Hudi Catalog,由 Hudi Catalog 与 MetaServer 进行交互,最后进行提交。MetaServer 收到提交请求后会先路由给 Timeline Service 进行处理,修改对应 Commit 状态,并且记录本次提交 Commit 的 Metadata 信息。然后根据 Commit Metadata 信息将本次写入修改的分区和文件写入底层存储中,即 Partition 信息的同步和 Snapshot 的同步。

640 (11).png

在读取过程中,计算引擎会先解析 SQL,生成 Analysis Plan。这个时候就访问 Hudi Catalog 获取表信息,构建 Relation,接着经过 Optimizer 层执行分区下推等优化规则。MetaServer 会根据 Client 传递的 Predicate 返回下推后的分区,Relation 会获取本次需要读取的所有文件信息,MetaServer 就会响应这次请求,获取当前最新的 Snapshot,封装成 File Status 返回,最后由 Compute Engine 执行读取操作。

640 (12).png

MetaServer 的几个核心功能包括 Schema Evolution 和并发管理的支持。其中 Schema Evolution 本质上就是支持多版本的 Schema,并且把该 Schema 和某个 Commit 进行关联,这里不多赘述。

并发管理的核心设计包含四个部分:

  • 基于乐观锁

  • 底层存储支持 CAS

  • 在元数据引入版本概念,表示 Commit 提交的先后关系

  • 支持多种并发冲突策略,最大化的进行并发写入

640 (13).png

先看一下整个的并发控制流程图。

首先写入端会提交一个 Requested Commit,并且从 Server 侧拿到最新的 Snapshot 信息;这个 SnapShot  信息对应一个 VREAD 的版本号,然后写入端基于 Snapshot 去构建 Work Profile,并且修改 Commit 状态为 Inflight 状态。完成后开始正式写入数据,写入完成后准备提交本次 Commit。此时 Service 侧会尝试将该 Commit 提交到 VREAD+1 版本,如果发现提交失败,说明当前最新版本号被改变了,不是 VREAD 版本,那么需要将 VREAD 版本到最新的版本之间所有提交  Commit 拿出来,判断已经完成的 Commit 是否与本次提交冲突,如果冲突的话需要放弃本次提交,不冲突的话提交本次 Commit 到最新的 Version+1 上。整个提交 Commit 到固定的版本过程(图上步骤7)是原子操作。

640 (14).png

上述整个过程是在 Commit 最后阶段进行并发拦截,此时数据已经写入。如果我们能在数据写入前及早发现冲突,就可以使因冲突导致本次写入失败的代价尽可能的小。所以我们在 Commit Inflight 阶段状态变化过程也增加了冲突检查功能。因为在这个时候,写入侧已经完成了 Work Profile 构建,知道本次 Commit 会写入哪些文件。Server 侧可以感知到该表所有正在写入的 Client,所以可以判断本次 Commit 与其它正在写入的 Client 是否有冲突,有冲突的话直接拒绝本次 Commit Inflight 的转换,这个时候写入侧还没有正式写入数据,代价非常小。

640 (15).png

基于 Version 的 Timeline 如何保障一致性?原先的 Timeline 仅仅是由所有 Completed 状态的 Instant 构成,现在的 Timeline 是由一个确定 Version 的 Completed 状态的 Instant 构成。这个 Instant 在提交过程中需要满足两个条件:

  • 状态必须是 Completed 状态

  • 必须有一个 Version 版本号相对应

这个 Version ID 是单调递增的并且支持 CAS 更新,就不会有一致性问题。

640 (16).png

最后介绍冲突检查部分的多种冲突检查策略,我们可以根据业务场景选择不同冲突检查策略,满足业务侧不同的并发写需求,比如:

  • 基于表级别的,一张表不能同时有两个 Instant 提交,其实就是不支持并发写的冲突检查策略

  • 基于分区级别的,两个 Instant 不能同时写入同一个分区

  • 基于 FileGroup 级别的,两个 Instant 不能同时写入同一个 FileGroup

  • 基于文件级别的,两个 Instant 不能同时写同一个文件

  • 锁力度越往下粒度越细,支持的并发场景也会更宽一些。


640 (17).png

最后介绍 MetaServer Event Bus 事件总线这个组件。事件总线是将元数据的增删改封装成一个个事件发送到消息总线中,由各个 Server 监听事件并且根据事件类型进行响应,从而让下游组件感受到元数据的变化(如平台侧的元数据管理服务,Table Management Service 等等)。以 External Catalog Listener为例,假设写入端提交了一个加列的 DDL,那么在 MetaServer 处理完请求后,会将本次的 Table Schema 的修改信息封装成一个 Change Schema(如 Change Schema Event),发送到 Event Bus 中。Hive Catalog Listener 在收到事件之后就会调用 Hive Client 同步新的 Schema 给 Hive Metastore。

640 (18).png

接下来介绍表级别管理服务 Table Management Service 的详细设计,以及它是如何跟 Hudi MetaServer 去进行交互的。

640 (19).png

Table Management Service 主要解决的是异步任务全托管的问题。Service 由两个部分组成:

  • Plan Generator

Plan Generator 主要跟 MetaServer 交互,主要用于生成 Action Plan,通过监听 MetaServer Event 触发 Plan 生成。

  • Job Manager

Job Manager 主要跟 Yarn/K8s 交互,用于管理任务。它按照功能分为Job Scheduler 和 Job Manager 两个部分,Job Scheduler 用于调度需要被执行的 Action Plan,而 Job Manger  用于管理 Action Plan 需要对应的执行任务。

640 (21).png

Plan Generator 和 MetaServer 之间的交互逻辑为,当 Table Management Service监听到 MetaServer 侧传递的 Instant Commit 事件之后,Plan Generator 决定是否本次需要生成一个新的 Action Plan。如果需要的话,就向 MetaServer 提交一个 Request 状态对应异步操作的 Instant,表示该 Action 后续需要被执行。提交成功后会记录本次 Action Requested 状态的相关信息,比如表名、时间戳、状态等等,然后等待调度执行。举个例子,比如 Client 端提交一个 Commit 事件之后,Plan Generator 监听到之后它可能会去判断本次 Commit 是否需要调度 Compaction Plan 去生成,如果需要的话,它就会创建一个 Compaction Requested 的时间戳,提交到 MetaServer 上,提交完成之后,Table Management Service 会获取到自己提交完成,把这些信息放到自己的存储中,表示这个 Instant 的 Compaction 需要被执行,然后就会由 Manager 再去调度 Compaction 进行执行。

Plan Generator 决定是否需要生成 Action Plan 或者 Compaction Plan,在本质上是由策略决定的。以 Compaction 为例,默认是需要等到 n 个 Delta Commit 完成之后才能进行 Client 调度。Comapction Plan 的生成策略也有多种,基于 Log File Size 决定 FileGroup 是否需要被 Compact;或者是直接基于 Bounded IO 去决定是否需要 Compact。比方说这次 Compact 的总的 IO 不能超过 500M 的策略。这些策略是一开始建表的时候由用户指定的。Table Management Service 会从 MetaServer 的表的元数据信息中获取策略信息。如果用户需要修改策略的话需要通过 DDL 修改表的相关配置。之所以这么做,而不是通过写入侧去提交策略信息,是因为考虑到并发场景。如果通过写入侧指定策略会出现两个写入端提交的策略不对齐的问题,比方说一个 Compaction 的调度策略是 12 个  Delta Commit 之后触发,而另外一个写入端提交提交的是 1 个 Delta Commit 之后触发,这块就会有不对齐的问题。

640 (23).png

Job Management 中的 Job Scheduler 会定期轮询尚未被执行的 Action Plan,再分发给 Job Manager,由 Job Manager 启动一个 Spark 或者 Flink 任务执行。然后它会定期轮询作业的执行状态,监控并记录作业的相关信息。其中 Job Scheduler 支持多种调度策略,比如 FIFO,或者按照优先级方式选择需要被执行的 Pending 的 Action Plan。而 Job Manager 的主要职责是适配多种引擎用于任务的执行,并且支持任务的自动重试,支持任务运维所需要的报警信息。

640 (24).png

另一个需要提的点是 Table Management Service 的架构设计。如果说和 MetaServer 一样,作为一个无状态的服务的话,那么在 Trigger Plan 生成选择 Plan 执行的时候会出现并发问题。所以整个服务架构为主从结构,主节点负责接收 MetaServer 的 Event,收到 Event 之后,如果决定需要调度 Plan 进行执行的话,会选择对应的 Worker,由 Worker 去负责  Plan 的生成。主节点负责任务的调度,会定期的去 Storage 里找到 Pending 的 Action Plan,交给 Worker 去做任务的执行,以及监控报警。

640 (25).png

04 未来规划

640 (26).png

围绕数据湖加速方向:

  • 元数据加速 (元数据获取加速,构建和获取索引的加速)

  • 数据加速 (底层存储数据本身的加速)

  • 索引加速 (基于索引的加速查询)

元数据加速和索引获取加速部分会和 MetaServer 之间做一些结合,MetaServer 本身也会做一些 Cache 来加速一些元数据信息的获取。数据加速和索引加速部分,会在底层存储之上加一层缓存层,比如 Alluxio 就是一个比较适合的缓存层,可以结合查询 SQL Pattern 的一些信息,去支持智能的缓存策略,来加速整个查询的过程。

05 问答环节

Q1:这个路由是怎么做的?会使用统一的优化器,只是用各个引擎的 Runtime?

A1:底层确实会用各个引擎的 Runtime,SQL 解析层统一对 SQL 进行优化,然后按照 SQL 本身的特性决定底层 Runtime 的引擎( 比如一个简单的查询,会路由到 Presto 执行。复杂查询会路由到 Spark 执行。

Q2:如何解决不同引擎 SQL 语义的一致性?

A2:Spark 和 Presto 的差异不是很大,主要差异在与流和批 SQL 语义对齐。针对 Spark/Presto 我们用 Ansi SQL/Hive 语义对齐,以这个为标准,让 Spark/Presto 向上对齐。对于流批 SQL 一体,我们以批处理相关逻辑来对齐,或者根据实时或离线场景需求,然后判断按照某个场景来对齐。

Q3:LAS 支持入湖模版吗?允许整库入湖吗?

A3:目前底层还不支持整库入湖,主要支持单表入湖。我们是在上层,即面向用户层实现。

Q4:Hudi Catalog 如何保证数据的一致性?

A4:Hudi Catalog 本质上就是一个 MetaServer  的 Client,所以不太会有一致性问题。出现一致性问题的主要点在因为 Catalog 会存 Table 的元数据信息和 Timeline 的元数据信息。但是所有对 Timeline 的修改(比如提交 Instant)都会通过 Catalog,Catalog 能感知到这次修改,就可以将本次修改提交到 MetaServer 侧,MetaServer 侧就会返回修改成功和失败到给 Catalog,Catalog 就能构建跟 MetaServer 一致的 Timeline 信息。

Q5:是自己实现了 Hudi 元数据表的数据组织吗?

A5.:是的,大部分设计会跟社区的实现类似,因为社区元数据管理本质上是基于在表路径下面建了个.hudi/的目录用于存 Timline 的元数据信息,以及 Commit Metadata,这部分能力我们通过 MetaServer 进行了支持,我们是基于 0.6 的版本开发。现在新版社区提供了 MDT Metadata Table 功能存储 Snapshot 信息,MOR 对于底层 Snapshot 的管理 Metadata Table 还是依赖 HDFS 的组织,也没有记录 Snapshot 信息。这块 Hudi 是有缺失的,我们补上这块缺失,增加 Snapshot 管理,在实现上更像 Iceberg 一样存一些 Manifest 的信息。

Q6:查询引擎需要缓存 Hudi 元数据吗?

A6:查询引擎目前对接的就是 Hudi Catalog,依赖 Hudi Catalog 的支持,这块我们目前没有做缓存。

Q7:是关于冲突检查级别,是用户选择还是引擎默认?

A7:我们有一个默认的冲突检查策略,是基于分区级别。但是对于业务场景来说(比如多流  Join,有两个 Job,每个 Job  写部分列数据,这时冲突检查策略就是基于文件/基于列的冲突检查策略),在这种场景下,需要建表时特殊指定冲突检查策略,Server 就会根据表指定的冲突检查策略做冲突检查。

Q8:事件通知模式如何保证事件不丢失?

A8:我们没有做强一致性的保证,但是会定期拉取没对齐的部分,发送给下游监听方。另外我们会记录一些 Metrics(比如这次事件发送成功与否), 如果对一致性敏感的场景可以做一些监控、告警。

Q9 :是否支持双流驱动的  Join,一个包含主键,一个不包含主键?

A9:双流 Join 一般都需要共同的主键才能做到。

Q10:一个典型的 SQL Binlog 到 Hudi 的 Pipeline一般做到的新鲜度是多少,消耗的成本是多少?

A10:目前做到稳定的场景是分钟级别,我们在尝试做秒级的数据可见。但是如果做秒级可见的话存在一个问题,事务的可见级别不能强保证,可能会读到 Read Uncommit 的数据。

Q11:异步 Compact 资源是用户的还是公共资源,如何避免 Compact 和查询并发同时发生带来的对 Query 的影响?

A11:首先 Compact 使用的是公共资源不是用户的,我们提供的一个大池子去跑。根据任务优先级,比如 p0 作业会调度到高优队列。另外 Compact 执行是完全异步的,不会影响查询也不会影响写入。







最新经典文章,欢迎关注公众号



---------------------

作者:DataFunSummit
来源:weixin
原文:火山引擎 LAS 数据湖存储内核揭秘



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条