分享

Flume+Kafka+Strom基于分布式环境的结合使用

pig2 2014-8-26 02:32:48 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 85773
本帖最后由 pig2 于 2014-8-26 02:32 编辑
问题导读:
1.Flume、Kafka、Storm是什么,如何安装?
2.Flume、Kafka、Storm如何结合使用?
3. Flume、Kafka、Storm结合原理是什么?





  一、Flume、Kafka、Storm是什么,如何安装?
  Flume的介绍,请参考这篇文章《Flume1.5.0入门:安装、部署、及flume的案例
  Kafka的介绍,请参考这篇文章《kafka2.9.2的分布式集群安装和demo(java api)测试
  Storm的介绍,请参考这篇文章《storm(ubuntu12.04)入门及分布式集群的搭建
  在后面的例子中,我们也是使用以上三篇文章中的配置进行测试。

  二、Flume、Kafka、Storm如何结合使用?
    1) 原理是什么?
  如何你仔细阅读过关于Flume、Kafka、Storm的介绍,就会知道,在他们各自之间对外交互发送消息的原理。
  在后面的例子中,我们主要对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出。

    2) flume和kafka的整合
     #复制flume要用到的kafka相关jar到flume目录下的lib里面。
  1. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/lib
  2. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/lib
  3. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
复制代码

#编写sink.java文件,然后在eclipse导出jar包,放到flume-1.5.1-bin/lib目录中,项目中要引用flume-ng-configuration-1.5.0.jar,flume-ng-sdk-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目录中,可以找到这几个jar文件,如果找不到就用find命令搜一下。
  1. package idoall.cloud.flume.sink;
  2. import java.util.Properties;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. import org.apache.commons.logging.Log;
  7. import org.apache.commons.logging.LogFactory;
  8. import org.apache.flume.Channel;
  9. import org.apache.flume.Context;
  10. import org.apache.flume.Event;
  11. import org.apache.flume.EventDeliveryException;
  12. import org.apache.flume.Transaction;
  13. import org.apache.flume.conf.Configurable;
  14. import org.apache.flume.sink.AbstractSink;
  15. public class KafkaSink extends AbstractSink implements Configurable {
  16.     private static final Log logger = LogFactory.getLog(KafkaSink.class);
  17.     private String topic;
  18.     private Producer<String, String> producer;
  19.     public void configure(Context context) {
  20.         topic = "idoall_testTopic";
  21.         Properties props = new Properties();
  22.         props.setProperty("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092");
  23.         props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
  24.         props.put("partitioner.class", "idoall.cloud.kafka.Partitionertest");
  25.         props.put("zookeeper.connect", "m1:2181,m2:2181,s1:2181,s2:2181/kafka");
  26.         props.setProperty("num.partitions", "4"); //
  27.         props.put("request.required.acks", "1");
  28.         ProducerConfig config = new ProducerConfig(props);
  29.         producer = new Producer<String, String>(config);
  30.         logger.info("KafkaSink初始化完成.");
  31.     }
  32.     public Status process() throws EventDeliveryException {
  33.         Channel channel = getChannel();
  34.         Transaction tx = channel.getTransaction();
  35.         try {
  36.             tx.begin();
  37.             Event e = channel.take();
  38.             if (e == null) {
  39.                 tx.rollback();
  40.                 return Status.BACKOFF;
  41.             }
  42.             KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
  43.             producer.send(data);
  44.             logger.info("flume向kafka发送消息:" + new String(e.getBody()));
  45.             tx.commit();
  46.             return Status.READY;
  47.         } catch (Exception e) {
  48.             logger.error("Flume KafkaSinkException:", e);
  49.             tx.rollback();
  50.             return Status.BACKOFF;
  51.         } finally {
  52.             tx.close();
  53.         }
  54.     }
  55. }
