分享

详解Apache Hudi Schema Evolution(模式演进)

本帖最后由 nettman 于 2022-6-27 23:40 编辑

问题导读

1.SparkSQL模式演进需要设置哪个参数?
2.如何添加列?
3.如何修改列?


Schema Evolution(模式演进)允许用户轻松更改 Hudi 表的当前模式,以适应随时间变化的数据。从 0.11.0 版本开始,支持 Spark SQL(spark3.1.x 和 spark3.2.1)对 Schema 演进的 DDL 支持并且标志为实验性的。

场景
  • 可以添加、删除、修改和移动列(包括嵌套列)
  • 分区列不能演进
  • 不能对 Array 类型的嵌套列进行添加、删除或操作

SparkSQL模式演进以及语法描述
使用模式演进之前,请先设置spark.sql.extensions,对于spark 3.2.x,需要设置spark.sql.catalog.spark_catalog

  1. # Spark SQL for spark 3.1.x
  2. spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.1.2 \
  3. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  4. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  5. # Spark SQL for spark 3.2.1
  6. spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.2.1 \
  7. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  8. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  9. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
复制代码
启动spark app后,请执行set schema.on.read.enable=true开启模式演进

当前模式演进开启后不能关闭


添加列

语法

-- add columns
ALTER TABLE Table name ADD COLUMNS(col_spec[, col_spec ...])


参数描述


1.png

col_name : 新列名,强制必须存在,如果在嵌套类型中添加子列,请指定子列的全路径

示例

  • 在嵌套类型users struct<name: string, age int>中添加子列col1,设置字段为users.col1

  • 在嵌套map类型member map<string, struct<n: string, a: int>>中添加子列col1, 设置字段为member.value.col1

col_type : 新列的类型

nullable : 新列是否可为null,可为空,当前Hudi中并未使用

comment : 新列的注释,可为空

col_position : 列添加的位置,值可为FIRST或者AFTER 某字段

  • 如果设置为FIRST,那么新加的列在表的第一列
  • 如果设置为AFTER 某字段,将在某字段后添加新列
  • 如果设置为空,只有当新的子列被添加到嵌套列时,才能使用 FIRST。不要在顶级列中使用 FIRST。AFTER 的使用没有限制。

示例
  1. alter table h0 add columns(ext0 string);
  2. alter table h0 add columns(new_col int not null comment 'add new column' after col1);
  3. alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);
复制代码

修改列

语法
  1. -- alter table ... alter column
  2. ALTER TABLE Table name ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name
复制代码
参数描述

1.png

示例
  1. --- Changing the column type
  2. ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
  3. --- Altering other attributes
  4. ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
  5. ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
  6. ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
  7. ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
复制代码

列类型变更兼容表

1.png

删除列

语法

-- alter table ... drop columns
ALTER TABLE tableName DROP COLUMN|COLUMNS cols

示例

  1. <div>ALTER TABLE table1 DROP COLUMN a.b.c</div><div>ALTER TABLE table1 DROP COLUMNS a.b.c, x, y</div>
复制代码

修改列名

语法

-- alter table ... rename column
ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName

示例

  1. ALTER TABLE table1 RENAME COLUMN a.b.c TO x
复制代码

修改表属性

语法

-- alter table ... set|unset
ALTER TABLE Table name SET|UNSET tblproperties

示例
  1. ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value')</div><div>ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')
复制代码

修改表名

语法

-- alter table ... rename
ALTER TABLE tableName RENAME TO newTableName

示例
  1. ALTER TABLE table1 RENAME TO table2
复制代码

0.11.0之前的模式演进

模式演进是数据管理的一个非常重要的方面。Hudi 支持开箱即用的常见模式演进场景,例如添加可为空的字段或提升字段的数据类型。此外,演进后的模式可以跨引擎查询,例如 Presto、Hive 和 Spark SQL。下表总结了与不同 Hudi 表类型兼容的Schema变更类型。

1.png

