分享

使用 Apache Hudi 实现 SCD-2(渐变维度)

问题导读
1.Hudi提供哪些功能?
2.Hudi默认显示表中的快照数据,是否正确?
3.Hudi 是否允许通过时间点查询旧版本数据?


数据是当今分析世界的宝贵资产。在向最终用户提供数据时,跟踪数据在一段时间内的变化非常重要。渐变维度 (SCD) 是随时间推移存储和管理当前和历史数据的维度。在 SCD 的类型中,我们将特别关注类型 2(SCD 2),它保留了值的完整历史。每条记录都包含有效时间和到期时间,以标识记录处于活动状态的时间段。这可以通过少数审计列来实现。例如:生效开始日期、生效结束日期和活动记录指示器。让我们了解如何使用 Apache Hudi 来实现这种 SCD-2 表设计。

Apache Hudi 是下一代流数据湖平台。Apache Hudi 将核心仓库和数据库功能直接引入数据湖。Hudi 提供表、事务、高效的 upserts/deletes、高级索引、流式摄取服务、数据Clustering/压缩优化和并发性,同时将数据保持为开源文件格式。

Apache Hudi 默认显示表中的快照数据,即最近提交的最新数据。如果我们想跟踪历史变化,我们需要利用 Hudi 的时间点查询(https://hudi.apache.org/docs/qui ... point-in-time-query

Hudi 允许通过时间点查询旧版本数据或最新数据和时间旅行,通过时间点查询遍历历史数据变化是不高效的,需要对给定数据进行多次时间间隔分析。让我们看看如何通过使用经典方法的解决方法来克服这个问题。让我们考虑一个包含产品详细信息和卖家折扣的表。


  1. +---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  2. |seller_id|prod_category |product_name   |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
  3. +---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  4. |3412     |Healthcare    |Dolo 650       |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       |
  5. |1234     |Detergent     |Tide 2L        |6              |15                 |2021-12-15 15:20:30|9999-12-31 23:59:59|1       |
  6. |1234     |Home Essential|Hand Towel     |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       |
  7. |4565     |Gourmet       |Dairy Milk Silk|6              |30                 |2021-06-12 20:30:40|9999-12-31 23:59:59|1       |
  8. +---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
复制代码


步骤

1. 让我们使用 Spark 将这些数据写入 Hudi 表中
  1. spark-shell \
  2. --packages org.apache.hudi:hudi-spark-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:2.4.7,org.apache.avro:avro:1.8.2 \
  3. --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  4. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  5. --conf "spark.sql.hive.convertMetastoreParquet=false"
复制代码
启动 spark shell 后,我们可以导入库,并创建 Hudi 表,如下所示。
  1. Welcome to
  2.       ____              __
  3.      / __/__  ___ _____/ /__
  4.     _\ \/ _ \/ _ `/ __/  '_/
  5.    /___/ .__/\_,_/_/ /_/\_\   version 2.4.8
  6.       /_/
  7.          
  8. Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)
  9. Type in expressions to have them evaluated.
  10. Type :help for more information.
  11. scala> spark.sql("""create table hudi_product_catalog (
  12.      | seller_id int,
  13.      | prod_category string,
  14.      | product_name string,
  15.      | product_package string,
  16.      | discount_percentage string,
  17.      | eff_start_ts timestamp,
  18.      | eff_end_ts timestamp,
  19.      | actv_ind int
  20.      |  ) using hudi
  21.      | tblproperties (
  22.      |   type = 'cow',
  23.      |   primaryKey = 'seller_id,prod_category,eff_end_ts',
  24.      |   preCombineField = 'eff_start_ts'
  25.      |  )
  26.      | partitioned by (actv_ind)
  27.      |  location 'gs://target_bucket/hudi_product_catalog/'""")
复制代码
将数据写入到存储桶后,如下是 Hudi 目标表的数据格式。
  1. +-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  2. |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                                                       |_hoodie_partition_path|_hoodie_file_name                                                         |seller_id|prod_category |product_name   |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
  3. +-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  4. |20220722113258101  |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|3412     |Healthcare    |Dolo 650       |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       |
  5. |20220722113258101  |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|1234     |Home Essential|Hand Towel     |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       |
  6. |20220722113258101  |20220722113258101_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000       |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|4565     |Gourmet       |Dairy Milk Silk|6              |30                 |2021-06-12 20:30:40|9999-12-31 23:59:59|1       |
  7. |20220722113258101  |20220722113258101_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000     |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|1234     |Detergent     |Tide 2L        |6              |15                 |2021-12-15 15:20:30|9999-12-31 23:59:59|1       |
  8. +-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
复制代码
2.假设我们的增量数据存储在下表中(非Hudi格式,可以是Hive)。
  1. +---------+-------------+-----------------+---------------+-------------------+-------------------+
  2. |seller_id|prod_category|product_name     |product_package|discount_percentage|eff_start_ts       |
  3. +---------+-------------+-----------------+---------------+-------------------+-------------------+
  4. |1234     |Detergent    |Tide 5L          |6              |25                 |2022-01-31 10:00:30|
  5. |4565     |Gourmet      |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|
  6. |3345     |Stationary   |Sticky Notes     |4              |12                 |2022-07-09 21:30:45|
  7. +---------+-------------+-----------------+---------------+-------------------+-------------------+
复制代码
3. 现在让我们通过对目标表进行Left Anti Join[1]过滤掉增量表中的所有 Insert only 记录。
  1. val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")
  2. val tgtHudiDf = spark.sql("select * from hudi_product_catalog")
  3. hudiTableData.createOrReplaceTempView("hudiTable")
  4. //Cast as needed
  5. val stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts")))
  6. .withColumn("seller_id",col("seller_id").cast("int"))
  7. //Prepare an insert DF from incremental temp DF
  8. val instmpDf = stgDf.as("stg")
  9.       .join(tgtHudiDf.as("tgt"),
  10.         col("stg.seller_id") === col("tgt.seller_id") &&
  11.           col("stg.prod_category") === col("tgt.prod_category"),"left_anti")
  12. .select("stg.*")
  13. val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59")))
  14. .withColumn("actv_ind",lit(1))
  15. insDf.show(false)
  16. +---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
  17. |seller_id|prod_category|product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|
  18. +---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
  19. |     3345|   Stationary|Sticky Notes|              4|                 12|2022-07-09 21:30:45|9999-12-31 23:59:59|       1|
  20. +---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
复制代码
4. 我们有一个只插入记录的DataFrame。接下来让我们创建一个DataFrame,其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录。
  1. //Prepare an update DF from incremental temp DF, select columns from both the tables
  2. val updDf = stgDf.as("stg")
  3.       .join(tgtHudiDf.as("tgt"),
  4.         col("stg.seller_id") === col("tgt.seller_id") &&
  5.           col("stg.prod_category") === col("tgt.prod_category"),"inner")
  6.           .where(col("stg.eff_start_ts") > col("tgt.eff_start_ts"))
  7. .select((stgDf.columns.map(c => stgDf(c).as(s"stg_$c"))++ tgtHudiDf.columns.map(c => tgtHudiDf(c).as(s"tgt_$c"))):_*)
  8. updDf.show(false)
  9. +-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
  10. |stg_seller_id|stg_prod_category| stg_product_name|stg_product_package|stg_discount_percentage|   stg_eff_start_ts|tgt__hoodie_commit_time|tgt__hoodie_commit_seqno|tgt__hoodie_record_key|tgt__hoodie_partition_path|tgt__hoodie_file_name|tgt_seller_id|tgt_prod_category|tgt_product_name|tgt_product_package|tgt_discount_percentage|   tgt_eff_start_ts|     tgt_eff_end_ts|tgt_actv_ind|
  11. +-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
  12. |         1234|        Detergent|          Tide 5L|                  6|                     25|2022-01-31 10:00:30|      20220710113622931|    20220710113622931...|  seller_id:1234,pr...|                actv_ind=1| 2dd6109f-2173-429...|         1234|        Detergent|         Tide 2L|                  6|                     15|2021-12-15 15:20:30|9999-12-31 23:59:59|           1|
  13. |         4565|          Gourmet|Dairy Milk Almond|                 12|                     45|2022-06-12 20:30:40|      20220710113622931|    20220710113622931...|  seller_id:4565,pr...|                actv_ind=1| 2dd6109f-2173-429...|         4565|          Gourmet| Dairy Milk Silk|                  6|                     30|2021-06-12 20:30:40|9999-12-31 23:59:59|           1|
  14. +-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
复制代码
5. 现在我们有一个DataFrame,它在一条记录中包含新旧数据,让我们在各自单独的DataFrame中拉取更新记录的活动和非活动实例。








640.png

在进行上述练习时,我们将通过更改活动(新)记录的 eff_end_tsto eff_start_ts -1 并更新 actv_ind = 0 来废弃非活动记录
  1. //Prepare Active updates
  2. val updActiveDf = updDf.select(col("stg_seller_id").as("seller_id"),
  3. col("stg_prod_category").as("prod_category"),
  4. col("stg_product_name").as("product_name"),
  5. col("stg_product_package").as("product_package"),
  6. col("stg_discount_percentage").as("discount_percentage"),
  7. col("stg_eff_start_ts").as("eff_start_ts"),
  8. to_timestamp(lit("9999-12-31 23:59:59")) as ("eff_end_ts"),
  9. lit(1) as ("actv_ind"))
  10. updActiveDf.show(false)
  11. +---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  12. |seller_id|prod_category|product_name     |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
  13. +---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  14. |1234     |Detergent    |Tide 5L          |6              |25                 |2022-01-31 10:00:30|9999-12-31 23:59:59|1       |
  15. |4565     |Gourmet      |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|9999-12-31 23:59:59|1       |
  16. +---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  17. //Prepare inactive updates, which will become obsolete records
  18. val updInactiveDf = updDf.select(col("tgt_seller_id").as("seller_id"),
  19. col("tgt_prod_category").as("prod_category"),
  20. col("tgt_product_name").as("product_name"),
  21. col("tgt_product_package").as("product_package"),
  22. col("tgt_discount_percentage").as("discount_percentage"),
  23. col("tgt_eff_start_ts").as("eff_start_ts"),
  24. (col("stg_eff_start_ts") - expr("interval 1 seconds")).as("eff_end_ts"),
  25. lit(0) as ("actv_ind"))
  26. scala> updInactiveDf.show
  27. +---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  28. |seller_id|prod_category|   product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|
  29. +---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  30. |     1234|    Detergent|        Tide 2L|              6|                 15|2021-12-15 15:20:30|2022-01-31 10:00:29|       0|
  31. |     4565|      Gourmet|Dairy Milk Silk|              6|                 30|2021-06-12 20:30:40|2022-06-12 20:30:39|       0|
  32. +---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
复制代码
6. 现在我们将使用union运算符将插入、活动更新和非活动更新拉入单个DataFrame。将此DataFrame作为最终 Hudi 写入逻辑的增量源。
  1. scala> val upsertDf = insDf.union(updActiveDf).union(updInactiveDf)
  2. scala> upsertDf.show
  3. +---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  4. |seller_id|prod_category|     product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|
  5. +---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  6. |     3345|   Stationary|     Sticky Notes|              4|                 12|2022-07-09 21:30:45|9999-12-31 23:59:59|       1|
  7. |     4565|      Gourmet|Dairy Milk Almond|             12|                 45|2022-06-12 20:30:40|9999-12-31 23:59:59|       1|
  8. |     1234|    Detergent|          Tide 5L|              6|                 25|2022-01-31 10:00:30|9999-12-31 23:59:59|       1|
  9. |     4565|      Gourmet|  Dairy Milk Silk|              6|                 30|2021-06-12 20:30:40|2022-06-12 20:30:39|       0|
  10. |     1234|    Detergent|          Tide 2L|              6|                 15|2021-12-15 15:20:30|2022-01-31 10:00:29|       0|
  11. +---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  12. val path = "gs://target_bucket/hudi_product_catalog"
  13. upsertDf.write.format("org.apache.hudi")
  14. .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
  15. .option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator")
  16. .option(RECORDKEY_FIELD_OPT_KEY, "seller_id,prod_category,eff_end_ts")
  17. .option(PRECOMBINE_FIELD_OPT_KEY, "eff_start_ts")
  18. .option("hoodie.table.name","hudi_product_catalog")
  19. .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "target_schema")
  20. .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_product_catalog")
  21. .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
  22. .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
  23. .option(PARTITIONPATH_FIELD_OPT_KEY, "actv_ind")
  24. .mode(Append)
  25. .save(s"$path")
  26. scala> spark.sql("refresh table stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog")
  27. scala> spark.sql("select * from stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog").show(false)
  28. +-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  29. |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                                                       |_hoodie_partition_path|_hoodie_file_name                                                         |seller_id|prod_category |product_name     |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
  30. +-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
  31. |20220722113258101  |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3412     |Healthcare    |Dolo 650         |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       |
  32. |20220722113258101  |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234     |Home Essential|Hand Towel       |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       |
  33. |20220722114049500  |20220722114049500_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000       |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|4565     |Gourmet       |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|9999-12-31 23:59:59|1       |
  34. |20220722114049500  |20220722114049500_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000     |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234     |Detergent     |Tide 5L          |6              |25                 |2022-01-31 10:00:30|9999-12-31 23:59:59|1       |
  35. |20220722114049500  |20220722114049500_0_4|seller_id:3345,prod_category:Stationary,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3345     |Stationary    |Sticky Notes     |4              |12                 |2022-07-09 21:30:45|9999-12-31 23:59:59|1       |
  36. |20220722114049500  |20220722114049500_1_0|seller_id:4565,prod_category:Gourmet,eff_end_ts:1655065839000000         |actv_ind=0            |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|4565     |Gourmet       |Dairy Milk Silk  |6              |30                 |2021-06-12 20:30:40|2022-06-12 20:30:39|0       |
  37. |20220722114049500  |20220722114049500_1_1|seller_id:1234,prod_category:Detergent,eff_end_ts:1643623229000000       |actv_ind=0            |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|1234     |Detergent     |Tide 2L          |6              |15                 |2021-12-15 15:20:30|2022-01-31 10:00:29|0       |
  38. +-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
复制代码
注意项
• 对于现有记录的每次更新,parquet 文件将在存储中重新写入/移动,这可能会影响写入时的性能

• 在查询数据期间,根据代表主要过滤器的属性对目标表进行分区总是一个更好的主意。例如:销售表中的销售日期,注册产品目录的卖家。上述示例中选择了 actv_ind ,因为我们希望使其易于解释并将所有活动记录保存在一个分区中。

结论
随着我们持续使用 Apache Hudi 编写 Spark 应用程序,我们将继续改进加载数据的策略,上述尝试只是用 Hudi 实现 SCD-2 功能的一个开始。


最新经典文章,欢迎关注公众号


原文链接
https://mp.weixin.qq.com/s/e4MgGY4pFcVkgjAmrhW5Bw


加微信w3aboutyun,可拉入技术爱好者群

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条