分享

Storm项目:流数据监控二:流数据监控代码详解

tntzbzc 发表于 2014-11-16 14:45:34 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 30732
问题导读
1.本文主要明白流量监控应该具有哪些功能?
2.你认为流量监控该如何实现,作者又是如何实现的?









1 项目概述

1.1 数据流向

流数据监控为storm模拟项目,模拟数据源从log文件中读取数据,并逐条发射到监控Bolt中,MonitorsBolt读取配置文件MonitorBolt.xml中的匹配规则,包括正则匹配、范围匹配、常规模糊匹配及常规完全匹配,多个条件可以组合多种匹配方式,多个条件字段可以有两种不同的逻辑关系。MonitorBolt在处理数据之后(过滤出符合匹配规则的数据),发射到数据持久化Bolt中,MysqlBolt读取配置文件MysqlBolt.xml中mysql相关信息,包括mysql的host及端口,username及password,database及from,最后将数据插入mysql中。
1.2 代码树                                               

图1.2 代码树
源码简单说明:
Storm包中为总体运行的Topology,Storm.base目前只有myslq预处理的一个类,storm.bolt为bolt处理类,包括monitorbolt及printbolt,storm.spout包中为spout源数据接口,storm.source为构造源数据的一个类(这个可以忽略),storm.xml为配置文件读取类,domain.log为源数据,MonitorBolt.xml及MyslqBolt.xml分别为配置文件。

2 代码详解2.1 Package storm
Toplology.java:
  1. /**
  2. * @author blogchong
  3. * @Blog www.blogchong.com
  4. * @email blogchong@gmail.com
  5. * @QQ_G 191321336
  6. * @version 2014年11月9日 上午11:26:29
  7. */
  8. // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
  9. builder.setSpout("readlog", new ReadLogSpout(), 1);
  10. //创建monitor监控过滤节点,指定该节点接收喷发节点的策略为随机方式。
  11. builder.setBolt("monitor", new MonitorBolt("MonitorBolt.xml"), 3)
  12.          .shuffleGrouping("readlog");
  13. //创建mysql数据存储节点,并传入配置文件
  14. builder.setBolt("mysql", new MysqlBolt("MysqlBolt.xml"), 3)
  15.                    .shuffleGrouping("monitor");
复制代码


注:该部分代码显示了整个topology的结构,每个节点与节点之间的关系(发布与订阅),并且指明了每个节点的喷发方式。

2.2 Package storm.xml
  1. MonitorXml.java:
  2. import java.io.File;
  3. import javax.xml.parsers.DocumentBuilderFactory;
  4. import javax.xml.parsers.DocumentBuilder;
  5. import org.w3c.dom.Document;
  6. import org.w3c.dom.Element;
  7. import org.w3c.dom.NodeList;
  8. File file = new File(fd);
  9. //创建xml文件模板
  10. DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
  11. DocumentBuilder db = dbf.newDocumentBuilder();
  12. Document doc = db.parse(file);
  13. //将Parameter里的项存入list中
  14. NodeList nl = doc.getElementsByTagName("Parameter");
  15. //从list的item中获取参数值
  16. Element e = (Element) nl.item(0);
  17. MatchLogic = e.getElementsByTagName("MatchLogic").item(0)
  18.                    .getFirstChild().getNodeValue();
  19. MatchType = e.getElementsByTagName("MatchType").item(0)
  20.                    .getFirstChild().getNodeValue();
  21. MatchField = e.getElementsByTagName("MatchField").item(0)
  22.                    .getFirstChild().getNodeValue();
  23. FieldValue = e.getElementsByTagName("FieldValue").item(0)
  24.                    .getFirstChild().getNodeValue();
复制代码


注:MyslqXml.java与MonitorXml.java核心代码相似,主要是调用java中解析xml的类,主要类见如上import。

2.3 Package storm.spout

  1. ReadLogSpout.java:
  2. public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
  3.                    this.collector = collector;
  4.                    try {
  5.                             this.fis = new FileInputStream("domain.log");
  6.                             this.isr = new InputStreamReader(fis, "UTF-8");
  7.                             this.br = new BufferedReader(isr);
  8.                    } catch (Exception e) {
  9.                             e.printStackTrace();
  10.                    }
  11. }
  12. public void nextTuple() {
  13.                    String str = "";
  14.                    try {
  15.                             //逐行读取发射,直到末尾
  16.                             while ((str = this.br.readLine()) != null) {
  17.                                      this.collector.emit(new Values(str));
  18.                                      Thread.sleep(100);
  19.                             }
  20.                    } catch (Exception e) {
  21.                             // TODO Auto-generated catch block
  22.                             e.printStackTrace();
  23.                    }
  24.          }
