分享

Apache Doris 整合 Iceberg+Flink CDC 构建实时湖仓一体的联邦查询分析架构

本帖最后由 levycui 于 2022-6-28 19:13 编辑
问题导读:
1、如何设计大数据架构?
2、如何配置HDFS?如何配置YARN?
3、Doris 如何查询 Iceberg?
4、如何
安装Doris?


本文概览

这篇教程将展示如何使用 Doris+Iceberg+Flink CDC 构建实时湖仓一体的联邦查询分析架构,Apache Doris 1.1 版本提供了 Iceberg 的支持,本文将主要展示 Doris 和 Iceberg 如何使用。

本教程中整个环境都是基于伪分布式环境搭建,按照步骤一步步完成,完整体验整个搭建操作的过程。

>>> 软件环境

本教程的演示环境如下:

  •     Centos7
  •     Apahce doris 1.1
  •     Hadoop 3.3.3
  •     hive 3.1.3
  •     Fink 1.14.4
  •     flink-sql-connector-mysql-cdc-2.2.1
  •     Apache Iceberg 0.13.2
  •     JDK 1.8.0_311
  •     MySQL 8.0.29

  1. wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gz
  2. wget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
  3. wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
  4. wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
  5. wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
复制代码

>>> 系统架构
2022-06-28_185356.jpg

  •     首先我们从 Mysql 数据中使用 Flink,通过 Binlog 完成数据的实时采集
  •     然后在 Flink 中创建 Iceberg 表,Iceberg 的元数据保存在 Hive 里
  •     最后我们在 Doris 中创建 Iceberg 外表
  •     再通过 Doris 统一查询入口完成对 Iceberg 里的数据查询分析,供前端应用调用,这里 Iceberg 外表的数据可以和 Doris 内部数据或者 Doris 其他外部数据源的数据进行关联查询分析

Doris 湖仓一体的联邦查询架构如下:
2022-06-28_185435.jpg

  •     Doris 通过 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
  •     同时支持 Elasticsearch 外表
  •     1.0 版本支持 Hive 外表
  •     1.1 版本支持 Iceberg 外表
  •     1.2 版本支持 Hudi 外表


环境安装部署

>>> 安装 Hadoop、Hive
  1. tar zxvf hadoop-3.3.3.tar.gz
  2. tar zxvf apache-hive-3.1.3-bin.tar.gz
复制代码

配置系统环境变量:
  1. export HADOOP_HOME=/data/hadoop-3.3.3
  2. export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
  3. export HADOOP_HDFS_HOME=$HADOOP_HOME
  4. export HIVE_HOME=/data/hive-3.1.3
  5. export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf
复制代码

>>> 配置 HDFS

  •    core-site.xml
  1. vi etc/hadoop/core-site.xml
  2. <configuration>
  3.    <property>
  4.        <name>fs.defaultFS</name>
  5.        <value>hdfs://localhost:9000</value>
  6.    </property>
  7. </configuration>
复制代码

  •    hdfs-site.xml

  1. vi etc/hadoop/hdfs-site.xml
  2. <configuration>
  3.    <property>
  4.      <name>dfs.replication</name>
  5.      <value>1</value>
  6.    </property>
  7.    <property>
  8.      <name>dfs.namenode.name.dir</name>
  9.      <value>/data/hdfs/namenode</value>
  10.    </property>
  11.    <property>
  12.      <name>dfs.datanode.data.dir</name>
  13.      <value>/data/hdfs/datanode</value>
  14.    </property>
  15. </configuration>
复制代码

  •    修改 Hadoop 启动脚本
  1. sbin/start-dfs.sh
  2. sbin/stop-dfs.sh
复制代码

在文件开始加上下面的内容:

  1. HDFS_DATANODE_USER=root
  2. HADOOP_SECURE_DN_USER=hdfs
  3. HDFS_NAMENODE_USER=root
  4. HDFS_SECONDARYNAMENODE_USER=rootsbin/start-yarn.sh
复制代码
  1. sbin/start-yarn.sh
  2. sbin/stop-yarn.sh
复制代码


