1 pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.learn.LearnFlink</groupId>
<artifactId>LearnFlink</artifactId>
<version>1.0</version>
<properties>
<flink.version>1.9.0</flink.version>
</properties>
<profiles>
<profile>
<id>local</id>
<properties>
<scope>compile</scope>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>pro</id>
<properties>
<scope>provided</scope>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!--不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.learn.Driver</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2 代码
2.1 入口类
public class Driver implements Serializable {
public static void main(String[] args) throws Exception {
String className = args[0];
Class clazz = Class.forName(className);
Method method = clazz.getDeclaredMethod("execute");
method.invoke(clazz.newInstance());
}
}
2.2 执行类
public class KafkaToRedis implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(KafkaToRedis.class);
public static void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
// parallelism
env.setParallelism(1);
env.setMaxParallelism(1);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "demoGroup");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("demoTopic", new SimpleStringSchema(), props);
kafkaConsumer.setStartFromLatest();
env.addSource(kafkaConsumer)
.map(new MapFunction<String, TeacherInfo>() {
@Override
public TeacherInfo map(String value) throws Exception {
logger.info("==========> begin map, value: " + value);
TeacherInfo info = new TeacherInfo();
String[] arr = value.split(",");
info.setId(Integer.parseInt(arr[0]));
info.setName(arr[1]);
info.setGender(arr[2]);
info.setAge(arr[3]);
return info;
}
})
.addSink(new SinkToRedis());
env.execute();
}
}
public class SinkToRedis extends RichSinkFunction<TeacherInfo> {
private static final Logger logger = LoggerFactory.getLogger(SinkToRedis.class);
private Jedis jedis = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jedis = RedisUtil.getResource();
logger.info("==========> open jedis: " + jedis.toString());
}
@Override
public void invoke(TeacherInfo value, Context context) throws Exception {
jedis.set(value.getId() + "", new Gson().toJson(value));
logger.info("==========> set value to redis: " + value.toString());
}
@Override
public void close() throws Exception {
super.close();
if (null != jedis) {
logger.info("==========> close jedis: " + jedis.toString());
jedis.close();
}
}
}
2.3 工具类
public class RedisUtil {
private static volatile JedisPool jedisPool = null;
public static Jedis getResource() {
if (null == jedisPool) {
synchronized (RedisUtil.class) {
if (null == jedisPool) {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(4);
config.setMaxIdle(1);
config.setMaxWaitMillis(100);
config.setTestOnBorrow(true);
jedisPool = new JedisPool(config, "localhost", 6379);
}
}
}
return jedisPool.getResource();
}
}
3 打包
mvn clean package -Ppro
4 启动
启动 Flink;
启动 Redis;
启动 Zookeeper 和 Kafka。
5 运行
5.1 提交 Job
flink.bat run F:\learn\LearnFlink-1.0.jar org.learn.datastream.KafkaToRedis
5.2 向 kafka 发数据
1,aaa,male,10
2,bbb,female,20
3,ccc,male,30
4,ddd,male,40
6 日志