问题导读
1.本文的背景是什么?
2.本文数据是什么格式的?
3.Flink是如何优化的?
4.Flink优化时需要考虑哪些点?
1. 项目背景
物联网环境,网关采集设备上测点实时上传的数值,目前大概2000台左右设备,每个设备有几十个测点到几百个测点不等,上传频率最低为5秒。我需要根据报警规则和测点名称信息对上传的数据进行处理,处理结果包括生成的报警数据、带有测点名称的实时数据。实时数据、报警规则和测点名称信息均在kafka,处理完的所有数据均写入kafka。
所有kafka中的value数据均为json字符串,不管是flink接收到的数据,还是flink写出到kafka的数据。
以下涉及到的数据格式全是中文举例,并不是项目实际使用真实数据,所以看看就行。
2. 数据处理细节
1. 数据格式
1. 数据上传说明
实时数据在一个主题中,实时数据使用到kafka的key和value信息。报警规则和测点名称在另一个主题中,具体数据在kafka的value中,使用不同的kafka的key来区分。
2. 实时数据
[mw_shl_code=bash,true]key:租户/模板编码/测点编码
value:
{
"fields": {
"测点编码1": {
"时间戳": 1567078756367,
"数值": 31.17904826098411
},
"测点编码2": {
"时间戳": 1567078756367,
"数值": 98.67622987146498
}
}
}[/mw_shl_code]
一条数据就是一个设备的所有测点数据。
3. 报警规则
[mw_shl_code=bash,true][
{
"AL_VALUE": [
{
"VALUE_MAX(上限)": "99",
"VALUE_MIN(下限)": "1",
}
],
"POINTCODE(测点编码)": "001",
"DEVICECODE(设备编码)": "设备编码1",
"PRODCODE(模板编码)": "GF-TMP-004"
},
{
"AL_VALUE": [
{
"VALUE_MAX(上限)": "50",
"VALUE_MIN(下限)": "10",
}
],
"POINTCODE(测点编码)": "002",
"DEVICECODE(设备编码)": "设备编码1",
"PRODCODE(模板编码)": "GF-TMP-004"
}
][/mw_shl_code]
4. 测点名称
[mw_shl_code=bash,true]{
"模板1~测点编码1": "镀铜槽2的溢流泵",
"模板1~测点编码2": "远程锁机",
}[/mw_shl_code]
2. 处理流程
1. 测点名称
首先给每个测点添加测点名称,处理完之后的数据key不动,value格式如下:
[mw_shl_code=bash,true]{
"fields": {
"测点编码1": {
"时间戳": 1567078756367,
"测点名称": "测点1",
"数值": 31.17904
},
"测点编码2": {
"时间戳": 1567078756367,
"测点名称": "测点2",
"数值": 98.676
}
}
}[/mw_shl_code]
2. 报警
拿到添加了测点名称的数据,然后遍历其中的每个测点,通过模板编码、设备编码、测点编码组合信息去报警规则数据中去查找是否有符合这个点的报警规则,然后根据是否能找到报警规则来进行判断:
1. 没有报警规则
直接将带有测点名称的数据写入kafka主题主题即可。
2. 有报警规则
① 判断这个点的数据是否报警
1) 报警
查看redis中是否存在这个点的报警信息
a. 存在
说明之前这个点触发过报警,现在什么都不用做。
b. 不存在
说明这个点之前没有触发过报警,发送tag为0的报警数据到kafka报警数据主题,同时将报警信息保存到redis中。
2) 不报警
查看redis中是否存在这个点的报警信息
a. 存在
说明之前这个点触发过报警,发送tag为2的结束报警数据到kafka报警数据主题,同时删除redis中的报警信息。
b. 不存在
说明之前这个点之前没有触发报警,现在什么都不做。
3. redis
刚开始计划将测点的报警状态信息保存在算子的内部变量中,由算子自己维护。由于使用到的是jvm内存变量,这种方式的操作速度是最快的。然后通过flink的checkPoint来保证在flink程序停止之后,可以将算子内部的缓存变量保存下来,以便重新启动之后,还能够继续使用之前保存的缓存数据。但是如下的几个原因使我放弃了这个做法:
1. 产生重复报警数据
生产环境中,数据处理的算子并行度并不是1。在旧版本中,由于没有没有对设备进行分区操作,而是使用了默认的reblance,因此同一个设备的数据在不同时刻,可能会发到不同的算子中,因此每次算子内维护的并不是全局的测点报警状态信息。后来通过自定义分区,使同一个设备的数据始终发送到了同一个算子,但是每次重启依然会重新进行分区计算,还是会发送到不同的分区。而且这其中还有数据倾斜的问题,并且无法解决.
2. checkPoint不稳定
在线上运行时,一分钟做一次checkPoint,但还是会经常失败,后来放弃了。
3. 优化
1. 广播模式
由于之前的flink学习中,对于 broadcast 算子的误解,并没有对报警规则和测点名称数据进行广播,而是其定义kafka source,在source内部缓存最新的报警规则和测点名称数据,接收到一条实时数据之后,九江实时数据和报警规则和测点名称数据一同发送出去。
后来仔细研究了 broadcast 算子之后,就使用了广播模式,将低频率数据直接广播到下个算子所有的并行度,然后缓存在算子内部。
2. redis测点报警状态数据保存方式
经过线上实际测试,发现从redis中get一次数据,耗时大都在50毫秒左右,set一次数据,大都在100-150毫秒。
在旧版本中,测点报警状态是通过java的序列化和反序列化将报警信息对象转化为字节数据保存在redis中的,这一部操作十分耗时,因为一条实时数据,根据设备上的测点数据,要查询redis几十到几百次不等。
新版本中是将对象转化为json字符串保存到redis中,使用到阿里的fastJSON,速度还是很快的。
通过这个优化,实际吞吐量提高大约20%。
3. redis测点报警状态数据保存格式
在旧版本中,redis中的数据是通过模板编码、设备编码、测点编码来唯一确定一个测点,因此处理一条实时数据,需要查询几十到几百次redis。
在新版本中,将一个设备所有测点的数据组合起来,作为一个设备的上所有点的测点状态信息,处理一条实时数据时,只需要get一次redis,然后再setredis即可。
这个优化,减少了处理一条实时数据处理中redis的请求次数,具体吞吐量提高幅度,没有测试。
4. 测点名称处理优化
下面直接上两个版本的代码。
旧版本:
[mw_shl_code=bash,true]/**
* 根据测点名称数据给实时数据添加测点名称信息,key为:pointName
*
* @param pointInfoMap 测点名称存储表
* @param key 实时数据key
* @param data 实时数据值
* @return 添加了测点名称的实时数据
*/
public static JSONObject pointNameHandle(Map<String, String> pointInfoMap, String key, String data) {
String templateCode = key.split("/")[1];
JSONObject object = JSON.parseObject(data);
Map<String, Object> fields = JSON.parseObject(object.get("fields").toString());
for (Map.Entry<String, Object> entry : fields.entrySet()) {
String pointCode = entry.getKey();
Map<String, Object> field = JSON.parseObject(entry.getValue().toString());
field.put("pointName", pointInfoMap.getOrDefault(templateCode + "~" + pointCode, "未知"));
//修改完的内容一定要重新放回原map表中
fields.put(pointCode, field);
}
//修改完的内容一定要重新放回原map表中
object.put("fields", fields);
return object;
}[/mw_shl_code]
新版本:
[mw_shl_code=bash,true]/**
* 根据测点名称数据给实时数据添加测点名称信息,key为:pointName
*
* @param pointInfoMap 测点名称存储表
* @param key 实时数据key
* @param data 实时数据值
* @return 添加了测点名称的实时数据
*/
public static JSONObject pointNameHandle(Map<String, String> pointInfoMap, String key, String data) {
String templateCode = key.split("/")[1];
JSONObject object = JSON.parseObject(data);
JSONObject fields = object.getJSONObject("fields");
for (String key1 : fields.keySet()) {
fields.getJSONObject(key1).put("pointName", pointInfoMap.getOrDefault(templateCode + "~" + key1, "未知"));
}
return object;
}[/mw_shl_code]
乍看起来差别不大,但是性能却是差了好几千倍。
[mw_shl_code=bash,true]{
"fields": {
"测点编码1": {
"时间戳": 1567078756367,
"数值": 31.17904826098411
},
"测点编码2": {
"时间戳": 1567078756367,
"数值": 98.67622987146498
}
}
}[/mw_shl_code]
上面是实时数据。
在旧版本中,通过JSONObject对象解析到fields这一层时,遍历所有测点,然后通过测点名称map表,去查找对应测点的测点名称。旧版本是获取到实时数据的每个测点信息之后,将其转化为字符串,然后再将其转化为JSONObject对象,设置完测点名称之后,再将新对象重新设置到最外成的JSONObject对象中。因此对于一条实时数据,需要新建几十到几百次对象,并由JVM销毁几十到几百次对象。再加上线上实时数据量大概在一千多条每秒,所以flink运行会越来越慢。
在新版中,直接通过JSONObject对象的 getJSONObject 方法获取每一个测点信息,然后直接添加测点名称即可,而不用新建对象。
线上实际测试中这一部分的运行时间直接从1000-3000毫秒不动降低到1毫秒左右!!!
组里大牛帮我看出来的。
5. 测点报警状态保存方式
旧版本中是直接在算子内部操作redis,每条试试数据均会查询到redis,因此redis的请求会大大影响数据处理速度。
新版本中将测点报警状态数据直接缓存在算子内部,然后在处理完一条实时数据之后对整个设备的测点报警状态数据进行前后对比,如果有变化,再将新的数据写入到redis中,并且将该数据写入交由flink sink处理。由于线上一个设备上的测点,大部分时间都是正常状态,一个设备的测点报警状态信息变化并不是很频繁,因此请求redis的频率是很低的。
下面给出一个判断map内部数据在操作前后是否一致的方法:
[mw_shl_code=bash,true]String originalString = JSON.toJSONString(deviceDataMapCache);
dataHandle(deviceDataMapCache, rules, keys, fields, fullFieldsMap, warningDatas, faultStandbyRules, faultStandbyDataList);
String currentString = JSON.toJSONString(deviceDataMapCache);[/mw_shl_code]
在操作之前,将map对象转化为JSON字符串,操作完之后,再将其转化为JSON字符串,然后对两个字符串进行对比即可。
刚开始考虑过在操作之前将map对象克隆一下,操作完之后对比克隆对象,但是hashmap的克隆方法只是浅克隆,不能实现需求。
4. 总结
这么多优化,总共提升了几千倍甚至上万倍的吞吐量。
flink程序优化需要考虑一下几点:
1. 减少数据传输,比如将低频数据直接广播并缓存到下游所有算子。
2. 减少对象创建次数,能复用一个对象,就复用一个对象。
3. 减少外部数据请求次数,比如redis,第三方接口等。
最新经典文章,欢迎关注公众号
————————————————
作者:「第一片心意」
原文链接:https://blog.csdn.net/u012443641/article/details/103459821
|
|