复制代码


注:该类为产生源数据的类,该类逐行读取log文件中的数据,发射到下一级处理Bolt中,读取文件时注意编码转换。

2.4 Package storm.base

MysqlOpt.java
  1. import java.io.Serializable;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.PreparedStatement;
  5. import java.sql.SQLException;
  6. public class MysqlOpt implements Serializable {
  7.          public Connection conn = null;
  8.          PreparedStatement statement = null;
  9.          // 连接数据库
  10.          public boolean connSQL(String host_p, String database, String username,
  11.                             String password) {
  12.                    String url = "jdbc:mysql://" + host_p + "/" + database
  13.                                      + "?characterEncoding=UTF-8";
  14.                    try {
  15.                             //使用jdbc驱动
  16.                             Class.forName("com.mysql.jdbc.Driver");
  17.                             conn = DriverManager.getConnection(url, username, password);
  18.                             return true;
  19.                    } catch (ClassNotFoundException cnfex) {
  20.                             System.out
  21.                                                .println("MysqlBolt-- Error: Loading JDBC/ODBC dirver failed!");
  22.                             cnfex.printStackTrace();
  23.                    } catch (SQLException sqlex) {
  24.                             System.out.println("MysqlBolt-- Error: Connect database failed!");
  25.                             sqlex.printStackTrace();
  26.                    }
  27.                    return false;
  28.          }
  29.          // 插入数据
  30.          public boolean insertSQL(String sql) {
  31.                    try {
  32.                             statement = conn.prepareStatement(sql);
  33.                             statement.executeUpdate();
  34.                             return true;
  35.                    } catch (SQLException e) {
  36.                             System.out.println("MysqlBolt-- Error: Insert database failed!");
  37.                             e.printStackTrace();
  38.                    } catch (Exception e) {
  39.                             System.out.println("MysqlBolt-- Error: Insert failed!");
  40.                             e.printStackTrace();
  41.                    }
  42.                    return false;
  43.          }
  44.          // 关闭连接
  45.          public void deconnSQL() {
  46.                    try {
  47.                             if (conn != null)
  48.                                      conn.close();
  49.                    } catch (Exception e) {
  50.                             System.out.println("MysqlBolt-- Error: Deconnect database failed!");
  51.                             e.printStackTrace();
  52.                    }
  53.          }
  54. }
复制代码


注:该类是mysql的操作类,包括mysql的链接、数据插入及数据库关闭等操作,供Mysqlbolt调用。


2.5 Package storm.bolt

