Flink1.11实践汇总:含table、kafka、hive等
本帖最后由 levycui 于 2020-11-4 22:12 编辑问题导读:
1、Flink1.11如何将 聚合,update数据写入到kafka?
2、Flink 1.11 如何操作table案例?
3、读取kafka数据写入hive中,无分区信息及读取不到数据如何解决?
4、如何添加自定义分区时间抽取类 MyPartTimeExtractor ?
实践一、参考社区实现 Flink1.11将 聚合,update数据写入到kafka
1,快速链接社区文档:
https://github.com/ververica/flink-cdc-connectors
下载依赖包:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-format-changelog-json</artifactId>
<version>1.0.0</version>
</dependency>
我们观察到changelog- json 代码逻辑与canal-json一样 :一个工厂类,一个反序列化,一个序列化类
上代码测试:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object CdcSinkKafka {
def main(args: Array): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30001)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val stenv = StreamTableEnvironment.create(env, bsSettings)
val source =
s"""
|CREATE TABLE kafka_table (
| category_id STRING,
| user_id INT,
| item_id STRING,
| behavior STRING,
| ts STRING
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'user_behavior',
| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
| 'properties.group.id' = 'test1',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin
stenv.executeSql(source)
val sink =
s"""
|CREATE TABLE kafka_gmv (
|id STRING,
|gmv DECIMAL(10, 5)
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'kafka_gmv',
| 'scan.startup.mode' = 'earliest-offset',
| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
| 'format' = 'changelog-json'
|)
""".stripMargin
stenv.executeSql(sink)
val insert =
s"""
| INSERT INTO kafka_gmv
| SELECT behavior, SUM(user_id) as gmv
| FROM kafka_table
| GROUP BY behavior
""".stripMargin
stenv.executeSql(insert)
val query =
s"""
|SELECT * FROM kafka_gmv;
""".stripMargin
stenv.executeSql(query).print()
}
}
kafka查看数据:
{"data":{"id":"pv","gmv":955},"op":"-U"}
{"data":{"id":"pv","gmv":955},"op":"+U"}
实践二、Flink 1.11 table案例
1,先来的简单点的,折腾了半天 昨天是maven下载不了flink-clients.jar ,下载之后手动导入,然后最简单的代码都运行不了
今天没办法,还是报错,缺包(org.apache.flink.optimizer.costs.CostEstimator),没法子,缺的包就是flink-clients.jar里面的,
再次尝试加入依赖,今天是成功了,代码就执行成功了。
官网中文文档地址:https://ci.apache.org/projects/f ... A%E8%A7%86%E5%9B%BE
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
public class SqlTest03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new SourceFunction<Tuple3<Long, String, Integer>>() {
@Override
public void run(SourceContext<Tuple3<Long, String, Integer>> out) throws Exception {
while (true){
out.collect(new Tuple3<>(1L,"a",11));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
}
});
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.print();
env.execute();
}
}
顺便提一句关于 最后面的执行:
实践三、flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决
一、前言
在上一博客中写了flink1.11.0读取kafka数据写入到hive中,发现hive中无法查询flink通过scala写入的数据,搜了些资料查找原因,参考了下文章:https://zhuanlan.zhihu.com/p/157899980 里无法读取hive数据的原因,但里面比较明确给出的解决方案是修改源码,我觉得太麻烦了。查了下官方和阅读些flink源码,终于找到一种我认为比较便捷的解决方案,具体分析方法如下:
完整的flink读取kafka数据动态写出hive,实现实时数仓的代码demo请参考上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830
二、分析过程
1.StreamingFileCommitter 类中 commitPartitions 方法会调用PartitionTimeCommitTigger 类中的committablePartitions方法获取可提交的分区列表,然后变量分区提交分区。
2.主要是 PartitionTimeCommitTigger类中的方法committablePartitions 用来获取需要提交分区的列表
里面需要watermark > toMills(partTime) + commitDelay 成立 时才会把分区添加到需要提交分区列表中,这里是问题的关键,里面的 toMills(partTime) 方法转为毫秒时间时 会把 partTime 时间 往后8小时,所以会一直大于watermark 的值一直无法添加分区到需提交分区列表中
3.PartitionTimeExtractor 类 上图中的 extractor.extract()方法是实现了分区时间抽取接口 PartitionTimeExtractor 中的方法,查看flink官网:https://ci.apache.org/projects/f ... ors/filesystem.html 可以看出,可以自定义类实现 PartitionTimeExtractor 接口,重写里面的extract方法
4. PartitionTimeExtractor 实例对象 extractor 是由PartitionTimeCommitTigger 类的构造方法根据读取配置信息来创建的,
当 'partition.time-extractor.kind'='custom' 是 使用自定义的实现PartitionTimeExtractor接口的分区时间抽取类,否则使用的是默认的DefaultPartTimeExtractor
5. 解决方案:使用自定义的分区抽取时间实现类MyPartTimeExtractor,重写 extract方法,方法的返回值partTime 时间减少8小时,如步骤2中所说toMills(partTime)方法会把partTime时间因为时区问题多加8小时,这样一减一加则抵消掉时区的影响了。
三、解决方案实现
1.添加自定义分区时间抽取类 MyPartTimeExtractor 。代码主要是复制默认PartitionTimeExtractor 另一实现类DefaultPartTimeExtractor 的逻辑,修改如下一行代码,减去8小时。
实现类完整代码参考 上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830
2.hive建表是添加partition.time-extractor.kind 和 partition.time-extractor.class 属性
如下:
3.hive验证
来源:
参考社区实现 Flink1.11将 聚合,update数据写入到kafka
https://www.pianshen.com/article/58101740377/
Flink 1.11 table案例
https://www.pianshen.com/article/73321658980/
flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决
https://www.pianshen.com/article/46751840422/
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
页:
[1]