问题导读
1.本文主要明白流量监控应该具有哪些功能?
2.你认为流量监控该如何实现,作者又是如何实现的?
1 项目概述
1.1 数据流向
流数据监控为storm模拟项目,模拟数据源从log文件中读取数据,并逐条发射到监控Bolt中,MonitorsBolt读取配置文件MonitorBolt.xml中的匹配规则,包括正则匹配、范围匹配、常规模糊匹配及常规完全匹配,多个条件可以组合多种匹配方式,多个条件字段可以有两种不同的逻辑关系。MonitorBolt在处理数据之后(过滤出符合匹配规则的数据),发射到数据持久化Bolt中,MysqlBolt读取配置文件MysqlBolt.xml中mysql相关信息,包括mysql的host及端口,username及password,database及from,最后将数据插入mysql中。
1.2 代码树
图1.2 代码树
源码简单说明:
Storm包中为总体运行的Topology,Storm.base目前只有myslq预处理的一个类,storm.bolt为bolt处理类,包括monitorbolt及printbolt,storm.spout包中为spout源数据接口,storm.source为构造源数据的一个类(这个可以忽略),storm.xml为配置文件读取类,domain.log为源数据,MonitorBolt.xml及MyslqBolt.xml分别为配置文件。
2 代码详解2.1 Package storm
Toplology.java:
- /**
- * @author blogchong
- * @Blog www.blogchong.com
- * @email blogchong@gmail.com
- * @QQ_G 191321336
- * @version 2014年11月9日 上午11:26:29
- */
- // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
- builder.setSpout("readlog", new ReadLogSpout(), 1);
- //创建monitor监控过滤节点,指定该节点接收喷发节点的策略为随机方式。
- builder.setBolt("monitor", new MonitorBolt("MonitorBolt.xml"), 3)
- .shuffleGrouping("readlog");
- //创建mysql数据存储节点,并传入配置文件
- builder.setBolt("mysql", new MysqlBolt("MysqlBolt.xml"), 3)
- .shuffleGrouping("monitor");
复制代码
注:该部分代码显示了整个topology的结构,每个节点与节点之间的关系(发布与订阅),并且指明了每个节点的喷发方式。
2.2 Package storm.xml
- MonitorXml.java:
- import java.io.File;
- import javax.xml.parsers.DocumentBuilderFactory;
- import javax.xml.parsers.DocumentBuilder;
- import org.w3c.dom.Document;
- import org.w3c.dom.Element;
- import org.w3c.dom.NodeList;
- File file = new File(fd);
- //创建xml文件模板
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- DocumentBuilder db = dbf.newDocumentBuilder();
- Document doc = db.parse(file);
- //将Parameter里的项存入list中
- NodeList nl = doc.getElementsByTagName("Parameter");
- //从list的item中获取参数值
- Element e = (Element) nl.item(0);
- MatchLogic = e.getElementsByTagName("MatchLogic").item(0)
- .getFirstChild().getNodeValue();
- MatchType = e.getElementsByTagName("MatchType").item(0)
- .getFirstChild().getNodeValue();
- MatchField = e.getElementsByTagName("MatchField").item(0)
- .getFirstChild().getNodeValue();
- FieldValue = e.getElementsByTagName("FieldValue").item(0)
- .getFirstChild().getNodeValue();
复制代码
注:MyslqXml.java与MonitorXml.java核心代码相似,主要是调用java中解析xml的类,主要类见如上import。
2.3 Package storm.spout
- ReadLogSpout.java:
- public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
- this.collector = collector;
- try {
- this.fis = new FileInputStream("domain.log");
- this.isr = new InputStreamReader(fis, "UTF-8");
- this.br = new BufferedReader(isr);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public void nextTuple() {
- String str = "";
- try {
- //逐行读取发射,直到末尾
- while ((str = this.br.readLine()) != null) {
- this.collector.emit(new Values(str));
- Thread.sleep(100);
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
复制代码
注:该类为产生源数据的类,该类逐行读取log文件中的数据,发射到下一级处理Bolt中,读取文件时注意编码转换。
2.4 Package storm.base
MysqlOpt.java
- import java.io.Serializable;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
- public class MysqlOpt implements Serializable {
- public Connection conn = null;
- PreparedStatement statement = null;
- // 连接数据库
- public boolean connSQL(String host_p, String database, String username,
- String password) {
- String url = "jdbc:mysql://" + host_p + "/" + database
- + "?characterEncoding=UTF-8";
- try {
- //使用jdbc驱动
- Class.forName("com.mysql.jdbc.Driver");
- conn = DriverManager.getConnection(url, username, password);
- return true;
- } catch (ClassNotFoundException cnfex) {
- System.out
- .println("MysqlBolt-- Error: Loading JDBC/ODBC dirver failed!");
- cnfex.printStackTrace();
- } catch (SQLException sqlex) {
- System.out.println("MysqlBolt-- Error: Connect database failed!");
- sqlex.printStackTrace();
- }
- return false;
- }
- // 插入数据
- public boolean insertSQL(String sql) {
- try {
- statement = conn.prepareStatement(sql);
- statement.executeUpdate();
- return true;
- } catch (SQLException e) {
- System.out.println("MysqlBolt-- Error: Insert database failed!");
- e.printStackTrace();
- } catch (Exception e) {
- System.out.println("MysqlBolt-- Error: Insert failed!");
- e.printStackTrace();
- }
- return false;
- }
- // 关闭连接
- public void deconnSQL() {
- try {
- if (conn != null)
- conn.close();
- } catch (Exception e) {
- System.out.println("MysqlBolt-- Error: Deconnect database failed!");
- e.printStackTrace();
- }
- }
- }
复制代码
注:该类是mysql的操作类,包括mysql的链接、数据插入及数据库关闭等操作,供Mysqlbolt调用。
2.5 Package storm.bolt
Monitorbolt.java:
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- System.out.println("MonitorBolt -- Start!");
- this.collector = collector;
- // 从conf中获取参数
- new MonitorXml(this.monitorXml).read();
- this.MatchLogic = MonitorXml.MatchLogic;
- this.MatchType = MonitorXml.MatchType;
- this.MatchField = MonitorXml.MatchField;
- this.FieldValue = MonitorXml.FieldValue;
- }
- public void execute(Tuple input) {
- //订阅str
- String str = input.getString(0);
- if (this.flag_par == false) {
- System.out
- .println("MonitorBolt-- Erre: can't get the path of Monitor.xml!");
- } else {
- //调用Monitor进行条件判断,除了str,其他参数为配置文件中读取的列表
- boolean moni = Monitor(str, this.MatchLogic, this.MatchType,
- this.MatchField, this.FieldValue);
- if (moni == true) {
- // System.out.println("Monitor!!!");
- this.collector.emit(new Values(str));
- }
- }
- }
- private boolean Monitor(String str, String logic, String type,
- String field, String value) {
- //将列表拆分
- String[] types = type.split("::");
- String[] fields = field.split("::");
- String[] values = value.split("::");
- int flag_init = types.length;
- int flag = 0;//判断标志
- if (logic.equals("AND")) {//逻辑AND
- for (int i = 0; i
- if (types.equals("regular")) {
- //调用正则匹配方法regular
- boolean regu = regular(str, fields, values);
- if (regu == true) {
- flag++;
- }
- } else if (types.equals("range")) {
- //调用范围匹配方法range
- boolean ran = range(str, fields, values);
- if (ran == true) {
- flag++;
- }
- } else if (types.equals("routine0")) {
- //调用常规模糊匹配方法routine0
- boolean rou0 = routine0(str, fields, values);
- if (rou0 == true) {
- flag++;
- }
- } else if (types.equals("routine1")) {
- //调用常规完全匹配方法routine1
- boolean rou1 = routine1(str, fields, values);
- if (rou1 == true) {
- flag++;
- }
- }
- }
- if (flag == flag_init) {
- //所有条件都满足时
- return true;
- } else {
- return false;
- }
- } else if (logic.equals("OR")) {//逻辑OR
- for (int i = 0; i
- if (types.equals("regular")) {
- boolean regu = regular(str, fields, values);
- if (regu == true) {
- flag++;
- }
- } else if (types.equals("range")) {
- boolean ran = range(str, fields, values);
- if (ran == true) {
- flag++;
- }
- } else if (types.equals("routine0")) {
- boolean rou0 = routine0(str, fields, values);
- if (rou0 == true) {
- flag++;
- }
- } else if (types.equals("routine1")) {
- boolean rou1 = routine1(str, fields, values);
- if (rou1 == true) {
- flag++;
- }
- }
- }
- if (flag != 0) {
- return true;
- } else {
- return false;
- }
- }
- return false;
- }
- // 正则匹配判断
- private boolean regular(String str, String field, String value) {
- String[] strs = str.split("\t");
- Pattern p = Pattern.compile(value);
- Matcher m = p.matcher(strs[Integer.parseInt(field) - 1]);
- boolean result = m.matches();
- if (result == true) {
- return true;
- } else {
- return false;
- }
- }
- // 范围匹配
- private boolean range(String str, String field, String value) {
- String[] strs = str.split("\t");
- String[] values = value.split(",");
- int strss = Integer.parseInt(strs[Integer.parseInt(field) - 1]);
- if (values.length == 1) {
- if (strss > Integer.parseInt(values[0])) {
- return true;
- } else {
- return false;
- }
- } else if (values.length == 2 && values[0].length() == 0) {
- if (strss parseInt(values[1])) {
- return true;
- } else {
- return false;
- }
- } else if (values.length == 2 && values[0].length() != 0) {
- if (strss > Integer.parseInt(values[0])
- && strss parseInt(values[1])) {
- return true;
- } else {
- return false;
- }
- } else {
- return false;
- }
- }
- // 常规模糊匹配
- private boolean routine0(String str, String field, String value) {
- String[] strs = str.split("\t");
- String strss = strs[Integer.parseInt(field) - 1];
-
- if (strss.contains(value) && !strss.equals(value)) {
- return true;
- } else {
- return false;
- }
- }
- // 常规完全匹配
- private boolean routine1(String str, String field, String value) {
- String[] strs = str.split("\t");
- String strss = strs[Integer.parseInt(field) - 1];
-
- if (strss.equals(value)) {
- return true;
- } else {
- return false;
- }
- }
复制代码
注1:该类主要设计了匹配规则,支持多种匹配方式,包括正则、范围、常规模糊及完全匹配,且支持两种逻辑判断关系。
MyslqBolt.java:
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- System.out.println("MysqlBolt -- Start!");
- this.collector = collector;
- // 初始化mysql
- Loading();
- }
- // 参数初始化
- public void Loading() {
- new MysqlXml(this.mysqlXml).read();
- String host_port = MysqlXml.Host_port; // mysql地址及端口
- String database = MysqlXml.Database; // 数据库名
- String username = MysqlXml.Username; // 用户名
- String password = MysqlXml.Password; // 密码
- this.from = MysqlXml.From; // 表名
- if (this.mysql.connSQL(host_port, database, username, password) == false) {
- System.out
- .println("MysqlBolt--Config errer, Please check Mysql-conf: "
- + this.mysqlXml);
- flag_xml = false;
- } else {
- System.out.println("MysqlBolt-- Conf Loaded: " + this.mysqlXml);
- }
- }
- public void execute(Tuple input) {
- String str = input.getString(0);
- if (this.flag_par == false) {
- System.out
- .println("MysqlBolt-- Erre: can't get the path of Mysql.xml!");
- } else {
- if (this.flag_xml == true) {
- String insert = send_str(str);
- if (this.mysql.insertSQL(insert) == false) {
- System.out
- .println("MysqlBolt-- Erre: can't insert tuple into database!");
- System.out.println("MysqlBolt-- Error Tuple: " + str);
- }
- }
- }
- }
- //插入mysql语句构造方法
- public String send_str(String str) {
- String send_tmp = null;
- String field[] = str.split("\t");
- for (int i = 0; i length; i++) {
- if (i == 0) {
- send_tmp = "'" + field[0] + "', '";
- } else if (i == (field.length - 1)) {
- send_tmp = send_tmp + field + "'";
- } else {
- send_tmp = send_tmp + field + "', '";
- }
- }
- String send = "insert into " + this.from + " values (" + send_tmp
- + ");";
- return send;
- }
复制代码
注2:该类主要用于数据存储,调用了base包中的mysqlOpt类中的多个方法,对mysql进行连接,数据插入及数据库关闭等等。
文档总结注意:
文章搬迁至博客虫之后,代码已经重新整理,加了新的模块,具体参看源码。该文档为流数据监控模拟项目的代码解析部分,鉴于篇幅只解析部分核心代码,如果需要完整源代码请戳这里。
相关文档
Storm项目:流数据监控一流数据监控设计文档
http://www.aboutyun.com/thread-10042-1-1.html
Storm项目:流数据监控二:流数据监控代码详解
http://www.aboutyun.com/thread-10047-1-1.html
Storm项目:流数据监控三:流数据监控示例运行
http://www.aboutyun.com/thread-10046-1-1.html
Storm项目:流数据监控四---流数据监控MetaQ接口
http://www.aboutyun.com/thread-10045-1-1.html
Storm项目:流数据监控五Zookeeper统一配置
http://www.aboutyun.com/thread-10044-1-1.html
实时处理方案架构 - Storm实时处理
http://www.aboutyun.com/thread-10043-1-1.html
http://www.blogchong.com/post/storm_monitor_code.html
|