如何在Storm编程实现与Kafka的集成

查看数: 63612 | 评论数: 11 | 收藏 5
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2014-9-27 12:19

正文摘要:

问题导读1.如何编程实现Storm与Kafka集成?2.Storm中Topology如何实现的?3.如何验证集成效果? 一、实现模型   数据流程:    1、Kafka Producter生成topic1主题的消息     2、Storm中有个Top ...

回复

daozhu 发表于 2017-1-18 17:01:38
不错不错,这storm和kafka还真是集成的太完美了
Hentai 发表于 2016-10-12 14:17:49
楼主 转载的时候能自己测试一下有没有错可以么?
浮云 发表于 2016-10-10 15:38:05
感谢分享
火龙果先生 发表于 2016-5-9 16:21:24
rocky2015 发表于 2016-1-3 21:47
    1、使用Kafka client模拟Kafka Producter ,生成topic1主题   

      bin/kafka-console- ...

你好,这个怎么解决呢?



8329 [Thread-13-spout] INFO  backtype.storm.daemon.executor - Opening spout spout:(4)
8402 [Thread-13-spout] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: com.google.common.collect.Maps.newConcurrentMap()Ljava/util/concurrent/ConcurrentMap;
    at org.apache.curator.framework.listen.ListenerContainer.<init>(ListenerContainer.java:35) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:145) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[curator-framework-2.5.0.jar:na]
    at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
8403 [Thread-13-spout] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: com.google.common.collect.Maps.newConcurrentMap()Ljava/util/concurrent/ConcurrentMap;
    at org.apache.curator.framework.listen.ListenerContainer.<init>(ListenerContainer.java:35) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:145) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[curator-framework-2.5.0.jar:na]
    at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
8417 [Thread-13-spout] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:495) [storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:241) [storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
rocky2015 发表于 2016-1-3 21:47:40
    1、使用Kafka client模拟Kafka Producter ,生成topic1主题   

      bin/kafka-console-producer.sh --broker-list node04:9092 --topic topic1

    2、使用Kafka client模拟Kafka Consumer,订阅topic2主题

      bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic2 --from-beginning
这个地方是不是应该统一时 topic1 或者 topic2
OCG 发表于 2015-12-3 15:36:41
kafka storm 集成
OCG 发表于 2015-12-3 15:35:35
也就是说,kafka 无缝集成storm 不用自己写spout接收kafka数据了么?只要实现schema 对每天消息的处理就可以了么?
ACE 发表于 2015-8-31 11:41:37
代码:


public class StormKafkaTopo {   
    public static void main(String[] args) throws Exception {
// 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("storm1:2181,storm2:2181,storm3:2181");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");
      
// 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();  
        Map<String, String> map = new HashMap<String, String>();
// 配置Kafka broker地址      
        map.put("metadata.broker.list", "storm3:9092");
        // serializer.class为消息的序列化类
        map.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", map);
// 配置KafkaBolt生成的topic
        conf.put("topic", "topic2");
        
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
        TopologyBuilder builder = new TopologyBuilder();   
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
        builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");        

        if (args != null && args.length > 0) {  
            conf.setNumWorkers(3);  
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
        } else {  
  
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("Topo", conf, builder.createTopology());  
            Utils.sleep(100000);  
            cluster.killTopology("Topo");  
            cluster.shutdown();  
        }  
    }  
}
ACE 发表于 2015-8-31 11:40:31
楼主您好!我执行到最后一步报错,帮我看下呗
# storm jar kafka-storm.jar StormKafkaTopo KafkaStorm
Running: /usr/java/jdk1.7.0_71/bin/java -client -Dstorm.options= -Dstorm.home=/root/storm/apache-storm-0.9.4 -Dstorm.log.dir=/root/storm/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /root/storm/apache-storm-0.9.4/lib/disruptor-2.10.1.jar:/root/storm/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:/root/storm/apache-storm-0.9.4/lib/reflectasm-1.07-shaded.jar:/root/storm/apache-storm-0.9.4/lib/commons-logging-1.1.3.jar:/root/storm/apache-storm-0.9.4/lib/core.incubator-0.1.0.jar:/root/storm/apache-storm-0.9.4/lib/ring-core-1.1.5.jar:/root/storm/apache-storm-0.9.4/lib/servlet-api-2.5.jar:/root/storm/apache-storm-0.9.4/lib/compojure-1.1.3.jar:/root/storm/apache-storm-0.9.4/lib/ring-servlet-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/logback-classic-1.0.13.jar:/root/storm/apache-storm-0.9.4/lib/storm-core-0.9.4.jar:/root/storm/apache-storm-0.9.4/lib/math.numeric-tower-0.0.1.jar:/root/storm/apache-storm-0.9.4/lib/ring-devel-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/clout-1.0.1.jar:/root/storm/apache-storm-0.9.4/lib/commons-fileupload-1.2.1.jar:/root/storm/apache-storm-0.9.4/lib/minlog-1.2.jar:/root/storm/apache-storm-0.9.4/lib/carbonite-1.4.0.jar:/root/storm/apache-storm-0.9.4/lib/log4j-over-slf4j-1.6.6.jar:/root/storm/apache-storm-0.9.4/lib/snakeyaml-1.11.jar:/root/storm/apache-storm-0.9.4/lib/asm-4.0.jar:/root/storm/apache-storm-0.9.4/lib/jline-2.11.jar:/root/storm/apache-storm-0.9.4/lib/jgrapht-core-0.9.0.jar:/root/storm/apache-storm-0.9.4/lib/tools.macro-0.1.0.jar:/root/storm/apache-storm-0.9.4/lib/commons-lang-2.5.jar:/root/storm/apache-storm-0.9.4/lib/jetty-util-6.1.26.jar:/root/storm/apache-storm-0.9.4/lib/joda-time-2.0.jar:/root/storm/apache-storm-0.9.4/lib/slf4j-api-1.7.5.jar:/root/storm/apache-storm-0.9.4/lib/jetty-6.1.26.jar:/root/storm/apache-storm-0.9.4/lib/kryo-2.21.jar:/root/storm/apache-storm-0.9.4/lib/commons-codec-1.6.jar:/root/storm/apache-storm-0.9.4/lib/hiccup-0.3.6.jar:/root/storm/apache-storm-0.9.4/lib/tools.cli-0.2.4.jar:/root/storm/apache-storm-0.9.4/lib/commons-exec-1.1.jar:/root/storm/apache-storm-0.9.4/lib/ring-jetty-adapter-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/clojure-1.5.1.jar:/root/storm/apache-storm-0.9.4/lib/json-simple-1.1.jar:/root/storm/apache-storm-0.9.4/lib/clj-stacktrace-0.2.2.jar:/root/storm/apache-storm-0.9.4/lib/tools.logging-0.2.3.jar:/root/storm/apache-storm-0.9.4/lib/commons-io-2.4.jar:/root/storm/apache-storm-0.9.4/lib/objenesis-1.2.jar:/root/storm/apache-storm-0.9.4/lib/chill-java-0.3.5.jar:/root/storm/apache-storm-0.9.4/lib/logback-core-1.0.13.jar:kafka-storm.jar:/root/storm/apache-storm-0.9.4/conf:/root/storm/apache-storm-0.9.4/bin -Dstorm.jar=kafka-storm.jar StormKafkaTopo KafkaStorm
Error: Could not find or load main class StormKafkaTopo
关闭

推荐上一条 /2 下一条