问题导读
1.本文是基于哪个版本的例子?
2.新版本使用了哪个函数来处理逻辑?
3.是否支持select和flatSelect?
4.下面例子包含了哪些知识点?
Flink1.9已经出来了,这里尝试了下Flink1.9 CEP的例子,贴出来供大家参考和学习。
这里说下几个难点:
1.包的引用,很多我们知道相关函数,可是相关包可能会找不到。在IDEA里面可以设置自动引入包。可参考下面图示
2.创建项目过程中,最好创建IDEA项目,然后引入pom文件。SBT在国内还是不太好用,不推荐。
3.本文引用了Flink CEP最新版本process函数来处理逻辑。select和flatSelect还是支持的.
[mw_shl_code=java,true]/**
* Created by aboutyun on 2019/10/8.
*/
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Created by aboutyun on 2019/9/22.
*/
public class CEPDemo {
private static final Logger LOG = LoggerFactory.getLogger(CEPDemo.class);
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple3<String, String, Long>> source = env.fromElements(Tuple3.of("b", "b1", 1L),
Tuple3.of("b", "b2", 2L),
Tuple3.of("b", "b3", 3L),
Tuple3.of("c", "c", 4L));
// pattern "b+ c"
Pattern<Tuple3<String, String, Long>, ?> pattern = Pattern.<Tuple3<String, String, Long>>begin("pattern-b", AfterMatchSkipStrategy.noSkip()).where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
if ("b".equals(value.f0)) {
LOG.debug("Found b! {}", value);
return true;
} else
return false;
}
}).oneOrMore().next("pattern-c").where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
if ("c".equals(value.f0)) {
LOG.debug("Found c! {}", value);
return true;
} else
return false;
}
});//.within(Time.seconds(1));
PatternStream<Tuple3<String, String, Long>> patternStream = CEP.pattern(source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple3<String, String, Long>>() {
@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1000);
}
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long previousElementTimestamp) {
return element.f2;
}
}), pattern);
patternStream.process(new PatternProcessFunction<Tuple3<String,String,Long>, String>() {
@Override
public void processMatch(Map<String, List<Tuple3<String, String, Long>>> pattern,
Context ctx,
Collector<String> out )throws Exception {
String phenomenon = pattern.get("pattern-b").stream().map(tuple -> tuple.f1).collect(Collectors.joining("-"));
phenomenon = String.join("-", phenomenon, pattern.get("pattern-c").get(0).f1);
phenomenon= "AboutYun" + phenomenon + "AboutYun";
out.collect(phenomenon+"在这里在这里在这里在这里");
}
}).print();
/* patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
@Override
public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
String phenomenon = pattern.get("pattern-b").stream().map(tuple -> tuple.f0).collect(Collectors.joining("-"));
phenomenon = String.join("-", phenomenon, pattern.get("pattern-c").get(0).f0);
return "AboutYun" + phenomenon + "AboutYun";
}
}).print();*/
/* patternStream.flatSelect (new PatternFlatSelectFunction<Tuple3<String, String, Long> ,String>(){
@Override
public void flatSelect(Map<String, List<Tuple3<String, String, Long> >> pattern,
Collector<String> collector1) throws Exception {
List first = pattern.get("pattern-b");
List second = pattern.get("pattern-c");
Iterator it = first.iterator();
while(it.hasNext()){
System.out.println(it.next());
}
Iterator itTwo = second.iterator();
while(itTwo.hasNext()){
System.out.println(itTwo.next());
}
// collector1.collect(new String());
}
});*/
/* patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
@Override
public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
List second = pattern.get("pattern-b");
Iterator itTwo = second.iterator();
while(itTwo.hasNext()){
System.out.println(itTwo.next());
}
return "";
// return "AboutYun" + phenomenon + "AboutYun";
}
});*/
try {
env.execute("orderness");
} catch (Exception e) {
e.printStackTrace();
}
}
}[/mw_shl_code]
相关pom文件:
[mw_shl_code=xml,true]>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.9.0</version>
</dependency>[/mw_shl_code]
上面例子:是对Flink cep的例子,里面包含了很多的知识点,包括Flink CEP跳过策略,条件分类,独立模式,组合模式,循环模式等,都包含在了上面模式。
对于Flink CEP知识点,亦录制成视频,放到About云知识星球。
获取视频,可以加入星球或则联系微信w3aboutyun获取。当前尚未更新完毕,大概11月份更新完毕。
此例子,亦会录制成视频,有问题欢迎交流。
|
|