分享

storm入门及代码示例

pergrand 发表于 2016-8-5 22:42:26 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 25762
storm
什么是storm:
        类似Hadoop的实时数据处理框架
storm和Hadoop的区别
                        Hadoop                                       storm
        数据来源:处理的是hdfs上历史数据,        处理实时新增的数据
        处理过程:map阶段 reduce阶段                数据源spout和处理逻辑bolt
        处理速度:处理hdfs上TB级数据速度慢        处理新增数据,处理实时
        使用场景:不讲究时效,批量处理数据        处理某一新增数据
        hadoop是代码执行完结束,                storm处理的是数据,不会结束
storm设计思想:
        storm是对流stream的抽象,流是一个不间断的无界的连续的tuple.
        topology是storm种最高层次的抽象,它被提交到storm集群,
        拓扑每个节点都要声明它所发射的元组的字段的name,其他节点订阅接收出处理。
storm中的核心概念
        Nimbus: 控制节点,负责集群内分发代码,为worker分配任务,故障检测
        Supervisor: 工作节点
        Worker: 一个节点可以启动1个或多个worker
        Executor: ,一个worker进程运行一个topology的一个或多个excutor线程
        Task:一个executor 运行同一个组件的一个或多个task
        Spout :组件,消息源,从数据库或者文档中读取数据向topology发送tuple
        Bolt :组件,消息处理者,处理输入的数据流并产生新的输出数据流,可执行过滤聚合查询数据库等操作
        Topology:集群中运行的一个个job任务。
        Tuple:一个tuple就是一个值列表value list,
        Stream: 消息流,
        Stream Grouping:消息分发策略,定义如何分配给bolt

   每个topology默认一个worker,一个superbisor节点最大可以启动4个worker,一个spout或者一个bolt会占用一个executor线程,每个线程executor默认启动一个task;
   zookeeper集群负责协调工作,master和worker之间不依赖,而是将信息放到zookeeper,这样可以快速恢复失败一方。
storm本地目录:
        目录位置:在配置文件中storm.local.dir: "/opt/storm/tmp"
        nimbus 目录下有两个目录
                inbox: 客户端上传jar的临时目录
                stormdist:存放正在运行所有的jar包;没有任务就为空
                        jar包下有三个文件:stormjar.jar提交拓扑的jar包
                        stormcode.ser中包含拓扑中各个组件和拓扑的关系
                        stormconf.ser保存着conf对象(代码中设置的conf)                       
        supervisor:
        在zk上状态:启动zkCli.sh查看
                ls /storm下 有workerbeats, errors, supervisors, storms, assignments
                ls /storm/storms 目录内容和配置文件中的tmp下的ls nimbus/stormdist/的内容一样
               

storm安装:(3个节点的集群)
        安装zookeeper集群(略)
        安装storm:修改配置文件vi storm/conf/storm.yaml注意顶格,之间用空格
                storm.zookeeper.servers:
                    - "192.168.1.160"
                    - "192.168.1.161"
                    - "192.168.1.162"
                nimbus.host: "192.168.1.160"
                storm.local.dir: "/usr/local/storm/tmp"
                supervisor.slots.ports:
                    - 6700
                    - 6701     
                    - 6702
                    - 6703

                复制到其他两个节点:scp -rq storm/ 192.168.1.161:/usr/local/

启动storm:
        1.先启动zk:zkServer.sh start (如果时间不一致简单点可以手动修改 date -s 时间/日期)
        2.主节点启动nimbus: nohup bin/storm nimbus >/dev/null 2>&1 &
                启动ui:      nohup bin/storm ui >/dev/null 2>&1 &
           注:nohup是为了关掉窗口进程仍然存在,后面的是把那些启动时的日志信息丢到黑洞中
        3.从节点启动supervisor: nohup bin/storm supervisor >/dev/null 2>&1 &
                启动logviewer: nohup bin/storm logviewer >/dev/null 2>&1 &(可以在浏览器查看日志)
        4.浏览器查看192.168.1.160:8080;如果ui界面不能查看日志
                原因可能是没启动logviewer进程或者是hosts没有配置将主机名和ip映射
        提交一个topology到集群:bin/storm jar 打的jar包 全类名  参数
        停止:在ui界面kill
                使用命令bin/storm list 查看storm中的所有topology信息
                手动bin/storm kill Topology_name

