分享

大数据实战之App管理平台日志分析(二)

本帖最后由 a87758133 于 2019-4-18 13:13 编辑
问题导读:


1、如何通过GeoLite2-City获取手机真实IP地址?
2、如何对地理信息缓存?
3、如何将log消息发送给Kafka?
4、如何自定义Flume拦截器?



上一篇:
大数据实战之App管理平台日志分析(一)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27032&_dsign=2a49c0cc



一、引入GeoLite2-City,获取手机真实ip地址
-----------------------------------------------------------
    1.下载GeoLite数据库文件
        GeoLite2-City.mmdb

    2.引入pom.xml
[mw_shl_code=xml,true]        <dependency>
            <groupId>com.maxmind.db</groupId>
            <artifactId>maxmind-db</artifactId>
            <version>1.0.0</version>
        </dependency>[/mw_shl_code]
    3.测试提取国家和省份信息
[mw_shl_code=java,true]@Test
      public void test1() throws IOException {
         InputStream in = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
         Reader r = new Reader(in);
         JsonNode node = r.get(InetAddress.getByName("140.211.11.105"));
         //国家
         String country = node.get("country").get("names").get("zh-CN").textValue();
         System.out.println(country);
         //省份
         String area = node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
         //城市
         String city = node.get("city").get("names").get("zh-CN").textValue();

         System.out.println(country + "." + area + "." + city);
      }[/mw_shl_code]
   4.封装GeoUtil工具类在common模块
[mw_shl_code=java,true] package com.test.app.util;

      import com.fasterxml.jackson.databind.JsonNode;
      import com.maxmind.db.Reader;

      import java.io.InputStream;
      import java.net.InetAddress;

      /**
       * 地理工具类,实现通过ip查找地址区域
       */
      public class GeoUtil {
         private static InputStream in ;
         private static Reader reader ;
         static{
            try {
               in = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
               reader = new Reader(in);
            } catch (Exception e) {
               e.printStackTrace();
            }
         }

         /**
          *获得国家数据
          */
         public static String getCountry(String ip){
            try{
               JsonNode node = reader.get(InetAddress.getByName(ip));
               return node.get("country").get("names").get("zh-CN").textValue();
            }
            catch (Exception e){
               e.printStackTrace();
            }
            return "" ;
         }
         /**
          *获得省份数据
          */
         public static String getProvince(String ip){
            try{
               JsonNode node = reader.get(InetAddress.getByName(ip));
               return node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
            }
            catch (Exception e){
               e.printStackTrace();
            }
            return "" ;
         }
         /**
          *获得地理位置数据
          */
         public static String getCity(String ip){
            try{
               JsonNode node = reader.get(InetAddress.getByName(ip));
               return node.get("city").get("names").get("zh-CN").textValue();
            }
            catch (Exception e){
               e.printStackTrace();
            }
            return "" ;
         }
      }[/mw_shl_code]

二、对地理信息缓存处理
---------------------------------------------------------
   1.创建GeoInfo类
[mw_shl_code=java,true]      public class GeoInfo {

         private String country ;

         private String province ;

         //get/set

      }[/mw_shl_code]

   2.Controller中增加map,存放出现过的ip信息。
[mw_shl_code=java,true] public class CollectLogController{

         private Map<String,GeoInfo> cache = new HashMap<String, GeoInfo>();
         /**
          * 处理ip client地址问题
          */
         private void processIp(AppLogEntity e, String clientIp) {
            GeoInfo info = cache.get(clientIp);
            if(info == null){
               info = GeoUtil.getGeoInfo(clientIp);
               cache.put(clientIp,info) ;
            }
            for(AppStartupLog log : e.getAppStartupLogs()){
               log.setCountry(info.getCountry());
               log.setProvince(info.getProvince());
               log.setIpAddress(clientIp);
            }
         }
      }[/mw_shl_code]
三、将log消息发送给kafka
------------------------------------------------
   1.web项目中引入kafka依赖
[mw_shl_code=xml,true]        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
        </dependency>[/mw_shl_code]

   2.common模块中创建常量类
      [mw_shl_code=java,true]package com.test.app.common;

      /**
       * 常量类
       */
      public class Constants {
         //主题
            public static final String TOPIC_APP_STARTUP = "topic-app-startup" ;
            public static final String TOPIC_APP_ERRROR = "topic-app-error" ;
            public static final String TOPIC_APP_EVENT = "topic-app-event" ;
            public static final String TOPIC_APP_USAGE = "topic-app-usage" ;
            public static final String TOPIC_APP_PAGE = "topic-app-page" ;
      }[/mw_shl_code]

   3.在controller发送消息给主题
