hyj 发表于 2020-12-31 09:02:31

Flink使用iceberg数据湖技术解决小文件问题


问题导读

1.hdfs小文件初始解决办法,存在哪些问题?
2.为了解决小文件问题,引入了什么技术?
3.压缩程序是如何实现的?
4.flink写入的资源为何减少,采用了什么文件格式?


背景
在大数据处理领域,有一个非常常见但是很麻烦的问题,即hdfs小文件问题,我们也被这个问题困扰了很久。开始的时候我们是自己写的一个小文件压缩工具,定期的去合并,原理就是把待压缩数据写入一个新的临时的文件夹,压缩完,和原来的数据进行检验,数据一致之后,用压缩的数据覆盖原来的数据,但是由于无法保证事务,所以出现了很多的问题,比如压缩的同时又有数据写入了,检验就会失败,导致合并小文件失败,而且无法实时的合并,只能按照分区合并一天之前的。或者一个小时之前的,最新的数据仍然有小文件的问题,导致查询性能提高不了。

所以基于以上的一些问题,我调研了数据湖技术,由于我们的流式数据主要是flink为主,查询引擎是presto,而hudi强耦合了spark,对flink的支持还不太友好,而且不支持presto查询,所以综合考虑了一下,决定引入iceberg。在对iceberg进行功能测试和简单代码review之后,发现iceberg在flink这块还有一些需要优化和提升,不过我觉得应该能hold的住,不完善的地方和需要优化的地方我们自己来补全,所以最终引入了iceberg来解决小文件的问题。

除此之外,对于一些其他的问题,比如cdc数据的接入,以及根据查询条件删除数据等,后续也可以通过数据湖技术来解决。


Flink流式数据写入iceberg
我们的主要使用场景是使用flink将kafka的流式数据写入到Iceberg,为了代码的简洁以及可维护性,我们尽量将程序使用sql来编写,示例代码如下:

// create catalog
CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive'," +
   'warehouse'='hdfs://localhost/user/hive/warehouse',
   'uri'='thrift://localhost:9083'
)

// create table

CREATE TABLE iceberg.tmp.iceberg_table (
id BIGINT COMMENT 'unique id',
    data STRING,
   d int)
PARTITIONED BY (d)
WITH ('connector'='iceberg','write.format.default'='orc')

// insert into

insert into iceberg.tmp.iceberg_table select * from kafka_table提示:记得开启checkpoint


压缩小文件
目前社区提供了一个spark版本的合并小文件的Action,我们的环境以flink为主,所以我参考spark版本把这个压缩程序改了一个flink版本,并经过测试,进行了多处bug修改和优化。目前社区新发布的1.10版本中没有带这个功能,我自己基于master分支打了一个jar,并且里面包含了flink 版本压缩小文件的程序,以及所有的优化,需要的朋友,可以 到这下载一下,https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/libs/iceberg-flink-runtime-0.9.1.jar 。社区版本我觉得应该会在下一个版本发布。

这个压缩程序是单独启动的一个shell任务,逻辑就是先把iceberg表进行一次压缩。然后sleep五分钟。然后再启动压缩,是一个死循环任务。

之所以没有采取定时任务,是因为如果五分钟一个定时任务来压缩,那么如果五分钟之内没有压缩完成,或者压缩程序出现异常,导致本次压缩没完成的时候,下一个定时任务又起来了,就会把上次没有压缩完的数据一起压缩,这样就导致任务量就增大了,以后的任务压缩的文件越积累越多


代码示例参考:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Actions.forTable(env, table)
                .rewriteDataFiles()
      //.maxParallelism(parallelism)
      //.filter(Expressions.equal("day", day))
      //.targetSizeInBytes(targetSizeInBytes)
      .execute();


快照过期处理
我们的快照过期策略,我是和压缩小文件的批处理任务写在一起的,压缩完小文件之后,进行表的快照过期处理,目前保留的时间是一个小时,这是因为对于有一些比较大的表,分区比较多,而且checkpoint比较短,如果保留的快照过长的话,还是会保留过多小文件,我们暂时没有查询历史快照的需求,所以我将快照的保留时间设置了一个小时。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
   table.expireSnapshots()
   // .retainLast(20)
   .expireOlderThan(olderThanTimestamp)
   .commit();
数据管理
写入了数据之后,有时候我想查看一下相应的快照下面有多少数据文件,直接查询hdfs你不知道哪个是有用的,哪个是没用的。所以需要有对应的管理工具。目前flink这块还不太成熟,我们可以使用spark3提供的工具来查看。

ddl
目前create table 这些操作我们是通过flink sql client来做的。其他相关的ddl的操作可以使用spark来做:

https://iceberg.apache.org/spark/#ddl-commands

Dml
一些相关的数据的操作,比如删除数据等可以通过spark来实现,presto目前只支持分区级别的删除功能。


移除孤立的文件

定时任务删除
在使用iceberg的过程中,有时候会有这样的情况,我提交了一个flink任务,由于各种原因,我把它给停了,这个时候iceberg还没提交相应的快照。还有由于一些异常导致程序失败,就会产生一些不在iceberg元数据里面的孤立的数据文件,这些文件对iceberg来说是不可达的,也是没用的。所以我们需要像jvm的垃圾回收一样来清理这些文件。

