本帖最后由 pig2 于 2014-4-14 16:13 编辑
1.Storm中与各个进程与hadoop进程对应关系是怎么样的?
2.Storm中与hadoop对应的mapreduce是什么?
Storm中的很多Bolt都有一个最常见的处理步骤:
读入一个tuple;
根据这个输入tuple,提取后发射0个,1个或多个tuple;
最后,通过ack操作确认这个tuple被成功处理。
按照上述处理步骤,依次处理发向这个Bolt的各个tuple元组。
这种模式可以实现像ETL这类的简单函数或过滤器功能,Storm中专门为这种模式封装了相应接口:IBasicBolt。BaseBasicBolt等类实现了这一接口。
为了能更好的理解Storm,及出现的术语,这里提供一张图:
下面是以BaseBasicBolt为基础,按照上述模式实现词频统计的Bolt(代码参考链接:storm-starter):
- public static class WordCount extends BaseBasicBolt {
- //记录每个单词及单词出现的次数
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word); //提取单词出现次数
- if(count==null)
- count = 0;
- count++;
- counts.put(word, count); //更新单词出现次数
- collector.emit(new Values(word, count)); //发射统计结果
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
复制代码
|