nettman 发表于 2022-10-9 19:21:14

Flink CDC 2.2.0同步Mysql数据到Hudi数据湖表实践


问题导读

1.Flink CDC底层是使用什么来进行数据变化更改?
2.如何添加Flink CDC依赖?
3.如何使用SQL方式同步Mysql数据到Hudi数据湖?


目录
介绍
Deserialization序列化和反序列化
添加Flink CDC依赖
3.1 sql-client
3.2 Java/Scala API
使用SQL方式同步Mysql数据到Hudi数据湖
4.1 Mysql表结构和数据
4.2 Flink开启checkpoint
4.3 在Flink中创建Mysql的映射表
4.4 在Flink中创建Hudi Sink的映射表
4.5 流式写入Hudi


1. 介绍
Flink CDC底层是使用Debezium来进行data changes的capture

特色:
snapshot能并行读取。根据表定义的primary key中的第一列划分成chunk。如果表没有primary key,需要通过参数scan.incremental.snapshot.enabled关闭snapshot增量读取
snapshot读取时的checkpoint粒度为chunk
snapshot读取不需要global read lock(FLUSH TABLES WITH READ LOCK)
reader读取snapshot和binlog的一致性过程:
标记当前的binlog position为low
多个reader读取各自的chunk
标记当前的binlog position为high
一个reader读取low ~ high之间的binlog
一个reader读取high之后的binlog

2. Deserialization序列化和反序列化
下面用json格式,展示了change event

{
"before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
},
"after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
},
"source": {...},
"op": "u",// operation type, u表示这是一个update event
"ts_ms": 1589362330904,// connector处理event的时间
"transaction": null
}在DataStrea API中,用户可以使用Constructor:JsonDebeziumDeserializationSchema(true),在message中包含schema。但是不推荐使用

JsonDebeziumDeserializationSchema也可以接收JsonConverter的自定义配置。如下示例在output中包含小数的数据

Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
JsonDebeziumDeserializationSchema schema =
      new JsonDebeziumDeserializationSchema(true, customConverterConfigs);

3. 添加Flink CDC依赖
3.1 sql-client
集成步骤如下:

从github flink cdc下载flink-sql-connector-mysql-cdc-2.2.0.jar包
将jar包放到Flink集群所有服务器的lib目录下
重启Flink集群
启动sql-client.sh
3.2 Java/Scala API
添加如下依赖到pom.xml中

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.2.0</version>
</dependency>

4. 使用SQL方式同步Mysql数据到Hudi数据湖
4.1 Mysql表结构和数据
建表语句如下:

CREATE TABLE `info_message` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`msg_title` varchar(100) DEFAULT NULL COMMENT '消息名称',
`msg_ctx` varchar(2048) DEFAULT NULL COMMENT '消息内容',
`msg_time` datetime DEFAULT NULL COMMENT '消息发送时间',
PRIMARY KEY (`id`)
)部分数据内容如下:
mysql>
mysql> select * from d_general.info_message limit 3;
+--------------------+-----------+-------------------------------------------------------+---------------------+
| id               | msg_title | msg_ctx                                             | msg_time            |
+--------------------+-----------+-------------------------------------------------------+---------------------+
|         1          |   title1|                         content1                      | 2019-03-29 15:27:21 |
|         2          |   title2|                         content2                      | 2019-03-29 15:38:36 |
|         3          |   title3|                         content3                      | 2019-03-29 15:38:36 |
+--------------------+-----------+-------------------------------------------------------+---------------------+
3 rows in set (0.00 sec)

mysql>4.2 Flink开启checkpoint
Checkpoint默认是不开启的,开启Checkpoint让Hudi可以提交事务
并且mysql-cdc在binlog读取阶段开始前,需要等待一个完整的checkpoint来避免binlog记录乱序的情况
binlog读取的并行度为1,checkpoint的粒度为数据行级别
可以在任务失败的情况下,达到Exactly-once语义


Flink SQL> set 'execution.checkpointing.interval' = '10s';
Session property has been set.

Flink SQL>4.3 在Flink中创建Mysql的映射表

