levycui 发表于 2020-11-4 18:38:04

Flink1.11实践汇总:含table、kafka、hive等

本帖最后由 levycui 于 2020-11-4 22:12 编辑

问题导读:
1、Flink1.11如何将 聚合,update数据写入到kafka?
2、Flink 1.11 如何操作table案例?
3、读取kafka数据写入hive中,无分区信息及读取不到数据如何解决?
4、如何添加自定义分区时间抽取类 MyPartTimeExtractor ?



实践一、参考社区实现 Flink1.11将 聚合,update数据写入到kafka

1,快速链接社区文档:
https://github.com/ververica/flink-cdc-connectors

下载依赖包:

   <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <!-- add the dependency matching your database -->
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>1.0.0</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <!-- add the dependency matching your database -->
      <artifactId>flink-sql-connector-mysql-cdc</artifactId>
      <version>1.0.0</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <!-- add the dependency matching your database -->
      <artifactId>flink-format-changelog-json</artifactId>
      <version>1.0.0</version>
    </dependency>

我们观察到changelog- json 代码逻辑与canal-json一样 :一个工厂类,一个反序列化,一个序列化类



上代码测试:
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.EnvironmentSettings
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

    object CdcSinkKafka {
      def main(args: Array): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.enableCheckpointing(30001)
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
      val stenv = StreamTableEnvironment.create(env, bsSettings)

      val source =

          s"""
             |CREATE TABLE kafka_table (
             | category_id STRING,
             | user_id INT,
             | item_id STRING,
             | behavior STRING,
             | ts STRING
             |) WITH (
             | 'connector' = 'kafka',
             | 'topic' = 'user_behavior',
             | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
             | 'properties.group.id' = 'test1',
             | 'format' = 'json',
             | 'scan.startup.mode' = 'earliest-offset'
             |)
         """.stripMargin

      stenv.executeSql(source)

      val sink =
          s"""
             |CREATE TABLE kafka_gmv (
             |id STRING,
             |gmv DECIMAL(10, 5)
             |) WITH (
             |    'connector' = 'kafka',
             |    'topic' = 'kafka_gmv',
             |    'scan.startup.mode' = 'earliest-offset',
             |    'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
             |    'format' = 'changelog-json'
             |)
         """.stripMargin

      stenv.executeSql(sink)
      val insert =
          s"""
             | INSERT INTO kafka_gmv
             |    SELECT behavior, SUM(user_id) as gmv
             |    FROM kafka_table
             |    GROUP BY behavior
         """.stripMargin
      stenv.executeSql(insert)
      val query =
          s"""
             |SELECT * FROM kafka_gmv;
         """.stripMargin

      stenv.executeSql(query).print()


      }
    }



kafka查看数据:

{"data":{"id":"pv","gmv":955},"op":"-U"}
{"data":{"id":"pv","gmv":955},"op":"+U"}


实践二、Flink 1.11 table案例

1,先来的简单点的,折腾了半天 昨天是maven下载不了flink-clients.jar ,下载之后手动导入,然后最简单的代码都运行不了

今天没办法,还是报错,缺包(org.apache.flink.optimizer.costs.CostEstimator),没法子,缺的包就是flink-clients.jar里面的,

再次尝试加入依赖,今天是成功了,代码就执行成功了。

官网中文文档地址:https://ci.apache.org/projects/f ... A%E8%A7%86%E5%9B%BE

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    public class SqlTest03 {
      public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);

    // ingest a DataStream from an external source
            DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new SourceFunction<Tuple3<Long, String, Integer>>() {
                @Override
                public void run(SourceContext<Tuple3<Long, String, Integer>> out) throws Exception {
                  while (true){
                        out.collect(new Tuple3<>(1L,"a",11));
                        Thread.sleep(1000L);
                  }
                }
                @Override
                public void cancel() {

                }
            });
            Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
            DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
            dsRow.print();
            env.execute();

      }
    }
顺便提一句关于 最后面的执行:



实践三、flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决

一、前言

在上一博客中写了flink1.11.0读取kafka数据写入到hive中,发现hive中无法查询flink通过scala写入的数据,搜了些资料查找原因,参考了下文章:https://zhuanlan.zhihu.com/p/157899980 里无法读取hive数据的原因,但里面比较明确给出的解决方案是修改源码,我觉得太麻烦了。查了下官方和阅读些flink源码,终于找到一种我认为比较便捷的解决方案,具体分析方法如下:

完整的flink读取kafka数据动态写出hive,实现实时数仓的代码demo请参考上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830

二、分析过程

1.StreamingFileCommitter 类中 commitPartitions 方法会调用PartitionTimeCommitTigger 类中的committablePartitions方法获取可提交的分区列表,然后变量分区提交分区。

2.主要是 PartitionTimeCommitTigger类中的方法committablePartitions 用来获取需要提交分区的列表




里面需要watermark > toMills(partTime) + commitDelay 成立 时才会把分区添加到需要提交分区列表中,这里是问题的关键,里面的 toMills(partTime) 方法转为毫秒时间时 会把 partTime 时间 往后8小时,所以会一直大于watermark 的值一直无法添加分区到需提交分区列表中


3.PartitionTimeExtractor 类   上图中的 extractor.extract()方法是实现了分区时间抽取接口 PartitionTimeExtractor 中的方法,查看flink官网:https://ci.apache.org/projects/f ... ors/filesystem.html 可以看出,可以自定义类实现 PartitionTimeExtractor 接口,重写里面的extract方法



4. PartitionTimeExtractor 实例对象 extractor 是由PartitionTimeCommitTigger 类的构造方法根据读取配置信息来创建的,

当 'partition.time-extractor.kind'='custom' 是 使用自定义的实现PartitionTimeExtractor接口的分区时间抽取类,否则使用的是默认的DefaultPartTimeExtractor





5. 解决方案:使用自定义的分区抽取时间实现类MyPartTimeExtractor,重写 extract方法,方法的返回值partTime 时间减少8小时,如步骤2中所说toMills(partTime)方法会把partTime时间因为时区问题多加8小时,这样一减一加则抵消掉时区的影响了。

三、解决方案实现

1.添加自定义分区时间抽取类 MyPartTimeExtractor 。代码主要是复制默认PartitionTimeExtractor 另一实现类DefaultPartTimeExtractor 的逻辑,修改如下一行代码,减去8小时。


实现类完整代码参考 上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830

2.hive建表是添加partition.time-extractor.kind 和 partition.time-extractor.class 属性

如下:


3.hive验证


来源:
参考社区实现 Flink1.11将 聚合,update数据写入到kafka
https://www.pianshen.com/article/58101740377/

Flink 1.11 table案例
https://www.pianshen.com/article/73321658980/

flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决
https://www.pianshen.com/article/46751840422/

最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
页: [1]
查看完整版本: Flink1.11实践汇总:含table、kafka、hive等