分享

Storm实战之WordCount

fc013 发表于 2015-10-24 19:03:41 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 21570


问题导读:

1.核心组件中的一个spout的作用是什么?
2.核心组件中的两个bolt的作用是什么?
3.核心组件中的一个Topology的作用是什么?





在storm环境部署完毕,并正确启动之后,现在就可以真正进入storm开发了,按照惯例,以wordcount作为开始。
这个例子很简单,核心组件包括:一个spout,两个bolt,一个Topology。

spout从一个路径读取文件,然后readLine,向bolt发射,一个文件处理完毕后,重命名,以不再重复处理。

第一个bolt将从spout接收到的字符串按空格split,产生word,发射给下一个bolt。

第二个bolt接收到word后,统计、计数,放到HashMap容器中。

1,定义一个spout,作用是源源不断滴向bolt发射字符串。

[mw_shl_code=java,true]import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {
    private static final long serialVersionUID = 2197521792014017918L;
    private String inputPath;
    private SpoutOutputCollector collector;

    @Override
    @SuppressWarnings(\"rawtypes\")
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        inputPath = (String) conf.get(\"INPUT_PATH\");
    }

    @Override
    public void nextTuple() {
        Collection<File> files = FileUtils.listFiles(new File(inputPath),
                FileFilterUtils.notFileFilter(FileFilterUtils.suffixFileFilter(\".bak\")), null);
        for (File f : files) {
            try {
                List<String> lines = FileUtils.readLines(f, \"UTF-8\");
                for (String line : lines) {
                    collector.emit(new Values(line));
                }
                FileUtils.moveFile(f, new File(f.getPath() + System.currentTimeMillis() + \".bak\"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(\"line\"));
    }

}[/mw_shl_code]


2,定义一个bolt,作用是接收spout发过来的字符串,并分割成word,发射给下一个bolt。

[mw_shl_code=java,true]import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordSpliter extends BaseBasicBolt {

    private static final long serialVersionUID = -5653803832498574866L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String line = input.getString(0);
        String[] words = line.split(\" \");
        for (String word : words) {
            word = word.trim();
            if (StringUtils.isNotBlank(word)) {
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(\"word\"));

    }

}[/mw_shl_code]


3,定义一个bolt,接收word,并统计。

[mw_shl_code=java,true]import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {
    private static final long serialVersionUID = 5683648523524179434L;
    private HashMap<String, Integer> counters = new HashMap<String, Integer>();
    private volatile boolean edit = false;

    @Override
    @SuppressWarnings(\"rawtypes\")
    public void prepare(Map stormConf, TopologyContext context) {
        final long timeOffset = Long.parseLong(stormConf.get(\"TIME_OFFSET\").toString());
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    if (edit) {
                        for (Entry<String, Integer> entry : counters.entrySet()) {
                            System.out.println(entry.getKey() + \" : \" + entry.getValue());
                        }
                        System.out.println(\"WordCounter---------------------------------------\");
                        edit = false;
                    }
                    try {
                        Thread.sleep(timeOffset * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
        edit = true;
        System.out.println(\"WordCounter+++++++++++++++++++++++++++++++++++++++++++\");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}[/mw_shl_code]


注意WordCounter类的prepare方法,里面定义了一个Thread,持续监控容器的变化(word个数增加或者新增word)。

4,定义一个Topology,提交作业。

[mw_shl_code=java,true]public class WordCountTopo {
    public static void main(String[] args) {
        if (args.length != 2) {
            System.err.println(\"Usage: inputPaht timeOffset\");
            System.err.println(\"such as : java -jar WordCount.jar D://input/ 2\");
            System.exit(2);
        }
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(\"word-reader\", new WordReader());
        builder.setBolt(\"word-spilter\", new WordSpliter()).shuffleGrouping(\"word-reader\");
        builder.setBolt(\"word-counter\", new WordCounter()).shuffleGrouping(\"word-spilter\");
        String inputPaht = args[0];
        String timeOffset = args[1];
        Config conf = new Config();
        conf.put(\"INPUT_PATH\", inputPaht);
        conf.put(\"TIME_OFFSET\", timeOffset);
        conf.setDebug(false);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(\"WordCount\", conf, builder.createTopology());
    }
}[/mw_shl_code]


5,代码完成后,导出jar(导出时不要指定Main class),然后上传至storm集群,通过命令./storm jar com.x.x.WordCountTopo /data/tianzhen/input 2来提交作业。

Topo启动,spout、bolt执行过程:

28912557_1425635927TGq6.jpg


Thread监控的统计结果:


28912557_142563610347II.jpg


源文件处理之后被重命名为*.bak。

和Hadoop不同,在任务执行完之后,Topo不会停止,spout会一直监控数据源,不停地往bolt发射数据。
所以现在如果源数据发生变化,应该能够立马体现出来。我往path下再放一个文本文件,结果:


28912557_1425636745Ir7I.jpg


可见,结果立刻更新了,storm的实时性就体现在这里。




原文链接:http://blog.itpub.net/28912557/viewspace-1450885/

已有(3)人评论

跳转到指定楼层
536528395 发表于 2015-12-15 17:29:37
你好,我没有权限发贴,就在这问个问题把:
我的spolt{"storm", "hadoop", "hive", "flume"} 死循环随机发送这四个中的一个 单词,
然后 第一个bolt 接受 然后啥也没干直接发走了,第二个bolt 按字段接受 并写到文件中。。(writer = new FileWriter("/home/test-hadoop/storm/" + this);) 我的worker设置为2个,bolt都设置的为4个。spolt设置为2个。 集群一共三个节点。那么我一共应该会生成几个文件呢?四个?每个单词写一个文件?

我的结果:生成了四个文件,但是有个文件是空的没有往里面写东西,然后有个文件里面有两个单词,这是怎么个情况阿?
回复

使用道具 举报

leegh1992 发表于 2016-1-1 23:13:10
536528395 发表于 2015-12-15 17:29
你好,我没有权限发贴,就在这问个问题把:
我的spolt{"storm", "hadoop", "hive", "flume"} 死循环随机发 ...

Storm里面有很多消息分发策略,你可以看下。常用的是前两种,你说的应该是用的第二种,你可以Spout可以多输入一些。
1、Shuffle Grouping:随机分组,随机派发stream里面的tuple,保证每个bolt接收到的tuple数目相同。
2、Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts,而不同的userid则会被分配到不同的Bolts。
3、All Grouping:广播发送,对于每一个tuple,所有的Bolts都会收到。
4、Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5、Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
6、Direct Grouping:直接分组,  这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
7、Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条