流分组stream Grouping
        1.shuffle Grouping :随机分组,可以达到负载均衡
        2.Feild Grouping :字段分组,可以根据分组字段把分组内容相同的数据交给同一个线程处理
        3.Global Grouping :全局分组,用于汇总,交给一个线程处理
        4.All Grouping :广播分组,每个线程都会手动组件发送的数据
        5.Non Grouping :不分组,类似shuffle Grouping
        6.localOrShuffle Grouping :本地或者随机分组,发射组件和接受组件的线程在同一个worker内部,
                只会让这一个线程去接受数据,其他线程不工作
        7.custom Grouping : 自定义
并行度rebalance
        task是storm最小逻辑单元,task是spout或者nolt的实例,storm中执行的就是task;
        task运行在线程中(executor)
        线程运行在进程worker中
        提高并行度:task-->executor-->worker
                1.提高线程(默认一个线程只有一个task)
                2.提高task数量,不会直接提高并行度,
                        后期动态提高storm的并行度
                        topology提交之后,topology的task不会变了。
                3.提高worker数量。
可靠性
        worker进程挂掉,storm集群重启一个worker进程
        supervisor进程挂掉,不会影响之前提交的topology,这个节点不在是集群一员不会分配任务给他
        nimbus进程挂掉,不会影响之前提交的topoligy,不能再向集群提交新的topology
acker消息确认机制:
        使用acker线程去监控数据的处理状态,ack();fail()
        完全处理:
                只有原始的tuple以及它所有衍生的tuple都被处理成功了,spout的ack方法才被调用
                只要有一个衍生tuple处理失败,那么spout的fail方法就会被调用       
定时任务
        1:全局的定时任务
        在main方法中通过config设置
                config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
                //表示把定时间隔设置为5秒,这个时候strom每隔5秒就会发射一个系统级别的tuple
        2.局部定时任务(经常用;例如实时查询时往往会间隔几秒把把数据存到数据库,实时的话数据太快会看花眼的)
                在对应的bolt类中覆盖一个方法,getComponentConfiguration
                @Override
                public Map<String, Object> getComponentConfiguration() {
                        HashMap<String, Object> hashMap = new HashMap<String,Object>();
                        hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//局部定时任务,时间间隔5秒
                        return hashMap;
                }
stormui
        主要参考值:Bolts下面的Capacity :这个值接近1时,说明它很忙,需要考虑提高并行度
        其他:process latency:   bolt收到一个tuple到bolt ack这个tuple的平均时间
             execute latency:bolt处理一个tuple的平均时间,不包含acker操作
         execute latency和proces latnecy是处理消息的时效性,
         而capacity则表示处理能力是否已经饱和,从这3个参数可以知道topology的瓶颈所在

