分享

如何在Storm编程实现与Kafka的集成

问题导读
1.如何编程实现Storm与Kafka集成?
2.Storm中Topology如何实现的?
3.如何验证集成效果?





一、实现模型
   数据流程:
    1、Kafka Producter生成topic1主题的消息 
    2、Storm中有个Topology,包含了KafkaSpout、SenqueceBolt、KafkaBolt三个组件。其中KafkaSpout订阅了topic1主题消息,然后发送
      给SenqueceBolt加工处理,最后数据由KafkaBolt生成topic2主题消息发送给Kafka
    3、Kafka Consumer负责消费topic2主题的消息
     下载.png

二、Topology实现
    1、创建maven工程,配置pom.xml
      需要依赖storm-core、kafka_2.10、storm-kafka三个包

  1. <dependencies>  
  2.       <dependency>
  3.            <groupId>org.apache.storm</groupId>
  4.              <artifactId>storm-core</artifactId>
  5.              <version>0.9.2-incubating</version>
  6.              <scope>provided</scope>
  7.        </dependency>
  8.    
  9.     <dependency>
  10.         <groupId>org.apache.kafka</groupId>
  11.         <artifactId>kafka_2.10</artifactId>
  12.         <version>0.8.1.1</version>
  13.         <exclusions>
  14.             <exclusion>
  15.                 <groupId>org.apache.zookeeper</groupId>
  16.                 <artifactId>zookeeper</artifactId>
  17.             </exclusion>
  18.             <exclusion>
  19.                 <groupId>log4j</groupId>
  20.                 <artifactId>log4j</artifactId>
  21.             </exclusion>
  22.         </exclusions>
  23.     </dependency>
  24.       
  25.         <dependency>  
  26.           <groupId>org.apache.storm</groupId>  
  27.          <artifactId>storm-kafka</artifactId>  
  28.           <version>0.9.2-incubating</version>  
  29.     </dependency>  
  30.   </dependencies>
  31.   
  32.   <build>
  33.     <plugins>
  34.       <plugin>
  35.         <artifactId>maven-assembly-plugin</artifactId>
  36.         <version>2.4</version>
  37.         <configuration>
  38.           <descriptorRefs>
  39.             <descriptorRef>jar-with-dependencies</descriptorRef>
  40.           </descriptorRefs>
  41.         </configuration>
  42.         <executions>
  43.           <execution>
  44.             <id>make-assembly</id>
  45.             <phase>package</phase>
  46.             <goals>
  47.               <goal>single</goal>
  48.             </goals>
  49.           </execution>
  50.         </executions>
  51.       </plugin>
  52.     </plugins>
  53.   </build>
复制代码


2、KafkaSpout
      KafkaSpout是Storm中自带的Spout,源码在https://github.com/apache/incubator-storm/tree/master/external
      使用KafkaSpout时需要子集实现Scheme接口,它主要负责从消息流中解析出需要的数据

  1. public class MessageScheme implements Scheme {
  2.    
  3.     /* (non-Javadoc)
  4.      * @see backtype.storm.spout.Scheme#deserialize(byte[])
  5.      */
  6.     public List<Object> deserialize(byte[] ser) {
  7.         try {
  8.             String msg = new String(ser, "UTF-8");
  9.             return new Values(msg);
  10.         } catch (UnsupportedEncodingException e) {  
  11.          
  12.         }
  13.         return null;
  14.     }
  15.    
  16.    
  17.     /* (non-Javadoc)
  18.      * @see backtype.storm.spout.Scheme#getOutputFields()
  19.      */
  20.     public Fields getOutputFields() {
  21.         // TODO Auto-generated method stub
  22.         return new Fields("msg");  
  23.     }  
  24. }
复制代码


 3、SenqueceBolt
       SenqueceBolt实现很简单,在接收的spout的消息前面加上“I‘m” 

  1. public class SenqueceBolt extends BaseBasicBolt{
  2.    
  3.     /* (non-Javadoc)
  4.      * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
  5.      */
  6.     public void execute(Tuple input, BasicOutputCollector collector) {
  7.         // TODO Auto-generated method stub
  8.          String word = (String) input.getValue(0);  
  9.          String out = "I'm " + word +  "!";  
  10.          System.out.println("out=" + out);
  11.          collector.emit(new Values(out));
  12.     }
  13.    
  14.     /* (non-Javadoc)
  15.      * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
  16.      */
  17.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  18.         declarer.declare(new Fields("message"));
  19.     }
  20. }
复制代码


4、KafkaBolt
      KafkaBolt是Storm中自带的Bolt,负责向Kafka发送主题消息
