分享

storm定时的三种方式及tick详解


问题导读


1.tick是什么功能?
2.如何指定某个bolt每隔一段时间做一些操作?
3.如何实现Topology中的每个bolt都每隔一段时间做一些操作?






背景:

我们知道在java中可以有最少3钟方式来实现定时任务:
1、普通thread(里面使用while循环以及sleep)

2、Timer和TimerTask

3、ScheduledExecutorService ,另外还有功能更全的Quartz框架或者是spring集成的Quartz。当然从标题就知道我们今天不是讲这些东西,而是讲讲storm中自带的定时功能使用,可以使用场景如:每分钟统计订单数据累计数据总和等。当然这其中最好的搭配就是使用kafka来做订单消息推送,目前我们只讲个本地main demo。


一、tick全解

1、tick的功能
Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自systemd的tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理。Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.5上测试。

2、为bolt设置tick
若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以Topology开头的此bolt特定的Config。

1.jpg


这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自systemd的tick stream的tick tuple,因此execute()方法可以实现如下:

2.jpg

3、全局tick
若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:

3.jpg


当我们在整个Topology上设置tick和我们单个运算bolt上冲突时,其优先级如何呢?事实是在更小范围的bolt设置的tick优先级更高


4、定时精度问题
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1-2ms左右。

二、代码实现

1、spout代码

[mw_shl_code=bash,true]public class TickWordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] sentences = {"a","b","c"};
    private int index = 0;
    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index ++;
        if(index >= sentences.length){
            index = 0;
        }
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
        }
    }
    @SuppressWarnings("rawtypes")
    public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
         this.collector = collector;
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

[/mw_shl_code]


2、bolt代码
[mw_shl_code=bash,true]public class TickWordCountBolt extends BaseBasicBolt{
    Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            System.err.println("TickWordCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
            //模拟聚合打印结果
            for (String key : counts.keySet()) {
                System.err.println("key: " + key + "   count: " + counts.get(key));
            }
            //模拟10秒钟的结果处理以后清空操作
            counts.clear();
        } else {
            String result = tuple.getStringByField("word");
            if(counts.get(result) == null){
                counts.put(result, 1);
            }else{
                counts.put(result, counts.get(result) + 1);
            }
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
    //设置10秒发送一次tick心跳
    @SuppressWarnings("static-access")
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return conf;
    }
}

[/mw_shl_code]


3、main调试代码

[mw_shl_code=bash,true]public class TickTest {
    @SuppressWarnings("static-access")
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new TickWordSpout());
        //启动3个线程按word值进行分组处理
        builder.setBolt("count", new TickWordCountBolt(),3).fieldsGrouping("spout", new Fields("word"));
        Config conf = new Config();
        //设置一个全局的Topology发送tick心跳时间,测试优先级
        conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
        }
      }
}
---------------------------------------输出结果------------------------------------------------
TickWordCount bolt: 2016-09-17 12:41:23:031
key: b   count: 3014
TickWordCount bolt: 2016-09-17 12:41:23:041
key: c   count: 3017
TickWordCount bolt: 2016-09-17 12:41:23:053
key: a   count: 3021

TickWordCount bolt: 2016-09-17 12:41:33:031
key: b   count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:041
key: c   count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:053
key: a   count: 3295

TickWordCount bolt: 2016-09-17 12:41:43:031
key: b   count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:041
key: c   count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:053
key: a   count: 3293

TickWordCount bolt: 2016-09-17 12:41:53:031
key: b   count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:041
key: c   count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:053
key: a   count: 3298

TickWordCount bolt: 2016-09-17 12:42:03:031
key: b   count: 3293
TickWordCount bolt: 2016-09-17 12:42:03:041
key: c   count: 3294
TickWordCount bolt: 2016-09-17 12:42:03:053
key: a   count: 3293

[/mw_shl_code]

从这组测试数据来看,每组都是相隔10s执行0延迟,不过在测试中也有发现延迟1-2ms的情况,还是比较精准的。

三、tick实现代码浅显分析

TopologyBuilder.setBolt

[mw_shl_code=bash,true] public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
        return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
    }

[/mw_shl_code]
[mw_shl_code=bash,true]
  public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
        validateUnusedId(id);
        initCommon(id, bolt, parallelism_hint);
        _bolts.put(id, bolt);
        return new BoltGetter(id);
    }[/mw_shl_code]


[mw_shl_code=bash,true]
//Map conf = component.getComponentConfiguration();能够获取设置的tick发送心跳的设置
  private void initCommon(String id, IComponent component, Number parallelism) {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
        Map conf = component.getComponentConfiguration();
        if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
        _commons.put(id, common);
    }


[/mw_shl_code]

作者:
hello_coke

来自:jianshu




已有(2)人评论

跳转到指定楼层
yuwenge 发表于 2016-12-7 17:07:35
补充:
如何在storm应用中实现定时调度

在storm中实时定时调度, 较好的方式是利用spout的nextTuple()不断循环触发你的定时任务。

    首先, 把需要定时调度的逻辑用一个bolt实现,其execute方法的伪代码如下:

[mw_shl_code=bash,true] // 从topology的全局对象 backtype.storm.Config取得上次调度时间
    Date 上次调度时间 = config.get("上次调度时间");

    if ( 如果当前时间 - 上次调度时间 >= 调度间隔 ) {

        // 执行具体业务逻辑
        doTask();

        // 将本次执行时间存入全局对象 backtype.storm.Config
        config.put("上次调度时间", 当前时间);
    }[/mw_shl_code]

    然后, 定义一个专门的定时发送消息的spout,如以下每隔8秒发送一条消息:
[mw_shl_code=bash,true] @Override
    public void nextTuple() {
        try {
            Thread.sleep(8 * 1000);
            collector.emit(new Values("Let's go!"));
        } catch (Exception e) {
            log.error("", e);
        }
    }[/mw_shl_code]


这个spout发出的消息没有任何业务意义,只是为了触发上面的bolt。

补充说明,如果以上代码在执行 config.put() 时抛出异常 “storm Async loop died!” , 那就改为用redis保存最近一次的执行时间。


作者:
林中漫步  




回复

使用道具 举报

Hentai 发表于 2016-12-13 16:20:44
楼主 请问一下 我有多个bolt,每个bolt我都重新了getComponentConfiguration方法,但是只有第一个bolt能定时执行,其他的下游bolt都不能定时,请问该怎么解决呢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条