分享

最简单的JStorm例子分为4个步骤

admin 发表于 2014-12-28 17:52:46 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 104938
问题导读

1.如何写Topology?
2.JStorm例子,本文讲哪四个步骤?








生成Topology
  1. Map conf = new HashMp();
  2. //topology所有自定义的配置均放入这个Map
  3. TopologyBuilder builder = new TopologyBuilder();
  4. //创建topology的生成器
  5. int spoutParal = get("spout.parallel", 1);
  6. //获取spout的并发设置
  7. SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
  8.                 new SequenceSpout(), spoutParal);
  9. //创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格
  10. int boltParal = get("bolt.parallel", 1);
  11. //获取bolt的并发设置
  12. BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
  13.                 boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
  14. //创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,
  15. //shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),
  16. //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,
  17. //即每个spout随机轮询发送tuple到下一级bolt中
  18. int ackerParal = get("acker.parallel", 1);
  19. Config.setNumAckers(conf, ackerParal);
  20. //设置表示acker的并发数
  21. int workerNum = get("worker.num", 10);
  22. conf.put(Config.TOPOLOGY_WORKERS, workerNum);
  23. //表示整个topology将使用几个worker
  24. conf.put(Config.STORM_CLUSTER_MODE, "distributed");
  25. //设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行
  26. StormSubmitter.submitTopology(streamName, conf,
  27.                 builder.createTopology());
  28. //提交topology
复制代码



IRichSpout
IRichSpout 为最简单的Spout接口
  1. IRichSpout{
  2.     @Override
  3.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  4.     }
  5.     @Override
  6.     public void close() {
  7.     }
  8.     @Override
  9.     public void activate() {
  10.     }
  11.     @Override
  12.     public void deactivate() {
  13.     }
  14.     @Override
  15.     public void nextTuple() {
  16.     }
  17.     @Override
  18.     public void ack(Object msgId) {
  19.     }
  20.     @Override
  21.     public void fail(Object msgId) {
  22.     }
  23.     @Override
  24.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  25.     }
  26.     @Override
  27.     public Map<String, Object> getComponentConfiguration() {
  28.         return null;
  29.     }
复制代码


其中注意:
  • spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
  • spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
  • open是当task起来后执行的初始化动作
  • close是当task被shutdown后执行的动作
  • activate 是当task被激活时,触发的动作
  • deactivate 是task被deactive时,触发的动作
  • nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
  • ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
  • fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
  • declareOutputFields, 定义spout发送数据,每个字段的含义
  • getComponentConfiguration 获取本spout的component 配置
Bolt
  1. IRichBolt {
  2.     @Override
  3.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  4.     }
  5.     @Override
  6.     public void execute(Tuple input) {
  7.     }
  8.     @Override
  9.     public void cleanup() {
  10.     }
  11.     @Override
  12.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  13.     }
  14.     @Override
  15.     public Map<String, Object> getComponentConfiguration() {
  16.         return null;
  17.     }
  18. }
复制代码



其中注意:
  • bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
  • bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
  • prepare是当task起来后执行的初始化动作
  • cleanup是当task被shutdown后执行的动作
  • execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
  • declareOutputFields, 定义bolt发送数据,每个字段的含义
  • getComponentConfiguration 获取本bolt的component 配置
编译
在Maven中配置
         
  1.      <dependency>
  2.             <groupId>com.alibaba.jstorm</groupId>
  3.             <artifactId>jstorm-client</artifactId>
  4.             <version>0.9.3.1</version>
  5.             <scope>provided</scope>
  6.         </dependency>
  7.          <dependency>
  8.             <groupId>com.alibaba.jstorm</groupId>
  9.             <artifactId>jstorm-client-extension</artifactId>
  10.             <version>0.9.3.1</version>
  11.             <scope>provided</scope>
  12.         </dependency>
复制代码


如果找不到jstorm-client和jstorm-client-extension包,可以自己下载jstorm源码进行编译,请参考 源码编译
打包时,需要将所有依赖打入到一个包中
  1. <build>
  2.         <plugins>
  3.             <plugin>
  4.                 <artifactId>maven-assembly-plugin</artifactId>
  5.                 <configuration>
  6.                     <descriptorRefs>
  7.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  8.                     </descriptorRefs>
  9.                     <archive>
  10.                         <manifest>
  11.                             <mainClass>storm.starter.SequenceTopology</mainClass>
  12.                         </manifest>
  13.                     </archive>
  14.                 </configuration>
  15.                 <executions>
  16.                     <execution>
  17.                         <id>make-assembly</id>
  18.                         <phase>package</phase>
  19.                         <goals>
  20.                             <goal>single</goal>
  21.                         </goals>
  22.                     </execution>
  23.                 </executions>
  24.             </plugin>
  25.             <plugin>
  26.                 <groupId>org.apache.maven.plugins</groupId>
  27.                 <artifactId>maven-compiler-plugin</artifactId>
  28.                 <configuration>
  29.                     <source>1.6</source>
  30.                     <target>1.6</target>
  31.                 </configuration>
  32.             </plugin>
  33.         </plugins>
  34.     </build>
复制代码



提交jar
  1. jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
复制代码



  • xxxx.jar 为打包后的jar
  • com.alibaba.xxxx.xx 为入口类,即提交任务的类
  • parameter即为提交参数


已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条