复制代码

 #在m1上配置flume和kafka交互的agent
  1. root@m1:/home/hadoop/flume-1.5.0-bin# vi /home/hadoop/flume-1.5.0-bin/conf/kafka.conf
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = syslogtcp
  7. a1.sources.r1.port = 5140
  8. a1.sources.r1.host = localhost
  9. a1.sources.r1.channels = c1
  10. # Describe the sink
  11. a1.sinks.k1.type = idoall.cloud.flume.sink.KafkaSink
  12. # Use a channel which buffers events in memory
  13. a1.channels.c1.type = memory
  14. a1.channels.c1.capacity = 1000
  15. a1.channels.c1.transactionCapacity = 100
  16. # Bind the source and sink to the channel
  17. a1.sources.r1.channels = c1
  18. a1.sinks.k1.channel = c1
复制代码

#在m1,m2,s1,s2的机器上,分别启动kafka(如果不会请参考这篇文章介绍了kafka的安装、配置和启动《kafka2.9.2的分布式集群安装和demo(java api)测试》),然后在s1机器上再启动一个消息消费者consumer
  1. root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &
复制代码

 #在m1启动flume
  1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
  2. #下面只截取部分日志信息
  3. 14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
  4. 14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
  5. 14/08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
  6. 14/08/19 11:36:34 INFO node.Application: Starting Channel c1
  7. 14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  8. 14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  9. 14/08/19 11:36:34 INFO node.Application: Starting Sink k1
  10. 14/08/19 11:36:34 INFO node.Application: Starting Source r1
  11. 14/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting...
复制代码

 #在m1上再打开一个窗口,测试向flume中发送syslog
  1. root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5140
复制代码

 #m1打开的flume窗口中看最后一行的信息,Flume已经向kafka发送了消息
  1. 14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
  2. 14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
  3. 14/08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
  4. 14/08/19 11:36:34 INFO node.Application: Starting Channel c1
  5. 14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  6. 14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  7. 14/08/19 11:36:34 INFO node.Application: Starting Sink k1
  8. 14/08/19 11:36:34 INFO node.Application: Starting Source r1
  9. 14/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting...
  10. 14/08/19 11:38:05 WARN source.SyslogUtils: Event created from Invalid Syslog data.
  11. 14/08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id:3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic)
  12. 14/08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing
  13. 14/08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:9092
  14. 14/08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing
  15. 14/08/19 11:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog
复制代码

#在刚才s1机器上打开的kafka消费端,同样可以看到从Flume中发出的信息,说明flume和kafka已经调试成功了。
  1. root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginning
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. [2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)
  6. [2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client  on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis)
  7. [2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log)
  8. [2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
  9. [2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition)
  10. [2014-08-11 14:22:12,375] INFO Closing socket connection to /192.168.1.50. (kafka.network.Processor)
  11. hello idoall.org syslog
复制代码

3) kafka和storm的整合
#我们先在eclipse中写代码,在写代码之前,我们要先对maven进行配置,pom.xml配置文件内容如下:

  1. <?xml version="1.0" encoding="utf-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.   <modelVersion>4.0.0</modelVersion>  
  4.   <groupId>idoall.cloud</groupId>  
  5.   <artifactId>idoall.cloud</artifactId>  
  6.   <version>0.0.1-SNAPSHOT</version>  
  7.   <packaging>jar</packaging>  
  8.   <name>idoall.cloud</name>  
  9.   <url>http://maven.apache.org</url>  
  10.   <properties>
  11.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12.   </properties>  
  13.   <repositories>
  14.     <repository>
  15.       <id>github-releases</id>  
  16.       <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
  17.     </repository>  
  18.     <repository>
  19.       <id>clojars.org</id>  
  20.       <url>http://clojars.org/repo</url>
  21.     </repository>
  22.   </repositories>  
  23.   <dependencies>
  24.     <dependency>
  25.       <groupId>junit</groupId>  
  26.       <artifactId>junit</artifactId>  
  27.       <version>4.11</version>  
  28.       <scope>test</scope>
  29.     </dependency>  
  30.     <dependency>
  31.       <groupId>com.sksamuel.kafka</groupId>  
  32.       <artifactId>kafka_2.10</artifactId>  
  33.       <version>0.8.0-beta1</version>
  34.     </dependency>  
  35.     <dependency>
  36.       <groupId>log4j</groupId>  
  37.       <artifactId>log4j</artifactId>  
  38.       <version>1.2.14</version>
  39.     </dependency>  
  40.     <dependency>
  41.       <groupId>storm</groupId>  
  42.       <artifactId>storm</artifactId>  
  43.       <version>0.9.0.1</version>  
  44.       <!-- keep storm out of the jar-with-dependencies -->
  45.       <scope>provided</scope>
  46.     </dependency>  
  47.     <dependency>
  48.       <groupId>commons-collections</groupId>  
  49.       <artifactId>commons-collections</artifactId>  
  50.       <version>3.2.1</version>
  51.     </dependency>
  52.   </dependencies>
  53. </project>
