pig2 发表于 2019-10-8 18:33:04

Flink1.9 CEP Demo例子

问题导读

1.本文是基于哪个版本的例子?
2.新版本使用了哪个函数来处理逻辑?
3.是否支持select和flatSelect?
4.下面例子包含了哪些知识点?


Flink1.9已经出来了,这里尝试了下Flink1.9 CEP的例子,贴出来供大家参考和学习。
这里说下几个难点:
1.包的引用,很多我们知道相关函数,可是相关包可能会找不到。在IDEA里面可以设置自动引入包。可参考下面图示



2.创建项目过程中,最好创建IDEA项目,然后引入pom文件。SBT在国内还是不太好用,不推荐。
3.本文引用了Flink CEP最新版本process函数来处理逻辑。select和flatSelect还是支持的.

/**
* Created by aboutyun on 2019/10/8.
*/


import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Created by aboutyun on 2019/9/22.
*/
public class CEPDemo {
    private static final Logger LOG = LoggerFactory.getLogger(CEPDemo.class);

    public static void main(String[] args) {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(1);
      DataStream<Tuple3<String, String, Long>> source = env.fromElements(Tuple3.of("b", "b1", 1L),
                Tuple3.of("b", "b2", 2L),
                Tuple3.of("b", "b3", 3L),
                Tuple3.of("c", "c", 4L));

      // pattern "b+ c"
      Pattern<Tuple3<String, String, Long>, ?> pattern = Pattern.<Tuple3<String, String, Long>>begin("pattern-b", AfterMatchSkipStrategy.noSkip()).where(new SimpleCondition<Tuple3<String, String, Long>>() {
            @Override
            public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                if ("b".equals(value.f0)) {
                  LOG.debug("Found b! {}", value);
                  return true;
                } else
                  return false;
            }
      }).oneOrMore().next("pattern-c").where(new SimpleCondition<Tuple3<String, String, Long>>() {
            @Override
            public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                if ("c".equals(value.f0)) {
                  LOG.debug("Found c! {}", value);
                  return true;
                } else
                  return false;
            }
      });//.within(Time.seconds(1));

      PatternStream<Tuple3<String, String, Long>> patternStream = CEP.pattern(source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple3<String, String, Long>>() {

            @Override
            public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long extractedTimestamp) {
                return new Watermark(extractedTimestamp - 1000);
            }

            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element, long previousElementTimestamp) {
                return element.f2;
            }
      }), pattern);
      patternStream.process(new PatternProcessFunction<Tuple3<String,String,Long>, String>() {
            @Override
            public void processMatch(Map<String, List<Tuple3<String, String, Long>>> pattern,
                                     Context ctx,
                                     Collector<String> out )throws Exception {
                String phenomenon = pattern.get("pattern-b").stream().map(tuple -> tuple.f1).collect(Collectors.joining("-"));
                phenomenon = String.join("-", phenomenon, pattern.get("pattern-c").get(0).f1);
                phenomenon= "AboutYun" + phenomenon + "AboutYun";

                out.collect(phenomenon+"在这里在这里在这里在这里");

            }


      }).print();

/*       patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
            @Override
            public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
                String phenomenon = pattern.get("pattern-b").stream().map(tuple -> tuple.f0).collect(Collectors.joining("-"));
                phenomenon = String.join("-", phenomenon, pattern.get("pattern-c").get(0).f0);
                return "AboutYun" + phenomenon + "AboutYun";
            }
      }).print();*/

       /* patternStream.flatSelect (new PatternFlatSelectFunction<Tuple3<String, String, Long> ,String>(){
            @Override
            public void flatSelect(Map<String, List<Tuple3<String, String, Long> >> pattern,
                  Collector<String> collector1) throws Exception {
               List first = pattern.get("pattern-b");
               List second = pattern.get("pattern-c");
                Iterator it = first.iterator();
                while(it.hasNext()){
                  System.out.println(it.next());
                }
                Iterator itTwo = second.iterator();
                while(itTwo.hasNext()){
                  System.out.println(itTwo.next());
                }
            //collector1.collect(new String());
            }
      });*/
   /*   patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
            @Override
            public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
                List second = pattern.get("pattern-b");
                Iterator itTwo = second.iterator();
                while(itTwo.hasNext()){
                  System.out.println(itTwo.next());
                }
                return "";
            //return "AboutYun" + phenomenon + "AboutYun";
            }
      });*/
      try {
            env.execute("orderness");
      } catch (Exception e) {
            e.printStackTrace();
      }

    }
}

相关pom文件:
>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.11</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
      </dependency>

      <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
      </dependency>

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>1.9.0</version>
      </dependency>

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.9.0</version>
      </dependency>



上面例子:是对Flink cep的例子,里面包含了很多的知识点,包括Flink CEP跳过策略,条件分类,独立模式,组合模式,循环模式等,都包含在了上面模式。
对于Flink CEP知识点,亦录制成视频,放到About云知识星球。


获取视频,可以加入星球或则联系微信w3aboutyun获取。当前尚未更新完毕,大概11月份更新完毕。
此例子,亦会录制成视频,有问题欢迎交流。





琅琊榜尾 发表于 2019-10-9 00:11:30

flink是基于JDK8以上开发的么

美丽天空 发表于 2019-10-9 09:22:28

很好的资料
页: [1]
查看完整版本: Flink1.9 CEP Demo例子