levycui 发表于 2020-9-2 21:20:05

flink实战之测试带窗口pattern性能

本帖最后由 levycui 于 2020-9-2 21:21 编辑




测试环境


[*] flink 版本:1.4.2
[*] cpu:Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
[*] 内存:16G (分配给flink的)

测试背景


[*]控制pattern的输出为1%
[*]典型pattern测试
[*]n分钟m次构造测试程序

Q:怎么控制pattern的输出为1%?
A:先确定pattern 的次数设置阈值是多少,然后,阈值 * 100 * 事件发送间隔时间(5s) 即为异常事件插入时间间隔。即,每发送100个事件后连续插入异常事件,插入异常事件个数大于设定的阈值。需要注意的是,设定事件次数的阈值 * 事件发送间隔时间需要小于等于窗口时间,否则pattern不会触发。
pattern代码

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start")
                .where(new SimpleCondition<JSONObject>() {
                  @Override
                  public boolean filter(JSONObject value) throws Exception {
                        return value.getString("user").equals("kevin");
                  }
                })
                .timesOrMore(10).greedy()
                .within(Time.minutes(20));


细分场景

满足simple condition的数据很少

测试代码
速度极快,在一个并行度的情况下都超过42万eps

job 截图:

满足simple condition的数据很多,但是被次数设定给限制住了(连续产生threshold - 1的异常事件)

测试代码
速度还是很快,在一个并行度11万eps左右

job 截图:


N分钟移动M公里

Q: 异常点构造方式?
A: 每一百个事件插入一个异常位置的登录事件。

pattern 代码

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("prev")
                .where(new SimpleCondition<JSONObject>() {
                  @Override
                  public boolean filter(JSONObject event) throws Exception {
                        return event.getString("event_type").equals("logon");
                  }
                })
                .followedBy("curr")
                .where(new IterativeCondition<JSONObject>() {
                  @Override
                  public boolean filter(JSONObject currentEvent, Context<JSONObject> ctx) throws Exception {
                        if (!currentEvent.getString("event_type").equals("logon")) {
                            return false;
                        }
                        Iterable<JSONObject> iterator = ctx.getEventsForPattern("prev");
                        JSONObject previousEvent = null;
                        for (JSONObject jsonObject : iterator) {
                            previousEvent = jsonObject;
                        }
                        return CEPUtil.geoDistance(previousEvent.getString("geo"), currentEvent.getString("geo")) > 100;
                  }
                })
                .within(Time.minutes(10));

细分场景

时间范围内的数据很少,绝大多数的事件都被window time给过滤掉了。

测试代码
性能情况不错,差不多26w eps

job 截图:


时间范围内的数据很多(每个窗口内包含100条数据),但是距离不足100km

测试代码
性能情况很差了,3.5k eps

job 截图:


使用jvsisualvm查看cpu 抽样耗时70%消耗在 geo运算上面,因为窗口内的数据会两两进行位置运算,所以每一条数据到来都会进行100次的geo运算。

时间范围内的数据减少至10个,每10个事件插入一个异常登录点的事件

测试代码
性能情况变好,接近9w eps

job 截图:


再看jvisualvm 查看cpu 抽样耗时64在fastjson 的jsonobject hashcode方法上了(CEP的SharedBuffer在查询数据的时候会调用事件的hashCode方法)。改用自己写的实体类替换,性能提升2倍。(对比测试代码为:改前,改后)这个留待以后需要的时候进行优化。


自己写的Event实体类:


作者:liujiawinds
来源:https://github.com/liujiawinds/flink-cep-perf


若无梦何远方 发表于 2020-9-3 08:21:27

这是CEP的代码吗

admin 发表于 2020-9-3 08:24:18

若无梦何远方 发表于 2020-9-3 08:21
这是CEP的代码吗

都是CEP例子的

学海无涯 发表于 2020-9-5 23:09:54

谢谢6666
页: [1]
查看完整版本: flink实战之测试带窗口pattern性能