分享

Apache Hudi实战之回调功能简介及示例实现

问题导读:
1、Hudi0.6.0版本有哪些新功能?
2、支持的回调方式有哪些?
3、为什么回调接收服务需使用 callbackMsg字段接收信息?
4、Hudi 如何使用HTTP方式?


1. 功能简介

从0.6.0版本开始,Hudi开始支持 commit 回调功能,即每当Hudi成功提交一次 commit, 其内部的回调服务就会向外部系统发出一条回调信息,用户可以根据该回调信息查询Hudi表的增量数据,并根据具体需求进行相应的业务处理。

1.1 支持的回调方式

当前 HoodieDeltaStreamer 可通过 HTTP(默认) 和 Kafka 两种方式向外部发送回调信息,而 SparkDataSource 暂只支持 HTTP 一种。两种数据摄入方式在使用回调功能上没有区别(除了回调方式支持不同外),均通过参数配置实现。

配置相应参数后,启动的任务会在每次成功提交后像外部系统发送 Json 格式的回调信息,信息样例:

  1. {"commitTime":"20201225152956","tableName":"callback_test","basePath":"file:///tmp/hudi_callback"}
复制代码

1.2 参数配置
1.2.1 HTTP

HTTP 回调是默认方式,可通过配置下列参数启用.

必配参数:
  1. ## 是否开启回调功能,默认false
  2. hoodie.write.commit.callback.on=true
  3. ## 回调地址(必填)
  4. hoodie.write.commit.callback.http.url=http://ip:端口/callback
复制代码

可选参数:
  1. ## 超时时间,默认三秒
  2. hoodie.write.commit.callback.http.timeout.seconds=xxx
  3. ## api key 默认:hudi_write_commit_http_callback
  4. hoodie.write.commit.callback.http.api.key=fake_api_key
复制代码

    Note: 回调接收服务需使用 callbackMsg 字段接收信息。

1.2.2 Kafka

Kafka回调目前只支持 HoodieDeltaStreamer,使用方式与 HTTP类似.

必配参数:

  1. ## 是否开启回调功能,默认false
  2. hoodie.write.commit.callback.on=true
  3. ## 回调方式,使用Kafka实现类
  4. hoodie.write.commit.callback.class=org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback
  5. ## Kafka server
  6. hoodie.write.commit.callback.kafka.bootstrap.servers=xxx:9092
  7. ## 回调 kafka 主题
  8. hoodie.write.commit.callback.kafka.topic=xxx_topic
复制代码

可选参数:
  1. ## 输出写出的分区,默认 0
  2. hoodie.write.commit.callback.kafka.partition=分区数
  3. ## ack 级别,默认 all
  4. hoodie.write.commit.callback.kafka.acks=all
  5. ## 失败重试次数,默认 3
  6. hoodie.write.commit.callback.kafka.retries=重试次数
复制代码

2. HTTP方式使用示例

由于使用方式相同,这里方便起见我们使用HTTP 方式示例。

数据写入代码:
  1. @Test
  2. def insert(): Unit = {
  3.   val inserts = convertToStringList(dataGen.generateInserts(100))
  4.   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  5.   df.write.format("hudi").
  6.     options(getQuickstartWriteConfigs).
  7.     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  8.     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  9.     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  10.     option(HoodieWriteCommitCallbackConfig.CALLBACK_ON, "true").
  11.     option(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP, "http://localhost:8080/callback").
  12.     option(TABLE_NAME, tableName).
  13.     mode(Overwrite).
  14.     save(basePath)
  15. }
  16. Note:
  17. tableName = "callback_test"
  18. basePath = "file:///tmp/hudi_callback"
复制代码

接收回调信息的服务:
  1. @RestController
  2. public class HoodieCallbackController {
  3.   @RequestMapping("/callback")
  4.   public void callback(@RequestBody String callbackMsg) {
  5.     System.out.println(callbackMsg);
  6.   }
  7. }
复制代码

启动insert方法,控制台打印出回调信息:

  1.   .   ____          _            __ _ _
  2. /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
  3. ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
  4. \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  5.   '  |____| .__|_| |_|_| |_\__, | / / / /
  6. =========|_|==============|___/=/_/_/_/
  7. :: Spring Boot ::                (v2.4.1)
  8. 2020-12-25 15:29:37.732  INFO 3860 --- [           main] c.h.h.HudiCallbackApplication            : Starting HudiCallbackApplication using Java 1.8.0_211 on Mathieu with PID 3860 (/Users/wangxianghu/github/hudi-callback/target/classes started by wangxianghu in /Users/wangxianghu/github/hudi-callback)
  9. 2020-12-25 15:29:37.735  INFO 3860 --- [           main] c.h.h.HudiCallbackApplication            : No active profile set, falling back to default profiles: default
  10. 2020-12-25 15:29:38.386  INFO 3860 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
  11. 2020-12-25 15:29:38.394  INFO 3860 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
  12. 2020-12-25 15:29:38.394  INFO 3860 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
  13. 2020-12-25 15:29:38.443  INFO 3860 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
  14. 2020-12-25 15:29:38.443  INFO 3860 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 674 ms
  15. 2020-12-25 15:29:38.577  INFO 3860 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
  16. 2020-12-25 15:29:38.737  INFO 3860 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
  17. 2020-12-25 15:29:38.747  INFO 3860 --- [           main] c.h.h.HudiCallbackApplication            : Started HudiCallbackApplication in 1.279 seconds (JVM running for 1.698)
  18. 2020-12-25 15:29:59.893  INFO 3860 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
  19. 2020-12-25 15:29:59.894  INFO 3860 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
  20. 2020-12-25 15:29:59.894  INFO 3860 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
  21. {"commitTime":"20201225152956","tableName":"callback_test","basePath":"file:///tmp/hudi_callback"}
复制代码

3. 总结

本文简要介绍了Hudi支持的回调方式以及各种方式的详细配置,并以 HTTP回调方式为例做了简要示范。
2020-12-29_215915.png

作者:wangxianghu
来源:https://mp.weixin.qq.com/s/M-ptQ3olf14IxaAPu9t5ww

最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条