分享

storm运行时bolt报异常

hai3wei 发表于 2016-3-6 15:14:23 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 11419
storm运行时bolt报异常,也不是经常出现这个异常,我查了下网上的解释:这个信息表示你在将一个可变的对象作为 tuple 发送出去。你发送到 outputcollector 中的所有对象必须是非可变的。这个错误表明对象在被序列化并发送到网络中时你的 bolt 正在修改这个对象。


不知道有没有人碰到跟我一样的问题啊?这是什么原因啊


bolt代码如下:
public class DataParseBolt extends BaseBasicBolt {
    private static final long serialVersionUID = 1L;

    private final static Logger logger = LoggerFactory.getLogger(DataParseBolt.class);

    public void cleanup() {
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        if (sentence == null || "".equals(sentence)) {
            return;
        }
        Map<String, Object> infoMap = null;
        try{
            infoMap = LogParser.getInfoMap(sentence);
        }catch(Exception e)
        {
            logger.error("DataParseBolt execute() Sth wrong whith :" + sentence);
            logger.error("DataParseBolt execute() failed: ", e);
        }
        if (infoMap == null || "".equals(infoMap.get("table"))) {
            return;
        }
        collector.emit(new Values(infoMap));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("source_line"));
    }
LogParser:public class LogParser {

    private static final Logger logger = LoggerFactory.getLogger(LogParser.class);

    private static final String keyword = "bigdata_lingxi_realtime_msg";

    public static Map<String, Object> getInfoMap(String sentence) {
        Map<String, Object> infoMap = new HashMap<>();
        if (!ifValidMsg(sentence)) {
            return null;
        }
        try {
            JSONObject tempObject = (JSONObject) JSON.parse(sentence);
            String message = (String) tempObject.get("message");
            JSONObject jsonObject = (JSONObject) JSON.parse(message);
            String messageType = (String) jsonObject.get("messageType");
            String messageTopic = (String) jsonObject.get("messageTopic");
            JSONObject messageContent = (JSONObject) jsonObject.get("messageContent");
            Map<String, String> contentMap = getContentMap(messageContent);
            String tableName = getTableName(messageTopic);
            infoMap.put("type", messageType);
            infoMap.put("topic", messageTopic);
            infoMap.put("table", tableName);
            infoMap.put("content", contentMap);
        } catch (Exception e) {
            logger.error("LogParser getInfoMap() parse sentence error :" + sentence);
            return null;
        }
        return infoMap;
    }

    private static String getTableName(String topic) {
        String dateStr = getDate();
        String tableName = topic + "_" + dateStr;
        return tableName;
    }

    private static String getDate() {
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
        return df.format(new Date()).toString();
    }

    private static Map<String, String> getContentMap(JSONObject jsonObject) {
        Map<String, String> contentMap = new HashMap<>();
        if (jsonObject != null) {
            contentMap.put("custom_empty_field","");
            Iterator iter = jsonObject.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry) iter.next();
                String key = entry.getKey().toString();
                String val = entry.getValue().toString();
                contentMap.put(key, val);
            }
            contentMap.put("custom_random_id", contentMap.get("id"));//no use
        }
        return contentMap;
    }

    private static boolean ifValidMsg(String message) {
        boolean flag = false;
        Pattern p = Pattern.compile(keyword);
        Matcher m = p.matcher(message);
        if (m.find()) {
            flag = true;
        }
        return flag;
    }
}
2016-03-06 12:22:19.980 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.util.ConcurrentModificationException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
Caused by: java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[?:1.8.0_60]
        at java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[?:1.8.0_60]
        at java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[?:1.8.0_60]
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:72) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.21.jar:?]
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) ~[kryo-2.21.jar:?]
        at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5543.invoke(worker.clj:142) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        ... 6 more
2016-03-06 12:22:19.986 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.util.ConcurrentModificationException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        ... 6 more
2016-03-06 12:22:20.005 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:332) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.6.0.jar:?]
        at backtype.storm.daemon.worker$fn__5927$fn__5928.invoke(worker.clj:636) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.daemon.executor$mk_executor_data$fn__3530$fn__3531.invoke(executor.clj:256) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at backtype.storm.util$async_loop$fn__544.invoke(util.clj:485) [storm-core-0.10.0.2.3.4.0-3485.jar:0.10.0.2.3.4.0-3485]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

已有(3)人评论

跳转到指定楼层
hai3wei 发表于 2016-3-6 22:17:25
哪位大神知道什么问题啊?是Iterator的问题么?
回复

使用道具 举报

s060403072 发表于 2016-3-6 22:30:12
hai3wei 发表于 2016-3-6 22:17
哪位大神知道什么问题啊?是Iterator的问题么?

楼主似乎已经找到了应该跟storm没有关系的。
参考下,希望有帮助
http://www.aboutyun.com/home.php ... do=blog&id=2904
回复

使用道具 举报

hai3wei 发表于 2016-3-8 08:58:37
s060403072 发表于 2016-3-6 22:30
楼主似乎已经找到了应该跟storm没有关系的。
参考下,希望有帮助
http://www.aboutyun.com/home.php?mo ...

你好,现在没有报这个异常了,以前kafka的topic建的是单分区,给topic增加了一个分区之后,就好了。单分区的时候bolt的任务分配很不均衡,不知道是不是因为这个。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条