分享

Iceberg在基于Flink的流式数据入库场景中的应用

问题导读

1.Iceberg作为数据落地的格式解决了Flink什么痛点?
2.Flink Iceberg sink有哪些模块?
3.Flink Iceberg sink如何实现?

导言

本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。

应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。

2.png

上述的应用场景通常有如下的痛点,需要整个流程不断的优化:

支持流式数据写入,并保证端到端的不重不丢(即exactly-once);
尽量减少中间环节,能支持更实时(甚至是T+0)的读取或导出,给下游提供更实时更准确的基础数据;
支持ACID,避免脏读等错误发生;
支持修改已落地的数据。虽然大数据和数据湖长于处理静态的、或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。

引入Iceberg作为Flink sink
为了解决上述痛点,我们引入了Iceberg作为数据落地的格式。Iceberg支持ACID事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。

同时,为了支持流式数据的写入,我们引入Flink作为流式处理框架,并将Iceberg作为Flink sink。

下文主要介绍Flink Iceberg sink的实现框架和要点。但在这之前,需要先介绍一些实现中用到的Flink基本概念。

Flink基本概念

从Flink的角度如何理解"流"和"批"

3.png
Flink使用DataFrame API来统一的处理流和批数据。

Stream, Transformation和Operator
  • 一个Flink程序由stream和transformation组成:
  • Stream: Transformation之间的中间结果数据;
  • Transformation:对(一个或多个)输入stream进行操作,输出(一个或多个)结果stream。
  • 当Flink程序执行时,其被映射成Streaming Dataflow,由如下的部分组成:
  • Source (operator):接收外部输入给Flink;
  • Transformation (operator):中间对stream做的任何操作;
  • Sink (operator):Flink输出给外部。

下图为Flink官网的示例,展示了一个以Kafka作为输入Source,经过中间两个transformation,最终通过sink输出到Flink之外的过程。

1.png

State, Checkpoint and Snapshot
Flink依靠checkpoint和基于snapshot的恢复机制,保证程序state的一致性,实现容错。

Checkpoint是对分布式的数据流,以及所有operator的state,打snapshot的过程。

State
一个operator的state,即它包含的所有用于恢复当前状态的信息,可分为两类:

  • 系统state:如operator中对数据的缓存。
  • 用户自定义state:和用户逻辑相关,可以利用Flink提供的managed state,如ValueState、ListState,来存储。
  • State的存储位置,可以分为:
  • Local:内存,或者本地磁盘
  • State backend:远端的持久化存储,如HDFS。


如下图所示:
1.png

Checkpoint
  • Flink做checkpoint的过程如下:
  • Checkpoint coordinator首先发送barrier给source。
  • Source做snapshot,完成后向coordinator确认。
  • Source向下游发送barrier。
  • 下游operator收到所有上游的barrier后,做snapshot,完成后向coordinator确认。
  • 继续往下游发送barrier,直到sink。
  • Sink通知coordinator自己完成checkpoint。
  • Coordinator确认本周期snapshot做完。


如下图所示:
1.png

Barrier

Barrier是Flink做分布式snapshot的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。

由于每个barrier唯一对应checkpoint id,所以数据流中的record实际被barrier分组,如下图所示,barrier n和barrier n-1之间的record,属于checkpoint n。
1.png

Barrier的作用是在分布式的数据流中,将operator的多个输入流按照checkpoint对齐(align),如下图所示:
1.png


Flink Iceberg sink
了解了上述Flink的基本概念,这些概念又是如何被应用和映射到Flink Iceberg sink当中的呢?

总体框架
1.png

如图,Flink Iceberg sink有两个主要模块和两个辅助模块组成:

1.png

实现要点
Writer
  • 在当前的实现中,Java的Map<String, Object>作为每条记录,输入给writer。内部逻辑先将其转化为作为中间格式的Avro IndexedRecord,而后通过Iceberg里的Parquet相关API,累积的写入DataFile。
  • 使用Avro作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过ISSUE-870来去掉Avro,进而使用Iceberg内建的数据类型作为输入,同时也需要加入一个到Flink内建数据类型的转换器。
  • 在做checkpoint的过程中,发送writer自己的barrier到下游的committer之前,关闭单个Parquet文件,构建DataFile,并发送DataFile的信息给下游。


Committer
  • 全局唯一的Committer在收到上游所有writer的barrier以后,将收到的DataFile的信息填入manifest file,并使用ListState把manifest file作为用户自定义的state,保存于snapshot中。
  • 当checkpoint完成以后,通过merge append将manifest file提交给Iceberg。Iceberg内部通过后续的一系列操作完成commit。最终让新加入的数据对其他的读任务可见。


试用Flink Iceberg sink
社区上https://github.com/apache/incubator-iceberg/pull/856提供了可以试用的原型代码。下载该patch放入master分支,编译并构建即可。如下的程序展示了如何将该sink嵌入到Flink数据流中:

[mw_shl_code=bash,true]// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
    new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
    META_STORE_URIS);
hadoopConf.set(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
    META_STORE_WAREHOUSE);

Catalog icebergCatalog = new HiveCatalog(hadoopConf);

// Create Iceberg table
Schema schema = new Schema(
   ...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
    TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);

// Obtain an execution environment
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing
env.enableCheckpointing(...);
  
// Add Source
DataStream<Map<String, Object>> dataStream =
    env.addSource(source, typeInformation);

// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
    META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);

// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =
   new IcebergSinkAppender<Map<String, Object>>(conf, "test")
       .withSerializer(MapAvroSerializer.getInstance())
       .withWriterParallelism(1);
appender.append(dataStream);

// Trigger the execution
env.execute("Sink Test");[/mw_shl_code]

后续规划
Flink Iceberg sink有很多需要完善的地方,例如:上文中提到的去掉Avro作为中间格式;以及在各种失败的情况下是否仍能保证端到端的exactly-once;按固定时长做checkpoint,在高低峰时生成不同大小的DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。

原文链接
https://mp.weixin.qq.com/s/3-0A-jhIdeliPwVFQgOyWg

加微信w3aboutyun,可拉入技术爱好者群

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条