分享

Apache Hudi集成Spark SQL抢先体验

问题导读

1.Hudi集成Spark SQL需要哪些准备?
2.如何设置并发度?
3.如何实现操作Hudi表数据?


1. 摘要
社区小伙伴一直期待的Hudi整合Spark SQL的[HUDI-1659](https://github.com/apache/hudi/pull/2645)正在积极Review中并已经快接近尾声,Hudi集成Spark SQL预计会在下个版本正式发布,在集成Spark SQL后,会极大方便用户对Hudi表的DDL/DML操作,下面来看看如何使用Spark SQL操作Hudi表。

2. 环境准备
首先需要将[HUDI-1659](https://github.com/apache/hudi/pull/2645)拉取到本地打包,生成SPARK_BUNDLE_JAR(hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar)包

2.1 启动spark-sql
在配置完spark环境后可通过如下命令启动spark-sql
  1. spark-sql --jars $PATH_TO_SPARK_BUNDLE_JAR  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
复制代码
2.2 设置并发度
由于Hudi默认upsert/insert/delete的并发度是1500,对于演示的小规模数据集可设置更小的并发度。


  1. set hoodie.upsert.shuffle.parallelism = 1;
  2. set hoodie.insert.shuffle.parallelism = 1;
  3. set hoodie.delete.shuffle.parallelism = 1;
复制代码
同时设置不同步Hudi表元数据
  1. set hoodie.datasource.meta.sync.enable=false;
复制代码


3. Create Table
使用如下SQL创建表
  1. create table test_hudi_table (
  2.   id int,
  3.   name string,
  4.   price double,
  5.   ts long,
  6.   dt string
  7. ) using hudi
  8. partitioned by (dt)
  9. options (
  10.   primaryKey = 'id',
  11.   type = 'mor'
  12. )
  13. location 'file:///tmp/test_hudi_table'
复制代码
说明:表类型为MOR,主键为id,分区字段为dt,合并字段默认为ts。

创建Hudi表后查看创建的Hudi表


  1. show create table test_hudi_table
复制代码


640.png

4. Insert Into

4.1 Insert

使用如下SQL插入一条记录
  1. insert into test_hudi_table select 1 as id, 'hudi' as name, 10 as price, 1000 as ts, '2021-05-05' as dt
复制代码
insert完成后查看Hudi表本地目录结构,生成的元数据、分区和数据与Spark Datasource写入均相同。



640.png

4.2 Select
使用如下SQL查询Hudi表数据

  1. select * from test_hudi_table
复制代码
查询结果如下


640.png

5. Update

5.1 Update

使用如下SQL将id为1的price字段值变更为20
  1. update test_hudi_table set price = 20.0 where id = 1
复制代码
5.2 Select
再次查询Hudi表数据


  1. select * from test_hudi_table
复制代码
查询结果如下,可以看到price已经变成了20.0


640.png

查看Hudi表的本地目录结构如下,可以看到在update之后又生成了一个deltacommit,同时生成了一个增量log文件。


640.png

6. Delete

6.1 Delete
使用如下SQL将id=1的记录删除

  1. delete from test_hudi_table where id = 1
复制代码
查看Hudi表的本地目录结构如下,可以看到delete之后又生成了一个deltacommit,同时生成了一个增量log文件。


640.png

6.2 Select
再次查询Hudi表

  1. select * from test_hudi_table;
复制代码
查询结果如下,可以看到已经查询不到任何数据了,表明Hudi表中已经不存在任何记录了。


640.png

7. Merge Into

7.1 Merge Into Insert

使用如下SQL向test_hudi_table插入数据
  1. merge into test_hudi_table as t0
  2. using (
  3.   select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
  4. ) as s0
  5. on t0.id = s0.id
  6. when not matched and s0.id % 2 = 1 then insert *
复制代码
7.2 Select
查询Hudi表数据


  1. select * from test_hudi_table
复制代码
查询结果如下,可以看到Hudi表中存在一条记录


640.png

7.3 Merge Into Update
使用如下SQL更新数据

  1. merge into test_hudi_table as t0
  2. using (
  3.   select 1 as id, 'a1' as name, 12 as price, 1001 as ts, '2021-03-21' as dt
  4. ) as s0
  5. on t0.id = s0.id
  6. when matched and s0.id % 2 = 1 then update set *
复制代码
7.4 Select
查询Hudi表

  1. select * from test_hudi_table
复制代码
查询结果如下,可以看到Hudi表中的分区已经更新了


640.png

7.6 Merge Into Delete
使用如下SQL删除数据

  1. merge into test_hudi_table t0
  2. using (
  3.   select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, '2021-03-21' as dt
  4. ) s0
  5. on t0.id = s0.s_id
  6. when matched and s_ts = 1001 then delete
复制代码
查询结果如下,可以看到Hudi表中已经没有数据了


640.png

8. 删除表
使用如下命令删除Hudi表

  1. drop table test_hudi_table;
复制代码
使用show tables查看表是否存在
  1. show tables;
复制代码
可以看到已经没有表了



640.png


9. 总结
通过上面示例简单展示了通过Spark SQL Insert/Update/Delete Hudi表数据,通过SQL方式可以非常方便地操作Hudi表,降低了使用Hudi的门槛。另外Hudi集成Spark SQL工作将继续完善语法,尽量对标Snowflake和BigQuery的语法,如插入多张表(INSERT ALL WHEN condition1 INTO t1 WHEN condition2 into t2),变更Schema以及CALL Cleaner、CALL Clustering等Hudi表服务。


最新经典文章,欢迎关注公众号



原文链接:
https://mp.weixin.qq.com/s/1ElF9fh5k7j7HD_3HKRDqQ


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条