levycui 发表于 2020-8-11 17:28:31

Flink Sink应用于Clickhouse数据库

本帖最后由 levycui 于 2020-8-11 17:31 编辑



下载链接:

Flink-Clickhouse-Sink

描述
Flink Sink用于Clickhouse数据库。 由异步Http客户端提供支持。
用于将数据加载到Clickhouse的高性能库。
它有两个加载数据的触发器:超时和缓冲区大小。



安装
Maven Central

<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.1.0</version>
</dependency>

使用Propertiesflink-clickhouse-sink使用配置属性的两个部分:common和操作员链中的每个接收器。

The common part (use like global):
clickhouse.sink.num-writers - number of writers, which build andsend requests,
clickhouse.sink.queue-max-capacity - max capacity (batches) of blank's queue,
clickhouse.sink.timeout-sec - timeout for loading data,
clickhouse.sink.retries - max number of retries,
clickhouse.sink.failed-records-path- path for failed records.
The sink part (use in chain):
clickhouse.sink.target-table - target table in Clickhouse,
clickhouse.sink.max-buffer-size- buffer size.

代码最主要的是:clickhouse-sink可处理字符串(Clickhouse插入格式,如CSV)格式的事件。 您必须将事件转换为csv格式(就像在数据库中通常插入一样)。

例如,您有event-pojo:
class A {
   public final String str;
   public final int integer;
   
   public A(String str, int i){
       this.str = str;
       this.integer = i;
   }
}

然后添加记录到接收器。
您必须为Flink环境添加全局参数:

public static String convertToCsv(A a) {
    StringBuilder builder = new StringBuilder();
    builder.append("(");
   
    // add a.str
    builder.append("'");
    builder.append(a.str);
    builder.append("', ");
   
    // add a.intger
    builder.append(String.valueOf(a.integer));
    builder.append(" )");
    return builder.toString();
}

并添加您的接收器,如下所示:
// create converter
public YourEventConverter {
    String toClickHouseInsertFormat (YourEvent yourEvent){
      String chFormat = ...;
      ....
      return chFormat;
    }
}

// create props for sink
Properties props = new Properties();
props.put(ClickhouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickhouseSinkConsts.MAX_BUFFER_SIZE, "10000");

// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
          .name("convert YourEvent to Clickhouse table format")
          .addSink(new ClickhouseSink(props))
          .name("your_table clickhouse sink);

线路图
[*] reading files from "failed-records-path"
[*] migrate to gradle

作者:ivi-ru
来源:https://github.com/ivi-ru/flink-clickhouse-sink

页: [1]
查看完整版本: Flink Sink应用于Clickhouse数据库