在文件开始加上下面的内容:
  1. YARN_RESOURCEMANAGER_USER=root
  2. HADOOP_SECURE_DN_USER=yarn
  3. YARN_NODEMANAGER_USER=root
复制代码

>>> 配置 Yarn
这里我改变了 Yarn 的一些端口,因为我是单机环境,和 Doris 的一些端口冲突。你可以不启动 Yarn。
  1. vi etc/hadoop/yarn-site.xml
  2. <property>        
  3.    <name>yarn.resourcemanager.address</name>  
  4.    <value>jiafeng-test:50056</value>
  5. </property>  
  6. <property>  
  7.    <name>yarn.resourcemanager.scheduler.address</name>
  8.    <value>jiafeng-test:50057</value>
  9. </property>
  10. <property>
  11.    <name>yarn.resourcemanager.resource-tracker.address</name>  
  12.    <value>jiafeng-test:50058</value>
  13. </property>
  14. <property>
  15.    <name>yarn.resourcemanager.admin.address</name>
  16.    <value>jiafeng-test:50059</value>
  17. </property>
  18. <property>
  19.    <name>yarn.resourcemanager.webapp.address</name>
  20.    <value>jiafeng-test:9090</value>
  21. </property>
  22. <property>
  23.    <name>yarn.nodemanager.localizer.address</name>
  24.    <value>0.0.0.0:50060</value>
  25. </property>
  26. <property>
  27.    <name>yarn.nodemanager.webapp.address</name>
  28.    <value>0.0.0.0:50062</value>  
  29. </property>
复制代码
  1. vi etc/hadoop/mapred-site.xm
  2. <property>      
  3.    <name>mapreduce.jobhistory.address</name>  
  4.    <value>0.0.0.0:10020</value>  
  5. </property>
  6. <property>
  7.    <name>mapreduce.jobhistory.webapp.address</name>
  8.    <value>0.0.0.0:19888</value>
  9. </property>
  10. <property>
  11.    <name>mapreduce.shuffle.port</name>
  12.    <value>50061</value>
  13. </property>
复制代码

  •    启动 Hadoop
  1. sbin/start-all.sh
复制代码
>>> 配置 Hive

  •    创建 HDFS 目录
  1. hdfs dfs -mkdir -p /user/hive/warehouse
  2. hdfs dfs -mkdir /tmp
  3. hdfs dfs -chmod g+w /user/hive/warehouse
  4. hdfs dfs -chmod g+w /tmp
复制代码

  •    配置 Hive-site.xml
  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4.        <property>
  5.            <name>javax.jdo.option.ConnectionURL</name>
  6.            <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
  7.        </property>
  8.        <property>
  9.            <name>javax.jdo.option.ConnectionDriverName</name>
  10.            <value>com.mysql.jdbc.Driver</value>
  11.        </property>
  12.        <property>
  13.            <name>javax.jdo.option.ConnectionUserName</name>
  14.            <value>root</value>
  15.        </property>
  16.        <property>
  17.            <name>javax.jdo.option.ConnectionPassword</name>
  18.            <value>MyNewPass4!</value>
  19.        </property>
  20.        <property>
  21.                <name>hive.metastore.warehouse.dir</name>
  22.                <value>/user/hive/warehouse</value>
  23.                <description>location of default database for the warehouse</description>
  24.        </property>
  25.        <property>
  26.                <name>hive.metastore.uris</name>
  27.                <value/>
  28.                <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  29.        </property>
  30.        <property>
  31.                <name>javax.jdo.PersistenceManagerFactoryClass</name>
  32.                <value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
  33.        </property>
  34.        <property>
  35.                <name>hive.metastore.schema.verification</name>
  36.                <value>false</value>
  37.        </property>
  38.        <property>
  39.                <name>datanucleus.schema.autoCreateAll</name>
  40.                <value>true</value>
  41.        </property>
  42. </configuration>
复制代码

  •    配置 Hive-env.sh
加入以下内容:
  1. HADOOP_HOME=/data/hadoop-3.3.3
复制代码

  •    Hive 元数据初始化
  1. schematool -initSchema -dbType mysql
