本帖最后由 nettman 于 2022-6-27 23:40 编辑
问题导读
1.SparkSQL模式演进需要设置哪个参数?
2.如何添加列?
3.如何修改列?
Schema Evolution(模式演进)允许用户轻松更改 Hudi 表的当前模式,以适应随时间变化的数据。从 0.11.0 版本开始,支持 Spark SQL(spark3.1.x 和 spark3.2.1)对 Schema 演进的 DDL 支持并且标志为实验性的。
场景
- 可以添加、删除、修改和移动列(包括嵌套列)
- 分区列不能演进
- 不能对 Array 类型的嵌套列进行添加、删除或操作
SparkSQL模式演进以及语法描述
使用模式演进之前,请先设置spark.sql.extensions,对于spark 3.2.x,需要设置spark.sql.catalog.spark_catalog
- # Spark SQL for spark 3.1.x
- spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.1.2 \
- --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
- --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
-
- # Spark SQL for spark 3.2.1
- spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.2.1 \
- --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
- --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
- --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
复制代码
启动spark app后,请执行set schema.on.read.enable=true开启模式演进
当前模式演进开启后不能关闭
添加列
语法
-- add columns
ALTER TABLE Table name ADD COLUMNS(col_spec[, col_spec ...])
参数描述
col_name : 新列名,强制必须存在,如果在嵌套类型中添加子列,请指定子列的全路径
示例
- 在嵌套类型users struct<name: string, age int>中添加子列col1,设置字段为users.col1
- 在嵌套map类型member map<string, struct<n: string, a: int>>中添加子列col1, 设置字段为member.value.col1
col_type : 新列的类型
nullable : 新列是否可为null,可为空,当前Hudi中并未使用
comment : 新列的注释,可为空
col_position : 列添加的位置,值可为FIRST或者AFTER 某字段
- 如果设置为FIRST,那么新加的列在表的第一列
- 如果设置为AFTER 某字段,将在某字段后添加新列
- 如果设置为空,只有当新的子列被添加到嵌套列时,才能使用 FIRST。不要在顶级列中使用 FIRST。AFTER 的使用没有限制。
示例
- alter table h0 add columns(ext0 string);
- alter table h0 add columns(new_col int not null comment 'add new column' after col1);
- alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);
复制代码
修改列
语法
- -- alter table ... alter column
- ALTER TABLE Table name ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name
复制代码
参数描述
示例
- --- Changing the column type
- ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
-
- --- Altering other attributes
- ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
- ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
- ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
- ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
复制代码
列类型变更兼容表
删除列
语法
-- alter table ... drop columns
ALTER TABLE tableName DROP COLUMN|COLUMNS cols
示例
- <div>ALTER TABLE table1 DROP COLUMN a.b.c</div><div>ALTER TABLE table1 DROP COLUMNS a.b.c, x, y</div>
复制代码
修改列名
语法
-- alter table ... rename column
ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName
示例
- ALTER TABLE table1 RENAME COLUMN a.b.c TO x
复制代码
修改表属性
语法
-- alter table ... set|unset
ALTER TABLE Table name SET|UNSET tblproperties
示例- ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value')</div><div>ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')
复制代码
修改表名
语法
-- alter table ... rename
ALTER TABLE tableName RENAME TO newTableName
示例
- ALTER TABLE table1 RENAME TO table2
复制代码
0.11.0之前的模式演进
模式演进是数据管理的一个非常重要的方面。Hudi 支持开箱即用的常见模式演进场景,例如添加可为空的字段或提升字段的数据类型。此外,演进后的模式可以跨引擎查询,例如 Presto、Hive 和 Spark SQL。下表总结了与不同 Hudi 表类型兼容的Schema变更类型。
让我们通过一个示例来演示 Hudi 中的模式演进支持。在下面的示例中,我们将添加一个新的字符串字段并将字段的数据类型从 int 更改为 long。
- Welcome to
- ____ __
- / __/__ ___ _____/ /__
- _\ \/ _ \/ _ `/ __/ '_/
- /___/ .__/\_,_/_/ /_/\_\ version 3.1.2
- /_/
-
- Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
- Type in expressions to have them evaluated.
- Type :help for more information.
-
- scala> import org.apache.hudi.QuickstartUtils._
- import org.apache.hudi.QuickstartUtils._
-
- scala> import scala.collection.JavaConversions._
- import scala.collection.JavaConversions._
-
- scala> import org.apache.spark.sql.SaveMode._
- import org.apache.spark.sql.SaveMode._
-
- scala> import org.apache.hudi.DataSourceReadOptions._
- import org.apache.hudi.DataSourceReadOptions._
-
- scala> import org.apache.hudi.DataSourceWriteOptions._
- import org.apache.hudi.DataSourceWriteOptions._
-
- scala> import org.apache.hudi.config.HoodieWriteConfig._
- import org.apache.hudi.config.HoodieWriteConfig._
-
- scala> import org.apache.spark.sql.types._
- import org.apache.spark.sql.types._
-
- scala> import org.apache.spark.sql.Row
- import org.apache.spark.sql.Row
-
- scala> val tableName = "hudi_trips_cow"
- tableName: String = hudi_trips_cow
- scala> val basePath = "file:///tmp/hudi_trips_cow"
- basePath: String = file:///tmp/hudi_trips_cow
- scala> val schema = StructType( Array(
- | StructField("rowId", StringType,true),
- | StructField("partitionId", StringType,true),
- | StructField("preComb", LongType,true),
- | StructField("name", StringType,true),
- | StructField("versionId", StringType,true),
- | StructField("intToLong", IntegerType,true)
- | ))
- schema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,IntegerType,true))
-
- scala> val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
- | Row("row_2", "part_0", 0L, "john", "v_0", 0),
- | Row("row_3", "part_0", 0L, "tom", "v_0", 0))
- data1: Seq[org.apache.spark.sql.Row] = List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])
-
- scala> var dfFromData1 = spark.createDataFrame(data1, schema)
- scala> dfFromData1.write.format("hudi").
- | options(getQuickstartWriteConfigs).
- | option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
- | option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
- | option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
- | option("hoodie.index.type","SIMPLE").
- | option(TABLE_NAME.key, tableName).
- | mode(Overwrite).
- | save(basePath)
-
- scala> var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
- tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]
-
- scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
-
- scala> spark.sql("desc hudi_trips_snapshot").show()
- +--------------------+---------+-------+
- | col_name|data_type|comment|
- +--------------------+---------+-------+
- | _hoodie_commit_time| string| null|
- |_hoodie_commit_seqno| string| null|
- | _hoodie_record_key| string| null|
- |_hoodie_partition...| string| null|
- | _hoodie_file_name| string| null|
- | rowId| string| null|
- | partitionId| string| null|
- | preComb| bigint| null|
- | name| string| null|
- | versionId| string| null|
- | intToLong| int| null|
- +--------------------+---------+-------+
-
- scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
- +-----+-----------+-------+----+---------+---------+
- |rowId|partitionId|preComb|name|versionId|intToLong|
- +-----+-----------+-------+----+---------+---------+
- |row_3| part_0| 0| tom| v_0| 0|
- |row_2| part_0| 0|john| v_0| 0|
- |row_1| part_0| 0| bob| v_0| 0|
- +-----+-----------+-------+----+---------+---------+
-
- // In the new schema, we are going to add a String field and
- // change the datatype `intToLong` field from int to long.
- scala> val newSchema = StructType( Array(
- | StructField("rowId", StringType,true),
- | StructField("partitionId", StringType,true),
- | StructField("preComb", LongType,true),
- | StructField("name", StringType,true),
- | StructField("versionId", StringType,true),
- | StructField("intToLong", LongType,true),
- | StructField("newField", StringType,true)
- | ))
- newSchema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,LongType,true), StructField(newField,StringType,true))
-
- scala> val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
- | Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
- | Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
- data2: Seq[org.apache.spark.sql.Row] = List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])
-
- scala> var dfFromData2 = spark.createDataFrame(data2, newSchema)
- scala> dfFromData2.write.format("hudi").
- | options(getQuickstartWriteConfigs).
- | option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
- | option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
- | option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
- | option("hoodie.index.type","SIMPLE").
- | option(TABLE_NAME.key, tableName).
- | mode(Append).
- | save(basePath)
-
- scala> var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
- tripsSnapshotDF2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]
-
- scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")
-
- scala> spark.sql("desc hudi_trips_snapshot").show()
- +--------------------+---------+-------+
- | col_name|data_type|comment|
- +--------------------+---------+-------+
- | _hoodie_commit_time| string| null|
- |_hoodie_commit_seqno| string| null|
- | _hoodie_record_key| string| null|
- |_hoodie_partition...| string| null|
- | _hoodie_file_name| string| null|
- | rowId| string| null|
- | partitionId| string| null|
- | preComb| bigint| null|
- | name| string| null|
- | versionId| string| null|
- | intToLong| bigint| null|
- | newField| string| null|
- +--------------------+---------+-------+
-
-
- scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
- +-----+-----------+-------+-------+---------+---------+----------+
- |rowId|partitionId|preComb| name|versionId|intToLong| newField|
- +-----+-----------+-------+-------+---------+---------+----------+
- |row_3| part_0| 0| tom| v_0| 0| null|
- |row_2| part_0| 5| john| v_3| 3|newField_1|
- |row_1| part_0| 0| bob| v_0| 0| null|
- |row_5| part_0| 5| maroon| v_2| 2|newField_1|
- |row_9| part_0| 5|michael| v_2| 2|newField_1|
- +-----+-----------+-------+-------+---------+---------+----------+
复制代码
最新经典文章,欢迎关注公众号
链接:
https://mp.weixin.qq.com/s/4ThnSLhp2N8No2_PzaBa9A
|