问题导读
1.什么是维表?
2.Flink中如何实现与维表join?
3.本文例子实现了什么功能?
FlinkSQL维表功能是Flink1.9新增功能,在Flink1.6版本的时候,阿里已经实现了流与维表的join。在新版本中融合了Blink功能。大家想学习,可以参考
https://github.com/DTStack/flinkStreamSQL
什么是维表
维表是动态表,表里所存储的数据有可能不变,也有可能定时更新,但是更新频率不是很频繁。在业务开发中一般的维表数据存储在关系型数据库如mysql,oracle等,也可能存储在hbase,redis等nosql数据库。
FlinkSQL实现流与维表的join 分两步
第一是用flink api实现维表的功能:
第二是解析流与维表join的sql语法转化成底层的flinkAPI
关联考虑问题
流与维表的join会碰到两个问题,一个是性能问题,因为流速要是很快,每一条数据都需要到维表做下join,但是维表的数据是存在第三方存储系统,如果实时访问第三方存储系统,一个是join的性能会差,每次都要走网络io;还有就是对第三方存储系统带来很大的压力,有可能会把第三方存储系统搞挂掉。所以解决的方法就是维表里的数据要缓存,可以全量缓存,这个主要是维表数据不大的情况,还有一个是LRU缓存,维表数据量比较大的情况。
第二个问题是流延迟过来的数据这么跟之前的维表数据做关联;这个就涉及到维表数据需要存储快照数据,所以这样的场景用hbase 做维表是比较适合的,因为hbase 是天生支持数据多版本的。
Flink 中的维表
如果要使用该功能,那就需要自己引入 Blink 的 Planner,而不是引用社区的 Planner。由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单,只要自定义实现 LookupableTableSource 接口,同时实现里面的方法就可以进行,下面来分析一下 LookupableTableSource的代码:
[mw_shl_code=bash,true]public interface LookupableTableSource<T> extends TableSource<T> {
TableFunction<T> getLookupFunction(String[] lookupKeys);
AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
boolean isAsyncEnabled();
}[/mw_shl_code]
isAsyncEnabled 方法主要表示该表是否支持异步访问外部数据源获取数据,当返回 true 时,那么在注册到 TableEnvironment 后,使用时会返回异步函数进行调用,当返回 false 时,则使同步访问函数。
可以看到 LookupableTableSource 这个接口中有三个方法
getLookupFunction 方法返回一个同步访问外部数据系统的函数,什么意思呢,就是你通过 Key 去查询外部数据库,需要等到返回数据后才继续处理数据,这会对系统处理的吞吐率有影响。
getAsyncLookupFunction 方法则是返回一个异步的函数,异步访问外部数据系统,获取数据,这能极大的提升系统吞吐率。具体是否要实现异步函数方法,这需要用户自己判定是否需要对异步访问的支持,如果同步方法的吞吐率已经满足要求,那可以先不用考虑异步的实现情况。
(1) 同步访问函数getLookupFunction
getLookupFunction 会返回同步方法,这里你需要自定义 TableFuntion 进行实现,TableFunction 本质是 UDTF,输入一条数据可能返回多条数据,也可能返回一条数据。用户自定义 TableFunction 格式如下:
[mw_shl_code=bash,true]public class MyLookupFunction extends TableFunction<Row> {
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
}
public void eval(Object... paramas) {
}
}[/mw_shl_code]
open 方法在进行初始化算子实例的进行调用,异步外部数据源的client要在类中定义为 transient,然后在 open 方法中进行初始化,这样每个任务实例都会有一个外部数据源的 client。防止同一个 client 多个任务实例调用,出现线程不安全情况。
eval 则是 TableFunction 最重要的方法,它用于关联外部数据。当程序有一个输入元素时,就会调用eval一次,用户可以将产生的数据使用 collect() 进行发送下游。paramas 的值为用户输入元素的值,比如在 Join 的时候,使用 A.id = B.id and A.name = b.name, B 是维表,A 是用户数据表,paramas 则代表 A.id,A.name 的值。
(2) 异步访问函数
getAsyncLookupFunction 会返回异步访问外部数据源的函数,如果你想使用异步函数,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用异步函数访问外部数据系统,一般是外部系统有异步访问客户端,如果没有的话,可以自己使用线程池异步访问外部系统。至于为什么使用异步访问函数,无非就是为了提高程序的吞吐量,不需要每条记录访问返回数据后,才去处理下一条记录。异步函数格式如下:
[mw_shl_code=bash,true]public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
}
public void eval(CompletableFuture<Collection<Row>> future, Object... params) {
}
}[/mw_shl_code]
维表异步访问函数总体和同步函数实现类似,这里说一下注意点:
1. 外部数据源异步客户端初始化。如果是线程安全的(多个客户端一起使用),你可以不加 transient 关键字,初始化一次。否则,你需要加上 transient,不对其进行初始化,而在 open 方法中,为每个 Task 实例初始化一个。
2. eval 方法中多了一个 CompletableFuture,当异步访问完成时,需要调用其方法进行处理.
为了减少每条数据都去访问外部数据系统,提高数据的吞吐量,一般我们会在同步函数和异步函数中加入缓存,如果以前某个关键字访问过外部数据系统,我们将其值放入到缓存中,在缓存没有失效之前,如果该关键字再次进行处理时,直接先访问缓存,有就直接返回,没有再去访问外部数据系统,然后在进行缓存,进一步提升我们实时程序处理的吞吐量。
一般缓存类型有以下几种类型:
- 数据全部缓存,定时更新。
- LRU Cache,设置一个超时时间。
- 用户自定义缓存。
流与维表例子
从HDFS中读取数据源,join Redis维表,最终结果落到kafka。服务器上8个并行度大概可以达到60w+/s。
全部依赖项:
[mw_shl_code=bash,true] <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--1.9 SQL-->
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.30</version>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.11.0.0</version>
</dependency>
<!--redis异步连接-->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.24.Final</version>
</dependency>
</dependencies>[/mw_shl_code]
[mw_shl_code=java,true]import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.types.Row;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
private final String[] fieldNames;
private final TypeInformation[] fieldTypes;
private transient RedisAsyncCommands<String, String> async;
public MyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
}
@Override
public void open(FunctionContext context) {
//配置redis异步连接
RedisClient redisClient = RedisClient.create("redis://172.16.44.28:6379/0");
StatefulRedisConnection<String, String> connection = redisClient.connect();
async = connection.async();
}
//每一条流数据都会调用此方法进行join
public void eval(CompletableFuture<Collection<Row>> future, Object... paramas) {
//表名、主键名、主键值、列名
String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"};
String key = String.join(":", info);
RedisFuture<String> redisFuture = async.get(key);
redisFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
future.complete(Collections.singletonList(Row.of(key, value)));
//todo
// BinaryRow row = new BinaryRow(2);
}
});
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
public static final class Builder {
private String[] fieldNames;
private TypeInformation[] fieldTypes;
private Builder() {
}
public static Builder getBuilder() {
return new Builder();
}
public Builder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public Builder withFieldTypes(TypeInformation[] fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public MyAsyncLookupFunction build() {
return new MyAsyncLookupFunction(fieldNames, fieldTypes);
}
}
}[/mw_shl_code]
[mw_shl_code=java,true]import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
public class RedisAsyncLookupTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
private final String[] fieldNames;
private final TypeInformation[] fieldTypes;
public RedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
}
//同步方法
@Override
public TableFunction<Row> getLookupFunction(String[] strings) {
return null;
}
//异步方法
@Override
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {
return MyAsyncLookupFunction.Builder.getBuilder()
.withFieldNames(fieldNames)
.withFieldTypes(fieldTypes)
.build();
}
//开启异步
@Override
public boolean isAsyncEnabled() {
return true;
}
@Override
public DataType getProducedDataType() {
return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
.build();
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) {
throw new UnsupportedOperationException("do not support getDataStream");
}
public static final class Builder {
private String[] fieldNames;
private TypeInformation[] fieldTypes;
private Builder() {
}
public static Builder newBuilder() {
return new Builder();
}
public Builder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public Builder withFieldTypes(TypeInformation[] fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public RedisAsyncLookupTableSource build() {
return new RedisAsyncLookupTableSource(fieldNames, fieldTypes);
}
}
}[/mw_shl_code]
[mw_shl_code=java,true]import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.Properties;
public class LookUpAsyncTest {
@Test
public void test() throws Exception {
LookUpAsyncTest.main(new String[]{});
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
final ParameterTool params = ParameterTool.fromArgs(args);
String fileName;
if (params.get("f") != null)
fileName = params.get("f");
else
fileName = "/flink/userClick_Random_100W";
DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
String[] fields = new String[]{"id", "user_click", "time"};
RowTypeInfo typeInformation = new RowTypeInfo(types, fields);
DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
private static final long serialVersionUID = 2349572544179673349L;
@Override
public Row map(String s) {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = split;
if (types.equals(Types.STRING)) {
value = split;
}
if (types.equals(Types.LONG)) {
value = Long.valueOf(split);
}
row.setField(i, value);
}
return row;
}
}).returns(typeInformation);
tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()
.withFieldNames(new String[]{"id", "name"})
.withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})
.build();
tableEnv.registerTableSource("info", tableSource);
String sql = "select t1.id,t1.user_click,t2.name" +
" from user_click_name as t1" +
" join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +
" on t1.id = t2.id";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
// result.print().setParallelism(1);
DataStream<String> printStream = result.map(new MapFunction<Row, String>() {
@Override
public String map(Row value) throws Exception {
return value.toString();
}
});
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.16.12.148:9094");
FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(// broker list
"user_click_name", // target topic
new SimpleStringSchema(),
properties);
printStream.addSink(kafkaProducer);
tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());
}
}[/mw_shl_code]
加微信w3aboutyun,获取更多资源
领取100本书+1T资源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26480
大数据5个项目视频
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25235
名企资源、名企面试题、最新BAT面试题、专题面试题等资源汇总
https://www.aboutyun.com/forum.php?mod=viewthread&tid=27732
参考:
https://zhuanlan.zhihu.com/p/79800113
https://www.codeleading.com/article/25642073324/
https://yq.aliyun.com/articles/638910
|
|