复制代码
  •    启动 Hive metaservice

后台运行:
  1. nohup bin/hive --service metaservice 1>/dev/null 2>&1 &
复制代码

验证:
  1. lsof -i:9083
  2. COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
  3. java   20700 root 567u IPv6 54605348     0t0 TCP *:emc-pp-mgmtsvc (LISTEN)
复制代码

>>> 安装 MySql

具体请参照这里:使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

复制到浏览器打开:
  1. https://doris.apache.org/zh-CN/blogs/PracticalCases/flink-cdc-to-doris.html#_4-3-%E5%AE%89%E8%A3%85%E9%85%8D%E7%BD%AE-mysql
复制代码

  •    创建 MySql 数据库表并初始化数据

  1. CREATE DATABASE demo;
  2. USE demo;
  3. CREATE TABLE userinfo (
  4. id int NOT NULL AUTO_INCREMENT,
  5. name VARCHAR(255) NOT NULL DEFAULT 'flink',
  6. address VARCHAR(1024),
  7. phone_number VARCHAR(512),
  8. email VARCHAR(255),
  9. PRIMARY KEY (`id`)
  10. )ENGINE=InnoDB ;
  11. INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
  12. INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
  13. INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
  14. INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
  15. INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
  16. INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
  17. INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
  18. INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
  19. INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);
复制代码

>>> 安装 Flink
  1. tar zxvf flink-1.14.4-bin-scala_2.12.tgz
复制代码

然后将下面的依赖拷贝到 Flink 安装目录下的 lib 目录下,具体的依赖的 lib 文件如下:
2022-06-28_185527.jpg

下面将几个 Hadoop 和 Flink 里没有的依赖下载地址放在下面:
  1. wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
  2. wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
  3. wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
  4. wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
复制代码

其他的:
  1. hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jar
  2. hadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jar
  3. hadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jar
  4. hadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jar
  5. hadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jar
  6. hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar
  7. adoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jar
  8. hadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jar
  9. hive-3.1.3/lib/hive-exec-3.1.3.jar
  10. hive-3.1.3/lib/hive-metastore-3.1.3.jar
  11. hive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar
复制代码

  •    启动 Flink
  1. bin/start-cluster.sh
复制代码

启动后的界面如下:
2022-06-28_185559.jpg

  •    进入 Flink SQL Client
  1. bin/sql-client.sh embedded
复制代码


2022-06-28_185635.jpg

开启 Checkpoint,每隔 3 秒做一次 Checkpoint

Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,MySql-CDC 在 Binlog 读取阶段开始前,需要等待一个完整的 Checkpoint 来避免 Binlog 记录乱序的问题。

注意:这里是演示环境,Checkpoint 的间隔设置比较短,线上使用,建议设置为3-5分钟一次 Checkpoint。

  1. Flink SQL> SET execution.checkpointing.interval = 3s;
  2. [INFO] Session property has been set.
复制代码

创建 Iceberg Catalog
  1. CREATE CATALOG hive_catalog WITH (
  2. 'type'='iceberg',
  3. 'catalog-type'='hive',
  4. 'uri'='thrift://localhost:9083',
  5. 'clients'='5',
  6. 'property-version'='1',
  7. 'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
  8. );
复制代码

查看 Catalog
  1. Flink SQL> show catalogs;
  2. +-----------------+
  3. |   catalog name |
  4. +-----------------+
  5. | default_catalog |
  6. |   hive_catalog |
  7. +-----------------+
  8. 2 rows in set
复制代码

创建 MySql CDC 表
  1. CREATE TABLE user_source (
  2.   database_name STRING METADATA VIRTUAL,
  3.    table_name STRING METADATA VIRTUAL,
  4.    `id` DECIMAL(20, 0) NOT NULL,
  5.   name STRING,
  6.   address STRING,
  7.   phone_number STRING,
  8.   email STRING,
  9.    PRIMARY KEY (`id`) NOT ENFORCED
  10. ) WITH (
  11.    'connector' = 'mysql-cdc',
  12.    'hostname' = 'localhost',
  13.    'port' = '3306',
  14.    'username' = 'root',
  15.    'password' = 'MyNewPass4!',
  16.    'database-name' = 'demo',
  17.    'table-name' = 'userinfo'
  18. );