[mw_shl_code=java,true] /**
         */
        @Controller()
        @RequestMapping("/coll")
        public class CollectLogController {

           /**
            * 地理信息缓存
            */
           private Map<String,GeoInfo> cache = new HashMap<String, GeoInfo>();

           /**
            * 启动日志收集
            */
           @RequestMapping(value = "/index", method = RequestMethod.POST)
           @ResponseBody
           public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req) {

              System.out.println("=============================");
              //server时间
              long myTime = System.currentTimeMillis() ;
              //客户端时间
              long clientTime = Long.parseLong(req.getHeader("clientTime"));
              //时间校对
              long diff = myTime - clientTime ;

              //1.修正日志时间
              verifyTime(e,diff);
              //2.对e进行处理,将具体日志分类的属性值填充完毕
              copyBaseProperties(e);
              //3.修正日志的ip位置等信息
              String clientIp = req.getRemoteAddr();
              processIp(e , clientIp);
              //4.发送日志到kafka集群
              sendMessageToKafka(e);
              return e;
           }

           /**
            * 发送消息到kafka集群
            * @param e
            */
           private void sendMessageToKafka(AppLogEntity e) {
              //创建配置对象
              Properties props = new Properties();
              props.put("metadata.broker.list", "s100:9092");
              props.put("serializer.class", "kafka.serializer.StringEncoder");
              props.put("request.required.acks", "1");

              //创建生产者
              Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
              sendSingleLog(producer,Constants.TOPIC_APP_STARTUP,e.getAppStartupLogs());
              sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs());
              sendSingleLog(producer,Constants.TOPIC_APP_EVENT,e.getAppEventLogs());
              sendSingleLog(producer,Constants.TOPIC_APP_PAGE,e.getAppPageLogs());
              sendSingleLog(producer,Constants.TOPIC_APP_USAGE,e.getAppUsageLogs());
              producer.close();
           }

           /**
            * 发送单个的log消息给kafka
            */
           private void sendSingleLog(Producer<Integer, String> producer,String topic , AppBaseLog[] logs){
              for (AppBaseLog log : logs) {
                 String logMsg = JSONObject.toJSONString(log);
                 //创建消息
                 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, logMsg);
                 producer.send(data);
              }
           }

           /**
            * 处理ip client地址问题
            */
           private void processIp(AppLogEntity e, String clientIp) {
              GeoInfo info = cache.get(clientIp);
              if(info == null){
                 info = new GeoInfo();
                 info.setCountry(GeoUtil.getCountry(clientIp));
                 info.setProvince(GeoUtil.getProvince(clientIp));
                 cache.put(clientIp,info) ;
              }
              for(AppStartupLog log : e.getAppStartupLogs()){
                 log.setCountry(info.getCountry());
                 log.setProvince(info.getProvince());
                 log.setIpAddress(clientIp);
              }
           }


           /**
            * 校对各个具体日志的创建时间(使用服务器时间差diff)
            */
           private void verifyTime(AppLogEntity e, long diff)
           {
              //启动修正
              //startuplog
              for(AppBaseLog log : e.getAppStartupLogs()){
                 log.setCreatedAtMs(log.getCreatedAtMs() + diff );
              }
              for(AppBaseLog log : e.getAppUsageLogs()){
                 log.setCreatedAtMs(log.getCreatedAtMs() + diff );
              }
              for(AppBaseLog log : e.getAppPageLogs()){
                 log.setCreatedAtMs(log.getCreatedAtMs() + diff );
              }
              for(AppBaseLog log : e.getAppEventLogs()){
                 log.setCreatedAtMs(log.getCreatedAtMs() + diff );
              }
              for(AppBaseLog log : e.getAppErrorLogs()){
                 log.setCreatedAtMs(log.getCreatedAtMs() + diff );
              }
           }

           /**
            * 将Log的属性分类复制到各个具体的log中
            */
           private void copyBaseProperties(AppLogEntity e){
              PropertiesUtil.copyProperties(e,e.getAppStartupLogs());
              PropertiesUtil.copyProperties(e,e.getAppErrorLogs());
              PropertiesUtil.copyProperties(e,e.getAppEventLogs());
              PropertiesUtil.copyProperties(e,e.getAppPageLogs());
              PropertiesUtil.copyProperties(e,e.getAppUsageLogs());
           }
        }[/mw_shl_code]
    4.启动zk集群和kafka集群

    5.创建5个主题。
      [mw_shl_code=shell,true]$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-startup$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-error
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-event
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-usage
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-page[/mw_shl_code]

    6.查看并验证主题是否创建和发送成功
        a.查看主题