Monitorbolt.java:
  1.     public void prepare(Map stormConf, TopologyContext context,
  2.                             OutputCollector collector) {
  3.                    System.out.println("MonitorBolt   --       Start!");
  4.                    this.collector = collector;
  5.                    // 从conf中获取参数
  6.                    new MonitorXml(this.monitorXml).read();
  7.                    this.MatchLogic = MonitorXml.MatchLogic;
  8.                    this.MatchType = MonitorXml.MatchType;
  9.                    this.MatchField = MonitorXml.MatchField;
  10.                    this.FieldValue = MonitorXml.FieldValue;
  11.          }
  12.          public void execute(Tuple input) {
  13.                    //订阅str
  14.                    String str = input.getString(0);
  15.                    if (this.flag_par == false) {
  16.                             System.out
  17.                                                .println("MonitorBolt-- Erre: can't get the path of Monitor.xml!");
  18.                    } else {
  19.                             //调用Monitor进行条件判断,除了str,其他参数为配置文件中读取的列表
  20.                             boolean moni = Monitor(str, this.MatchLogic, this.MatchType,
  21.                                                this.MatchField, this.FieldValue);
  22.                             if (moni == true) {
  23.                                      // System.out.println("Monitor!!!");
  24.                                      this.collector.emit(new Values(str));
  25.                             }
  26.                    }
  27.          }
  28.          private boolean Monitor(String str, String logic, String type,
  29.                             String field, String value) {
  30.                    //将列表拆分
  31.                    String[] types = type.split("::");
  32.                    String[] fields = field.split("::");
  33.                    String[] values = value.split("::");
  34.                    int flag_init = types.length;
  35.                    int flag = 0;//判断标志
  36.                    if (logic.equals("AND")) {//逻辑AND
  37.                             for (int i = 0; i
  38.                                      if (types.equals("regular")) {
  39.                                                //调用正则匹配方法regular
  40.                                                boolean regu = regular(str, fields, values);
  41.                                                if (regu == true) {
  42.                                                         flag++;
  43.                                                }
  44.                                      } else if (types.equals("range")) {
  45.                                                //调用范围匹配方法range
  46.                                                boolean ran = range(str, fields, values);
  47.                                                if (ran == true) {
  48.                                                         flag++;
  49.                                                }
  50.                                      } else if (types.equals("routine0")) {
  51.                                                //调用常规模糊匹配方法routine0
  52.                                                boolean rou0 = routine0(str, fields, values);
  53.                                                if (rou0 == true) {
  54.                                                         flag++;
  55.                                                }
  56.                                      } else if (types.equals("routine1")) {
  57.                                                //调用常规完全匹配方法routine1
  58.                                                boolean rou1 = routine1(str, fields, values);
  59.                                                if (rou1 == true) {
  60.                                                         flag++;
  61.                                                }
  62.                                      }
  63.                             }
  64.                             if (flag == flag_init) {
  65.                                      //所有条件都满足时
  66.                                      return true;
  67.                             } else {
  68.                                      return false;
  69.                             }
  70.                    } else if (logic.equals("OR")) {//逻辑OR
  71.                             for (int i = 0; i
  72.                                      if (types.equals("regular")) {
  73.                                                boolean regu = regular(str, fields, values);
  74.                                                if (regu == true) {
  75.                                                         flag++;
  76.                                                }
  77.                                      } else if (types.equals("range")) {
  78.                                                boolean ran = range(str, fields, values);
  79.                                                if (ran == true) {
  80.                                                         flag++;
  81.                                                }
  82.                                      } else if (types.equals("routine0")) {
  83.                                                boolean rou0 = routine0(str, fields, values);
  84.                                                if (rou0 == true) {
  85.                                                         flag++;
  86.                                                }
  87.                                      } else if (types.equals("routine1")) {
  88.                                                boolean rou1 = routine1(str, fields, values);
  89.                                                if (rou1 == true) {
  90.                                                         flag++;
  91.                                                }
  92.                                      }
  93.                             }
  94.                             if (flag != 0) {
  95.                                      return true;
  96.                             } else {
  97.                                      return false;
  98.                             }
  99.                    }
  100.                    return false;
  101.          }
  102.          // 正则匹配判断
  103.          private boolean regular(String str, String field, String value) {
  104.                    String[] strs = str.split("\t");
  105.                    Pattern p = Pattern.compile(value);
  106.                    Matcher m = p.matcher(strs[Integer.parseInt(field) - 1]);
  107.                    boolean result = m.matches();
  108.                    if (result == true) {
  109.                             return true;
  110.                    } else {
  111.                             return false;
  112.                    }
  113.          }
  114.          // 范围匹配
  115.          private boolean range(String str, String field, String value) {
  116.                    String[] strs = str.split("\t");
  117.                    String[] values = value.split(",");
  118.                    int strss = Integer.parseInt(strs[Integer.parseInt(field) - 1]);
  119.                    if (values.length == 1) {
  120.                             if (strss > Integer.parseInt(values[0])) {
  121.                                      return true;
  122.                             } else {
  123.                                      return false;
  124.                             }
  125.                    } else if (values.length == 2 && values[0].length() == 0) {
  126.                             if (strss parseInt(values[1])) {
  127.                                      return true;
  128.                             } else {
  129.                                      return false;
  130.                             }
  131.                    } else if (values.length == 2 && values[0].length() != 0) {
  132.                             if (strss > Integer.parseInt(values[0])
  133.                                                && strss parseInt(values[1])) {
  134.                                      return true;
  135.                             } else {
  136.                                      return false;
  137.                             }
  138.                    } else {
  139.                             return false;
  140.                    }
  141.          }
  142.          // 常规模糊匹配
  143.          private boolean routine0(String str, String field, String value) {
  144.                    String[] strs = str.split("\t");
  145.                    String strss = strs[Integer.parseInt(field) - 1];
  146.                    if (strss.contains(value) && !strss.equals(value)) {
  147.                             return true;
  148.                    } else {
  149.                             return false;
  150.                    }
  151.          }
  152.          // 常规完全匹配
  153.          private boolean routine1(String str, String field, String value) {
  154.                    String[] strs = str.split("\t");
  155.                    String strss = strs[Integer.parseInt(field) - 1];
  156.                    if (strss.equals(value)) {
  157.                             return true;
  158.                    } else {
  159.                             return false;
  160.                    }
  161.          }
