问题导读
1.tick是什么功能?
2.如何指定某个bolt每隔一段时间做一些操作?
3.如何实现Topology中的每个bolt都每隔一段时间做一些操作?
背景:
我们知道在java中可以有最少3钟方式来实现定时任务:
1、普通thread(里面使用while循环以及sleep)
2、Timer和TimerTask
3、ScheduledExecutorService ,另外还有功能更全的Quartz框架或者是spring集成的Quartz。当然从标题就知道我们今天不是讲这些东西,而是讲讲storm中自带的定时功能使用,可以使用场景如:每分钟统计订单数据累计数据总和等。当然这其中最好的搭配就是使用kafka来做订单消息推送,目前我们只讲个本地main demo。
一、tick全解
1、tick的功能
Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自systemd的tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理。Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.5上测试。
2、为bolt设置tick
若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以Topology开头的此bolt特定的Config。
这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自systemd的tick stream的tick tuple,因此execute()方法可以实现如下:
3、全局tick
若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
当我们在整个Topology上设置tick和我们单个运算bolt上冲突时,其优先级如何呢?事实是在更小范围的bolt设置的tick优先级更高
4、定时精度问题
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1-2ms左右。
二、代码实现
1、spout代码
[mw_shl_code=bash,true]public class TickWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {"a","b","c"};
private int index = 0;
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index ++;
if(index >= sentences.length){
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
@SuppressWarnings("rawtypes")
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
[/mw_shl_code]
2、bolt代码
[mw_shl_code=bash,true]public class TickWordCountBolt extends BaseBasicBolt{
Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
System.err.println("TickWordCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
//模拟聚合打印结果
for (String key : counts.keySet()) {
System.err.println("key: " + key + " count: " + counts.get(key));
}
//模拟10秒钟的结果处理以后清空操作
counts.clear();
} else {
String result = tuple.getStringByField("word");
if(counts.get(result) == null){
counts.put(result, 1);
}else{
counts.put(result, counts.get(result) + 1);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
//设置10秒发送一次tick心跳
@SuppressWarnings("static-access")
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
[/mw_shl_code]
3、main调试代码
[mw_shl_code=bash,true]public class TickTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TickWordSpout());
//启动3个线程按word值进行分组处理
builder.setBolt("count", new TickWordCountBolt(),3).fieldsGrouping("spout", new Fields("word"));
Config conf = new Config();
//设置一个全局的Topology发送tick心跳时间,测试优先级
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
}
}
}
---------------------------------------输出结果------------------------------------------------
TickWordCount bolt: 2016-09-17 12:41:23:031
key: b count: 3014
TickWordCount bolt: 2016-09-17 12:41:23:041
key: c count: 3017
TickWordCount bolt: 2016-09-17 12:41:23:053
key: a count: 3021
TickWordCount bolt: 2016-09-17 12:41:33:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:053
key: a count: 3295
TickWordCount bolt: 2016-09-17 12:41:43:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:053
key: a count: 3293
TickWordCount bolt: 2016-09-17 12:41:53:031
key: b count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:041
key: c count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:053
key: a count: 3298
TickWordCount bolt: 2016-09-17 12:42:03:031
key: b count: 3293
TickWordCount bolt: 2016-09-17 12:42:03:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:42:03:053
key: a count: 3293
[/mw_shl_code]
从这组测试数据来看,每组都是相隔10s执行0延迟,不过在测试中也有发现延迟1-2ms的情况,还是比较精准的。
三、tick实现代码浅显分析
TopologyBuilder.setBolt
[mw_shl_code=bash,true] public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
[/mw_shl_code]
[mw_shl_code=bash,true]
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}[/mw_shl_code]
[mw_shl_code=bash,true]
//Map conf = component.getComponentConfiguration();能够获取设置的tick发送心跳的设置
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
[/mw_shl_code]
作者:
hello_coke
来自:jianshu
|