分享

让你真正明白什么是storm

bioger_hit 发表于 2014-10-16 20:09:21 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 14 41127
本帖最后由 bioger_hit 于 2014-10-17 11:30 编辑

问题导读:
1.你认为什么图形可以显示hadoop与storm的区别?
2.本文是如何形象讲解hadoop与storm的?
3.hadoop map/reduce对应storm那两个概念?
4.storm流由谁来组成?
5.tuple具体是什么形式?


个人总结
当我们在学习一项新的技术时,由于大量信息的涌入,导致我们什么也记不住,如果初学storm,并且对于hadoop有所了解,那么只要知道hadoop中map/reduce所对应storm中的Spout/bolt即可,
那么明白了这两个概念有什么用?作用还是比较大的,因为最起码已经算是storm编程入门了。我们会编写maprecue,那么hadoop基本编程没有问题,所以如果我们会编写Spout/bolt,那么我们就会storm编程了,这是多么简单的一件事情。

更多的概念可以慢慢消化,比如hadoop有master和slave,那么strom是如何对应的,这些疑问可以留作以后。

本文以电梯的方式讲解了hadoop与storm的区别,凸显了它们一个为批量处理,一个为流式处理。


更详细内容参考下面:





什么是Storm?
Storm是:
• 快速且可扩展伸缩
• 容错
• 确保消息能够被处理
• 易于设置和操作
• 开源的分布式实时计算系统
- 最初由Nathan Marz开发
- 使用Java 和 Clojure 编写

我们知道hadoop是批处理,storm是流式处理,那么是什么是批处理,什么刘处理?
Storm和Hadoop主要区别是实时和批处理的区别:

1.jpg



Storm概念 组成:Spout 和Bolt组成Topology。


2.jpg



Tuple是Storm的数据模型,如['jdon',12346]
多个Tuple组成事件流:


3.jpg



Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者。
Bolt 处理Tuples然后再创建新的Tuples流,Bolt相当于事件流的消费者。

Bolt 作为真正业务处理者,主要实现大数据处理的核心功能,比如转换数据,应用相应过滤器,计算和聚合数据(比如统计总和等等) 。
以Twitter的某个Tweet为案例,看看Storm如何处理:

4.jpg



这些tweett贴内容是:“No Small Cell Lung #Cancer(没有小细胞肺癌#癌症)” "An #OnCology Consult...."
这些贴被Spout读取以后,产生Tuple,字段名是tweet,内容是"No Small Cell Lung #Cancer",格式类似:['No Small Cell Lung #Cancer',133221]。
然后进入被流 消费者Bolt进行处理,第一个Bolt是SplitSentence,将tuple内容进行分离,结果成为:一个个单词:"No" "Small" "Cell" "Lung" "#Cancer" ;然后经过第二个Bolt进行过滤HashTagFilter处理,Hash标签是单词中用#标注的,也就是Cancer;再经过HasTagCount计数,可以本地内存缓存这个计数结果,最后通过PrinterBolt打印出标签单词统计结果 。
我们使用Stom所要做的就是编制Spout和Bolt代码:
  1. public class RandomSentenceSpout extends BaseRichSpout {
  2.   SpoutOutputCollector collector;
  3.   Random random;
  4.   //读入外部数据
  5.   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  6.     this.collector = collector;
  7.     random = new Random();
  8.   }
  9.   //产生Tuple
  10.    public void nextTuple() {
  11.     String[] sentences = new String[] {
  12.       "No Small Cell Lung #Cancer",
  13.       "An #OnCology Consultant apple a day keeps the doctor away",
  14.       "four score and seven years ago",
  15.       "snow white and the seven dwarfs",
  16.       "i am at two with nature"
  17.     };
  18.     String tweet = sentences[random.nextInt(sentences.length)];
  19.     //定义字段名"tweet" 的值
  20.     collector.emit(new Values(tweet));
  21.   }
  22.   // 定义字段名"tweet"
  23.   public void declareOutputFields(OutputFieldsDeclarer declarer) {
  24.     declarer.declare(new Fields("tweet"));
  25.   }
  26.   @Override
  27.   public void ack(Object msgId) {}
  28.   @Override
  29.   public void fail(Object msgId) {}
  30. }
复制代码



