我的topology如下:
KafkaSpout -> SplitBolt -> StoreBolt
我写了一个程序每隔一毫秒来给kafka发送信息,当发送的信息条数比较小时,没有出现什么问题,但是当数据量比较大的时候,比如发送了3000条信息之后,Bolt无法继续收到Spout的信息了,日志如下:
268150 [Thread-25-spout] INFO backtype.storm.daemon.task - Emitting: spout default [14, at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:914)]
268150 [Thread-25-spout] INFO backtype.storm.daemon.task - Emitting: spout __ack_init [-3381955863165195850 3298380998172808616 10]
2014-10-11 10:52:22,974 DEBUG [kafka.consumer.PartitionTopicInfo] - reset consume offset of 14:0: fetched offset = 69009: consumed offset = 69012 to 69012 (consumer)--> at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:852)
268151 [Thread-25-spout] INFO backtype.storm.daemon.task - Emitting: spout default [14, at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:852)] 268151 [Thread-25-spout] INFO backtype.storm.daemon.task - Emitting: spout __ack_init [2346958040565628056 -8239789527920435743 10]
也就是说,只有来自KafkaSpout的__ack_init 但是没有 来自__acker, split or store的__ack_ack.
但是,这些线程还是活着的:
2898095 [Thread-9-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
输出的就是他们在一定时间内没有收到数据的信息。同时我没有观察到任何的错误信息。
当我重新启动Topology的时候,这些没收到的信息会得到处理。
我的KafkaSpout emit的代码是:
mCollector.emit(new Values(topicId, value));
有人碰到过类似的问题或者知道可能哪里有问题吗?
谢谢!