我在storm平台发布了一个Topology(例如 testTopology),通过kafkaSpout 从kafka上抓取消息进行处理;
storm会在 zookeeper的节点 /storm/testTopology/partition_0 中记录当前处理的消息的offset,例如:
{"topology":{"id":"e20a3347-f43b-4765-94fc-2aec6b8a7d25","name":"testTopology"},"offset":4305106,"partition":0,"broker":{"host":"gjzq-zz5-20","port":9092},"topic":"test-topic"}
说明当前处理到的消息的offset 是4305106
我的问题是:
假设我想指定任意的offset,让Topology从该offset开始进行处理,应该怎么做??
我曾经将/storm/testTopology/partition_0 节点中的offset设置为一个新值,但没有任何作用,反而storm无法抓取到kafka的消息了
请大神解答。。;。多谢!!!
|
|