分享

Storm【开发实战】- 流方式的统计系统

问题导读
1、 初期硬件准备的条件是什么?
2、在数据发射过程之中,要注意什么?
3、如何完成一个Ip到 CITY,COUNTRY 的转换?






摘要
模拟电商的,铁道部,电力的流方式处理过程

1: 初期硬件准备:
                1 如果条件具备:请保证您安装好了 redis集群

                2 配置好您的Storm开发环境

                3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间


2:业务背景的介绍:
                1  在这里我们将模拟一个   流方式的数据处理过程

                 2 数据的源头保存在我们的redis 集群之中

                 3  发射的数据格式为: ip,url,client_key


数据发射器
  1. package storm.spout;
  2. import backtype.storm.spout.SpoutOutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichSpout;
  6. import backtype.storm.tuple.Values;
  7. import backtype.storm.tuple.Fields;
  8. import org.json.simple.JSONObject;
  9. import org.json.simple.JSONValue;
  10. import redis.clients.jedis.Jedis;
  11. import storm.utils.Conf;
  12. import java.util.Map;
  13. import org.apache.log4j.Logger;
  14. /**
  15. * click Spout 从redis中间读取所需要的数据
  16. */
  17. public class ClickSpout extends BaseRichSpout {
  18.     private static final long serialVersionUID = -6200450568987812474L;
  19.     public static Logger LOG = Logger.getLogger(ClickSpout.class);
  20.     // 对于redis,我们使用的是jedis客户端
  21.     private Jedis jedis;
  22.     // 主机
  23.     private String host;
  24.     // 端口
  25.     private int port;
  26.     // Spout 收集器
  27.     private SpoutOutputCollector collector;
  28.     @Override
  29.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  30.      
  31.             // 这里,我们发射的格式为
  32.             // IP,URL,CLIENT_KEY
  33.         outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
  34.                 storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
  35.     }
  36.     @Override
  37.     public void open(Map conf, TopologyContext topologyContext,
  38.             SpoutOutputCollector spoutOutputCollector) {
  39.         host = conf.get(Conf.REDIS_HOST_KEY).toString();
  40.         port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
  41.         this.collector = spoutOutputCollector;
  42.         connectToRedis();
  43.     }
  44.     private void connectToRedis() {
  45.         jedis = new Jedis(host, port);
  46.     }
  47.     @Override
  48.     public void nextTuple() {
  49.         String content = jedis.rpop("count");
  50.         if (content == null || "nil".equals(content)) {
  51.             try {
  52.                 Thread.sleep(300);
  53.             } catch (InterruptedException e) {
  54.             }
  55.         } else {
  56.             // 将jedis对象 rpop出来的字符串解析为 json对象
  57.             JSONObject obj = (JSONObject) JSONValue.parse(content);
  58.             String ip = obj.get(storm.cookbook.Fields.IP).toString();
  59.             String url = obj.get(storm.cookbook.Fields.URL).toString();
  60.             String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
  61.                     .toString();
  62.             System.out.println("this is a clientKey");
  63.             // List<Object> tuple对象
  64.             collector.emit(new Values(ip, url, clientKey));
  65.         }
  66.     }
  67. }
复制代码



在这个过程之中,请注意:
1、我们在 OPEN 方法之中初始化   host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库

2、我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个Values对象

让我们来看看数据的流向图:
1.png


在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt
                                    1  : repeatVisitBolt
                                    2   :  geographyBolt

共同来读取同一个数据源的数据:clickSpout


3、细细察看 repeatVisitBolt
  1. package storm.bolt;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichBolt;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Fields;
  8. import backtype.storm.tuple.Values;
  9. import redis.clients.jedis.Jedis;
  10. import storm.utils.Conf;
  11. import java.util.Map;
  12. public class RepeatVisitBolt extends BaseRichBolt {
  13.     private OutputCollector collector;
  14.     private Jedis jedis;
  15.     private String host;
  16.     private int port;
  17.     @Override
  18.     public void prepare(Map conf, TopologyContext topologyContext,
  19.             OutputCollector outputCollector) {
  20.         this.collector = outputCollector;
  21.         host = conf.get(Conf.REDIS_HOST_KEY).toString();
  22.         port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
  23.         connectToRedis();
  24.     }
  25.     private void connectToRedis() {
  26.         jedis = new Jedis(host, port);
  27.         jedis.connect();
  28.     }
  29.     public boolean isConnected() {
  30.         if (jedis == null)
  31.             return false;
  32.         return jedis.isConnected();
  33.     }
  34.     @Override
  35.     public void execute(Tuple tuple) {
  36.         String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
  37.         String clientKey = tuple
  38.                 .getStringByField(storm.cookbook.Fields.CLIENT_KEY);
  39.         String url = tuple.getStringByField(storm.cookbook.Fields.URL);
  40.         String key = url + ":" + clientKey;
  41.         String value = jedis.get(key);
  42.          
  43.         // redis中取,如果redis中没有,就插入新的一条访问记录。
  44.         if (value == null) {
  45.             jedis.set(key, "visited");
  46.             collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
  47.         } else {
  48.             collector
  49.                     .emit(new Values(clientKey, url, Boolean.FALSE.toString()));
  50.         }
  51.     }
  52.     @Override
  53.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  54.         outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
  55.                 storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
  56.                 storm.cookbook.Fields.UNIQUE));
  57.     }
  58. }