复制代码
#编写KafkaSpouttest.java文件
  1. package idoall.cloud.storm;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Properties;
  8. import kafka.consumer.ConsumerConfig;
  9. import kafka.consumer.ConsumerIterator;
  10. import kafka.consumer.KafkaStream;
  11. import kafka.javaapi.consumer.ConsumerConnector;
  12. import backtype.storm.spout.SpoutOutputCollector;
  13. import backtype.storm.task.TopologyContext;
  14. import backtype.storm.topology.IRichSpout;
  15. import backtype.storm.topology.OutputFieldsDeclarer;
  16. import backtype.storm.tuple.Fields;
  17. import backtype.storm.tuple.Values;
  18. public class KafkaSpouttest implements IRichSpout {
  19.      
  20.     private SpoutOutputCollector collector;
  21.     private ConsumerConnector consumer;
  22.     private String topic;
  23.     public KafkaSpouttest() {
  24.     }
  25.      
  26.     public KafkaSpouttest(String topic) {
  27.         this.topic = topic;
  28.     }
  29.     public void nextTuple() {
  30.     }
  31.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  32.         this.collector = collector;
  33.     }
  34.     public void ack(Object msgId) {
  35.     }
  36.     public void activate() {
  37.          
  38. <span style="font-size: 9pt; line-height: 25.2000007629395px;">     </span>consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());  
  39.          
  40. <span style="font-size: 9pt; line-height: 25.2000007629395px;">     </span>Map<String,Integer> topickMap = new HashMap<String, Integer>();  
  41.         topickMap.put(topic, 1);  
  42.         System.out.println("*********Results********topic:"+topic);  
  43.         Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
  44.         KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
  45.         ConsumerIterator<byte[],byte[]> it =stream.iterator();   
  46.         while(it.hasNext()){  
  47.              String value =new String(it.next().message());
  48.              SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");  
  49.              Date curDate = new Date(System.currentTimeMillis());//获取当前时间      
  50.              String str = formatter.format(curDate);   
  51.                
  52.              System.out.println("storm接收到来自kafka的消息------->" + value);
  53.              collector.emit(new Values(value,1,str), value);
  54.         }  
  55.     }
  56.      
  57.     private static ConsumerConfig createConsumerConfig() {  
  58.         Properties props = new Properties();  
  59.         // 设置zookeeper的链接地址
  60.         props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181");  
  61.         // 设置group id
  62.         props.put("group.id", "1");  
  63.         // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
  64.         props.put("auto.commit.interval.ms", "1000");
  65.         props.put("zookeeper.session.timeout.ms","10000");  
  66.         return new ConsumerConfig(props);  
  67.     }  
  68.     public void close() {
  69.     }
  70.     public void deactivate() {
  71.     }
  72.     public void fail(Object msgId) {
  73.     }
  74.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  75.         declarer.declare(new Fields("word","id","time"));
  76.     }
  77.     public Map<String, Object> getComponentConfiguration() {
  78.         System.out.println("getComponentConfiguration被调用");
  79.         topic="idoall_testTopic";
  80.         return null;
  81.     }
  82. }
