Flink 1.9CEP例子代码诠释
本帖最后由 阿飞 于 2019-11-19 17:32 编辑问题导读
1.Flink DataStream API分为哪三个部分?
2.Flink分流是如何演化的?
3.如何将Tuple3流转换为字符串流?
Flink CEP大家有的在研究,理论只要是通俗的文字,大多看得懂。对于代码,有些看得不怎么清晰,这里给大家解析一个比较全的Flink cep代码。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
/**
* Created by aboutyun on 2019/11/4.
*/
public class CEPsideOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
*接收source并将数据转换成一个tuple
*/
DataStream<Tuple3<String, String, String>> myDataStream= env.addSource(new MySource()).map(new MapFunction<String, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(String value) throws Exception {
com.alibaba.fastjson.JSONObject json = JSON.parseObject(value);
return new Tuple3<>(json.getString("userid"),json.getString("orderid"),json.getString("behave"));
}
});
/**
* 定义一个规则
* 接受到behave是order以后,下一个动作必须是pay才算符合这个需求
*/
Pattern<Tuple3<String, String, String>, Tuple3<String, String, String>> myPattern = Pattern.<Tuple3<String, String, String>>begin("start").where(new IterativeCondition<Tuple3<String, String, String>>() {
@Override
public boolean filter(Tuple3<String, String, String> value, Context<Tuple3<String, String, String>> ctx) throws Exception {
System.out.println("value:" + value);
return value.f2.equals("order");
}
}).next("next").where(new IterativeCondition<Tuple3<String, String, String>>() {
@Override
public boolean filter(Tuple3<String, String, String> value, Context<Tuple3<String, String, String>> ctx) throws Exception {
return value.f2.equals("pay");
}
}).within(Time.seconds(3));
PatternStream<Tuple3<String, String, String>> pattern = CEP.pattern(myDataStream.keyBy(0), myPattern);
//记录超时的订单
OutputTag<String> outputTag = new OutputTag<String>("myOutput"){};
SingleOutputStreamOperator<String> resultStream = pattern.select(outputTag,
/**
* 超时的
*/
new PatternTimeoutFunction<Tuple3<String, String, String>, String>() {
@Override
public String timeout(Map<String, List<Tuple3<String, String, String>>> pattern, long timeoutTimestamp) throws Exception {
System.out.println("pattern:"+pattern);
List<Tuple3<String, String, String>> startList = pattern.get("start");
Tuple3<String, String, String> tuple3 = startList.get(0);
return tuple3.toString() + "迟到的";
}
}, new PatternSelectFunction<Tuple3<String, String, String>, String>() {
@Override
public String select(Map<String, List<Tuple3<String, String, String>>> pattern) throws Exception {
//匹配上第一个条件的
List<Tuple3<String, String, String>> startList = pattern.get("start");
//匹配上第二个条件的
List<Tuple3<String, String, String>> endList = pattern.get("next");
Tuple3<String, String, String> tuple3 = endList.get(0);
return tuple3.toString();
}
}
);
//输出匹配上规则的数据
resultStream.print();
//输出超时数据的流
DataStream<String> sideOutput = resultStream.getSideOutput(outputTag);
sideOutput.print();
env.execute("Test CEP");
}
}
1. org.apache.flink.streaming.api.datastream.DataStream
DataStream API主要可为分为三个部分
1.DataSource模块
Sources模块主要定义了数据接入功能,主要是将各种外部数据接入至Flink系统中,并将接入数据转换成对应的DataStream数据集。
2.Transformation模块
在Transformation模块定义了对DataStream数据集的各种转换操作,例如进行map、filter、windows等操作。
3.DataSink模块
将结果数据通过DataSink模块写出到外部存储介质中,例如将数据输出到文件或Kafka消息中间件等。
2.org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
1.原始分流(Split...Select)
分流可以将一个流拆分成多个流。
2.演化(Side-Output)
(1)Side-Output是从Flink 1.3.0开始提供的功能,支持了更灵活的多路输出。
(2)Side-Output可以以侧流的形式,以不同于主流的数据类型,向下游输出指定条件的数据、异常数据、迟到数据等等。
(3)Side-Output通过ProcessFunction将数据发送到侧路OutputTag。
3. com.alibaba.fastjson.JSON
Fastjson是一个Java语言编写的高性能功能完善的JSON库。它采用一种“假定有序快速匹配”的算法,把JSON Parse的性能提升到极致,是目前Java语言中最快的JSON库。
4. org.apache.flink.api.common.functions.MapFunction
将Tuple3流转换为字符串流
未完待续,后续将录制为视频,加入星球可获取,已经录制视频。
扫码可了解详情:
下一篇:
Flink 1.9CEP例子代码诠释【续】
https://www.aboutyun.com/forum.php?mod=viewthread&tid=28014
页:
[1]