[mw_shl_code=shell,true] $> kafka-topics.sh --list --zookeeper s100:2181[/mw_shl_code]
        b.启动控制台消费者,进行测试,查看日志输出
[mw_shl_code=shell,true]$> kafka-console-consumer.sh --bootstrap-server s200:9092 --topic topic-app-startup --from-beginning[/mw_shl_code]

四、通过flume收集kafka消息,然后上传到hdfs进行储存
----------------------------------------------------------------
    1.日志分成5个方面,hdfs中存放在不同目录下。
      /data/applogs/startup/201901/12/1213/xxx-xxxxxxx
      /data/applogs/error/201901/12/1213/xxx-xxxxxxx
      ...

    2.如果想实现上面的自动命名hdfs目录名
        a.将kafka消息转换成对象
        b.抽取createTimeMs属性值作为flume的Header
        c.按照固定的格式化串对Header进行格式化
        d.创建自定义拦截器,对每条kafka消息都进行类似format处理
        f.按照格式化之后的串创建HDFS目录

    3.创建新的模块app-logs-flume,添加maven依赖
        [mw_shl_code=xml,true]<?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>

            <groupId>com.test</groupId>
            <artifactId>app-logs-flume</artifactId>
            <version>1.0-SNAPSHOT</version>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flume</groupId>
                    <artifactId>flume-ng-core</artifactId>
                    <version>1.7.0</version>
                </dependency>
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                    <version>4.11</version>
                </dependency>
                <dependency>
                    <groupId>com.alibaba</groupId>
                    <artifactId>fastjson</artifactId>
                    <version>1.2.24</version>
                </dependency>
                <dependency>
                    <groupId>com.test</groupId>
                    <artifactId>app-analyze-common</artifactId>
                    <version>1.0-SNAPSHOT</version>
                </dependency>
            </dependencies>
        </project>[/mw_shl_code]

   4.自定义flume拦截器
[mw_shl_code=java,true]package com.test.app.flume.interceptor;

        import com.alibaba.fastjson.JSONObject;
        import com.test.app.common.AppBaseLog;
        import org.apache.flume.Context;
        import org.apache.flume.Event;
        import org.apache.flume.interceptor.Interceptor;

        import java.util.List;
        import java.util.Map;

        import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;

        /**
         * 自定义flume的拦截器,提取body中的createTimeMS字段作为header
         */
        public class LogCollInterceptor implements Interceptor {

            private final boolean preserveExisting;

            private LogCollInterceptor(boolean preserveExisting) {
                this.preserveExisting = preserveExisting;
            }

            public void initialize() {
            }

            /**
             * Modifies events in-place.
             * 将flume的时间戳全部盖掉。换成startTimeMs
             */
            public Event intercept(Event event) {
                Map<String, String> headers = event.getHeaders();
                //得到kafka传递过来的消息,反转成AppBaseLog对象
                byte[] json = event.getBody();
                String jsonStr = new String(json);
                AppBaseLog log = JSONObject.parseObject(jsonStr , AppBaseLog.class);
                //获取日志创建时间
                long time = log.getCreatedAtMs();

                //处理log类型的头
                //1.盖掉flume的头信息的时间戳
                headers.put(TIMESTAMP, Long.toString(time));

                //2.处理头的logType[实现1个flume订阅5个kafka主题]
                String logType = "" ;
                if(jsonStr.contains("pageId")){
                    logType = "page" ;
                }
                //eventLog
                else if (jsonStr.contains("eventId")) {
                    logType = "event";
                }
                //usageLog
                else if (jsonStr.contains("singleUseDurationSecs")) {
                    logType = "usage";
                }
                //error
                else if (jsonStr.contains("errorBrief")) {
                    logType = "error";
                }
                //startup
                else if (jsonStr.contains("network")) {
                    logType = "startup";
                }
                headers.put("logType", logType);
                return event;
            }

            /**
             * Delegates to {@link #intercept(Event)} in a loop.
             *
             * @param events
             * @return
             */
            public List<Event> intercept(List<Event> events) {
                for (Event event : events) {
                    intercept(event);
                }
                return events;
            }

            public void close() {
            }

            /**
             */
            public static class Builder implements Interceptor.Builder {

                private boolean preserveExisting = PRESERVE_DFLT;

                public Interceptor build() {
                    return new LogCollInterceptor(preserveExisting);
                }

                public void configure(Context context) {
                    preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
                }

            }

            public static class Constants {
                public static String TIMESTAMP = "timestamp";
                public static String PRESERVE = "preserveExisting";
                public static boolean PRESERVE_DFLT = false;
            }

        }[/mw_shl_code]
   5.导出flumejar包[使用Build Artfacts打包,加入所有的依赖进行打包]
      复制到flume的/lib下,并分发到所有节点

   6.配置flume配置文件[flume/conf/applog.conf]
        [mw_shl_code=text,true]a1.sources=r1
        a1.channels=c1
        a1.sinks=k1

        a1.sources.r1.interceptors = i1
        a1.sources.r1.interceptors.i1.type = com.test.app.flume.interceptor.LogCollInterceptor$Builder
        a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
        a1.sources.r1.batchSize = 5000
        a1.sources.r1.batchDurationMillis = 2000
        a1.sources.r1.kafka.bootstrap.servers = s200:9092
        a1.sources.r1.kafka.zookeeperConnect = s200:2181,s300:2181,s400:2181
        a1.sources.r1.kafka.topics.regex = ^topic-app-.*$
        #a1.sources.r1.kafka.consumer.group.id = g3

        a1.channels.c1.type=memory
        a1.channels.c1.capacity=100000
        a1.channels.c1.transactionCapacity=10000

        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /data/applogs/%{logType}/%Y%m/%d/%H%M
        a1.sinks.k1.hdfs.filePrefix = events-
        a1.sinks.k1.hdfs.round = false
        a1.sinks.k1.hdfs.roundValue = 30
        a1.sinks.k1.hdfs.roundUnit = second

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel= c1[/mw_shl_code]

    7.启动hdfs,并且创建好目录/data/applogs/

   8.启动flume