复制代码

#编写KafkaTopologytest.java文件
  1. package idoall.cloud.storm;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import backtype.storm.Config;
  5. import backtype.storm.LocalCluster;
  6. import backtype.storm.topology.BasicOutputCollector;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.TopologyBuilder;
  9. import backtype.storm.topology.base.BaseBasicBolt;
  10. import backtype.storm.tuple.Fields;
  11. import backtype.storm.tuple.Tuple;
  12. import backtype.storm.tuple.Values;
  13. import backtype.storm.utils.Utils;
  14. public class KafkaTopologytest {
  15.     public static void main(String[] args) {
  16.         TopologyBuilder builder = new TopologyBuilder();
  17.         builder.setSpout("spout", new KafkaSpouttest(""), 1);
  18.         builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
  19.         builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));
  20.         Map conf = new HashMap();
  21.         conf.put(Config.TOPOLOGY_WORKERS, 1);
  22.         conf.put(Config.TOPOLOGY_DEBUG, true);
  23.         LocalCluster cluster = new LocalCluster();
  24.         cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology());
  25.          
  26.         Utils.sleep(1000*60*5); // local cluster test ...
  27.         cluster.shutdown();
  28.     }
  29.      
  30.     public static class Bolt1 extends BaseBasicBolt {
  31.          
  32.         public void execute(Tuple input, BasicOutputCollector collector) {
  33.             try {
  34.                 String msg = input.getString(0);
  35.                 int id = input.getInteger(1);
  36.                 String time = input.getString(2);
  37.                 msg = msg+"bolt1";
  38.                 System.out.println("对消息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
  39.                 if (msg != null) {
  40.                     collector.emit(new Values(msg));
  41.                 }
  42.             } catch (Exception e) {
  43.                 e.printStackTrace();
  44.             }
  45.         }
  46.   
  47.         
  48.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  49.             declarer.declare(new Fields("word"));
  50.         }
  51.     }
  52.      
  53.     public static class Bolt2 extends BaseBasicBolt {
  54.         Map<String, Integer> counts = new HashMap<String, Integer>();
  55.   
  56.         
  57.         public void execute(Tuple tuple, BasicOutputCollector collector) {
  58.             String msg = tuple.getString(0);
  59.             msg = msg + "bolt2";
  60.             System.out.println("对消息加工第2次---------->"+msg);
  61.             collector.emit(new Values(msg,1));
  62.         }
  63.   
  64.       
  65.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  66.             declarer.declare(new Fields("word", "count"));
  67.         }
  68.     }
  69. }
复制代码

#测试kafka和storm的结合
  打开两个窗口(也可以在两台机器上分别打开,下面的例子中,我会打开m2和s1机器 ),分别m2上运行kafka的producer,在s1上运行kafka的consumer(如果刚才打开了就不用再打开),先测试kafka自运行是否正常。
  如下所示,我在m2上运行producer,输入“hello welcome idoall.org”,在s1的机器上consumer同样收到了消息。说明kafka已经运行正常,并且消息通讯也没有问题。


m2机器输出的消息:
  1. root@m2:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-st m1:9092 --sync --topic idoall_testTopic
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. hello welcome idoall.org
复制代码

s1机器接收的消息:
  1. root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. hello welcome idoall.org
复制代码