5、Topology

  1. public class StormKafkaTopo {   
  2.     public static void main(String[] args) throws Exception {
  3.      // 配置Zookeeper地址
  4.         BrokerHosts brokerHosts = new ZkHosts("node04:2181,node05:2181,node06:2181");
  5.         // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
  6.         SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");
  7.       
  8.      // 配置KafkaBolt中的kafka.broker.properties
  9.         Config conf = new Config();  
  10.         Map<String, String> map = new HashMap<String, String>();
  11.      // 配置Kafka broker地址      
  12.         map.put("metadata.broker.list", "node04:9092");
  13.         // serializer.class为消息的序列化类
  14.         map.put("serializer.class", "kafka.serializer.StringEncoder");
  15.         conf.put("kafka.broker.properties", map);
  16.     // 配置KafkaBolt生成的topic
  17.         conf.put("topic", "topic2");
  18.         
  19.         spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
  20.         TopologyBuilder builder = new TopologyBuilder();   
  21.         builder.setSpout("spout", new KafkaSpout(spoutConfig));  
  22.         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
  23.         builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");        
  24.         if (args != null && args.length > 0) {  
  25.             conf.setNumWorkers(3);  
  26.             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
  27.         } else {  
  28.   
  29.             LocalCluster cluster = new LocalCluster();  
  30.             cluster.submitTopology("Topo", conf, builder.createTopology());  
  31.             Utils.sleep(100000);  
  32.             cluster.killTopology("Topo");  
  33.             cluster.shutdown();  
  34.         }  
  35.     }  
  36. }
复制代码


三、测试验证
    1、使用Kafka client模拟Kafka Producter ,生成topic1主题   
      bin/kafka-console-producer.sh --broker-list node04:9092 --topic topic1
    2、使用Kafka client模拟Kafka Consumer,订阅topic2主题
      bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic2 --from-beginning
    3、运行Strom Topology
      bin/storm jar storm-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar  StormKafkaTopo KafkaStorm
    4、运行结果
     2.png     


原创文章,转载请注明: 转载自http://www.cnblogs.com/tovin/p/3974417.html

已有(11)人评论

跳转到指定楼层
duoduo2009 发表于 2014-9-28 10:00:37
感谢分享,辛苦了
回复

使用道具 举报

hb1984 发表于 2014-9-28 11:35:46
感谢分享,辛苦了
回复

使用道具 举报

ACE 发表于 2015-8-31 11:40:31
楼主您好!我执行到最后一步报错,帮我看下呗
# storm jar kafka-storm.jar StormKafkaTopo KafkaStorm
Running: /usr/java/jdk1.7.0_71/bin/java -client -Dstorm.options= -Dstorm.home=/root/storm/apache-storm-0.9.4 -Dstorm.log.dir=/root/storm/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /root/storm/apache-storm-0.9.4/lib/disruptor-2.10.1.jar:/root/storm/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:/root/storm/apache-storm-0.9.4/lib/reflectasm-1.07-shaded.jar:/root/storm/apache-storm-0.9.4/lib/commons-logging-1.1.3.jar:/root/storm/apache-storm-0.9.4/lib/core.incubator-0.1.0.jar:/root/storm/apache-storm-0.9.4/lib/ring-core-1.1.5.jar:/root/storm/apache-storm-0.9.4/lib/servlet-api-2.5.jar:/root/storm/apache-storm-0.9.4/lib/compojure-1.1.3.jar:/root/storm/apache-storm-0.9.4/lib/ring-servlet-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/logback-classic-1.0.13.jar:/root/storm/apache-storm-0.9.4/lib/storm-core-0.9.4.jar:/root/storm/apache-storm-0.9.4/lib/math.numeric-tower-0.0.1.jar:/root/storm/apache-storm-0.9.4/lib/ring-devel-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/clout-1.0.1.jar:/root/storm/apache-storm-0.9.4/lib/commons-fileupload-1.2.1.jar:/root/storm/apache-storm-0.9.4/lib/minlog-1.2.jar:/root/storm/apache-storm-0.9.4/lib/carbonite-1.4.0.jar:/root/storm/apache-storm-0.9.4/lib/log4j-over-slf4j-1.6.6.jar:/root/storm/apache-storm-0.9.4/lib/snakeyaml-1.11.jar:/root/storm/apache-storm-0.9.4/lib/asm-4.0.jar:/root/storm/apache-storm-0.9.4/lib/jline-2.11.jar:/root/storm/apache-storm-0.9.4/lib/jgrapht-core-0.9.0.jar:/root/storm/apache-storm-0.9.4/lib/tools.macro-0.1.0.jar:/root/storm/apache-storm-0.9.4/lib/commons-lang-2.5.jar:/root/storm/apache-storm-0.9.4/lib/jetty-util-6.1.26.jar:/root/storm/apache-storm-0.9.4/lib/joda-time-2.0.jar:/root/storm/apache-storm-0.9.4/lib/slf4j-api-1.7.5.jar:/root/storm/apache-storm-0.9.4/lib/jetty-6.1.26.jar:/root/storm/apache-storm-0.9.4/lib/kryo-2.21.jar:/root/storm/apache-storm-0.9.4/lib/commons-codec-1.6.jar:/root/storm/apache-storm-0.9.4/lib/hiccup-0.3.6.jar:/root/storm/apache-storm-0.9.4/lib/tools.cli-0.2.4.jar:/root/storm/apache-storm-0.9.4/lib/commons-exec-1.1.jar:/root/storm/apache-storm-0.9.4/lib/ring-jetty-adapter-0.3.11.jar:/root/storm/apache-storm-0.9.4/lib/clojure-1.5.1.jar:/root/storm/apache-storm-0.9.4/lib/json-simple-1.1.jar:/root/storm/apache-storm-0.9.4/lib/clj-stacktrace-0.2.2.jar:/root/storm/apache-storm-0.9.4/lib/tools.logging-0.2.3.jar:/root/storm/apache-storm-0.9.4/lib/commons-io-2.4.jar:/root/storm/apache-storm-0.9.4/lib/objenesis-1.2.jar:/root/storm/apache-storm-0.9.4/lib/chill-java-0.3.5.jar:/root/storm/apache-storm-0.9.4/lib/logback-core-1.0.13.jar:kafka-storm.jar:/root/storm/apache-storm-0.9.4/conf:/root/storm/apache-storm-0.9.4/bin -Dstorm.jar=kafka-storm.jar StormKafkaTopo KafkaStorm
Error: Could not find or load main class StormKafkaTopo
回复

