分享

Storm应用系列之——集成Kafka

howtodown 2014-8-30 16:35:00 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 100281

问题导读:
Kafka集群中的Broker地址,有哪两种方法指定?
TransactionalTridentKafkaSpout的作用是什么?
本地模式无法保存Offset该如何解决?








前言Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。

Kafka的基本介绍:什么是Kafka

准备工作
KafkaSpout其实网上已经有人写了,在github上开源了,不用我们自己造轮子。只是要注意版本问题:
0.7版本的Kafka,对应KafkaSpout可以使用Storm-contrib下面的例子
源码:https://github.com/nathanmarz/st ... /master/storm-kafka
Maven依赖:https://clojars.org/storm/storm-kafka

0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,所以老的KafkaSpout不再适用,必须使用新的KafkaAPI
源码:https://github.com/wurstmeister/storm-kafka-0.8-plus

这里因为0.8版本的Kafka必然是将来主流,所以我就不介绍0.7 的了,使用方式基本上是类似的。

PS:
是人写的,就会有bug,何况是别人分享出来的。所以,遇到bug,还请去github上提交一个issue告诉作者修正。

2014/7/29 更新:
wurstmeister/storm-kafka-0.8-plus 现在合并到Apache Storm了,在其external/storm-kakfa目录

Maven依赖直接更新成:

  1.   <dependency>  
  2.   <groupId>org.apache.storm</groupId>  
  3.   <artifactId>storm-kafka</artifactId>  
  4.   <version>0.9.2-incubating</version>  
  5. </dependency>  
复制代码

但是storm似乎没有直接把external的包加载到classpath,所以使用时,还得手动把该jar包从external/storm-kafka/下拷到storm的lib目录。
当然,也可以在maven中加上<scope>compile</scope>,直接把该jar打到你项目一起。


使用KafkaSpout一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息:
  • Kafka集群中的Broker地址 (IP+Port)

有两种方法指定:
1. 使用静态地址,即直接给定Kafka集群中所有Broker信息

  1. GlobalPartitionInformation info = new GlobalPartitionInformation();  
  2. info.addPartition(0, new Broker("10.1.110.24",9092));  
  3. info.addPartition(0, new Broker("10.1.110.21",9092));  
  4. BrokerHosts brokerHosts = new StaticHosts(info);  
复制代码

2. 从Zookeeper动态读取
  1. BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");  
复制代码

推荐使用这种方法,因为Kafka的Broker可能会动态的增减

  • topic名字
  • 当前spout的唯一标识Id (以下代称$spout_id)
  • zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root)
  • 当前topic中数据如何解码

了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。


在Topology中加入Spout的代码:

  1. String topic = "test";  
  2. String zkRoot = "kafkastorm";  
  3. String spoutId = "myKafka";  
  4.   
  5. SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);  
  6. spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());  
  7.   
  8. TopologyBuilder builder = new TopologyBuilder();  
  9. builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum);  
复制代码

其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据
  1. public class TestMessageScheme implements Scheme {  
  2.   
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);  
  4.       
  5.     @Override  
  6.     public List<Object> deserialize(byte[] bytes) {  
  7.         try {  
  8.             String msg = new String(bytes, "UTF-8");  
  9.             return new Values(msg);  
  10.         } catch (InvalidProtocolBufferException e) {  
  11.             LOGGER.error("Cannot parse the provided message!");  
  12.         }  
  13.          
  14.         //TODO: what happend if returns null?  
  15.         return null;  
  16.     }  
  17.   
  18.     @Override  
  19.     public Fields getOutputFields() {  
  20.         return new Fields("msg");  
  21.     }  
  22.   
  23. }  
复制代码

这个解码方式是与Producer端生成时塞入数据的编码方式配套的。这里我Producer端塞入的是String的byte,所以这里也还原成String,定义输出为一个名叫"msg"的field。

后面就可以自己添加Bolt处理tuple中该field的数据了。


使用TransactionalTridentKafkaSpoutTransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。

  1. TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);  
  2. kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());  
  3.   
  4. TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);  
  5.   
  6. TridentTopology topology = new TridentTopology();  
  7. topology.newStream("test_str", kafkaSpout).shuffle().each(new Fields("msg", new PrintFunction());  
复制代码

看到它并没有要求我们提供zkRoot,因为直接代码里面写死了…… -_-T
地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是  /transactional/test_str/myKafaka


常见问题1. 本地模式无法保存Offset
KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。
本地模式,要显示的去配置

  1. spoutConfig.zkServers = new ArrayList<String>(){{  
  2.                 add("10.1.110.20");  
  3.                 add("10.1.110.21");  
  4.                 add("10.1.110.24");  
  5.             }};  
  6. spoutConfig.zkPort = 2181;  
复制代码
  1. <del><dependency>  
  2.   <groupId>net.wurstmeister.storm</groupId>  
  3.   <artifactId>storm-kafka-0.8-plus</artifactId>  
  4.   <version>0.2.0</version>  
  5.   <exclusion>  
  6.     <groupId>org.slf4j</groupId>  
  7.     <artifactId>slf4j-simple</artifactId>  
  8.   </exclusion>  
  9. </dependency></del>  
复制代码




已有(5)人评论

跳转到指定楼层
anyhuayong 发表于 2014-8-30 19:05:03
好文章,支持
回复

使用道具 举报

quenlang 发表于 2014-10-12 14:40:51
支持一下,感谢分享
回复

使用道具 举报

qzjqzjqzj 发表于 2015-8-23 22:16:36
楼主,我在本地模式下跑storm-kafka,然后我搭了个zookeeper集群,在集群的其中一台服务器上运行了kafka的brokers,然后就配置了kafkaSpout,zKhost是集群的ip+port列表,还配置了zkRoot(就叫/kafkastorm,这个zkRoot到底是做什么的?存消息消费到哪用的?按照这个路径在zk上创建相应地目录,存消息消费到哪?),但是本地模式运行的时候,报Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/logs_kafka/partitions;可是我通过zkCli的ls /命令去查zk上的目录,是有这个目录的,这是什么原因呢?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条