让我们通过一个示例来演示 Hudi 中的模式演进支持。在下面的示例中,我们将添加一个新的字符串字段并将字段的数据类型从 int 更改为 long。

  1. Welcome to
  2.     ____              __
  3.     / __/__  ___ _____/ /__
  4.     _\ \/ _ \/ _ `/ __/  '_/
  5.     /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
  6.     /_/
  7.     Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
  8.     Type in expressions to have them evaluated.
  9.     Type :help for more information.
  10. scala> import org.apache.hudi.QuickstartUtils._
  11. import org.apache.hudi.QuickstartUtils._
  12. scala> import scala.collection.JavaConversions._
  13. import scala.collection.JavaConversions._
  14. scala> import org.apache.spark.sql.SaveMode._
  15. import org.apache.spark.sql.SaveMode._
  16. scala> import org.apache.hudi.DataSourceReadOptions._
  17. import org.apache.hudi.DataSourceReadOptions._
  18. scala> import org.apache.hudi.DataSourceWriteOptions._
  19. import org.apache.hudi.DataSourceWriteOptions._
  20. scala> import org.apache.hudi.config.HoodieWriteConfig._
  21. import org.apache.hudi.config.HoodieWriteConfig._
  22. scala> import org.apache.spark.sql.types._
  23. import org.apache.spark.sql.types._
  24. scala> import org.apache.spark.sql.Row
  25. import org.apache.spark.sql.Row
  26. scala> val tableName = "hudi_trips_cow"
  27.     tableName: String = hudi_trips_cow
  28. scala> val basePath = "file:///tmp/hudi_trips_cow"
  29.     basePath: String = file:///tmp/hudi_trips_cow
  30. scala> val schema = StructType( Array(
  31.     | StructField("rowId", StringType,true),
  32.     | StructField("partitionId", StringType,true),
  33.     | StructField("preComb", LongType,true),
  34.     | StructField("name", StringType,true),
  35.     | StructField("versionId", StringType,true),
  36.     | StructField("intToLong", IntegerType,true)
  37.     | ))
  38.     schema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,IntegerType,true))
  39.    
  40. scala> val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
  41.     |                Row("row_2", "part_0", 0L, "john", "v_0", 0),
  42.     |                Row("row_3", "part_0", 0L, "tom", "v_0", 0))
  43.     data1: Seq[org.apache.spark.sql.Row] = List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])
  44. scala> var dfFromData1 = spark.createDataFrame(data1, schema)
  45. scala> dfFromData1.write.format("hudi").
  46.     |   options(getQuickstartWriteConfigs).
  47.     |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
  48.     |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
  49.     |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
  50.     |   option("hoodie.index.type","SIMPLE").
  51.     |   option(TABLE_NAME.key, tableName).
  52.     |   mode(Overwrite).
  53.     |   save(basePath)
  54. scala> var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
  55.     tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]
  56. scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
  57. scala> spark.sql("desc hudi_trips_snapshot").show()
  58.     +--------------------+---------+-------+
  59.     |            col_name|data_type|comment|
  60.     +--------------------+---------+-------+
  61.     | _hoodie_commit_time|   string|   null|
  62.     |_hoodie_commit_seqno|   string|   null|
  63.     |  _hoodie_record_key|   string|   null|
  64.     |_hoodie_partition...|   string|   null|
  65.     |   _hoodie_file_name|   string|   null|
  66.     |               rowId|   string|   null|
  67.     |         partitionId|   string|   null|
  68.     |             preComb|   bigint|   null|
  69.     |                name|   string|   null|
  70.     |           versionId|   string|   null|
  71.     |           intToLong|      int|   null|
  72.     +--------------------+---------+-------+
  73.    
  74. scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
  75.     +-----+-----------+-------+----+---------+---------+
  76.     |rowId|partitionId|preComb|name|versionId|intToLong|
  77.     +-----+-----------+-------+----+---------+---------+
  78.     |row_3|     part_0|      0| tom|      v_0|        0|
  79.     |row_2|     part_0|      0|john|      v_0|        0|
  80.     |row_1|     part_0|      0| bob|      v_0|        0|
  81.     +-----+-----------+-------+----+---------+---------+
  82. // In the new schema, we are going to add a String field and
  83. // change the datatype `intToLong` field from  int to long.
  84. scala> val newSchema = StructType( Array(
  85.     | StructField("rowId", StringType,true),
  86.     | StructField("partitionId", StringType,true),
  87.     | StructField("preComb", LongType,true),
  88.     | StructField("name", StringType,true),
  89.     | StructField("versionId", StringType,true),
  90.     | StructField("intToLong", LongType,true),
  91.     | StructField("newField", StringType,true)
  92.     | ))
  93.     newSchema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,LongType,true), StructField(newField,StringType,true))
  94. scala> val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
  95.     |                Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
  96.     |                Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
  97.     data2: Seq[org.apache.spark.sql.Row] = List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])
  98. scala> var dfFromData2 = spark.createDataFrame(data2, newSchema)
  99. scala> dfFromData2.write.format("hudi").
  100.     |   options(getQuickstartWriteConfigs).
  101.     |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
  102.     |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
  103.     |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
  104.     |   option("hoodie.index.type","SIMPLE").
  105.     |   option(TABLE_NAME.key, tableName).
  106.     |   mode(Append).
  107.     |   save(basePath)
  108. scala> var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
  109.     tripsSnapshotDF2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]
  110. scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")
  111. scala> spark.sql("desc hudi_trips_snapshot").show()
  112.     +--------------------+---------+-------+
  113.     |            col_name|data_type|comment|
  114.     +--------------------+---------+-------+
  115.     | _hoodie_commit_time|   string|   null|
  116.     |_hoodie_commit_seqno|   string|   null|
  117.     |  _hoodie_record_key|   string|   null|
  118.     |_hoodie_partition...|   string|   null|
  119.     |   _hoodie_file_name|   string|   null|
  120.     |               rowId|   string|   null|
  121.     |         partitionId|   string|   null|
  122.     |             preComb|   bigint|   null|
  123.     |                name|   string|   null|
  124.     |           versionId|   string|   null|
  125.     |           intToLong|   bigint|   null|
  126.     |            newField|   string|   null|
  127.     +--------------------+---------+-------+
  128. scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
  129.     +-----+-----------+-------+-------+---------+---------+----------+
  130.     |rowId|partitionId|preComb|   name|versionId|intToLong|  newField|
  131.     +-----+-----------+-------+-------+---------+---------+----------+
  132.     |row_3|     part_0|      0|    tom|      v_0|        0|      null|
  133.     |row_2|     part_0|      5|   john|      v_3|        3|newField_1|
  134.     |row_1|     part_0|      0|    bob|      v_0|        0|      null|
  135.     |row_5|     part_0|      5| maroon|      v_2|        2|newField_1|
  136.     |row_9|     part_0|      5|michael|      v_2|        2|newField_1|
  137.     +-----+-----------+-------+-------+---------+---------+----------+
复制代码



最新经典文章,欢迎关注公众号


链接:
https://mp.weixin.qq.com/s/4ThnSLhp2N8No2_PzaBa9A


加微信w3aboutyun,可拉入技术爱好者群

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条