使用道具 举报

ACE 发表于 2015-8-31 11:41:37
代码:


public class StormKafkaTopo {   
    public static void main(String[] args) throws Exception {
// 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("storm1:2181,storm2:2181,storm3:2181");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");
      
// 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();  
        Map<String, String> map = new HashMap<String, String>();
// 配置Kafka broker地址      
        map.put("metadata.broker.list", "storm3:9092");
        // serializer.class为消息的序列化类
        map.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", map);
// 配置KafkaBolt生成的topic
        conf.put("topic", "topic2");
        
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
        TopologyBuilder builder = new TopologyBuilder();   
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
        builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");        

        if (args != null && args.length > 0) {  
            conf.setNumWorkers(3);  
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
        } else {  
  
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("Topo", conf, builder.createTopology());  
            Utils.sleep(100000);  
            cluster.killTopology("Topo");  
            cluster.shutdown();  
        }  
    }  
}
回复

使用道具 举报

OCG 发表于 2015-12-3 15:35:35
也就是说,kafka 无缝集成storm 不用自己写spout接收kafka数据了么?只要实现schema 对每天消息的处理就可以了么?
回复

使用道具 举报

OCG 发表于 2015-12-3 15:36:41
kafka storm 集成
回复

使用道具 举报

rocky2015 发表于 2016-1-3 21:47:40
    1、使用Kafka client模拟Kafka Producter ,生成topic1主题   

      bin/kafka-console-producer.sh --broker-list node04:9092 --topic topic1

    2、使用Kafka client模拟Kafka Consumer,订阅topic2主题

      bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic2 --from-beginning
这个地方是不是应该统一时 topic1 或者 topic2
回复

使用道具 举报

火龙果先生 发表于 2016-5-9 16:21:24
rocky2015 发表于 2016-1-3 21:47
    1、使用Kafka client模拟Kafka Producter ,生成topic1主题   

      bin/kafka-console- ...

你好,这个怎么解决呢?



8329 [Thread-13-spout] INFO  backtype.storm.daemon.executor - Opening spout spout:(4)
8402 [Thread-13-spout] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: com.google.common.collect.Maps.newConcurrentMap()Ljava/util/concurrent/ConcurrentMap;
    at org.apache.curator.framework.listen.ListenerContainer.<init>(ListenerContainer.java:35) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:145) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[curator-framework-2.5.0.jar:na]
    at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
8403 [Thread-13-spout] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: com.google.common.collect.Maps.newConcurrentMap()Ljava/util/concurrent/ConcurrentMap;
    at org.apache.curator.framework.listen.ListenerContainer.<init>(ListenerContainer.java:35) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:145) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[curator-framework-2.5.0.jar:na]
    at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[curator-framework-2.5.0.jar:na]
    at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
8417 [Thread-13-spout] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:495) [storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:241) [storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条