Iceberg在基于Flink的流式数据入库场景中的应用
问题导读1.Iceberg作为数据落地的格式解决了Flink什么痛点?
2.Flink Iceberg sink有哪些模块?
3.Flink Iceberg sink如何实现?
导言
本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。
应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
支持流式数据写入,并保证端到端的不重不丢(即exactly-once);
尽量减少中间环节,能支持更实时(甚至是T+0)的读取或导出,给下游提供更实时更准确的基础数据;
支持ACID,避免脏读等错误发生;
支持修改已落地的数据。虽然大数据和数据湖长于处理静态的、或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。
引入Iceberg作为Flink sink
为了解决上述痛点,我们引入了Iceberg作为数据落地的格式。Iceberg支持ACID事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入Flink作为流式处理框架,并将Iceberg作为Flink sink。
下文主要介绍Flink Iceberg sink的实现框架和要点。但在这之前,需要先介绍一些实现中用到的Flink基本概念。
Flink基本概念
从Flink的角度如何理解"流"和"批"
Flink使用DataFrame API来统一的处理流和批数据。
Stream, Transformation和Operator
[*]一个Flink程序由stream和transformation组成:
[*]Stream: Transformation之间的中间结果数据;
[*]Transformation:对(一个或多个)输入stream进行操作,输出(一个或多个)结果stream。
[*]当Flink程序执行时,其被映射成Streaming Dataflow,由如下的部分组成:
[*]Source (operator):接收外部输入给Flink;
[*]Transformation (operator):中间对stream做的任何操作;
[*]Sink (operator):Flink输出给外部。
下图为Flink官网的示例,展示了一个以Kafka作为输入Source,经过中间两个transformation,最终通过sink输出到Flink之外的过程。
State, Checkpoint and Snapshot
Flink依靠checkpoint和基于snapshot的恢复机制,保证程序state的一致性,实现容错。
Checkpoint是对分布式的数据流,以及所有operator的state,打snapshot的过程。
State
一个operator的state,即它包含的所有用于恢复当前状态的信息,可分为两类:
[*]系统state:如operator中对数据的缓存。
[*]用户自定义state:和用户逻辑相关,可以利用Flink提供的managed state,如ValueState、ListState,来存储。
[*]State的存储位置,可以分为:
[*]Local:内存,或者本地磁盘
[*]State backend:远端的持久化存储,如HDFS。
如下图所示:
Checkpoint
[*]Flink做checkpoint的过程如下:
[*]Checkpoint coordinator首先发送barrier给source。
[*]Source做snapshot,完成后向coordinator确认。
[*]Source向下游发送barrier。
[*]下游operator收到所有上游的barrier后,做snapshot,完成后向coordinator确认。
[*]继续往下游发送barrier,直到sink。
[*]Sink通知coordinator自己完成checkpoint。
[*]Coordinator确认本周期snapshot做完。
如下图所示:
Barrier
Barrier是Flink做分布式snapshot的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个barrier唯一对应checkpoint id,所以数据流中的record实际被barrier分组,如下图所示,barrier n和barrier n-1之间的record,属于checkpoint n。
Barrier的作用是在分布式的数据流中,将operator的多个输入流按照checkpoint对齐(align),如下图所示:
Flink Iceberg sink
了解了上述Flink的基本概念,这些概念又是如何被应用和映射到Flink Iceberg sink当中的呢?
总体框架
如图,Flink Iceberg sink有两个主要模块和两个辅助模块组成:
实现要点
Writer
[*]在当前的实现中,Java的Map<String, Object>作为每条记录,输入给writer。内部逻辑先将其转化为作为中间格式的Avro IndexedRecord,而后通过Iceberg里的Parquet相关API,累积的写入DataFile。
[*]使用Avro作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过ISSUE-870来去掉Avro,进而使用Iceberg内建的数据类型作为输入,同时也需要加入一个到Flink内建数据类型的转换器。
[*]在做checkpoint的过程中,发送writer自己的barrier到下游的committer之前,关闭单个Parquet文件,构建DataFile,并发送DataFile的信息给下游。
Committer
[*]全局唯一的Committer在收到上游所有writer的barrier以后,将收到的DataFile的信息填入manifest file,并使用ListState把manifest file作为用户自定义的state,保存于snapshot中。
[*]当checkpoint完成以后,通过merge append将manifest file提交给Iceberg。Iceberg内部通过后续的一系列操作完成commit。最终让新加入的数据对其他的读任务可见。
试用Flink Iceberg sink
社区上https://github.com/apache/incubator-iceberg/pull/856提供了可以试用的原型代码。下载该patch放入master分支,编译并构建即可。如下的程序展示了如何将该sink嵌入到Flink数据流中:
// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
META_STORE_URIS);
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_WAREHOUSE);
Catalog icebergCatalog = new HiveCatalog(hadoopConf);
// Create Iceberg table
Schema schema = new Schema(
...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
// Obtain an execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
env.enableCheckpointing(...);
// Add Source
DataStream<Map<String, Object>> dataStream =
env.addSource(source, typeInformation);
// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);
// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =
new IcebergSinkAppender<Map<String, Object>>(conf, "test")
.withSerializer(MapAvroSerializer.getInstance())
.withWriterParallelism(1);
appender.append(dataStream);
// Trigger the execution
env.execute("Sink Test");
后续规划
Flink Iceberg sink有很多需要完善的地方,例如:上文中提到的去掉Avro作为中间格式;以及在各种失败的情况下是否仍能保证端到端的exactly-once;按固定时长做checkpoint,在高低峰时生成不同大小的DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
原文链接
https://mp.weixin.qq.com/s/3-0A-jhIdeliPwVFQgOyWg
感谢分享
页:
[1]