复制代码

查询 CDC 表:
  1. select * from user_source;
复制代码


2022-06-28_185708.jpg

创建 Iceberg 表
  1. ---查看catalog
  2. show catalogs;
  3. ---使用catalog
  4. use catalog hive_catalog;
  5. --创建数据库
  6. CREATE DATABASE iceberg_hive;
  7. --使用数据库
  8. use iceberg_hive;
复制代码

创建表
  1. CREATE TABLE all_users_info (
  2.   database_name STRING,
  3.   table_name   STRING,
  4.   `id`         DECIMAL(20, 0) NOT NULL,
  5.   name         STRING,
  6.   address       STRING,
  7.   phone_number STRING,
  8.   email         STRING,
  9.   PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  10. ) WITH (
  11.   'catalog-type'='hive'
  12. );
复制代码

从 CDC 表里插入数据到 Iceberg 表里
  1. use catalog default_catalog;
  2. insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;
复制代码

在 Web 界面可以看到任务的运行情况
2022-06-28_185739.jpg

然后停掉任务,我们去查询 Iceberg 表
  1. select * from hive_catalog.iceberg_hive.all_users_info
复制代码

可以看到下面的结果:
2022-06-28_185805.jpg

我们去 HDFS 上可以看到 Hive 目录下的数据及对应的元数据:
2022-06-28_185831.jpg

我们也可以通过 Hive 建好 Iceberg 表,然后通过 Flink 将数据插入到表里:

下载 Iceberg Hive 运行依赖
  1. wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar
复制代码

在 Hive Shell 下执行:
  1. SET engine.hive.enabled=true;
  2. SET iceberg.engine.hive.enabled=true;
  3. SET iceberg.mr.catalog=hive;
  4. add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;
复制代码

创建表
  1. CREATE EXTERNAL TABLE iceberg_hive(
  2. `id` int,
  3. `name` string)
  4. STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  5. LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
  6. TBLPROPERTIES (
  7. 'iceberg.mr.catalog'='hadoop',
  8. 'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
  9. );
复制代码

然后在 Flink SQL Client 下执行下面语句将数据插入到 Iceber 表里
  1. INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');
  2. INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');');
复制代码

查询这个表
  1. select * from hive_catalog.iceberg_hive.iceberg_hive
复制代码

可以看到下面的结果:
2022-06-28_185901.jpg


Doris 查询 Iceberg

Apache Doris 提供了直接访问 Iceberg 外部表的能力,外部表省去了繁琐的数据导入工作,并借助 Doris 本身 OLAP 的能力来解决 Iceberg 表的数据分析问题:

  •     支持 Iceberg 数据源接入 Doris
  •     支持 Doris 与 Iceberg 数据源中的表联合查询,进行更加复杂的分析操作

>>> 安装 Doris
这里我们不再详细讲解 Doris 的安装,如果你不知道怎么安装 Doris请参照官方文档:快速入门
复制到浏览器打开:

https://doris.apache.org/zh-CN/d ... /get-starting.html#环境准备

  •    创建 Iceberg 外表

  1. CREATE TABLE `all_users_info`
  2. ENGINE = ICEBERG
  3. PROPERTIES (
  4. "iceberg.database" = "iceberg_hive",
  5. "iceberg.table" = "all_users_info",
  6. "iceberg.hive.metastore.uris" = "thrift://localhost:9083",
  7. "iceberg.catalog.type" = "HIVE_CATALOG"
  8. );
复制代码

参数说明:
ENGINE 需要指定为 ICEBERG
PROPERTIES 属性:

  1.     iceberg.hive.metastore.uris:Hive Metastore 服务地址
  2.     iceberg.database:挂载 Iceberg 对应的数据库名
  3.     iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。
  4.     iceberg.catalog.type:Iceberg 中使用的 Catalog 方式,默认为 HIVE_CATALOG,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。
