Flink Sink应用于Clickhouse数据库
本帖最后由 levycui 于 2020-8-11 17:31 编辑下载链接:
Flink-Clickhouse-Sink
描述
Flink Sink用于Clickhouse数据库。 由异步Http客户端提供支持。
用于将数据加载到Clickhouse的高性能库。
它有两个加载数据的触发器:超时和缓冲区大小。
安装
Maven Central
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.1.0</version>
</dependency>
使用Propertiesflink-clickhouse-sink使用配置属性的两个部分:common和操作员链中的每个接收器。
The common part (use like global):
clickhouse.sink.num-writers - number of writers, which build andsend requests,
clickhouse.sink.queue-max-capacity - max capacity (batches) of blank's queue,
clickhouse.sink.timeout-sec - timeout for loading data,
clickhouse.sink.retries - max number of retries,
clickhouse.sink.failed-records-path- path for failed records.
The sink part (use in chain):
clickhouse.sink.target-table - target table in Clickhouse,
clickhouse.sink.max-buffer-size- buffer size.
代码最主要的是:clickhouse-sink可处理字符串(Clickhouse插入格式,如CSV)格式的事件。 您必须将事件转换为csv格式(就像在数据库中通常插入一样)。
例如,您有event-pojo:
class A {
public final String str;
public final int integer;
public A(String str, int i){
this.str = str;
this.integer = i;
}
}
然后添加记录到接收器。
您必须为Flink环境添加全局参数:
public static String convertToCsv(A a) {
StringBuilder builder = new StringBuilder();
builder.append("(");
// add a.str
builder.append("'");
builder.append(a.str);
builder.append("', ");
// add a.intger
builder.append(String.valueOf(a.integer));
builder.append(" )");
return builder.toString();
}
并添加您的接收器,如下所示:
// create converter
public YourEventConverter {
String toClickHouseInsertFormat (YourEvent yourEvent){
String chFormat = ...;
....
return chFormat;
}
}
// create props for sink
Properties props = new Properties();
props.put(ClickhouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickhouseSinkConsts.MAX_BUFFER_SIZE, "10000");
// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
.name("convert YourEvent to Clickhouse table format")
.addSink(new ClickhouseSink(props))
.name("your_table clickhouse sink);
线路图
[*] reading files from "failed-records-path"
[*] migrate to gradle
作者:ivi-ru
来源:https://github.com/ivi-ru/flink-clickhouse-sink
页:
[1]