大数据实战之App管理平台日志分析(二)
本帖最后由 a87758133 于 2019-4-18 13:13 编辑问题导读:
1、如何通过GeoLite2-City获取手机真实IP地址?
2、如何对地理信息缓存?
3、如何将log消息发送给Kafka?
4、如何自定义Flume拦截器?
上一篇:
大数据实战之App管理平台日志分析(一)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27032&_dsign=2a49c0cc
一、引入GeoLite2-City,获取手机真实ip地址
-----------------------------------------------------------
1.下载GeoLite数据库文件
GeoLite2-City.mmdb
2.引入pom.xml
<dependency>
<groupId>com.maxmind.db</groupId>
<artifactId>maxmind-db</artifactId>
<version>1.0.0</version>
</dependency>
3.测试提取国家和省份信息
@Test
public void test1() throws IOException {
InputStream in = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
Reader r = new Reader(in);
JsonNode node = r.get(InetAddress.getByName("140.211.11.105"));
//国家
String country = node.get("country").get("names").get("zh-CN").textValue();
System.out.println(country);
//省份
String area = node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
//城市
String city = node.get("city").get("names").get("zh-CN").textValue();
System.out.println(country + "." + area + "." + city);
}
4.封装GeoUtil工具类在common模块
package com.test.app.util;
import com.fasterxml.jackson.databind.JsonNode;
import com.maxmind.db.Reader;
import java.io.InputStream;
import java.net.InetAddress;
/**
* 地理工具类,实现通过ip查找地址区域
*/
public class GeoUtil {
private static InputStream in ;
private static Reader reader ;
static{
try {
in = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
reader = new Reader(in);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*获得国家数据
*/
public static String getCountry(String ip){
try{
JsonNode node = reader.get(InetAddress.getByName(ip));
return node.get("country").get("names").get("zh-CN").textValue();
}
catch (Exception e){
e.printStackTrace();
}
return "" ;
}
/**
*获得省份数据
*/
public static String getProvince(String ip){
try{
JsonNode node = reader.get(InetAddress.getByName(ip));
return node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
}
catch (Exception e){
e.printStackTrace();
}
return "" ;
}
/**
*获得地理位置数据
*/
public static String getCity(String ip){
try{
JsonNode node = reader.get(InetAddress.getByName(ip));
return node.get("city").get("names").get("zh-CN").textValue();
}
catch (Exception e){
e.printStackTrace();
}
return "" ;
}
}
二、对地理信息缓存处理
---------------------------------------------------------
1.创建GeoInfo类
public class GeoInfo {
private String country ;
private String province ;
//get/set
}
2.Controller中增加map,存放出现过的ip信息。
public class CollectLogController{
private Map<String,GeoInfo> cache = new HashMap<String, GeoInfo>();
/**
* 处理ip client地址问题
*/
private void processIp(AppLogEntity e, String clientIp) {
GeoInfo info = cache.get(clientIp);
if(info == null){
info = GeoUtil.getGeoInfo(clientIp);
cache.put(clientIp,info) ;
}
for(AppStartupLog log : e.getAppStartupLogs()){
log.setCountry(info.getCountry());
log.setProvince(info.getProvince());
log.setIpAddress(clientIp);
}
}
}
三、将log消息发送给kafka
------------------------------------------------
1.web项目中引入kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
2.common模块中创建常量类
package com.test.app.common;
/**
* 常量类
*/
public class Constants {
//主题
public static final String TOPIC_APP_STARTUP = "topic-app-startup" ;
public static final String TOPIC_APP_ERRROR = "topic-app-error" ;
public static final String TOPIC_APP_EVENT = "topic-app-event" ;
public static final String TOPIC_APP_USAGE = "topic-app-usage" ;
public static final String TOPIC_APP_PAGE = "topic-app-page" ;
}
3.在controller发送消息给主题
/**
*/
@Controller()
@RequestMapping("/coll")
public class CollectLogController {
/**
* 地理信息缓存
*/
private Map<String,GeoInfo> cache = new HashMap<String, GeoInfo>();
/**
* 启动日志收集
*/
@RequestMapping(value = "/index", method = RequestMethod.POST)
@ResponseBody
public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req) {
System.out.println("=============================");
//server时间
long myTime = System.currentTimeMillis() ;
//客户端时间
long clientTime = Long.parseLong(req.getHeader("clientTime"));
//时间校对
long diff = myTime - clientTime ;
//1.修正日志时间
verifyTime(e,diff);
//2.对e进行处理,将具体日志分类的属性值填充完毕
copyBaseProperties(e);
//3.修正日志的ip位置等信息
String clientIp = req.getRemoteAddr();
processIp(e , clientIp);
//4.发送日志到kafka集群
sendMessageToKafka(e);
return e;
}
/**
* 发送消息到kafka集群
* @param e
*/
private void sendMessageToKafka(AppLogEntity e) {
//创建配置对象
Properties props = new Properties();
props.put("metadata.broker.list", "s100:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
//创建生产者
Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
sendSingleLog(producer,Constants.TOPIC_APP_STARTUP,e.getAppStartupLogs());
sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs());
sendSingleLog(producer,Constants.TOPIC_APP_EVENT,e.getAppEventLogs());
sendSingleLog(producer,Constants.TOPIC_APP_PAGE,e.getAppPageLogs());
sendSingleLog(producer,Constants.TOPIC_APP_USAGE,e.getAppUsageLogs());
producer.close();
}
/**
* 发送单个的log消息给kafka
*/
private void sendSingleLog(Producer<Integer, String> producer,String topic , AppBaseLog[] logs){
for (AppBaseLog log : logs) {
String logMsg = JSONObject.toJSONString(log);
//创建消息
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, logMsg);
producer.send(data);
}
}
/**
* 处理ip client地址问题
*/
private void processIp(AppLogEntity e, String clientIp) {
GeoInfo info = cache.get(clientIp);
if(info == null){
info = new GeoInfo();
info.setCountry(GeoUtil.getCountry(clientIp));
info.setProvince(GeoUtil.getProvince(clientIp));
cache.put(clientIp,info) ;
}
for(AppStartupLog log : e.getAppStartupLogs()){
log.setCountry(info.getCountry());
log.setProvince(info.getProvince());
log.setIpAddress(clientIp);
}
}
/**
* 校对各个具体日志的创建时间(使用服务器时间差diff)
*/
private void verifyTime(AppLogEntity e, long diff)
{
//启动修正
//startuplog
for(AppBaseLog log : e.getAppStartupLogs()){
log.setCreatedAtMs(log.getCreatedAtMs() + diff );
}
for(AppBaseLog log : e.getAppUsageLogs()){
log.setCreatedAtMs(log.getCreatedAtMs() + diff );
}
for(AppBaseLog log : e.getAppPageLogs()){
log.setCreatedAtMs(log.getCreatedAtMs() + diff );
}
for(AppBaseLog log : e.getAppEventLogs()){
log.setCreatedAtMs(log.getCreatedAtMs() + diff );
}
for(AppBaseLog log : e.getAppErrorLogs()){
log.setCreatedAtMs(log.getCreatedAtMs() + diff );
}
}
/**
* 将Log的属性分类复制到各个具体的log中
*/
private void copyBaseProperties(AppLogEntity e){
PropertiesUtil.copyProperties(e,e.getAppStartupLogs());
PropertiesUtil.copyProperties(e,e.getAppErrorLogs());
PropertiesUtil.copyProperties(e,e.getAppEventLogs());
PropertiesUtil.copyProperties(e,e.getAppPageLogs());
PropertiesUtil.copyProperties(e,e.getAppUsageLogs());
}
}
4.启动zk集群和kafka集群
5.创建5个主题。
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-startup$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-error
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-event
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-usage
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-page
6.查看并验证主题是否创建和发送成功
a.查看主题
$> kafka-topics.sh --list --zookeeper s100:2181
b.启动控制台消费者,进行测试,查看日志输出
$> kafka-console-consumer.sh --bootstrap-server s200:9092 --topic topic-app-startup --from-beginning
四、通过flume收集kafka消息,然后上传到hdfs进行储存
----------------------------------------------------------------
1.日志分成5个方面,hdfs中存放在不同目录下。
/data/applogs/startup/201901/12/1213/xxx-xxxxxxx
/data/applogs/error/201901/12/1213/xxx-xxxxxxx
...
2.如果想实现上面的自动命名hdfs目录名
a.将kafka消息转换成对象
b.抽取createTimeMs属性值作为flume的Header
c.按照固定的格式化串对Header进行格式化
d.创建自定义拦截器,对每条kafka消息都进行类似format处理
f.按照格式化之后的串创建HDFS目录
3.创建新的模块app-logs-flume,添加maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>app-logs-flume</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.24</version>
</dependency>
<dependency>
<groupId>com.test</groupId>
<artifactId>app-analyze-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
4.自定义flume拦截器
package com.test.app.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import com.test.app.common.AppBaseLog;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;
/**
* 自定义flume的拦截器,提取body中的createTimeMS字段作为header
*/
public class LogCollInterceptor implements Interceptor {
private final boolean preserveExisting;
private LogCollInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}
public void initialize() {
}
/**
* Modifies events in-place.
* 将flume的时间戳全部盖掉。换成startTimeMs
*/
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
//得到kafka传递过来的消息,反转成AppBaseLog对象
byte[] json = event.getBody();
String jsonStr = new String(json);
AppBaseLog log = JSONObject.parseObject(jsonStr , AppBaseLog.class);
//获取日志创建时间
long time = log.getCreatedAtMs();
//处理log类型的头
//1.盖掉flume的头信息的时间戳
headers.put(TIMESTAMP, Long.toString(time));
//2.处理头的logType[实现1个flume订阅5个kafka主题]
String logType = "" ;
if(jsonStr.contains("pageId")){
logType = "page" ;
}
//eventLog
else if (jsonStr.contains("eventId")) {
logType = "event";
}
//usageLog
else if (jsonStr.contains("singleUseDurationSecs")) {
logType = "usage";
}
//error
else if (jsonStr.contains("errorBrief")) {
logType = "error";
}
//startup
else if (jsonStr.contains("network")) {
logType = "startup";
}
headers.put("logType", logType);
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
*
* @param events
* @return
*/
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
public void close() {
}
/**
*/
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
public Interceptor build() {
return new LogCollInterceptor(preserveExisting);
}
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}
}
public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
}
}
5.导出flumejar包[使用Build Artfacts打包,加入所有的依赖进行打包]
复制到flume的/lib下,并分发到所有节点
6.配置flume配置文件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.test.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = s200:9092
a1.sources.r1.kafka.zookeeperConnect = s200:2181,s300:2181,s400:2181
a1.sources.r1.kafka.topics.regex = ^topic-app-.*$
#a1.sources.r1.kafka.consumer.group.id = g3
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
7.启动hdfs,并且创建好目录/data/applogs/
8.启动flume
$>flume-ng agent -f applog.conf -n a1
9.启动web服务器和日志生成程序,查看hdfs上是否成功生成日志
五、配置hive数据仓库 -- 周期性的加载hdfs上的数据到hive仓库中,用于后期查询
-----------------------------------------------------------------------------
1.说明
因为使用json格式存放数据,需要第三方serde库。
下载json-serde-1.3.8-jar-with-dependencies.jar
2.复制以上的jar包hive的lib下,分发
3.配置hive-site.xml文件,添加jar包的声明,永久注册。
<property>
<name>hive.aux.jars.path</name>
<value>file:///soft/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar</value>
</property>
4.设置不压缩存储
<property>
<name>hive.exec.compress.output</name>
<value>false</value>
</property>
5.创建数据库
$hive> create database applogs_db ;
6.创建测试表
hive> use applogs_db;
hive> create table test(id int , name string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
7.执行插入
$hive> insert into test(id,name) values(1,'tom') ;
8.修改配置文件需要重新进入hive命令行
9.创建applogs表语句
CREATE external TABLE ext_startup_logs(
createdAtMs bigint ,
name string)
PARTITIONED BY (
ym string,
day string,
hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
...
六、BUG解决
-------------------------------------------------------------
1.公共模块加载Geo数据库的Null Stream问题
类加载问题。
ClassLoader.getSystemSystemAsStream("Geo.mmdb") ;
web
tomcat
使用线程获得当前的类加载器.[因为是一个web程序,所以不能使用传统的classloader方式]
public class GeoUtil {
...
static{
try {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
in = loader.getResource("GeoLite2-City.mmdb").openStream();
reader = new Reader(in);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.idea下模块之间存在依赖关系的时候,添加依赖和打包问题
a.只需要在项目结构中添加依赖的模块即可,不需要在pom.xml中添加依赖工件。
b.在web模块工件部分,将依赖的第三方模块put into web-info/classes下。
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg
来源:CSDN作者:葛红富原文:《大数据项目实战之 --- 某App管理平台的手机app日志分析系统(二)》https://blog.csdn.net/xcvbxv01/article/details/84256844
感谢分享
页:
[1]