林宝宝 发表于 2019-8-8 12:10:09

Flink关于读取Kudu的源码实现及相关内容总结

本帖最后由 林宝宝 于 2019-8-8 12:28 编辑

问题导读:

1.Flink自定义kudu的source和sink是如何实现的?
2.flinkStreamSQL对于sql扩展了什么功能?
3.Flink Kudu Connector如何使用?

Flink 自定义 Kudu的 Sink 和 source
      参照github上bahir-flink项目写的,主要是将POJO类写入kudu,代码地址
自定义Flink的sink或source



FlinkStreamSQL的Kudu Sink

还有一种是根据fieldNames 和fieldTypes写入,详细可以看flinkStreamSQL中的kudu Sink,代码地址
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
将原有的异步查询修改为callBack方式
kudu-sink 与现有pom保持一致 添加-${git.branch}


Flink Kudu Connector
Flink Kudu连接器提供可以读写Kudu的源(KuduInputFormat)和接收器/输出(分别为KuduSink和KuduOutputFormat)。 要使用此连接器,可以将下面这些依赖项添加到项目中:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-kudu_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
版本兼容性:此模块与Apache Kudu 1.7.1(最新稳定版本)兼容。 注意,流连接器并不是Flink二进制包的一部分。 你需要将它们链接到作业jar以进行集群执行。 在此处了解如何链接它们以进行群集执行。 安装Kudu 按照Kudu安装指南中的说明进行操作。 你可以选择性地使用dockers文件夹中提供的docker镜像。
Kudu输入格式ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);

// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
      .create("books")
      .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
      .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
      .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
      .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
      .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
      .build();
   
// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
      .count();
      
env.execute();
Kudu输出格式ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);

// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
      .create("books")
      .createIfNotExist(true)
      .replicas(1)
      .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
      .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
      .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
      .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
      .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
      .build();

...

env.fromCollection(books)
      .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));

env.execute();
KuduSinkStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);

// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
      .create("books")
      .createIfNotExist(true)
      .replicas(1)
      .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
      .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
      .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
      .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
      .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
      .build();

...

env.fromCollection(books)
    .addSink(new KuduSink<>("172.25.0.6", tableInfo));

env.execute();
本文来源:https://blog.csdn.net/yidan7063/article/details/90642743                https://bahir.apache.org/docs/flink/current/flink-streaming-kudu/

最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201907/20/110231gzd1ckizdv384iv3.jpg

页: [1]
查看完整版本: Flink关于读取Kudu的源码实现及相关内容总结