复制代码


  在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】

4:
  1. package storm.bolt;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichBolt;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. import java.util.Map;
  9. public class VisitStatsBolt extends BaseRichBolt {
  10.     private OutputCollector collector;
  11.     private int total = 0;
  12.     private int uniqueCount = 0;
  13.     @Override
  14.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  15.         this.collector = outputCollector;
  16.     }
  17.     @Override
  18.     public void execute(Tuple tuple) {
  19.          
  20.         //在这里,我们在上游来判断这个Fields 是否是独特和唯一的
  21.         boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
  22.          
  23.         total++;
  24.         if(unique)uniqueCount++;
  25.         collector.emit(new Values(total,uniqueCount));
  26.     }
  27.     @Override
  28.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  29.         outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
  30.                 storm.cookbook.Fields.TOTAL_UNIQUE));
  31.     }
  32. }
复制代码

第一次出现,uv ++


5、接下来,看看流水线2 :
  1. package storm.bolt;
  2. import backtype.storm.spout.SpoutOutputCollector;
  3. import backtype.storm.task.OutputCollector;
  4. import backtype.storm.task.TopologyContext;
  5. import backtype.storm.topology.OutputFieldsDeclarer;
  6. import backtype.storm.topology.base.BaseRichBolt;
  7. import backtype.storm.tuple.Tuple;
  8. import backtype.storm.tuple.Fields;
  9. import backtype.storm.tuple.Values;
  10. import org.json.simple.JSONObject;
  11. import storm.cookbook.IPResolver;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Map;
  15. /**
  16. * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
  17. * File | Settings | File Templates.
  18. */
  19. public class GeographyBolt extends BaseRichBolt {
  20.     // ip解析器
  21.     private IPResolver resolver;
  22.     private OutputCollector collector;
  23.     public GeographyBolt(IPResolver resolver) {
  24.         this.resolver = resolver;
  25.     }
  26.     @Override
  27.     public void prepare(Map map, TopologyContext topologyContext,
  28.             OutputCollector outputCollector) {
  29.         this.collector = outputCollector;
  30.     }
  31.     @Override
  32.     public void execute(Tuple tuple) {
  33.         // 1 从上级的目录之中拿到我们所要使用的ip
  34.         String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
  35.         // 将ip 转换为json
  36.         JSONObject json = resolver.resolveIP(ip);
  37.         // 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象
  38.         String city = (String) json.get(storm.cookbook.Fields.CITY);
  39.         String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);
  40.         collector.emit(new Values(country, city));
  41.     }
  42.     @Override
  43.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  44.         // 确定了我们这次输出元祖的格式
  45.         outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
  46.                 storm.cookbook.Fields.CITY));
  47.     }
  48. }
复制代码



以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换
  1. package storm.bolt;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichBolt;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. import java.util.HashMap;
  9. import java.util.LinkedList;
  10. import java.util.List;
  11. import java.util.Map;
  12. public class GeoStatsBolt extends BaseRichBolt {
  13.     private class CountryStats {
  14.         //
  15.         private int countryTotal = 0;
  16.         private static final int COUNT_INDEX = 0;
  17.         private static final int PERCENTAGE_INDEX = 1;
  18.         private String countryName;
  19.         public CountryStats(String countryName) {
  20.             this.countryName = countryName;
  21.         }
  22.         private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();
  23.         /**
  24.          * @param cityName
  25.          */
  26.         public void cityFound(String cityName) {
  27.             countryTotal++;
  28.             // 已经有了值,一个加1的操作
  29.             if (cityStats.containsKey(cityName)) {
  30.                 cityStats.get(cityName)
  31.                         .set(COUNT_INDEX,
  32.                                 cityStats.get(cityName).get(COUNT_INDEX)
  33.                                         .intValue() + 1);
  34.                 // 没有值的时候
  35.             } else {
  36.                 List<Integer> list = new LinkedList<Integer>();
  37.                 list.add(1);
  38.                 list.add(0);
  39.                 cityStats.put(cityName, list);
  40.             }
  41.             double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
  42.                     / (double) countryTotal;
  43.             cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);
  44.         }
  45.         /**
  46.          * @return 拿到的国家总数
  47.          */
  48.         public int getCountryTotal() {
  49.             return countryTotal;
  50.         }
  51.         /**
  52.          * @param cityName  依据传入的城市名称,拿到城市总数
  53.          * @return
  54.          */
  55.         public int getCityTotal(String cityName) {
  56.             return cityStats.get(cityName).get(COUNT_INDEX).intValue();
  57.         }
  58.          
  59.         public String toString() {
  60.             return "Total Count for " + countryName + " is "
  61.                     + Integer.toString(countryTotal) + "\n" + "Cities:  "
  62.                     + cityStats.toString();
  63.         }
  64.     }
  65.     private OutputCollector collector;
  66.     // CountryStats 是一个内部类的对象
  67.     private Map<String, CountryStats> stats = new HashMap<String, CountryStats>();
  68.     @Override
  69.     public void prepare(Map map, TopologyContext topologyContext,
  70.             OutputCollector outputCollector) {
  71.         this.collector = outputCollector;
  72.     }
  73.     @Override
  74.     public void execute(Tuple tuple) {
  75.         String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
  76.         String city = tuple.getStringByField(storm.cookbook.Fields.CITY);
  77.         // 如果国家不存在的时候,新增加一个国家,国家的统计
  78.         if (!stats.containsKey(country)) {
  79.             stats.put(country, new CountryStats(country));
  80.         }
  81.         // 这里拿到新的统计,cityFound 是拿到某个城市的值
  82.         stats.get(country).cityFound(city);
  83.         collector.emit(new Values(country,
  84.                 stats.get(country).getCountryTotal(), city, stats.get(country)
  85.                         .getCityTotal(city)));
  86.     }
  87.     @Override
  88.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  89.         outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
  90.                 storm.cookbook.Fields.COUNTRY,
  91.                 storm.cookbook.Fields.COUNTRY_TOTAL,
  92.                 storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
  93.     }
  94. }