复制代码


注1:该类主要设计了匹配规则,支持多种匹配方式,包括正则、范围、常规模糊及完全匹配,且支持两种逻辑判断关系。
MyslqBolt.java:
  1.          public void prepare(Map stormConf, TopologyContext context,
  2.                             OutputCollector collector) {
  3.                    System.out.println("MysqlBolt       --       Start!");
  4.                    this.collector = collector;
  5.                    // 初始化mysql
  6.                    Loading();
  7.          }
  8.          // 参数初始化
  9.          public void Loading() {
  10.                    new MysqlXml(this.mysqlXml).read();
  11.                    String host_port = MysqlXml.Host_port; // mysql地址及端口
  12.                    String database = MysqlXml.Database; // 数据库名
  13.                    String username = MysqlXml.Username; // 用户名
  14.                    String password = MysqlXml.Password; // 密码
  15.                    this.from = MysqlXml.From; // 表名
  16.                   if (this.mysql.connSQL(host_port, database, username, password) == false) {
  17.                             System.out
  18.                                                .println("MysqlBolt--Config errer, Please check Mysql-conf: "
  19.                                                                  + this.mysqlXml);
  20.                             flag_xml = false;
  21.                    } else {
  22.                             System.out.println("MysqlBolt-- Conf Loaded: " + this.mysqlXml);
  23.                    }
  24.          }
  25.          public void execute(Tuple input) {
  26.                    String str = input.getString(0);
  27.                    if (this.flag_par == false) {
  28.                             System.out
  29.                                                .println("MysqlBolt-- Erre: can't get the path of Mysql.xml!");
  30.                    } else {
  31.                             if (this.flag_xml == true) {
  32.                                      String insert = send_str(str);
  33.                                      if (this.mysql.insertSQL(insert) == false) {
  34.                                                System.out
  35.                                                                  .println("MysqlBolt-- Erre: can't insert tuple into database!");
  36.                                                System.out.println("MysqlBolt-- Error Tuple: " + str);
  37.                                      }
  38.                             }
  39.                    }
  40.          }
  41.          //插入mysql语句构造方法
  42.          public String send_str(String str) {
  43.                    String send_tmp = null;
  44.                    String field[] = str.split("\t");
  45.                    for (int i = 0; i length; i++) {
  46.                             if (i == 0) {
  47.                                      send_tmp = "'" + field[0] + "', '";
  48.                             } else if (i == (field.length - 1)) {
  49.                                      send_tmp = send_tmp + field + "'";
  50.                             } else {
  51.                                      send_tmp = send_tmp + field + "', '";
  52.                             }
  53.                    }
  54.                    String send = "insert into " + this.from + " values (" + send_tmp
  55.                                      + ");";
  56.                    return send;
  57.          }
复制代码


注2:该类主要用于数据存储,调用了base包中的mysqlOpt类中的多个方法,对mysql进行连接,数据插入及数据库关闭等等。


文档总结注意:
文章搬迁至博客虫之后,代码已经重新整理,加了新的模块,具体参看源码。该文档为流数据监控模拟项目的代码解析部分,鉴于篇幅只解析部分核心代码,如果需要完整源代码请戳这里

相关文档

Storm项目:流数据监控一流数据监控设计文档
http://www.aboutyun.com/thread-10042-1-1.html


Storm项目:流数据监控二:流数据监控代码详解
http://www.aboutyun.com/thread-10047-1-1.html

Storm项目:流数据监控三:流数据监控示例运行
http://www.aboutyun.com/thread-10046-1-1.html

Storm项目:流数据监控四---流数据监控MetaQ接口
http://www.aboutyun.com/thread-10045-1-1.html

Storm项目:流数据监控五Zookeeper统一配置
http://www.aboutyun.com/thread-10044-1-1.html

实时处理方案架构 - Storm实时处理
http://www.aboutyun.com/thread-10043-1-1.html








http://www.blogchong.com/post/storm_monitor_code.html

已有(3)人评论

跳转到指定楼层
hahaxixi 发表于 2014-11-16 19:14:02
顶,谢谢分享~~~~~~~~~~~~~
回复

使用道具 举报

caiyifeng 发表于 2014-12-15 16:39:07
收藏了,多谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条