复制代码
  1. mysql> CREATE TABLE `all_users_info`
  2.     -> ENGINE = ICEBERG
  3.     -> PROPERTIES (
  4.     -> "iceberg.database" = "iceberg_hive",
  5.     -> "iceberg.table" = "all_users_info",
  6.     -> "iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
  7.     -> "iceberg.catalog.type"  =  "HIVE_CATALOG"
  8.     -> );
  9. Query OK, 0 rows affected (0.23 sec)
复制代码
  1. mysql> select * from all_users_info;
  2. +---------------+------------+-------+----------+-----------+--------------+-------+
  3. | database_name | table_name | id    | name     | address   | phone_number | email |
  4. +---------------+------------+-------+----------+-----------+--------------+-------+
  5. | demo          | userinfo   | 10004 | user_113 | shenzheng | 13347420870  | NULL  |
  6. | demo          | userinfo   | 10005 | user_114 | hangzhou  | 13347420870  | NULL  |
  7. | demo          | userinfo   | 10002 | user_111 | xian      | 13347420870  | NULL  |
  8. | demo          | userinfo   | 10003 | user_112 | beijing   | 13347420870  | NULL  |
  9. | demo          | userinfo   | 10001 | user_110 | Shanghai  | 13347420870  | NULL  |
  10. | demo          | userinfo   | 10008 | user_117 | guangzhou | 13347420870  | NULL  |
  11. | demo          | userinfo   | 10009 | user_118 | xian      | 13347420870  | NULL  |
  12. | demo          | userinfo   | 10006 | user_115 | guizhou   | 13347420870  | NULL  |
  13. | demo          | userinfo   | 10007 | user_116 | chengdu   | 13347420870  | NULL  |
  14. +---------------+------------+-------+----------+-----------+--------------+-------+
  15. 9 rows in set (0.18 sec)
复制代码

  •    同步挂载

当 Iceberg 表 Schema 发生变更时,可以通过 REFRESH 命令手动同步,该命令会将 Doris 中的 Iceberg 外表删除重建。

-- 同步 Iceberg 表
  1. REFRESH TABLE t_iceberg;
复制代码

-- 同步 Iceberg 数据库
  1. REFRESH DATABASE iceberg_test_db;
复制代码

  •    Doris 和 Iceberg 数据类型对应关系

支持的 Iceberg 列类型与 Doris 对应关系如下表:
2022-06-28_185935.jpg

   注意事项
  •     Iceberg 表 Schema 变更不会自动同步,需要在 Doris 中通过 REFRESH 命令同步 Iceberg 外表或数据库。
  •     当前默认支持的 Iceberg 版本为 0.12.0,0.13.x,未在其他版本进行测试。后续后支持更多版本。
  •    Doris FE 配置

下面几个配置属于 Iceberg 外表系统级别的配置,可以通过修改 fe.conf 来配置,也可以通过 ADMIN SET CONFIG 来配置。

  •     iceberg_table_creation_strict_mode

创建 Iceberg 表默认开启 strict mode。strict mode 是指对 Iceberg 表的列类型进行严格过滤,如果有 Doris 目前不支持的数据类型,则创建外表失败。

  •     iceberg_table_creation_interval_second

自动创建 Iceberg 表的后台任务执行间隔,默认为 10s。

  •     max_iceberg_table_creation_record_size

Iceberg 表创建记录保留的最大值,默认为 2000,仅针对创建 Iceberg 数据库记录。


总结

这里 Doris On Iceberg 我们只演示了 Iceberg 单表的查询,你还可以联合 Doris 的表,或者其他的 ODBC 外表,Hive 外表,ES 外表等进行联合查询分析,通过 Doris 对外提供统一的查询分析入口。
到这里我们从完整搭建 Hadoop、Hive、Flink 、MySql、Doris 及 Doris On Iceberg 的使用已经全部介绍完了。


作者:张家锋
来源:https://mp.weixin.qq.com/s/VnqlJ763nGWyOe1FJ8gt2g


最新经典文章,欢迎关注公众号




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条