本帖最后由 pig2 于 2014-7-24 00:56 编辑
问题导读
1、什么是分组和消息流?
2、Storm以什么方式向每个bolt实例发送消息?
3、怎么创建、使用Storm项目?
本章我们将会创建一个Storm工程和我们的第一个Storm topology。
提示:下述假设你已经安装JRE1.6或者更高级版本。推荐使用Oracle提供的JRE:下载地址
操作模式
在开始创建项目之前,了解Storm的操作模式(operation modes)是很重要的。Storm有两种运行方式:
本地模式
在本地模式下,Storm topologies 运行在本地机器的一个JVM中。因为本地模式下查看所有topology组件共同工作最为简单,所以这种模式被用于开发、测试和调试。例如,我们可以调整参数,这使得我们可以看到我们的topology在不同的Storm配置环境中是如何运行的。为了以本地模式运行topologies,我们需要下载Storm的开发依赖包,这是我们开发、测试topologies所需的所有东西。当我们建立自己的第一个Storm工程的时候我们很快就可以看到是怎么回事了。
提示:本地模式下运行一个topology同Storm集群中运行类似。然而,确保所有组件线程安全非常重要,因为当它们被部署到远程模式时,它们可能运行在不同的JVM或者不同的物理机器上,此时,它们之间不能直接交流或者共享内存。
在本章的所有示例中,我们都以本地模式运行。
远程模式
在远程模式下,我们将topology提交到Storm集群中,Storm集群由许多进程组成,这些进程通常运行在不同的机器上。远程模式下不显示调试信息,这也是它被认为是产品模式的原因。然而,在一台机器上创建一个Storm集群也是可能的,并且在部署至产品前这样做还是一个好方法,它可以确保将来在一个成熟的产品环境中运行topology不会出现任何问题。
Hello World Storm
在这个项目中,我们会建立一个简单的topology来统计单词个数,我们可以将它看成是Storm topologies中的“Hello World”。然而,它又是一个非常强大的topology,因为它几乎可以扩展到无限大小,并且经过小小的修改,我们甚至可以使用它创建一个统计系统。例如,我们可以修改本项目来找到Twitter上的热门话题。
为了建立这个topology,我们将使用一个spout来负责从文件中读取单词,第一个bolt来标准化单词,第二个bolt去统计单词个数,如图2-1所示:
topology入门图2-1.topology入门
你可以在https://github.com/storm-book/ex ... rted/zipball/master下载本例源码的ZIP文件。
译者的话:本站有备份:下载地址
提示:如果你使用git(一个分布式的版本控制和源码管理工具),则可以运行命令:git clone git@github.com:storm-book/examplesch02-getting_started.git进入你想要下载的源码所在的目录。
检查Java安装
搭建环境的第一步就是检查正在运行的Java版本。运行java -version命令,我们可以看到类似如下信息:
- java -version
- java version “1.6.0_26″
- Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
- Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)
复制代码
如果没有,检查下你的Java安装。下载地址
创建项目
首先,创建一个文件夹,用于存放这个应用(就像对于任何Java应用一样),该文件夹包含了整个项目的源代码。
接着我们需要下载Storm的依赖包——添加到本应用classpath的jar包集合。可以通过下面两种方式完成:
下载依赖包,解压,并将它们加入classpath路径
使用Apache Maven
提示:Maven是一个软件项目管理工具,可以用于管理一个项目开发周期中的多个方面(从从依赖包到发布构建过程),在本书中我们会广泛使用Maven。可以使用mvn命令检查maven是否安装,如果未安装,可以从下载,地址。
下一步我们需要新建一个pom.xml文件(pom:project object model,项目的对象模型)去定义项目的结构,该文件描述了依赖包、封装、源码等等。这里我们将使用由nathanmarz构建的依赖包和Maven库,这些依赖包可以在这里找到。
提示:Storm的Maven依赖包引用了在本地模式下运行Storm所需的所有函数库。
使用这些依赖包,我们可以写一个包含运行topology基本的必要组件的pom.xml文件:
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
- <groupId>storm.book</groupId>
- <artifactId>Getting-Started</artifactId>
- <version>0.0.1-SNAPSHOT</version>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- <compilerVersion>1.6</compilerVersion>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <repositories>
- <!-- Repository where we can found the storm dependencies -->
- <repository>
- <id>clojars.org</id>
- <url>http://clojars.org/repo</url>
- </repository>
- </repositories>
-
- <dependencies>
- <!-- Storm Dependency -->
- <dependency>
- <groupId>storm</groupId>
- <artifactId>storm</artifactId>
- <version>0.6.0</version>
- </dependency>
- </dependencies>
-
- </project>
复制代码
前几行指定了项目名称、版本;然后我们添加了一个编译器插件,该插件告诉Maven我们的代码应该用Java1.6编译;接着我们定义库(repository)(Maven支持同一个项目的多个库),clojars是Storm依赖包所在的库,Maven会自动下载本地模式运行Storm需要的所有子依赖包。
本项目的目录结构如下,它是一个典型的Maven Java项目。
java目录下的文件夹包含了我们的源代码,并且我们会将我们的单词文件放到resources文件夹中来处理。
创建第一个topology
为建立我们第一个topology,我们要创建运行本例(统计单词个数)的所有的类。本阶段例子中的有些部分不清楚很正常,我们将在接下来的几个章节中进一步解释它们。
Spout(WordReader类)
WordReader类实现了IRichSpout接口,该类负责读取文件并将每一行发送到一个bolt中去。
提示:spout发送一个定义字段(field)的列表,这种架构允许你有多种bolt读取相同的spout流,然后这些bolt可以定义字段(field)供其他bolt消费。
例2-1包含WordReader类的完整代码(后面会对代码中的每个部分进行分析)
例2-1.src/main/java/spouts/WordReader.java
- package spouts;
-
- import java.io.BufferedReader;
- import java.io.FileNotFoundException;
- import java.io.FileReader;
- import java.util.Map;
-
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichSpout;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
-
- public class WordReader implements IRichSpout{
- private SpoutOutputCollector collector;
- private FileReader fileReader;
- private boolean completed=false;
- private TopologyContext context;
-
- public boolean isDistributed(){return false;}
-
- public void ack(Object msgId) {
- System.out.println("OK:"+msgId);
- }
-
- public void close(){}
-
- public void fail(Object msgId) {
- System.out.println("FAIL:"+msgId);
- }
-
- /**
- * 该方法用于读取文件并发送文件中的每一行
- */
-
- public void nextTuple() {
- /**
- * The nextuple it is called forever, so if we have beenreaded the file
- * we will wait and then return
- */
- if(completed){
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- //Do nothing
- }
- return;
- }
- String str;
- //Open the reader
- BufferedReader reader =new BufferedReader(fileReader);
- try{
- //Read all lines
- while((str=reader.readLine())!=null){
- /**
- * By each line emmit a new value with the line as a their
- */
- this.collector.emit(new Values(str),str);
- }
- }catch(Exception e){
- throw new RuntimeException("Errorreading tuple",e);
- }finally{
- completed = true;
- }
- }
-
- /**
- * We will create the file and get the collector object
- */
-
- public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
- try {
- this.context=context;
- this.fileReader=new FileReader(conf.get("wordsFile").toString());
- } catch(FileNotFoundException e) {
- throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
- }
- this.collector=collector;
- }
-
- /**
- * 声明输出字段“line”
- */
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("line"));
- }
-
- }
复制代码
在任何spout中调用的第一个方法都是open()方法,该方法接收3个参数:
TopologyContext:它包含了所有的topology数据
conf对象:在topology定义的时候被创建
SpoutOutputCollector:该类的实例可以让我们发送将被bolts处理的数据。
下面的代码块是open()方法的实现:
- public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
- try {
- this.context=context;
- this.fileReader=new FileReader(conf.get("wordsFile").toString());
- } catch(FileNotFoundException e) {
- throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
- }
- this.collector=collector;
- }
复制代码
在open()方法中,我们也创建了reader,它负责读文件。接着,我们需要实现nextTuple()方法,在该方法中发送要被bolt处理的值(values)。在我们的例子中,这个方法读文件并且每行发送一个值。
- public void nextTuple() {
- if(completed){
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- //Do nothing
- }
- return;
- }
- String str;
- //Open the reader
- BufferedReader reader =new BufferedReader(fileReader);
- try{
- //Read all lines
- while((str=reader.readLine())!=null){
- /**
- * By each line emmit a new value with the line as a their
- */
- this.collector.emit(new Values(str),str);
- }
- }catch(Exception e){
- throw new RuntimeException("Errorreading tuple",e);
- }finally{
- completed = true;
- }
- }
复制代码
提示:Values类是ArrayList的一个实现,将列表中的元素传递到构造方法中。
nextTuple()方法被周期性地调用(和ack()、fail()方法相同的循环),当没有工作要做时,nextTuple()方法必须释放对线程的控制,以便其他的方法有机会被调用。因此必须在nextTuple()第一行检查处理是否完成,如果已经完成,在返回前至少应该休眠1秒来降低处理器的负载,如果还有工作要做,则将文件中的每一行读取为一个值并发送出去。
提示:元组(tuple)是一个值的命名列表,它可以是任何类型的Java对象(只要这个对象是可以序列化的)。默认情况下,Storm可以序列化的常用类型有strings、byte arrays、ArrayList、HashMap和HashSet。
Bolt(WordNormalizer&WordCounter类)
上面我们设计了一个spout来读取文件,并且每读取一行发送一个元组(tuple)。现在,我们需要创建两个bolt处理这些元组(见图2-1)。这些bolt实现了IRichBolt接口。
在bolt中,最重要的方法是execute()方法,每当bolt收到一个元组,该方法就会被调用一次,对于每个收到的元组,该bolt处理完之后又会发送几个bolt。
提示:一个spout或bolt可以发送多个tuple,当nextTuple()或execute()方法被调用时,它们可以发送0、1或者多个元组。在第五章中你将会了解到更多。
第一个bolt,WordNormalizer,负责接收每一行,并且将行标准化——它将行分解为一个个的单词后转化成小写,并且消除单词前后的空格。
首先,我们需要声明bolt的输出参数:
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
复制代码
这儿,我们声明bolt发送一个命名为“word”的字段。
接着,我们实现execute方法,输入的tuple将会在这个方法中被处理:
- public void execute(Tuple input) {
- String sentence = input.getString(0);
- String[]words= sentence.split(" ");
- for(String word:words){
- word =word.trim();
- if(!word.isEmpty()){
- word =word.toLowerCase();
- //Emit the word
- List a =new ArrayList();
- a.add(input);
- collector.emit(a,new Values(word));
- }
- }
- // Acknowledge the tuple
- collector.ack(input);
- }
复制代码
第一行读取元组中的值,可以按照位置或者字段命名读取。值被处理后使用collector对象发送出去。当每个元组被处理完之后,就会调用collector的ack()方法,表明该tuple成功地被处理。如果tuple不能被处理,则应该调用collector的fail()方法。
例2-2包含这个类的完整代码。
例2-2.src/main/java/bolts/WordNormalizer.java
- package bolts;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
-
- public class WordNormalizer implements IRichBolt{
- private OutputCollector collector;
-
- public void cleanup(){}
-
- /**
- * The bolt will receive the line from the
- * words file and process it to Normalize this line
- *
- * The normalize will be put the words in lower case
- * and split the line to get all words in this
- */
-
- public void execute(Tuple input) {
- String sentence = input.getString(0);
- String[]words= sentence.split(" ");
- for(String word:words){
- word =word.trim();
- if(!word.isEmpty()){
- word =word.toLowerCase();
- //Emit the word
- List a =new ArrayList();
- a.add(input);
- collector.emit(a,new Values(word));
- }
- }
- // Acknowledge the tuple
- collector.ack(input);
- }
-
- public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
- this.collector=collector;
- }
-
- /**
- * The bolt will only emit the field "word"
- */
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- }
复制代码
提示:在这个类中,每调用一次execute()方法,会发送多个元组。例如,当execute()方法收到“This is the Storm book”这个句子时,该方法会发送5个新元组。
第二个bolt,WordCounter,负责统计每个单词个数。当topology结束时(cleanup()方法被调用时),显示每个单词的个数。
提示:第二个bolt中什么也不发送,本例中,将数据添加到一个map对象中,但是现实生活中,bolt可以将数据存储到一个数据库中。
- package bolts;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Tuple;
-
- public class WordCounter implements IRichBolt{
- Integer id;
- String name;
- Map<String,Integer>counters;
-
- private OutputCollector collector;
-
- /**
- * At the end of the spout (when the cluster is shutdown
- * We will show the word counters
- */
-
- @Override
-
- public void cleanup(){
- System.out.println("-- Word Counter ["+name+"-"+id+"]--");
- for(Map.Entry<String,Integer>entry: counters.entrySet()){
- System.out.println(entry.getKey()+": "+entry.getValue());
- }
- }
-
- /**
- * On each word We will count
- */
- @Override
-
- public void execute(Tuple input) {
- String str =input.getString(0);
- /**
- * If the word dosn't exist in the map we will create
- * this, if not We will add 1
- */
- if(!counters.containsKey(str)){
- counters.put(str,1);
- }else{
- Integer c =counters.get(str) +1;
- counters.put(str,c);
- }
- //Set the tuple as Acknowledge
- collector.ack(input);
- }
-
- /**
- * On create
- */
-
- @Override
-
- public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
- this.counters=newHashMap<String,Integer>();
- this.collector=collector;
- this.name=context.getThisComponentId();
- this.id=context.getThisTaskId();
- }
-
- @Override
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
- }
复制代码
execute()方法使用一个映射(Map类型)采集单词并统计这些单词个数。当topology结束的时候,cleanup()方法被调用并且打印出counter映射。(这仅仅是个例子,通常情况下,当topology关闭时,你应该使用cleanup()方法关闭活动链接和其他资源。)
主类
在主类中,你将创建topology和一个LocalCluster对象,LocalCluster对象使你可以在本地测试和调试topology。LocalCluster结合Config对象允许你尝试不同的集群配置。例如,如果不慎使用一个全局变量或者类变量,当配置不同数量的worker测试topology的时候,你将会发现这个错误。(关于config对象在第三章会有更多介绍)
提示:所有的topology结点应该可以在进程间没有数据共享的情形下独立运行(也就是说没有全局或者类变量),因为当topology运行在一个真实的集群上时,这些进程可能运行在不同的机器上。
你将使用TopologyBuilder创建topology,TopologyBuilder会告诉Storm怎么安排节点顺序、它们怎么交换数据。
- TopologyBuilder builder =new TopologyBuilder();
- builder.setSpout("word-reader",new WordReader());
- builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
- builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
复制代码
本例中spout和bolt之间使用随机分组(shuffleGrouping)连接,这种分组类型告诉Storm以随机分布的方式从源节点往目标节点发送消息。
接着,创建一个包含topology配置信息的Config对象,该配置信息在运行时会与集群配置信息合并,并且通过prepare()方法发送到所有节点。
- Config conf =new Config();
- conf.put("wordsFile",args[0]);
- conf.setDebug(false);
复制代码
将wordFile属性设置为将要被spout读取的文件名称(文件名在args参数中传入),并将debug属性设置为true,因为你在开发过程中,当debug为true时,Storm会打印节点间交换的所有消息和其他调试数据,这些信息有助于理解topology是如何运行的。
前面提到,你将使用LocalCluster来运行topology。在一个产品环境中,topology会持续运行,但是在本例中,你仅需运行topology几秒钟就能看到结果。
- LocalCluster cluster =new LocalCluster();
- cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
- Thread.sleep(1000);
- cluster.shutdown();
复制代码
使用createTopology和submitTopology创建、运行topology,睡眠两秒(topology运行在不同的线程中),然后通过关闭集群来停止topology。
例2-3将上面代码拼凑到一起。
例2-3.src/main/java/TopologyMain.java
- import spouts.WordReader;
- import bolts.WordCounter;
- import bolts.WordNormalizer;
-
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
-
- public class TopologyMain{
- public static void main(String[]args)throws InterruptedException{
- //Topology definition
- TopologyBuilder builder =new TopologyBuilder();
- builder.setSpout("word-reader",new WordReader());
- builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
- builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
-
- //Configuration
- Config conf =new Config();
- conf.put("wordsFile",args[0]);
- conf.setDebug(false);
-
- //Topology run
- conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
- LocalCluster cluster =new LocalCluster();
- cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
- Thread.sleep(1000);
- cluster.shutdown();
- }
-
- }
复制代码
运行本项目
现在开始准备运行第一个topology!如果你新建一个文本文件(src/main/resources/words.txt)并且每行一个单词,则可以通过如下命令运行这个topology:
- mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt”
复制代码
例如,如果你使用如下words.txt文件:
- Storm
- test
- are
- great
- is
- an
- Storm
- simple
- application
- but
- very
- powerful
- really
- Storm
- is
- great
复制代码
在日志中,你将会看到类似如下信息:
- is: 2
- application: 1
- but: 1
- great: 1
- test: 1
- simple: 1
- Storm: 3
- really: 1
- are: 1
- great: 1
- an: 1
- powerful: 1
- very: 1
复制代码
在本例中,你只使用了每个结点的一个单一实例,假如此时有一个非常大的日志文件怎么去统计每个单词的个数?此时可以很方便地改系统中节点数量来并行工作,如创建WordCounter的两个实例:
- builder.setBolt("word-counter",new WordCounter(),2).shuffleGrouping("word-normalizer");
复制代码
重新运行这个程序,你将看到:
- – Word Counter [word-counter-2] –
- application: 1
- is: 1
- great: 1
- are: 1
- powerful: 1
- Storm: 3
- – Word Counter [word-counter-3] –
- really: 1
- is: 1
- but: 1
- great: 1
- test: 1
- simple: 1
- an: 1
- very: 1
复制代码
太棒了!改变并行度,so easy(当然,在实际生活中,每个实例运行在不同的机器中)。但仔细一看似乎还有点问题:“is”和“great”这两个单词在每个WordCounter实例中都被计算了一次。Why?当使用随机分组(shuffleGrouping)时,Storm以随机分布的方式向每个bolt实例发送每条消息。在这个例子中,将相同的单词发送到同一个WordCounter实例是更理想的。为了实现这个,你可以将shuffleGrounping(“word-normalizer”)改成fieldsGrouping(“word-normalizer”,new Fields(“word”))。尝试一下并重新运行本程序来确认结果。后面的章节你将看到更多关于分组和消息流的内容。
总结
本章我们讨论了Storm的本地操作模式和远程操作模式的不同,以及用Storm开发的强大和简便。同时也学到了更多关于Storm的基本概念,我们将在接下来的章节深入解释这些概念。
上一篇:Storm入门指南第一章 基础知识
最后,感谢原作者的分享:东风化宇
|