[mw_shl_code=shell,true]      $>flume-ng agent -f applog.conf -n a1
[/mw_shl_code]
    9.启动web服务器和日志生成程序,查看hdfs上是否成功生成日志

五、配置hive数据仓库 -- 周期性的加载hdfs上的数据到hive仓库中,用于后期查询
-----------------------------------------------------------------------------
    1.说明
      因为使用json格式存放数据,需要第三方serde库。
      下载json-serde-1.3.8-jar-with-dependencies.jar

   2.复制以上的jar包hive的lib下,分发

   3.配置hive-site.xml文件,添加jar包的声明,永久注册。
      [mw_shl_code=xml,true][hive-site.xml]
        <property>
            <name>hive.aux.jars.path</name>
            <value>file:///soft/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar</value>
        </property>[/mw_shl_code]
   4.设置不压缩存储
      [mw_shl_code=xml,true][hive-site.xml]
      <property>
         <name>hive.exec.compress.output</name>
         <value>false</value>
      </property>[/mw_shl_code]
   5.创建数据库
     [mw_shl_code=shell,true] $hive> create database applogs_db ;[/mw_shl_code]
   6.创建测试表
       [mw_shl_code=shell,true]hive> use applogs_db;
hive> create table test(id int , name string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;[/mw_shl_code]
   7.执行插入
[mw_shl_code=sql,true]      $hive> insert into test(id,name) values(1,'tom') ;
[/mw_shl_code]
   8.修改配置文件需要重新进入hive命令行

   9.创建applogs表语句
      [mw_shl_code=applescript,true]CREATE external TABLE ext_startup_logs(
        createdAtMs bigint ,
        name string)
PARTITIONED BY (
        ym string,
        day string,
        hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
        ...[/mw_shl_code]

六、BUG解决
-------------------------------------------------------------
    1.公共模块加载Geo数据库的Null Stream问题
        类加载问题。
        ClassLoader.getSystemSystemAsStream("Geo.mmdb") ;
        web
        tomcat

        使用线程获得当前的类加载器.[因为是一个web程序,所以不能使用传统的classloader方式]
        [mw_shl_code=java,true]public class GeoUtil {
            ...
            static{
                try {
                    ClassLoader loader = Thread.currentThread().getContextClassLoader();
                    in = loader.getResource("GeoLite2-City.mmdb").openStream();
                    reader = new Reader(in);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }[/mw_shl_code]

    2.idea下模块之间存在依赖关系的时候,添加依赖和打包问题
       a.只需要在项目结构中添加依赖的模块即可,不需要在pom.xml中添加依赖工件。

       b.在web模块工件部分,将依赖的第三方模块put into web-info/classes下。



最新经典文章,欢迎关注公众号

来源:CSDN

作者:葛红富

原文:《大数据项目实战之 --- 某App管理平台的手机app日志分析系统(二)》

https://blog.csdn.net/xcvbxv01/article/details/84256844




已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条