storm运行时bolt报异常,也不是经常出现这个异常,我查了下网上的解释:这个信息表示你在将一个可变的对象作为 tuple 发送出去。你发送到 outputcollector 中的所有对象必须是非可变的。这个错误表明对象在被序列化并发送到网络中时你的 bolt 正在修改这个对象。
不知道有没有人碰到跟我一样的问题啊?这是什么原因啊
bolt代码如下:
public class DataParseBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private final static Logger logger = LoggerFactory.getLogger(DataParseBolt.class);
public void cleanup() {
}
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getString(0);
if (sentence == null || "".equals(sentence)) {
return;
}
Map<String, Object> infoMap = null;
try{
infoMap = LogParser.getInfoMap(sentence);
}catch(Exception e)
{
logger.error("DataParseBolt execute() Sth wrong whith :" + sentence);
logger.error("DataParseBolt execute() failed: ", e);
}
if (infoMap == null || "".equals(infoMap.get("table"))) {
return;
}
collector.emit(new Values(infoMap));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source_line"));
}
LogParser:public class LogParser {
private static final Logger logger = LoggerFactory.getLogger(LogParser.class);
private static final String keyword = "bigdata_lingxi_realtime_msg";
public static Map<String, Object> getInfoMap(String sentence) {
Map<String, Object> infoMap = new HashMap<>();
if (!ifValidMsg(sentence)) {
return null;
}
try {
JSONObject tempObject = (JSONObject) JSON.parse(sentence);
String message = (String) tempObject.get("message");
JSONObject jsonObject = (JSONObject) JSON.parse(message);
String messageType = (String) jsonObject.get("messageType");
String messageTopic = (String) jsonObject.get("messageTopic");
JSONObject messageContent = (JSONObject) jsonObject.get("messageContent");
Map<String, String> contentMap = getContentMap(messageContent);
String tableName = getTableName(messageTopic);
infoMap.put("type", messageType);
infoMap.put("topic", messageTopic);
infoMap.put("table", tableName);
infoMap.put("content", contentMap);
} catch (Exception e) {
logger.error("LogParser getInfoMap() parse sentence error :" + sentence);
return null;
}
return infoMap;
}
private static String getTableName(String topic) {
String dateStr = getDate();
String tableName = topic + "_" + dateStr;
return tableName;
}
private static String getDate() {
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
return df.format(new Date()).toString();
}
private static Map<String, String> getContentMap(JSONObject jsonObject) {
Map<String, String> contentMap = new HashMap<>();
if (jsonObject != null) {
contentMap.put("custom_empty_field","");
Iterator iter = jsonObject.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
String key = entry.getKey().toString();
String val = entry.getValue().toString();
contentMap.put(key, val);
}
contentMap.put("custom_random_id", contentMap.get("id"));//no use
}
return contentMap;
}
private static boolean ifValidMsg(String message) {
boolean flag = false;
Pattern p = Pattern.compile(keyword);
Matcher m = p.matcher(message);
if (m.find()) {
flag = true;
}
return flag;
}
}
2016-03-06 12:22:19.980 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.util.ConcurrentModificationException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[?:1.8.0_60]
at java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[?:1.8.0_60]
at java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[?:1.8.0_60]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:72) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.21.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) ~[kryo-2.21.jar:?]
at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5543.invoke(worker.clj:142) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
... 6 more
2016-03-06 12:22:19.986 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.util.ConcurrentModificationException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
... 6 more
2016-03-06 12:22:20.005 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:332) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.6.0.jar:?]
at backtype.storm.daemon.worker$fn__5927$fn__5928.invoke(worker.clj:636) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.daemon.executor$mk_executor_data$fn__3530$fn__3531.invoke(executor.clj:256) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at backtype.storm.util$async_loop$fn__544.invoke(util.clj:485) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
|
|