storm中的定时任务可以这样使用
1:在main中设置
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);// 设置本Bolt定时发射数据
2:在bolt中使用代码判断是否是触发用的bolt
tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑
至于逻辑楼主可以自己添加
代码示例
int sum = 0;
public void execute(Tuple input) {
if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
//定时执行的任务
System.out.println("定时输出:"+sum);
return;
}else{
//执行正常的tuple处理逻辑
Integer value = input.getIntegerByField("num");
sum+=value;
}
}
完整示例:
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.LocalCluster;
import backtype.storm.messaging.local;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LocalStormTopology {
/**
* 数据源spout
* @author Administrator
*
*/
public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/**
* 本实例运行时,首先被调用,并且只会被调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* 死循环,会一直执行
*/
int i = 0;
public void nextTuple() {
System.out.println("spout:"+i);
//把数据发送给下一个bolt
//values是一个list列表
collector.emit(new Values(i++));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 声明输出的字段
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//这里的fields和nexttuple中的values是一一对应的。
declarer.declare(new Fields("num"));
}
}
/**
* 对spout发射过来的额数据进行求和
* @author Administrator
*
*/
public static class sumbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
/**
* 相当于是死循环--接收spout发射过来的数据
*/
int sum = 0;
public void execute(Tuple input) {
if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
//定时执行的任务
System.out.println("定时输出:"+sum);
return;
}else{
//执行正常的tuple处理逻辑
Integer value = input.getIntegerByField("num");
sum+=value;
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout_id", new DataSourceSpout());
topologyBuilder.setBolt("bolt_id", new sumbolt()).shuffleGrouping("spout_id");
LocalCluster localCluster = new LocalCluster();
Config config = new Config();
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
localCluster.submitTopology(LocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology());
}
}
|