分享

Flink 1.9CEP例子代码诠释

本帖最后由 阿飞 于 2019-11-19 17:32 编辑

问题导读

1.Flink DataStream API分为哪三个部分?
2.Flink分流是如何演化的?
3.如何将Tuple3流转换为字符串流?



Flink CEP大家有的在研究,理论只要是通俗的文字,大多看得懂。对于代码,有些看得不怎么清晰,这里给大家解析一个比较全的Flink cep代码。

[mw_shl_code=java,true]import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;

/**
* Created by aboutyun on 2019/11/4.
*/
public class CEPsideOutput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         *  接收source并将数据转换成一个tuple
         */
        DataStream<Tuple3<String, String, String>> myDataStream  = env.addSource(new MySource()).map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {

                com.alibaba.fastjson.JSONObject json = JSON.parseObject(value);
                return new Tuple3<>(json.getString("userid"),json.getString("orderid"),json.getString("behave"));
            }
        });

        /**
         * 定义一个规则
         * 接受到behave是order以后,下一个动作必须是pay才算符合这个需求
         */
        Pattern<Tuple3<String, String, String>, Tuple3<String, String, String>> myPattern = Pattern.<Tuple3<String, String, String>>begin("start").where(new IterativeCondition<Tuple3<String, String, String>>() {
            @Override
            public boolean filter(Tuple3<String, String, String> value, Context<Tuple3<String, String, String>> ctx) throws Exception {
                System.out.println("value:" + value);
                return value.f2.equals("order");
            }
        }).next("next").where(new IterativeCondition<Tuple3<String, String, String>>() {
            @Override
            public boolean filter(Tuple3<String, String, String> value, Context<Tuple3<String, String, String>> ctx) throws Exception {
                return value.f2.equals("pay");
            }
        }).within(Time.seconds(3));


        PatternStream<Tuple3<String, String, String>> pattern = CEP.pattern(myDataStream.keyBy(0), myPattern);

        //记录超时的订单
        OutputTag<String> outputTag = new OutputTag<String>("myOutput"){};

        SingleOutputStreamOperator<String> resultStream = pattern.select(outputTag,
                /**
                 * 超时的
                 */
                new PatternTimeoutFunction<Tuple3<String, String, String>, String>() {
                    @Override
                    public String timeout(Map<String, List<Tuple3<String, String, String>>> pattern, long timeoutTimestamp) throws Exception {
                        System.out.println("pattern:"+pattern);
                        List<Tuple3<String, String, String>> startList = pattern.get("start");
                        Tuple3<String, String, String> tuple3 = startList.get(0);
                        return tuple3.toString() + "迟到的";
                    }
                }, new PatternSelectFunction<Tuple3<String, String, String>, String>() {
                    @Override
                    public String select(Map<String, List<Tuple3<String, String, String>>> pattern) throws Exception {
                        //匹配上第一个条件的
                        List<Tuple3<String, String, String>> startList = pattern.get("start");
                        //匹配上第二个条件的
                        List<Tuple3<String, String, String>> endList = pattern.get("next");

                        Tuple3<String, String, String> tuple3 = endList.get(0);
                        return tuple3.toString();
                    }
                }
        );

        //输出匹配上规则的数据
        resultStream.print();

        //输出超时数据的流
        DataStream<String> sideOutput = resultStream.getSideOutput(outputTag);
        sideOutput.print();

        env.execute("Test CEP");
    }
}[/mw_shl_code]


1. org.apache.flink.streaming.api.datastream.DataStream
DataStream API主要可为分为三个部分

1.DataSource模块
Sources模块主要定义了数据接入功能,主要是将各种外部数据接入至Flink系统中,并将接入数据转换成对应的DataStream数据集。

2.Transformation模块
在Transformation模块定义了对DataStream数据集的各种转换操作,例如进行map、filter、windows等操作。

3.DataSink模块
将结果数据通过DataSink模块写出到外部存储介质中,例如将数据输出到文件或Kafka消息中间件等。



2.org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
1.原始分流(Split...Select)
分流可以将一个流拆分成多个流。

2.演化(Side-Output)
(1)Side-Output是从Flink 1.3.0开始提供的功能,支持了更灵活的多路输出。
(2)Side-Output可以以侧流的形式,以不同于主流的数据类型,向下游输出指定条件的数据、异常数据、迟到数据等等。
(3)Side-Output通过ProcessFunction将数据发送到侧路OutputTag。


3. com.alibaba.fastjson.JSON
Fastjson是一个Java语言编写的高性能功能完善的JSON库。它采用一种“假定有序快速匹配”的算法,把JSON Parse的性能提升到极致,是目前Java语言中最快的JSON库。

4. org.apache.flink.api.common.functions.MapFunction
将Tuple3流转换为字符串流


未完待续,后续将录制为视频,加入星球可获取,已经录制视频。

1.png


扫码可了解详情:
1_gaitubao_224x305.jpg
下一篇:
Flink 1.9CEP例子代码诠释【续】
https://www.aboutyun.com/forum.php?mod=viewthread&tid=28014



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条