本帖最后由 levycui 于 2020-11-4 22:12 编辑
问题导读:
1、Flink1.11如何将 聚合,update数据写入到kafka?
2、Flink 1.11 如何操作table案例?
3、读取kafka数据写入hive中,无分区信息及读取不到数据如何解决?
4、如何添加自定义分区时间抽取类 MyPartTimeExtractor ?
实践一、参考社区实现 Flink1.11将 聚合,update数据写入到kafka
1,快速链接社区文档:
https://github.com/ververica/flink-cdc-connectors
下载依赖包:
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <!-- add the dependency matching your database -->
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.0.0</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <!-- add the dependency matching your database -->
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>1.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <!-- add the dependency matching your database -->
- <artifactId>flink-format-changelog-json</artifactId>
- <version>1.0.0</version>
- </dependency>
-
复制代码
我们观察到changelog- json 代码逻辑与canal-json一样 :一个工厂类,一个反序列化,一个序列化类
上代码测试:
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.table.api.EnvironmentSettings
- import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
-
- object CdcSinkKafka {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.enableCheckpointing(30001)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
- val stenv = StreamTableEnvironment.create(env, bsSettings)
-
- val source =
-
- s"""
- |CREATE TABLE kafka_table (
- | category_id STRING,
- | user_id INT,
- | item_id STRING,
- | behavior STRING,
- | ts STRING
- |) WITH (
- | 'connector' = 'kafka',
- | 'topic' = 'user_behavior',
- | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
- | 'properties.group.id' = 'test1',
- | 'format' = 'json',
- | 'scan.startup.mode' = 'earliest-offset'
- |)
- """.stripMargin
-
- stenv.executeSql(source)
-
- val sink =
- s"""
- |CREATE TABLE kafka_gmv (
- | id STRING,
- | gmv DECIMAL(10, 5)
- |) WITH (
- | 'connector' = 'kafka',
- | 'topic' = 'kafka_gmv',
- | 'scan.startup.mode' = 'earliest-offset',
- | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
- | 'format' = 'changelog-json'
- |)
- """.stripMargin
-
- stenv.executeSql(sink)
- val insert =
- s"""
- | INSERT INTO kafka_gmv
- | SELECT behavior, SUM(user_id) as gmv
- | FROM kafka_table
- | GROUP BY behavior
- """.stripMargin
- stenv.executeSql(insert)
- val query =
- s"""
- |SELECT * FROM kafka_gmv;
- """.stripMargin
-
- stenv.executeSql(query).print()
-
-
- }
- }
复制代码
kafka查看数据:
{"data":{"id":"pv","gmv":955},"op":"-U"}
{"data":{"id":"pv","gmv":955},"op":"+U"}
实践二、Flink 1.11 table案例
1,先来的简单点的,折腾了半天 昨天是maven下载不了flink-clients.jar ,下载之后手动导入,然后最简单的代码都运行不了
今天没办法,还是报错,缺包(org.apache.flink.optimizer.costs.CostEstimator),没法子,缺的包就是flink-clients.jar里面的,
再次尝试加入依赖,今天是成功了,代码就执行成功了。
官网中文文档地址:https://ci.apache.org/projects/f ... A%E8%A7%86%E5%9B%BE
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- public class SqlTest03 {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
-
- // ingest a DataStream from an external source
- DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new SourceFunction<Tuple3<Long, String, Integer>>() {
- @Override
- public void run(SourceContext<Tuple3<Long, String, Integer>> out) throws Exception {
- while (true){
- out.collect(new Tuple3<>(1L,"a",11));
- Thread.sleep(1000L);
- }
- }
- @Override
- public void cancel() {
-
- }
- });
- Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
- DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
- dsRow.print();
- env.execute();
-
- }
- }
复制代码
顺便提一句关于 最后面的执行:
实践三、flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决
一、前言
在上一博客中写了flink1.11.0读取kafka数据写入到hive中,发现hive中无法查询flink通过scala写入的数据,搜了些资料查找原因,参考了下文章:https://zhuanlan.zhihu.com/p/157899980 里无法读取hive数据的原因,但里面比较明确给出的解决方案是修改源码,我觉得太麻烦了。查了下官方和阅读些flink源码,终于找到一种我认为比较便捷的解决方案,具体分析方法如下:
完整的flink读取kafka数据动态写出hive,实现实时数仓的代码demo请参考上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830
二、分析过程
1.StreamingFileCommitter 类中 commitPartitions 方法会调用PartitionTimeCommitTigger 类中的committablePartitions方法获取可提交的分区列表,然后变量分区提交分区。
2.主要是 PartitionTimeCommitTigger 类中的方法committablePartitions 用来获取需要提交分区的列表
里面需要 watermark > toMills(partTime) + commitDelay 成立 时才会把分区添加到需要提交分区列表中,这里是问题的关键,里面的 toMills(partTime) 方法转为毫秒时间时 会把 partTime 时间 往后8小时,所以会一直大于watermark 的值一直无法添加分区到需提交分区列表中
3.PartitionTimeExtractor 类 上图中的 extractor.extract()方法是实现了分区时间抽取接口 PartitionTimeExtractor 中的方法,查看flink官网:https://ci.apache.org/projects/f ... ors/filesystem.html 可以看出,可以自定义类实现 PartitionTimeExtractor 接口,重写里面的extract方法
4. PartitionTimeExtractor 实例对象 extractor 是由 PartitionTimeCommitTigger 类的构造方法根据读取配置信息来创建的,
当 'partition.time-extractor.kind'='custom' 是 使用自定义的实现PartitionTimeExtractor接口的分区时间抽取类,否则使用的是默认的DefaultPartTimeExtractor
5. 解决方案:使用自定义的分区抽取时间实现类MyPartTimeExtractor,重写 extract方法,方法的返回值partTime 时间减少8小时,如步骤2中所说 toMills(partTime)方法会把partTime时间因为时区问题多加8小时,这样一减一加则抵消掉时区的影响了。
三、解决方案实现
1.添加自定义分区时间抽取类 MyPartTimeExtractor 。 代码主要是复制默认PartitionTimeExtractor 另一实现类DefaultPartTimeExtractor 的逻辑,修改如下一行代码,减去8小时。
实现类完整代码参考 上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830
2.hive建表是添加 partition.time-extractor.kind 和 partition.time-extractor.class 属性
如下:
3.hive验证
来源:
参考社区实现 Flink1.11将 聚合,update数据写入到kafka
https://www.pianshen.com/article/58101740377/
Flink 1.11 table案例
https://www.pianshen.com/article/73321658980/
flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决
https://www.pianshen.com/article/46751840422/
最新经典文章,欢迎关注公众号
|