本帖最后由 pig2 于 2020-8-26 15:47 编辑
问题导读
1.Flink写redis需要什么组件?
2.Flink Redis Connector支持哪三种Redis连接方式?
3.Flink Redis Sink 用法有哪些?
4.Flink写Redis本文由哪些应用和案例?
Flink经常需要和一些数据库进行交互,特别是redis,那么Flink如何实现和redis交互,这里咱们整理下:
使用 Flink Redis Connector
支持三种Redis连接方式
- Single Redis Server
- Redis Cluster
- Redis Sentinel
源码:https://github.com/apache/bahir-flink
链接: https://pan.baidu.com/s/1Rh0IbWzQ0xe6UvyGTSYh1w 提取码: rawm
相关使用说明:
https://blog.csdn.net/www_changer/article/details/102643876
Flink Redis Sink 基本用法
类
Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:
类 | 场景 | 备注 | FlinkJedisPoolConfig | 单Redis服务器 | 适用于本地、测试场景 | FlinkJedisClusterConfig | Redis集群 | | FlinkJedisSentinelConfig | Redis哨兵 | |
使用
Redis Sink 核心类是 RedisMappe 是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
更多可参考
https://www.jianshu.com/p/4e5b5d0d0bb9
FlinkDemo及案例
Flink写RedisDemo1:
上面是Flink Redis Connector的相关源码和用法,那么该如何使用他们
需要添加pom.xml
[mw_shl_code=xml,true] <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
</dependency>[/mw_shl_code]
更多可参考
https://www.aboutyun.com/home.php?mod=space&uid=61&do=blog&id=4358
Flink + Redis应用案例:
Flink + Redis来实现实时防刷接口的功能
随着人口红利的慢慢削减,互联网产品的厮杀愈加激烈,大家开始看好下沉市场的潜力,拼多多,趣头条等厂商通过拉新奖励,购物优惠等政策率先抢占用户,壮大起来。其他各厂商也紧随其后,纷纷推出自己产品的极速版,如今日头条极速版,腾讯新闻极速版等,也通过拉新奖励,阅读奖励等政策来吸引用户。
对于这类APP,实时风控是必不可少的,一个比较常见的实时风控场景就是防刷接口作弊。刷接口是黑产的一种作弊手段,APP上的各种操作,一般都会对应后台的某个接口,用户操作APP数据就会通过接口上报到后台,但如果黑产通过破解获取到了APP的新增用户接口,那他们就能跳过登陆APP步骤直接调后台接口构造虚假数据牟利了。对于这类业务,我们可以通过Flink + Redis来实现实时防刷接口的功能。
具体实现可参考:
链接:https://www.jianshu.com/p/1fbc9480cbf6
Flink从socket读取数据sink到redis案例
[mw_shl_code=bash,true]package com.lin.flink.stream.customPartition;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class StreamingDemoToRedis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("node1", 9000, "\n");
//lpsuh l_words word
//对数据进行组装,把string转化为tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<String, String>("l_words", value);
}
});
//创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build();
//创建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示从接收的数据中获取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从接收的数据中获取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}[/mw_shl_code]
Flink消费Kafka数据并把实时计算的结果导入到RedisDemo
1. 完成的场景
在很多大数据场景下,要求数据形成数据流的形式进行计算和存储。上篇博客介绍了Flink消费Kafka数据实现Wordcount计算,这篇博客需要完成的是将实时计算的结果写到redis。当kafka从其他端获取数据立刻到Flink计算,Flink计算完后结果写到Redis,整个过程就像流水一样形成了数据流的处理
2. 代码
添加第三方依赖
[mw_shl_code=xml,true] <dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifa ... link-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifa ... connector-kafka-0.9 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>[/mw_shl_code]
注意这里的版本最好统一选1.4.0,flink-redis的版本最好选1.1.5,用低版本或其他版本会遇到包冲突或者不同包的同一类不同等逻辑或者第版本有些类没有等java通用的一些问题
逻辑代码
[mw_shl_code=bash,true]package com.scn;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class FilnkCostKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
//实例化Flink和Redis关联类FlinkJedisPoolConfig,设置Redis端口
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
//实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis
counts.addSink(new RedisSink<Tuple2<String, Integer>>(conf,new RedisExampleMapper()));
env.execute("WordCount from Kafka data");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
//指定Redis key并将flink数据类型映射到Redis数据类型
public static final class RedisExampleMapper implements RedisMapper<Tuple2<String,Integer>>{
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "flink");
}
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
}
[/mw_shl_code]
编写一个测试类
[mw_shl_code=bash,true]package com.scn;
import redis.clients.jedis.Jedis;
public class RedisTest {
public static void main(String args[]){
Jedis jedis=new Jedis("127.0.0.1");
System.out.println("Server is running: " + jedis.ping());
System.out.println("result:"+jedis.hgetAll("flink"));
}
}[/mw_shl_code]
3. 测试
启动Redis服务
redis-server
执行FilnkCostKafka main方法
没有跑出异常信息证明启动没有问题
在kafka producer端输出一些数据
执行测试类RedisTest的main方法
会输出:
[mw_shl_code=bash,true]Server is running: PONG
result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2}[/mw_shl_code]
可以看到数据已经流到Redis
Flink 1.10 实战:自定义Redis的Sink函数
上面是1.10以前的版本,这里给咱们提供下比较新的代码实现
这里自定义了Ridis对应的Sink函数,为了方便直接从socket端接收数据,operator处理后,直接写入redis中,由于比较简单,详细内容直接看实例代码即可。
软件版本:
flink1.10
redis5.0.5
代码实战
1.添加redis对应pom依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
2.主函数代码:
[mw_shl_code=bash,true]package com.hadoop.ljs.flink110.redis;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import scala.Tuple2;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-05-02 10:30
* @version: v1.0
* @description: com.hadoop.ljs.flink110.redis
*/
public class RedisSinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = senv.socketTextStream("localhost", 9000);
DataStream<String> filter = source.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if (null == value || value.split(",").length != 2) {
return false;
}
return true;
}
});
DataStream<Tuple2<String, String>> keyValue = filter.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<>(split[0], split[1]);
}
});
//创建redis的配置 单机redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfig
FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();
keyValue.addSink(new RedisSink<Tuple2<String, String>>(redisConf, new RedisMapper<Tuple2<String, String>>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"table1");
}
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data._1;
}
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data._2;
}
}));
/*启动执行*/
senv.execute();
}
}[/mw_shl_code]
关于Flink写redis的专题到这里,如果大家有想学习的专题,可以留言。
https://www.cnblogs.com/jiashengmei/p/9084057.html
https://www.cnblogs.com/linkmust/p/10933536.html
|
|