问题导读
1、 初期硬件准备的条件是什么?
2、在数据发射过程之中,要注意什么?
3、如何完成一个Ip到 CITY,COUNTRY 的转换?
摘要
模拟电商的,铁道部,电力的流方式处理过程
1: 初期硬件准备:
1 如果条件具备:请保证您安装好了 redis集群
2 配置好您的Storm开发环境
3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间
2:业务背景的介绍:
1 在这里我们将模拟一个 流方式的数据处理过程
2 数据的源头保存在我们的redis 集群之中
3 发射的数据格式为: ip,url,client_key
数据发射器
- package storm.spout;
-
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichSpout;
- import backtype.storm.tuple.Values;
- import backtype.storm.tuple.Fields;
- import org.json.simple.JSONObject;
- import org.json.simple.JSONValue;
- import redis.clients.jedis.Jedis;
- import storm.utils.Conf;
-
- import java.util.Map;
- import org.apache.log4j.Logger;
-
- /**
- * click Spout 从redis中间读取所需要的数据
- */
- public class ClickSpout extends BaseRichSpout {
-
- private static final long serialVersionUID = -6200450568987812474L;
-
- public static Logger LOG = Logger.getLogger(ClickSpout.class);
-
- // 对于redis,我们使用的是jedis客户端
- private Jedis jedis;
-
- // 主机
- private String host;
-
- // 端口
- private int port;
-
- // Spout 收集器
- private SpoutOutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
-
- // 这里,我们发射的格式为
- // IP,URL,CLIENT_KEY
- outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
- storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
- }
-
- @Override
- public void open(Map conf, TopologyContext topologyContext,
- SpoutOutputCollector spoutOutputCollector) {
-
- host = conf.get(Conf.REDIS_HOST_KEY).toString();
- port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
- this.collector = spoutOutputCollector;
- connectToRedis();
- }
-
- private void connectToRedis() {
- jedis = new Jedis(host, port);
- }
-
- @Override
- public void nextTuple() {
- String content = jedis.rpop("count");
- if (content == null || "nil".equals(content)) {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- }
- } else {
-
- // 将jedis对象 rpop出来的字符串解析为 json对象
- JSONObject obj = (JSONObject) JSONValue.parse(content);
-
- String ip = obj.get(storm.cookbook.Fields.IP).toString();
- String url = obj.get(storm.cookbook.Fields.URL).toString();
- String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
- .toString();
-
- System.out.println("this is a clientKey");
-
- // List<Object> tuple对象
- collector.emit(new Values(ip, url, clientKey));
- }
- }
- }
复制代码
在这个过程之中,请注意:
1、我们在 OPEN 方法之中初始化 host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库
2、我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个Values对象
让我们来看看数据的流向图:
在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt
1 : repeatVisitBolt
2 : geographyBolt
共同来读取同一个数据源的数据:clickSpout
3、细细察看 repeatVisitBolt
- package storm.bolt;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
- import redis.clients.jedis.Jedis;
- import storm.utils.Conf;
-
- import java.util.Map;
-
- public class RepeatVisitBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- private Jedis jedis;
- private String host;
- private int port;
-
- @Override
- public void prepare(Map conf, TopologyContext topologyContext,
- OutputCollector outputCollector) {
- this.collector = outputCollector;
- host = conf.get(Conf.REDIS_HOST_KEY).toString();
- port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
- connectToRedis();
- }
-
- private void connectToRedis() {
- jedis = new Jedis(host, port);
- jedis.connect();
- }
-
- public boolean isConnected() {
- if (jedis == null)
- return false;
- return jedis.isConnected();
- }
-
- @Override
- public void execute(Tuple tuple) {
-
- String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
- String clientKey = tuple
- .getStringByField(storm.cookbook.Fields.CLIENT_KEY);
- String url = tuple.getStringByField(storm.cookbook.Fields.URL);
- String key = url + ":" + clientKey;
-
- String value = jedis.get(key);
-
- // redis中取,如果redis中没有,就插入新的一条访问记录。
- if (value == null) {
- jedis.set(key, "visited");
- collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
- } else {
- collector
- .emit(new Values(clientKey, url, Boolean.FALSE.toString()));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
- storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
- storm.cookbook.Fields.UNIQUE));
- }
- }
复制代码
在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】
4:
- package storm.bolt;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
-
- import java.util.Map;
-
- public class VisitStatsBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- private int total = 0;
- private int uniqueCount = 0;
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.collector = outputCollector;
- }
-
- @Override
- public void execute(Tuple tuple) {
-
- //在这里,我们在上游来判断这个Fields 是否是独特和唯一的
- boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
-
- total++;
- if(unique)uniqueCount++;
- collector.emit(new Values(total,uniqueCount));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
- storm.cookbook.Fields.TOTAL_UNIQUE));
- }
- }
复制代码
第一次出现,uv ++
5、接下来,看看流水线2 :
- package storm.bolt;
-
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
- import org.json.simple.JSONObject;
-
- import storm.cookbook.IPResolver;
-
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
- * File | Settings | File Templates.
- */
- public class GeographyBolt extends BaseRichBolt {
-
- // ip解析器
- private IPResolver resolver;
-
- private OutputCollector collector;
-
- public GeographyBolt(IPResolver resolver) {
- this.resolver = resolver;
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext,
- OutputCollector outputCollector) {
- this.collector = outputCollector;
- }
-
- @Override
- public void execute(Tuple tuple) {
-
- // 1 从上级的目录之中拿到我们所要使用的ip
- String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
-
- // 将ip 转换为json
- JSONObject json = resolver.resolveIP(ip);
-
- // 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象
- String city = (String) json.get(storm.cookbook.Fields.CITY);
- String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);
-
- collector.emit(new Values(country, city));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
- // 确定了我们这次输出元祖的格式
- outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
- storm.cookbook.Fields.CITY));
- }
- }
复制代码
以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换
- package storm.bolt;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
-
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
-
- public class GeoStatsBolt extends BaseRichBolt {
-
- private class CountryStats {
-
- //
- private int countryTotal = 0;
-
- private static final int COUNT_INDEX = 0;
- private static final int PERCENTAGE_INDEX = 1;
- private String countryName;
-
- public CountryStats(String countryName) {
- this.countryName = countryName;
- }
-
- private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();
-
- /**
- * @param cityName
- */
- public void cityFound(String cityName) {
- countryTotal++;
-
- // 已经有了值,一个加1的操作
- if (cityStats.containsKey(cityName)) {
- cityStats.get(cityName)
- .set(COUNT_INDEX,
- cityStats.get(cityName).get(COUNT_INDEX)
- .intValue() + 1);
- // 没有值的时候
- } else {
- List<Integer> list = new LinkedList<Integer>();
- list.add(1);
- list.add(0);
- cityStats.put(cityName, list);
- }
-
- double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
- / (double) countryTotal;
-
- cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);
-
- }
-
- /**
- * @return 拿到的国家总数
- */
- public int getCountryTotal() {
- return countryTotal;
- }
-
- /**
- * @param cityName 依据传入的城市名称,拿到城市总数
- * @return
- */
-
- public int getCityTotal(String cityName) {
- return cityStats.get(cityName).get(COUNT_INDEX).intValue();
- }
-
-
- public String toString() {
- return "Total Count for " + countryName + " is "
- + Integer.toString(countryTotal) + "\n" + "Cities: "
- + cityStats.toString();
- }
- }
-
- private OutputCollector collector;
-
- // CountryStats 是一个内部类的对象
- private Map<String, CountryStats> stats = new HashMap<String, CountryStats>();
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext,
- OutputCollector outputCollector) {
- this.collector = outputCollector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
- String city = tuple.getStringByField(storm.cookbook.Fields.CITY);
-
- // 如果国家不存在的时候,新增加一个国家,国家的统计
- if (!stats.containsKey(country)) {
- stats.put(country, new CountryStats(country));
- }
-
- // 这里拿到新的统计,cityFound 是拿到某个城市的值
- stats.get(country).cityFound(city);
-
- collector.emit(new Values(country,
- stats.get(country).getCountryTotal(), city, stats.get(country)
- .getCityTotal(city)));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
- storm.cookbook.Fields.COUNTRY,
- storm.cookbook.Fields.COUNTRY_TOTAL,
- storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
- }
- }
复制代码
有关地理位置的统计,附带上程序其他的使用类
- package storm.cookbook;
-
- /**
- */
- public class Fields {
-
- public static final String IP = "ip";
-
- public static final String URL = "url";
-
- public static final String CLIENT_KEY = "clientKey";
-
- public static final String COUNTRY = "country";
-
- public static final String COUNTRY_NAME = "country_name";
-
- public static final String CITY = "city";
-
- //唯一的,独一无二的
- public static final String UNIQUE = "unique";
-
- //城镇整数
- public static final String COUNTRY_TOTAL = "countryTotal";
-
- //城市整数
- public static final String CITY_TOTAL = "cityTotal";
-
- //总共计数
- public static final String TOTAL_COUNT = "totalCount";
-
- //总共独一无二的
- public static final String TOTAL_UNIQUE = "totalUnique";
-
-
-
-
- }
-
-
- package storm.cookbook;
-
- import org.json.simple.JSONObject;
- import org.json.simple.JSONValue;
-
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.Serializable;
- import java.net.MalformedURLException;
- import java.net.URL;
- import java.net.URLConnection;
-
- public class HttpIPResolver implements IPResolver, Serializable {
-
- static String url = "http://api.hostip.info/get_json.php";
-
- @Override
- public JSONObject resolveIP(String ip) {
- URL geoUrl = null;
- BufferedReader in = null;
- try {
- geoUrl = new URL(url + "?ip=" + ip);
- URLConnection connection = geoUrl.openConnection();
- in = new BufferedReader(new InputStreamReader(
- connection.getInputStream()));
- String inputLine;
-
- JSONObject json = (JSONObject) JSONValue.parse(in);
-
- in.close();
-
- return json;
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
-
- // 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- }
- }
- }
- return null;
- }
- }
复制代码
- package storm.cookbook;
-
- import org.json.simple.JSONObject;
-
- /**
- * Created with IntelliJ IDEA.
- * User: admin
- * Date: 2012/12/07
- * Time: 5:29 PM
- * To change this template use File | Settings | File Templates.
- */
- public interface IPResolver {
-
- public JSONObject resolveIP(String ip);
- }
复制代码
至此,整个流程完毕。 对于统计以后,数据如何持久,亦或是数据数据写回redis的过程,请实践~
|