复制代码



有关地理位置的统计,附带上程序其他的使用类
  1. package storm.cookbook;
  2. /**
  3. */
  4. public class Fields {
  5.     public static final String IP = "ip";
  6.      
  7.     public static final String URL = "url";
  8.      
  9.     public static final String CLIENT_KEY = "clientKey";
  10.      
  11.     public static final String COUNTRY = "country";
  12.      
  13.     public static final String COUNTRY_NAME = "country_name";
  14.      
  15.     public static final String CITY = "city";
  16.      
  17.     //唯一的,独一无二的
  18.     public static final String UNIQUE = "unique";
  19.      
  20.     //城镇整数
  21.     public static final String COUNTRY_TOTAL = "countryTotal";
  22.      
  23.     //城市整数
  24.     public static final String CITY_TOTAL = "cityTotal";
  25.      
  26.     //总共计数
  27.     public static final String TOTAL_COUNT = "totalCount";
  28.      
  29.     //总共独一无二的
  30.     public static final String TOTAL_UNIQUE = "totalUnique";
  31. }
  32. package storm.cookbook;
  33. import org.json.simple.JSONObject;
  34. import org.json.simple.JSONValue;
  35. import java.io.BufferedReader;
  36. import java.io.IOException;
  37. import java.io.InputStreamReader;
  38. import java.io.Serializable;
  39. import java.net.MalformedURLException;
  40. import java.net.URL;
  41. import java.net.URLConnection;
  42. public class HttpIPResolver implements IPResolver, Serializable {
  43.     static String url = "http://api.hostip.info/get_json.php";
  44.     @Override
  45.     public JSONObject resolveIP(String ip) {
  46.         URL geoUrl = null;
  47.         BufferedReader in = null;
  48.         try {
  49.             geoUrl = new URL(url + "?ip=" + ip);
  50.             URLConnection connection = geoUrl.openConnection();
  51.             in = new BufferedReader(new InputStreamReader(
  52.                     connection.getInputStream()));
  53.             String inputLine;
  54.             JSONObject json = (JSONObject) JSONValue.parse(in);
  55.             in.close();
  56.             return json;
  57.         } catch (IOException e) {
  58.             e.printStackTrace();
  59.         } finally {
  60.             // 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作
  61.             if (in != null) {
  62.                 try {
  63.                     in.close();
  64.                 } catch (IOException e) {
  65.                 }
  66.             }
  67.         }
  68.         return null;
  69.     }
  70. }
复制代码
  1. package storm.cookbook;
  2. import org.json.simple.JSONObject;
  3. /**
  4. * Created with IntelliJ IDEA.
  5. * User: admin
  6. * Date: 2012/12/07
  7. * Time: 5:29 PM
  8. * To change this template use File | Settings | File Templates.
  9. */
  10. public interface IPResolver {
  11. public JSONObject resolveIP(String ip);
  12. }
复制代码





至此,整个流程完毕。 对于统计以后,数据如何持久,亦或是数据数据写回redis的过程,请实践~

已有(3)人评论

跳转到指定楼层
GreenArrow 发表于 2014-9-24 21:32:44
学习学习啦
回复

使用道具 举报

limengyu 发表于 2014-9-25 11:59:22
strom流处理那本书上的例子 学习啦
回复

使用道具 举报

bbfj 发表于 2016-3-23 15:13:25
很好的参考
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条