Flink SQL> create table mysql_source(
> database_name string metadata from 'database_name' virtual,
> table_name string metadata from 'table_name' virtual,
> id decimal(20,0) not null,
> msg_title string,
> msg_ctx string,
> msg_time timestamp(9),
> primary key (id) not enforced
> ) with (
>   'connector' = 'mysql-cdc',
>   'hostname' = '192.168.8.124',
>   'port' = '3306',
>   'username' = 'hnmqet',
>   'password' = 'hnmq123456',
> 'server-time-zone' = 'Asia/Shanghai',
> 'scan.startup.mode' = 'initial',
>   'database-name' = 'd_general',
>   'table-name' = 'info_message'
> );
Execute statement succeed.

Flink SQL>说明如下:

Flink的table中添加了两个metadata列。还可以定义op_ts列,类型为TIMESTAMP_LTZ(3),表示binlog在数据库创建的时间,如果是snapshot,则值为0
如果Mysql中有很多个列,这里只获取Flink Table中定义的列
Mysql的用户需要的权限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT
server-time-zone: Mysql数据库的session time zone,用来控制如何将Mysql的timestamp类型转换成string类型
scan.startup.mode:mysql-cdc启动时消费的模式,initial表示同步snapshot和binlog,latest-offset表示同步最新的binlog
database-name和table-name可以使用正则表达式匹配多个数据库和多个表,例如"d_general+"可以匹配d_general0、d_general999等

4.4 在Flink中创建Hudi Sink的映射表

Flink SQL> create table hudi_sink(
> database_name string,
> table_name string,
> id decimal(20,0) not null,
> msg_title string,
> msg_ctx string,
> msg_time timestamp(6),
> primary key (database_name, table_name, id) not enforced
> ) with (
>   'connector' = 'hudi',
> 'path' = 'hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
> 'table.type' = 'MERGE_ON_READ',
> 'hoodie.datasource.write.recordkey.field' = 'database_name.table_name.id',
> 'write.precombine.field' = 'msg_time',
> 'write.rate.limit' = '2000',
> 'write.tasks' = '2',
> 'write.operation' = 'upsert',
> 'compaction.tasks' = '2',
> 'compaction.async.enabled' = 'true',
> 'compaction.trigger.strategy' = 'num_commits',
> 'compaction.delta_commits' = '5',
> 'read.tasks' = '2',
> 'changelog.enabled' = 'true'
> );
Execute statement succeed.

Flink SQL>

说明如下:

不同数据库和表的id字段可能会相同,定义复合主键
hoodie.datasource.write.recordkey.field:默指定表的主键,多个字段用.分隔。认为uuid字段
如果upstream不能保证数据的order,则需要显式指定write.precombine.field,且选取的字段不能包含null。默认为ts字段。作用是如果在一个批次中,有两条key相同的数据,取较大的precombine数据,插入到Hudi中
write.rate.limit:每秒写入数据的条数,默认为0表示不限制
默认write的并行度为4
write.operation:默认是upsert
默认compaction的并行度为4
compaction.async.enabled:是否开启online compaction,默认为true
compaction.trigger.strategy:compaction触发的策略,可选值:num_commits、time_elapsed、num_and_time、num_or_time,默认值为num_commits
compaction.delta_commits:每多少次commit进行一次compaction,默认值为5
MOR类型的表,还不能处理delete,所以会导致数据不一致。可以通过changelog.enabled转换到change log模式

4.5 流式写入Hudi
先同步snapshot,再同步事务日志

Flink SQL> insert into hudi_sink select database_name, table_name, id, msg_title, msg_ctx, msg_time from mysql_source /*+ OPTIONS('server-id'='5401') */ where msg_time is not null;
Submitting SQL update statement to the cluster...
SQL update statement has been successfully submitted to the cluster:
Job ID: afa575f5451af65d1ee7d225d77888ac


Flink SQL>注意:这里如果where条件如果添加了"msg_time > timestamp ‘2021-04-14 09:49:00’",任务会一直卡在write_stream这一步,write_stream的状态一直是bush(max): 100%,并且checkpoint也会一直卡住,查看HDFS上的表是没有数据
默认查询的并行度是1。如果并行度大于1,需要为每个slot设置server-id,4个slot的设置方法为:'server-id'='5401-5404'。这样Mysql server就能正确维护network connection和binlog position



最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg


原文连接:
https://mp.weixin.qq.com/s/MxP_5rdBKdDyomd1Uap8VQ

momo1 发表于 2022-10-10 08:09:44

谢谢分享
页: [1]
查看完整版本: Flink CDC 2.2.0同步Mysql数据到Hudi数据湖表实践