分享

storm请问如何定时执行emit操作?

Hentai 发表于 2016-12-7 11:43:03 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 8193
我想每60s 执行一次emit操作 同时清理一个map 该怎么实现呢?

已有(3)人评论

跳转到指定楼层
yuwenge 发表于 2016-12-7 12:32:28
storm中的定时任务可以这样使用

1:在main中设置
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);// 设置本Bolt定时发射数据
2:在bolt中使用代码判断是否是触发用的bolt
tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑

至于逻辑楼主可以自己添加

代码示例
                int sum = 0;
                public void execute(Tuple input) {
                        if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
                                //定时执行的任务
                                System.out.println("定时输出:"+sum);
                                return;
                        }else{
                                //执行正常的tuple处理逻辑        
                                Integer value = input.getIntegerByField("num");
                                sum+=value;
                        }

                }

完整示例:


import java.util.Map;

import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.LocalCluster;
import backtype.storm.messaging.local;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LocalStormTopology {

        /**
         * 数据源spout
         * @author Administrator
         *
         */
        public static class DataSourceSpout extends BaseRichSpout{
                private Map conf;
                private TopologyContext context;
                private SpoutOutputCollector collector;
                /**
                 * 本实例运行时,首先被调用,并且只会被调用一次
                 */
                public void open(Map conf, TopologyContext context,
                                SpoutOutputCollector collector) {
                        this.conf = conf;
                        this.context = context;
                        this.collector = collector;
                }
                /**
                 * 死循环,会一直执行
                 */
                int i = 0;
                public void nextTuple() {
                        System.out.println("spout:"+i);
                        //把数据发送给下一个bolt
                        //values是一个list列表
                        collector.emit(new Values(i++));
                        try {
                                Thread.sleep(1000);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
                /**
                 * 声明输出的字段
                 */
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        //这里的fields和nexttuple中的values是一一对应的。
                        declarer.declare(new Fields("num"));
                }
        }

        /**
         * 对spout发射过来的额数据进行求和
         * @author Administrator
         *
         */
        public static class sumbolt extends BaseRichBolt{
                private Map stormConf;
                private TopologyContext context;
                private OutputCollector collector;


                public void prepare(Map stormConf, TopologyContext context,
                                OutputCollector collector) {
                        this.stormConf = stormConf;
                        this.context = context;
                        this.collector = collector;
                }

                /**
                 * 相当于是死循环--接收spout发射过来的数据
                 */
                int sum = 0;
                public void execute(Tuple input) {
                        if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
                                //定时执行的任务
                                System.out.println("定时输出:"+sum);
                                return;
                        }else{
                                //执行正常的tuple处理逻辑        
                                Integer value = input.getIntegerByField("num");
                                sum+=value;
                        }

                }

                public void declareOutputFields(OutputFieldsDeclarer declarer) {

                }
        }

        public static void main(String[] args) {
                TopologyBuilder topologyBuilder = new TopologyBuilder();
                topologyBuilder.setSpout("spout_id", new DataSourceSpout());
                topologyBuilder.setBolt("bolt_id", new sumbolt()).shuffleGrouping("spout_id");

                LocalCluster localCluster = new LocalCluster();
                Config config = new Config();
                config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
                localCluster.submitTopology(LocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology());

        }


}





回复

使用道具 举报

Hentai 发表于 2016-12-7 13:45:41
yuwenge 发表于 2016-12-7 12:32
storm中的定时任务可以这样使用

1:在main中设置

你写的这个是所有的bolt都是定时的吧?我只想设置某一个bolt定时
回复

使用道具 举报

xuanxufeng 发表于 2016-12-7 17:04:49
Hentai 发表于 2016-12-7 13:45
你写的这个是所有的bolt都是定时的吧?我只想设置某一个bolt定时

楼主参考这篇文章,可以指定特定的bolt

storm定时的三种方式及tick详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20502


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条