下面是Bolt的代码编写:
  1. public class SplitSentenceBolt extends BaseRichBolt {
  2.   OutputCollector collector;
  3.   @Override
  4.   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  5.     this.collector = collector;
  6.   }
  7.   @Override 消费者激活主要方法:分离成单个单词
  8.   public void execute(Tuple input) {
  9.     for (String s : input.getString(0).split("\\s")) {
  10.       collector.emit(new Values(s));
  11.     }
  12.   }
  13.   @Override 定义新的字段名
  14.   public void declareOutputFields(OutputFieldsDeclarer declarer) {
  15.     declarer.declare(new Fields("word"));
  16.   }
复制代码



最后是装配运行Spout和Bolt的客户端调用代码:
  1. public class WordCountTopology {
  2.   public static void main(String[] args) throws Exception {
  3.     TopologyBuilder builder = new TopologyBuilder();
  4.     builder.setSpout("tweet", new RandomSentenceSpout(), 2);
  5.     builder.setBolt("split", new SplitSentenceBolt(), 4)
  6.       .shuffleGrouping("tweet")
  7.       .setNumTasks(8);
  8.     builder.setBolt("count", new WordCountBolt(), 6)
  9.       .fieldsGrouping("split", new Fields("word"));
  10.     ..设置多个Bolt
  11.     Config config = new Config();
  12.     config.setNumWorkers(4);
  13.     
  14.     StormSubmitter.submitTopology("wordcount", config, builder.createTopology());
  15. // Local testing
  16. //LocalCluster cluster = new LocalCluster();
  17. // cluster.submitTopology("wordcount", config, builder.createTopology());
  18. //Thread.sleep(10000);
  19. //cluster.shutdown();
  20. }
  21. }
复制代码



在这个代码中定义了一些参数比如Works的数目是4,其含义在后面详细分析。

下面我们要将上面这段代码发布部署到Storm中,首先了解Storm物理架构图:

5.jpg




Nimbus是一个主后台处理器,主要负责:
1.发布分发代码
2.分配任务
3.监控失败。
Supervisor是负责当前这个节点的后台工作处理器的监听。
Work类似Java的线程,采取JDK的Executor 。
下面开始将我们的代码部署到这个网络拓扑中:


6.jpg


将代码Jar包上传到Nimbus的inbox,包括所有的依赖包,然后提交。
Nimbus将保存在本地文件系统,然后开始配置网络拓扑,分配开始拓扑。
见下图:


7.jpg



Nimbus服务器将拓扑Jar 配置和结构下载到 Supervisor,负载平衡ZooKeeper分配某个特定的Supervisor服务器,而Supervisor开始基于配置分配Work,Work调用JDK的Executor启动线程,开始任务处理。
下面是我们代码对拓扑分配的参数示意图:


8.jpg



Executor启动的线程数目是12个,组件的实例是16个,那么如何在实际服务器中分配呢?如下图:


9.jpg



图中RsSpout代表我们的代码中RandomSentenceSpout;SplitSentenceBolt简写为SSbolt;





相关文章:
storm入门介绍









欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

已有(14)人评论

跳转到指定楼层
sunny62520 发表于 2014-10-16 21:50:02
这篇不错,赞
回复

使用道具 举报

梦回三国 发表于 2014-10-16 22:01:37
可以帮忙看一下http://www.aboutyun.com/forum.ph ... amp;extra=#pid47531的问题吗?
回复

使用道具 举报

hahaxixi 发表于 2014-10-17 11:47:41
讲的很详细,不错,准备学来着,谢谢
回复

使用道具 举报

hb1984 发表于 2014-10-17 12:00:14
谢谢楼主分享。            
回复

使用道具 举报

韩克拉玛寒 发表于 2014-10-17 20:11:56
很不错,谢谢,分享以后学习了
回复

使用道具 举报

永无止进 发表于 2014-10-23 12:43:21
好好学习,谢谢
回复

使用道具 举报

break-spark 发表于 2014-10-31 10:42:00
看的有点费劲,当然是自己不懂,storm是部署在Yarn上吗,还是单独部署??
回复

使用道具 举报

caiyifeng 发表于 2014-12-15 16:30:35
好资料,多谢
回复

使用道具 举报

ningbufan 发表于 2014-12-25 14:31:48
谢谢分享,好好学习
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条