背景:我在远程的Linux服务器上安装了kafka,Linux上测试可以发送和接收消息,本地idea上本地模式运行storm,获取远程Linux上kafka中topic1的数据!大概就是这样!可是一直报错!帮我看看,实在找不到问题了!先拜谢了!
一部分错误日志:
9596 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections
9674 [Thread-10-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
9759 [Thread-10-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
9761 [Thread-10-spout] INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=localhost:9092, partition=0}]
9761 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
9761 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=localhost:9092, partition=0}]
9963 [Thread-10-spout] INFO storm.kafka.PartitionManager - Read partition information from: /order/kafkaspoutid/partition_0 --> null
11045 [Thread-10-spout] INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException
12048 [Thread-10-spout] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
Caused by: java.nio.channels.ClosedChannelException: null
|
|