Hentai 发表于 2016-12-7 11:43:03

storm请问如何定时执行emit操作?

我想每60s 执行一次emit操作 同时清理一个map 该怎么实现呢?

yuwenge 发表于 2016-12-7 12:32:28

storm中的定时任务可以这样使用

1:在main中设置
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);// 设置本Bolt定时发射数据
2:在bolt中使用代码判断是否是触发用的bolt
tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑

至于逻辑楼主可以自己添加

代码示例
                int sum = 0;
                public void execute(Tuple input) {
                        if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
                              //定时执行的任务
                              System.out.println("定时输出:"+sum);
                              return;
                        }else{
                              //执行正常的tuple处理逻辑      
                              Integer value = input.getIntegerByField("num");
                              sum+=value;
                        }

                }

完整示例:


import java.util.Map;

import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.LocalCluster;
import backtype.storm.messaging.local;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LocalStormTopology {

      /**
         * 数据源spout
         * @author Administrator
         *
         */
      public static class DataSourceSpout extends BaseRichSpout{
                private Map conf;
                private TopologyContext context;
                private SpoutOutputCollector collector;
                /**
               * 本实例运行时,首先被调用,并且只会被调用一次
               */
                public void open(Map conf, TopologyContext context,
                              SpoutOutputCollector collector) {
                        this.conf = conf;
                        this.context = context;
                        this.collector = collector;
                }
                /**
               * 死循环,会一直执行
               */
                int i = 0;
                public void nextTuple() {
                        System.out.println("spout:"+i);
                        //把数据发送给下一个bolt
                        //values是一个list列表
                        collector.emit(new Values(i++));
                        try {
                              Thread.sleep(1000);
                        } catch (InterruptedException e) {
                              e.printStackTrace();
                        }
                }
                /**
               * 声明输出的字段
               */
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        //这里的fields和nexttuple中的values是一一对应的。
                        declarer.declare(new Fields("num"));
                }
      }

      /**
         * 对spout发射过来的额数据进行求和
         * @author Administrator
         *
         */
      public static class sumbolt extends BaseRichBolt{
                private Map stormConf;
                private TopologyContext context;
                private OutputCollector collector;


                public void prepare(Map stormConf, TopologyContext context,
                              OutputCollector collector) {
                        this.stormConf = stormConf;
                        this.context = context;
                        this.collector = collector;
                }

                /**
               * 相当于是死循环--接收spout发射过来的数据
               */
                int sum = 0;
                public void execute(Tuple input) {
                        if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
                              //定时执行的任务
                              System.out.println("定时输出:"+sum);
                              return;
                        }else{
                              //执行正常的tuple处理逻辑      
                              Integer value = input.getIntegerByField("num");
                              sum+=value;
                        }

                }

                public void declareOutputFields(OutputFieldsDeclarer declarer) {

                }
      }

      public static void main(String[] args) {
                TopologyBuilder topologyBuilder = new TopologyBuilder();
                topologyBuilder.setSpout("spout_id", new DataSourceSpout());
                topologyBuilder.setBolt("bolt_id", new sumbolt()).shuffleGrouping("spout_id");

                LocalCluster localCluster = new LocalCluster();
                Config config = new Config();
                config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
                localCluster.submitTopology(LocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology());

      }


}





Hentai 发表于 2016-12-7 13:45:41

yuwenge 发表于 2016-12-7 12:32
storm中的定时任务可以这样使用

1:在main中设置


你写的这个是所有的bolt都是定时的吧?我只想设置某一个bolt定时

xuanxufeng 发表于 2016-12-7 17:04:49

Hentai 发表于 2016-12-7 13:45
你写的这个是所有的bolt都是定时的吧?我只想设置某一个bolt定时
楼主参考这篇文章,可以指定特定的bolt

storm定时的三种方式及tick详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20502


页: [1]
查看完整版本: storm请问如何定时执行emit操作?