不错不错,这storm和kafka还真是集成的太完美了 |
楼主 转载的时候能自己测试一下有没有错可以么? |
感谢分享 |
rocky2015 发表于 2016-1-3 21:47 你好,这个怎么解决呢? 8329 [Thread-13-spout] INFO backtype.storm.daemon.executor - Opening spout spout:(4) 8402 [Thread-13-spout] ERROR backtype.storm.util - Async loop died! java.lang.NoSuchMethodError: com.google.common.collect.Maps.newConcurrentMap()Ljava/util/concurrent/ConcurrentMap; at org.apache.curator.framework.listen.ListenerContainer.<init>(ListenerContainer.java:35) ~[curator-framework-2.5.0.jar:na] at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:145) ~[curator-framework-2.5.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[curator-framework-2.5.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[curator-framework-2.5.0.jar:na] at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6] at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25] 8403 [Thread-13-spout] ERROR backtype.storm.daemon.executor - java.lang.NoSuchMethodError: com.google.common.collect.Maps.newConcurrentMap()Ljava/util/concurrent/ConcurrentMap; at org.apache.curator.framework.listen.ListenerContainer.<init>(ListenerContainer.java:35) ~[curator-framework-2.5.0.jar:na] at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:145) ~[curator-framework-2.5.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[curator-framework-2.5.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[curator-framework-2.5.0.jar:na] at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6] at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25] 8417 [Thread-13-spout] ERROR backtype.storm.util - Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.6.jar:0.9.6] at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:495) [storm-core-0.9.6.jar:0.9.6] at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:241) [storm-core-0.9.6.jar:0.9.6] at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.6.jar:0.9.6] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25] |
1、使用Kafka client模拟Kafka Producter ,生成topic1主题 bin/kafka-console-producer.sh --broker-list node04:9092 --topic topic1 2、使用Kafka client模拟Kafka Consumer,订阅topic2主题 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic2 --from-beginning 这个地方是不是应该统一时 topic1 或者 topic2 |
kafka storm 集成 |
也就是说,kafka 无缝集成storm 不用自己写spout接收kafka数据了么?只要实现schema 对每天消息的处理就可以了么? |
代码: public class StormKafkaTopo { public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("storm1:2181,storm2:2181,storm3:2181"); // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout"); // 配置KafkaBolt中的kafka.broker.properties Config conf = new Config(); Map<String, String> map = new HashMap<String, String>(); // 配置Kafka broker地址 map.put("metadata.broker.list", "storm3:9092"); // serializer.class为消息的序列化类 map.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", map); // 配置KafkaBolt生成的topic conf.put("topic", "topic2"); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt"); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Topo", conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("Topo"); cluster.shutdown(); } } } |
楼主您好!我执行到最后一步报错,帮我看下呗 # storm jar kafka-storm.jar StormKafkaTopo KafkaStorm Running: /usr/java/jdk1.7.0_71/bin/java -client -Dstorm.options= -Dstorm.home=/root/storm/apache-storm-0.9.4 -Dstorm.log.dir=/root/storm/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /root/storm/apache-storm-0.9.4/lib/disruptor-2.10.1.jar:/root/storm/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:/root/storm/apache-storm-0.9.4/lib/reflectasm-1.07-shaded.jar:/root/storm/apache-storm-0.9.4/lib/commons-logging-1.1.3.jar:/root/storm/apache-storm-0.9.4/lib/core.incubator-0.1.0.jar:/root/storm/apache-storm-0.9.4/lib/ring-core-1.1.5.jar:/root/storm/apache-storm-0.9.4/lib/servlet-api-2.5.jar:/root/storm/apache-storm-0.9.4/lib/compojure-1.1.3.jar:/root/storm/apache-storm-0.9.4/lib/ring-servlet-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/logback-classic-1.0.13.jar:/root/storm/apache-storm-0.9.4/lib/storm-core-0.9.4.jar:/root/storm/apache-storm-0.9.4/lib/math.numeric-tower-0.0.1.jar:/root/storm/apache-storm-0.9.4/lib/ring-devel-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/clout-1.0.1.jar:/root/storm/apache-storm-0.9.4/lib/commons-fileupload-1.2.1.jar:/root/storm/apache-storm-0.9.4/lib/minlog-1.2.jar:/root/storm/apache-storm-0.9.4/lib/carbonite-1.4.0.jar:/root/storm/apache-storm-0.9.4/lib/log4j-over-slf4j-1.6.6.jar:/root/storm/apache-storm-0.9.4/lib/snakeyaml-1.11.jar:/root/storm/apache-storm-0.9.4/lib/asm-4.0.jar:/root/storm/apache-storm-0.9.4/lib/jline-2.11.jar:/root/storm/apache-storm-0.9.4/lib/jgrapht-core-0.9.0.jar:/root/storm/apache-storm-0.9.4/lib/tools.macro-0.1.0.jar:/root/storm/apache-storm-0.9.4/lib/commons-lang-2.5.jar:/root/storm/apache-storm-0.9.4/lib/jetty-util-6.1.26.jar:/root/storm/apache-storm-0.9.4/lib/joda-time-2.0.jar:/root/storm/apache-storm-0.9.4/lib/slf4j-api-1.7.5.jar:/root/storm/apache-storm-0.9.4/lib/jetty-6.1.26.jar:/root/storm/apache-storm-0.9.4/lib/kryo-2.21.jar:/root/storm/apache-storm-0.9.4/lib/commons-codec-1.6.jar:/root/storm/apache-storm-0.9.4/lib/hiccup-0.3.6.jar:/root/storm/apache-storm-0.9.4/lib/tools.cli-0.2.4.jar:/root/storm/apache-storm-0.9.4/lib/commons-exec-1.1.jar:/root/storm/apache-storm-0.9.4/lib/ring-jetty-adapter-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/clojure-1.5.1.jar:/root/storm/apache-storm-0.9.4/lib/json-simple-1.1.jar:/root/storm/apache-storm-0.9.4/lib/clj-stacktrace-0.2.2.jar:/root/storm/apache-storm-0.9.4/lib/tools.logging-0.2.3.jar:/root/storm/apache-storm-0.9.4/lib/commons-io-2.4.jar:/root/storm/apache-storm-0.9.4/lib/objenesis-1.2.jar:/root/storm/apache-storm-0.9.4/lib/chill-java-0.3.5.jar:/root/storm/apache-storm-0.9.4/lib/logback-core-1.0.13.jar:kafka-storm.jar:/root/storm/apache-storm-0.9.4/conf:/root/storm/apache-storm-0.9.4/bin -Dstorm.jar=kafka-storm.jar StormKafkaTopo KafkaStorm Error: Could not find or load main class StormKafkaTopo |