输入过大时storm的bolt无法成功接受到来自spout的数据
我在使用storm 0.9.2-incubating, local mode.
我的topology如下: KafkaSpout -> SplitBolt -> StoreBolt我写了一个程序每隔一毫秒来给kafka发送信息,当发送的信息条数比较小时,没有出现什么问题,但是当数据量比较大的时候,比如发送了3000条信息之后,Bolt无法继续收到Spout的信息了,日志如下:
268150 INFO backtype.storm.daemon.task - Emitting: spout default 268150 INFO backtype.storm.daemon.task - Emitting: spout __ack_init [-3381955863165195850 3298380998172808616 10]2014-10-11 10:52:22,974 DEBUG - reset consume offset of 14:0: fetched offset = 69009: consumed offset = 69012 to 69012 (consumer)--> at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:852)268151 INFO backtype.storm.daemon.task - Emitting: spout default 268151 INFO backtype.storm.daemon.task - Emitting: spout __ack_init 也就是说,只有来自KafkaSpout的__ack_init 但是没有 来自__acker, split or store的__ack_ack.但是,这些线程还是活着的:
2898095 INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, 输出的就是他们在一定时间内没有收到数据的信息。同时我没有观察到任何的错误信息。当我重新启动Topology的时候,这些没收到的信息会得到处理。我的KafkaSpout emit的代码是:mCollector.emit(new Values(topicId, value));有人碰到过类似的问题或者知道可能哪里有问题吗?谢谢!
你测试一个临界点试试,是不是做了什么特殊的设置,导致超过这个临界点,就会被卡住。
是不是资源的问题,把间隔时间和数量做一些相应的调整
可能变成僵尸线程了 我们就是冲着storm流量大的特性才选择这个的,storm连每毫秒一条信息的流量都处理不了吗?我刚才看了下,CPU使用顶峰时60%左右,Memory几乎没怎么用,感觉不像是资源的瓶颈。 howtodown 发表于 2014-10-14 17:43
可能变成僵尸线程了
Bolt都还活着。。隔了一些时间就会发送接受数据的信息。
spout如果发送失败,你是怎么处理的。
jixianqiuxue 发表于 2014-10-15 01:38
spout如果发送失败,你是怎么处理的。
我觉得应该没有失败,因为重新启动之后那些未处理的消息会继续被处理,目前我发现可能是一个buffer的默认值太小的原因。见这个:
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
formlog 发表于 2014-10-15 11:16
我觉得应该没有失败,因为重新启动之后那些未处理的消息会继续被处理,目前我发现可能是一个buffer的默认 ...
设置生效了吗
s060403072 发表于 2014-10-15 18:10
设置生效了吗
是的,按照上面说的设置了之后,现在已经正常运行了。
页:
[1]
2