伪代码:
        1.单词计数和单词出现总次数
        <1>spout 生成数据的spout
        需要继承BaseRichSpout;nextuple将监视的文本按行读取并且发射出去(this.collector.emit(new Values(line));)
        声明字段line,其他组件可以根据这个字段获取declarer.declare(new Fields("line"));代码如下
                public static class MySpout extends BaseRichSpout{
                /* Map conf, 配置参数类,保存的是storm的配置信息
                 * TopologyContext context, 上下文,保存一些全局参数
                 * SpoutOutputCollector collector,发射器,负责发送spout产生的tuple
                 */
                private Map conf;
                private TopologyContext context;
                private SpoutOutputCollector collector;
                //初始化
                @Override
                public void open(Map conf, TopologyContext context,
                                SpoutOutputCollector collector) {
                        this.conf = conf;
                        this.context = context;
                        this.collector = collector;
                }

                @Override
                public void nextTuple() {
                        Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[]{"txt"}, true);
                        for (File file : listFiles) {
                                try {
                                        List<String> readLines = FileUtils.readLines(file);
                                        for (String line : readLines) {
                                                this.collector.emit(new Values(line));
                                        }
                                        FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                                } catch (IOException e) {
                                        e.printStackTrace();
                                }
                        }
                }
               
               
                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        //fields中的参数和values中的参数需要一一对应
                        declarer.declare(new Fields("line"));
                }

        <2>bolt 按行分割单词的bolt
        prepare初始化方法;execute(Tuple input)执行方法,接收spout发射的tuple。String line = input.getStringByField("line");
        这里前面spout按行读取发射,有多少行这里执行多少次并且将分割后的每行的单词发射出去this.collector.emit(new Values(word));
        declareOutputFields声明输出字段word
        public static class SplitBolt extends BaseRichBolt{

                private Map stormConf;
                private TopologyContext context;
                private OutputCollector collector;               
                @Override
                public void prepare(Map stormConf, TopologyContext context,
                                OutputCollector collector) {
                        this.stormConf = stormConf;
                        this.context = context;
                        this.collector = collector;
                }
                @Override
                public void execute(Tuple input) {
                        //接收到一行内容,切割出单词
                        String line = input.getStringByField("line");
                        String[] words = line.split("\t");
                        //把单词发射出去
                        for (String word : words) {
                                this.collector.emit(new Values(word));
                                }
                }
                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        declarer.declare(new Fields("word"));
                }               
        }

        <3>Bolt
        局部汇总单词的bolt
        基本和前面的bolt一样,this.collector.emit(new Values(word,integer));将单词和单词出现的次数都统计发射出去
        声明输出字段"word"和"count" ;declarer.declare(new Fields("word","count"));
        public static class JuBuBolt extends BaseRichBolt{

                private OutputCollector collector;
                @Override
                public void prepare(Map stormConf, TopologyContext context,
                                OutputCollector collector) {
                        this.collector = collector;
                }               
                HashMap<String, Integer> hashMap = new HashMap<String,Integer>();
                @Override
                public void execute(Tuple input) {
                        //获取所有单词进行汇总
                        String word = input.getStringByField("word");
                        Integer integer = hashMap.get(word);
                        if(integer==null){
                                integer=0;
                        }
                        integer++;
                        hashMap.put(word, integer);
                        this.collector.emit(new Values(word,integer));
                }
                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        declarer.declare(new Fields("word","count"));
                }               
        }
        <4>Bolt 获取所有单词汇总这里不需要往外发射tuple所以declareOutputFields方法为空
                public static class AllCountBolt extends BaseRichBolt{
                private Map stormConf;
                private TopologyContext context;
                private OutputCollector collector;
                @Override
                public void prepare(Map stormConf, TopologyContext context,
                                OutputCollector collector) {
                        this.stormConf = stormConf;
                        this.context = context;
                        this.collector = collector;
                }
                HashMap<String, Integer> hashMap = new HashMap<String,Integer>();
                @Override
                public void execute(Tuple input) {
                        //获取所有单词进行汇总
                        String word = input.getStringByField("word");
                        Integer count = input.getIntegerByField("count");
                        hashMap.put(word, count);
                        Utils.sleep(1000);
                        System.err.println("===============================");
                        int num  =0;
                        System.out.println("所有单词去重后的个数:"+hashMap.size());
                        for (Entry<String, Integer> entry : hashMap.entrySet()) {
                                System.out.println(entry);
                                num+=entry.getValue();
                        }
                        System.out.println("所有单词出现的总次数:"+num);
                }
                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                       
                }               
        }
        <5>main
        集群上一个topology就是StormTopology;
        一个task 就是new MySpout();new MyBolt()
        分组策略就是shuffleGrouping("spout1")随机分,达到负载均衡
                fieldsGrouping("bolt1", new Fields("word"))相同的单词都会进入同一线程处理进行统计次数
                globalGrouping("bolt2"),保证所有都汇总到一个线程统计总次数
        config的信息就是被保存在ls nimbus/stormdist/stormconf.ser
        提交到集群StormSubmitter.submitTopology
        提交到本地localCluster.submitTopology
        public static void main(String[] args) {
                        TopologyBuilder topologyBuilder = new TopologyBuilder();
                topologyBuilder.setSpout("spout1", new MySpout());
                topologyBuilder.setBolt("bolt1", new SplitBolt(),2).shuffleGrouping("spout1");
                topologyBuilder.setBolt("bolt2", new JuBuBolt(),2).fieldsGrouping("bolt1", new Fields("word"));
                topologyBuilder.setBolt("bolt3", new AllCountBolt()).globalGrouping("bolt2");
               
                //创建topology
                StormTopology createTopology = topologyBuilder.createTopology();
                //设置storm的参数信息
                Config config = new Config();
                String topology_name = RemoteStormTopology.class.getSimpleName();
               
                if(args.length==0){
                        //创建本地storm集群
                        LocalCluster localCluster = new LocalCluster();
                        //提交topology
                        localCluster.submitTopology(topology_name, config, createTopology);
                }else{
                        try {
                                //提交到集群运行
                                StormSubmitter.submitTopology(topology_name, config, createTopology);
                        } catch (AlreadyAliveException e) {
                                e.printStackTrace();
                        } catch (InvalidTopologyException e) {
                                e.printStackTrace();
                        }
                }               
        }

        2.统计网站PV、UV、DV
        类似这样的日志文档27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
        soput读取每行数据发射出去
        bolt1分割每行将ip发射出去
        bolt2局部汇总,将ip放到hashmap中并ip出现的次数累加
        最后将ip和ip出现的次数发射出去,同时声明字段ip和次数
                String ip = input.getStringByField("ip");
                Integer i = hashMap.get(ip);
                if(i==null){
                        i=0;
                }
                i++;
                hashMap.put(ip, i);
                this.collector.emit(new Values(ip,i));
        bolt3全局汇总:遍历map
                HashMap<String, Integer> hashMap = new HashMap<String,Integer>();
                public void execute(Tuple input) {
                        String ip = input.getStringByField("ip");
                        Integer count = input.getIntegerByField("count");
                        hashMap.put(ip, count);
                        int num  =0;
                        System.out.println("UV个数:"+hashMap.size());
                        for (Entry<String, Integer> entry : hashMap.entrySet()) {       
                                System.out.println(entry+"DV:"+entry.getValue() );                               
                                num+=entry.getValue();
                        }
                        System.out.println(" PV:"+num);
                }
        main :
                        TopologyBuilder topologyBuilder = new TopologyBuilder();
                topologyBuilder.setSpout("spout", new MySpout());
                topologyBuilder.setBolt("bolt1", new SplitBolt()).shuffleGrouping("spout1");
                topologyBuilder.setBolt("bolt2", new JuBuBolt()).fieldsGrouping("bolt1", new Fields("ip"));
                topologyBuilder.setBolt("bolt3", new AllCountBolt()).globalGrouping("bolt2");
其他代码:
        acker机制代码实现:
        1.在spout中的nextTuple()方法中发射时开启acker机制覆盖ack(Object msgId) 和fail(Object msgId) 方法
        通过设置messageid开启消息确认机制
                         * messageid如何赋值呢?
                         * 注意:这个messageid的值一般设置为tuple里面封装的这一条数据的唯一id【在这要保证messageid和tuple中数据的唯一性,需要一一对应】
                         * messageid和tuple中内容的关系需要程序员来维护
                         * 例如:mysql article表:id  content
        messageid = num;
        this.collector.emit(new Values("哈 哈 呵 呵"),messageid);
        2.在bolt的execute(Tuple input)方法中进行处理
        String word = input.getStringByField("word");
                        if(word.equals("哈")){
                                this.collector.fail(input);
                        }else{
                                this.collector.ack(input);
                        }

已有(3)人评论

跳转到指定楼层
super_麒麟 发表于 2016-9-7 16:57:58
初次接触storm,看了文章虽然没有全部记住理解,但是对storm整体有了个很好的认识,谢谢楼主。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条