#我们再在Eclipse中运行KafkaTopologytest.java,可以看到在控制台,同样收到了刚才在m2上kafka发送的消息。说明kafka和storm也打通了。
  1. #信息太多,我只截取重要部分:
  2. *********Results********topic:idoall_testTopic
  3. storm接收到来自kafka的消息------->hello welcome idoall.org
  4. 5268 [Thread-24-spout] INFO backtype.storm.daemon.task - Emitting: spout default [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
  5. 对消息加工第1次-------[arg0]:hello welcome idoall.orgbolt1---[arg1]:1---[arg2]:2014年08月19日 11:21:15 051------->hello welcome idoall.orgbolt1
  6. 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.executor - Processing received message source: spout:6, stream: default, id: {-2000523200413433507=6673316475127546409}, [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
  7. 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [hello welcome idoall.orgbolt1]
  8. 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2000523200413433507 4983764025617316501]
  9. 5269 [Thread-20-bolt2] INFO backtype.storm.daemon.executor - Processing received message source: bolt1:3, stream: default, id: {-2000523200413433507=1852530874180384956}, [hello welcome idoall.orgbolt1]
  10. 对消息加工第2次---------->hello welcome idoall.orgbolt1bolt2
  11. 5270 [Thread-20-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [hello welcome idoall.orgbolt1bolt2, 1]
复制代码

3) flume、kafka、storm的整合
  从上面两个例子我们可以看到,flume和kafka之前已经完成了通讯和部署,kafka和storm之间可以正常通讯,只差把storm的相关文件打包成jar部署到storm中即可完成三者的通讯。
  Storm的安装、配置、部署,如果不了解,可以参考这篇文章《ubuntu12.04+storm0.9.2分布式集群的搭建


     #复制kafka相关的jar包到storm的lib里面。(因为在上面我们已经说过,kafka和storm的整合,主要是重写storm的spout,调用kafka的Consumer来接收消息并打印,所在需要用到这些jar包)

  1. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/storm-0.9.2-incubating/lib
  2. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/storm-0.9.2-incubating/lib
  3. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/storm-0.9.2-incubating/lib
  4. root@m1:/home/hadoop# cp /home/hadoop/zookeeper-3.4.5/dist-maven/zookeeper-3.4.5.jar /home/hadoop/storm-0.9.2-incubating/lib
  5. root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/zkclient-0.3.jar /home/hadoop/storm-0.9.2-incubating/lib
复制代码

#在m1上启动storm nimbus
  1. root@m1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm nimbus &
复制代码

#在s1,s2上启动storm supervisor
  1. root@s1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm supervisor &
复制代码

#在m1上启动storm ui

  1. root@m1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm ui &
复制代码

#将Eclipse中的文件打包成jar复制到做任意目录,然后用storm来运行
  1. root@m1:/home/hadoop/storm-0.9.2-incubating# ll
  2. 总用量 25768
  3. drwxr-xr-x 11 root   root       4096 Aug 19 11:53 ./
  4. drwxr-xr-x 46 hadoop hadoop     4096 Aug 17 15:06 ../
  5. drwxr-xr-x  2 root   root       4096 Aug  1 14:38 bin/
  6. -rw-r--r--  1    502 staff     34239 Jun 13 08:46 CHANGELOG.md
  7. drwxr-xr-x  2 root   root       4096 Aug  2 12:31 conf/
  8. -rw-r--r--  1    502 staff       538 Mar 13 11:17 DISCLAIMER
  9. drwxr-xr-x  3    502 staff      4096 May  6 03:13 examples/
  10. drwxr-xr-x  3 root   root       4096 Aug  1 14:38 external/
  11. -rw-r--r--  1 root   root   26252342 Aug 19 11:36 idoall.cloud.jar
  12. drwxr-xr-x  3 root   root       4096 Aug  2 12:51 ldir/
  13. drwxr-xr-x  2 root   root       4096 Aug 19 11:53 lib/
  14. -rw-r--r--  1    502 staff     22822 Jun 12 04:07 LICENSE
  15. drwxr-xr-x  2 root   root       4096 Aug  1 14:38 logback/
  16. drwxr-xr-x  2 root   root       4096 Aug  1 15:07 logs/
  17. -rw-r--r--  1    502 staff       981 Jun 11 01:10 NOTICE
  18. drwxr-xr-x  5 root   root       4096 Aug  1 14:38 public/
  19. -rw-r--r--  1    502 staff      7445 Jun 10 02:24 README.markdown
  20. -rw-r--r--  1    502 staff        17 Jun 17 00:22 RELEASE
  21. -rw-r--r--  1    502 staff      3581 May 30 00:20 SECURITY.md
  22. root@m1:/home/hadoop/storm-0.9.2-incubating# /home/hadoop/storm-0.9.2-incubating/bin/storm jar idoall.cloud.jar idoall.cloud.storm.KafkaTopologytest
复制代码
#在flume中发消息,在storm中看是否有接收到
在flume中发送的消息:
  1. root@m1:/home/hadoop# echo "flume->kafka->storm message" | nc localhost 5140                       
  2. root@m1:/home/hadoop#
复制代码

storm中显示的内容:
  1. #内容太多,只截取重要部分
  2. storm接收到来自kafka的消息------->flume->kafka->storm message
  3. 174218 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout default [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
  4. 174220 [Thread-10-bolt1] INFO  backtype.storm.daemon.executor - Processing received message source: spout:6, stream: default, id: {-2345821945306343027=-7738131487327750388}, [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
  5. 对消息加工第1次-------[arg0]:flume->kafka->storm messagebolt1---[arg1]:1---[arg2]:2014年08月19日 12:06:39 360------->flume->kafka->storm messagebolt1
  6. 174221 [Thread-10-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 default [flume->kafka->storm messagebolt1]
  7. 174221 [Thread-10-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2345821945306343027 -2191137958679040397]
  8. 174222 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: bolt1:3, stream: __ack_ack, id: {}, [-2345821945306343027 -2191137958679040397]
  9. 174222 [Thread-12-bolt2] INFO  backtype.storm.daemon.executor - Processing received message source: bolt1:3, stream: default, id: {-2345821945306343027=8433871885621516671}, [flume->kafka->storm messagebolt1]
  10. 对消息加工第2次---------->flume->kafka->storm messagebolt1bolt2
  11. 174223 [Thread-12-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 default [flume->kafka->storm messagebolt1bolt2, 1]
  12. 174223 [Thread-12-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-2345821945306343027 8433871885621516671]
  13. 174224 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: bolt2:4, stream: __ack_ack, id: {}, [-2345821945306343027 8433871885621516671]
  14. 174228 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout __ack_init [-2345821945306343027 -7738131487327750388 6]
  15. 174228 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: spout:6, stream: __ack_init, id: {}, [-2345821945306343027 -7738131487327750388 6]
  16. 174228 [Thread-20-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-2345821945306343027]
复制代码

通过以上实例,我们完成了flume、kafka、storm之间的通讯,结合之前介绍的《Flume1.5.0入门:安装、部署、及flume的案例》和《使用Thrift0.9.1实现跨语言调用Golang、Php、Python、Java》.如果相互结合,相信在基于大数据实时计算,以及多语言之间的相互调用,能够解决你在项目中的大部分问题。希望最近一系列的文章能够对你有帮助。



作者:迦壹
博客地址:Flume+Kafka+Strom基于分布式环境的结合使用


已有(8)人评论

跳转到指定楼层
linhai1023 发表于 2014-9-10 13:50:57
好东西,学习了先
回复

使用道具 举报

kanwei163 发表于 2014-11-7 08:43:38
好东西,搜藏
回复

使用道具 举报

lexihc 发表于 2015-1-6 09:03:03
好东西,搜藏
回复

使用道具 举报

ohano_javaee 发表于 2015-1-30 19:14:03
请问楼主,我不想用storm,自己用多线程取kafka的内容,怎么实现呢?

点评

可以研究下kafka api  发表于 2015-1-30 19:48
回复

使用道具 举报

ainubis 发表于 2015-3-29 01:34:43
好东西,支持!
回复

使用道具 举报

小伙425 发表于 2015-9-25 16:22:41
回复

使用道具 举报

461735590 发表于 2016-10-24 10:45:49
楼主你好!为什么在m1上配置flume和kafka交互的agent 还需要做:编写sink.java文件,然后在eclipse导出jar包,放到flume-1.5.1-bin/lib目录中 这个步骤呢?没理解到其中含义
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条