目前iceberg提供了一个spark版本的action来进行处理这些没用的文件,我们采取的策略和压缩小文件一样,获取hive中的所有的iceberg表。每隔一个小时执行一次定时任务来删除这些没用的文件。

SparkSession spark = ......
Actions.forTable(spark, table)
         .removeOrphanFiles()
         //.deleteWith(...)
         .execute();踩坑
在程序运行过程中出现了正常的数据文件被删除的问题,经过调研,由于我的快照保留设置是一小时,这个清理程序清理时间也是设置一个小时,通过日志发现是这个清理程序删除了正常的数据。查了查代码,觉得应该是他们设置了一样的时间,在清理孤立文件的时候,有其他程序正在读写表,由于这个清理程序是没有事务的,导致删除了正常的数据。最后把这个清理程序的清理时间改成默认的三天,没有再出现删除数据文件的问题。当然,为了保险起见,我们可以覆盖原来的删除文件的方法,改成将文件到一个备份文件夹,检查没有问题之后,手工删除。


使用presto进行查询

目前我们使用的版本是prestosql 346,这个版本安装的时候需要jdk11,presto查询iceberg比较简单。官方提供了相应的conncter,我们配置一下就行,
//iceberg.properties

connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083


批任务处理

手工执行sql批任务

目前查询iceberg的批处理任务,使用的flink的客户端,首先我们启动一个基于yarn session 的flink集群,然后通过sql客户端提交任务到集群。

主要的配置就是我们需要根据数据的大小设置sql任务执行的并行度,可以通过以下参数设置。

set table.exec.resource.default-parallelism = 100;
此外我在sql客户端的配置文件里配置了hive和iceberg相应的catalog,这样每次客户端启动的时候就不需要建catalog了。
catalogs:# empty list
- name: iceberg
    type: iceberg
    warehouse: hdfs://localhost/user/hive2/warehouse
    uri: thrift://localhost:9083
    catalog-type: hive
    cache-enabled: false
- name: hive
    type: hive
    hive-conf-dir: /Users/user/work/hive/conf
    default-database: default


定时任务
目前对于定时调度中的批处理任务,flink的sql客户端还没hive那样做的很完善,比如执行hive -f来执行一个文件。而且不同的任务需要不同的资源,并行度等。所以我自己封装了一个flinK程序,通过调用这个程序来进行处理,读取一个指定文件里面的sql,来提交批任务。在命令行控制任务的资源和并行度等。

/home/flink/bin/flink run -p 10 -m yarn-cluster/home/work/iceberg-scheduler.jar my.sql


优化
批任务的查询这块,做了一些优化,比如limit下推,filter下推,查询并行度优化等,可以大大提高查询的速度,这些优化都已经推回给社区。


数据迁移
目前我们的所有数据都是存储在hive表的,在验证完iceberg之后,我们决定将hive的数据迁移到iceberg,所以我写了一个工具,可以使用hive的数据,然后新建一个iceberg表,为其建立相应的元数据,但是测试的时候发现,如果采用这种方式,就需要把写入hive的程序停止,因为如果iceberg和hive使用同一个数据文件,而压缩程序会不断地压缩iceberg表的小文件,压缩完之后,不会马上删除旧数据,所以hive表就会查到双份的数据。鉴于iceberg测试的时候还有一些不稳定,所以我们采用双写的策略,原来写入hive的程序不动,新启动一套程序写入iceberg,这样能对iceberg表观察一段时间。还能和原来hive中的数据进行比对,来验证程序的正确性。

经过一段时间观察,每天将近20亿数据的hive表和iceberg表,一条数据也不差。所以在最终对比数据没有问题之后,把hive表停止写入,使用新的iceberg表,然后把hive中的旧数据导入到iceberg。


总结
1.flink写入的资源减少,
举一个例子,原来一个flink读取kafka写入hive的任务,需要60个并行度才不会让kafka产生积压。改成写入iceberg之后,只需要20个并行度就够了,这得益于写入iceberg的时候,首先写入avro格式,这个要比写入列格式快的多。

2.查询速度变快
由于iceberg查询的时候不会像hive一样去扫描文件夹下面的所有数据,而是先从元数据查询对应的数据文件。只扫描需要的文件,这样查询的文件变少了,查询的性能得到了显著的提升。一些报表的速度从50秒提高到30秒。

3.并发读写
由于iceberg的事务支持,我们可以实现对一个表进行并发读写,flink流式数据实时入湖,压缩程序同时压缩小文件,清理过期文件和快照的程序同时清理无用的文件,这样就能更及时的提供数据,做到分钟级的延迟,并保证查询性能。

目前对iceberg的优化和修改,绝大部分已经贡献给社区,剩下的我后续也会陆续提交相应的pr,推回社区。




最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg

原文链接
https://mp.weixin.qq.com/s/AdGroTPNtXtwAdC-KLdi4g
页: [1]
查看完整版本: Flink使用iceberg数据湖技术解决小文件问题