本帖最后由 levycui 于 2016-5-31 11:40 编辑
问题导读:
1、如何使用Mapreduce清洗日志文件?
2、如何使用HIVE建立数据仓库?
Flume+Hadoop+Hive的离线分析系统基本架构(一)
第三步,清洗第二步生成的Session信息,生成PageViews信息表
[mw_shl_code=applescript,true]package com.guludada.clickstream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.clickstream.logSession.sessionMapper;
import com.guludada.clickstream.logSession.sessionReducer;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;
public class PageViews {
public static class pageMapper extends Mapper<Object,Text,Text,Text> {
private Text word = new Text();
public void map(Object key,Text value,Context context) {
String line = value.toString();
String[] webLogContents = line.split(" ");
//根据session来分组
word.set(webLogContents[2]);
try {
context.write(word,value);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{
private Text session = new Text();
private Text content = new Text();
private NullWritable v = NullWritable.get();
PageViewsParser pageViewsParser = new PageViewsParser();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//上一条记录的访问信息
PageViewsBean lastStayPageBean = null;
Date lastVisitTime = null;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//将session所对应的所有浏览记录按时间排序
ArrayList<PageViewsBean> pageViewsBeanGroup = new ArrayList<PageViewsBean>();
for(Text pageView : values) {
PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString());
pageViewsBeanGroup.add(pageViewsBean);
}
Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() {
public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) {
Date date1 = pageViewsBean1.getTimeWithDateFormat();
Date date2 = pageViewsBean2.getTimeWithDateFormat();
if(date1 == null && date2 == null) return 0;
return date1.compareTo(date2);
}
});
//计算每个页面的停留时间
int step = 0;
for(PageViewsBean pageViewsBean : pageViewsBeanGroup) {
Date curVisitTime = pageViewsBean.getTimeWithDateFormat();
if(lastStayPageBean != null) {
//计算前后两次访问记录相差的时间,单位是秒
Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000);
//根据当前记录的访问信息更新上一条访问记录中访问的页面的停留时间
lastStayPageBean.setStayTime(timeDiff.toString());
}
//更新访问记录的步数
step++;
pageViewsBean.setStep(step+"");
//更新上一条访问记录的停留时间后,将当前访问记录设定为上一条访问信息记录
lastStayPageBean = pageViewsBean;
lastVisitTime = curVisitTime;
//输出pageViews信息
content.set(pageViewsParser.parser(pageViewsBean));
try {
context.write(content,v);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(PageViews.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(pageMapper.class);
job.setReducerClass(pageReducer.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Date curDate = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
String dateStr = sdf.format(curDate);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/"));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
[/mw_shl_code]
[mw_shl_code=applescript,true]package com.guludada.dataparser;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;
public class PageViewsParser {
/**
* 根据logSession的输出数据加载PageViewsBean
*
* */
public PageViewsBean loadBean(String sessionContent) {
PageViewsBean pageViewsBean = new PageViewsBean();
String[] contents = sessionContent.split(" ");
pageViewsBean.setTime(contents[0] + " " + contents[1]);
pageViewsBean.setIP_addr(contents[2]);
pageViewsBean.setSession(contents[3]);
pageViewsBean.setVisit_URL(contents[4]);
pageViewsBean.setStayTime("0");
pageViewsBean.setStep("0");
return pageViewsBean;
}
public String parser(PageViewsBean pageBean) {
return pageBean.toString();
}
}
[/mw_shl_code]
[mw_shl_code=applescript,true]package com.guludada.javabean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class PageViewsBean {
String session;
String IP_addr;
String time;
String visit_URL;
String stayTime;
String step;
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getIP_addr() {
return IP_addr;
}
public void setIP_addr(String iP_addr) {
IP_addr = iP_addr;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getVisit_URL() {
return visit_URL;
}
public void setVisit_URL(String visit_URL) {
this.visit_URL = visit_URL;
}
public String getStayTime() {
return stayTime;
}
public void setStayTime(String stayTime) {
this.stayTime = stayTime;
}
public String getStep() {
return step;
}
public void setStep(String step) {
this.step = step;
}
public Date getTimeWithDateFormat() {
SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if(this.time != null && this.time != "") {
try {
return sdf_final.parse(this.time);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return null;
}
@Override
public String toString() {
return session + " " + IP_addr + " " + time + " "
+ visit_URL + " " + stayTime + " " + step;
}
}
[/mw_shl_code]
第三次日志清洗产生的PageViews数据结构如下图:
SessionID | IP | 访问时间 | 访问页面 | 停留时间 | 第几步 | Session1 | 192.168.12.130 | 2016-05-30 15:17:30 | /blog/me | 30000 | 1 | Session1 | 192.168.12.130 | 2016-05-30 15:18:00 | /blog/me/admin | 30000 | 2 | Session1 | 192.168.12.130 | 2016-05-30 15:18:30 | /home | 30000 | 3 | Session2 | 192.168.12.150 | 2016-05-30 15:16:30 | /products | 30000 | 1 | Session2 | 192.168.12.150 | 2016-05-30 15:17:00 | /products/details | 30000 | 2 |
第四步,再次清洗Session日志,并生成Visits信息表
[mw_shl_code=applescript,true]package com.guludada.clickstream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.guludada.clickstream.PageViews.pageMapper;
import com.guludada.clickstream.PageViews.pageReducer;
import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.VisitsInfoParser;
import com.guludada.javabean.PageViewsBean;
public class VisitsInfo {
public static class visitMapper extends Mapper<Object,Text,Text,Text> {
private Text word = new Text();
public void map(Object key,Text value,Context context) {
String line = value.toString();
String[] webLogContents = line.split(" ");
//根据session来分组
word.set(webLogContents[2]);
try {
context.write(word,value);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class visitReducer extends Reducer<Text, Text, Text, NullWritable>{
private Text content = new Text();
private NullWritable v = NullWritable.get();
VisitsInfoParser visitsParser = new VisitsInfoParser();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
PageViewsParser pageViewsParser = new PageViewsParser();
Map<String,Integer> viewedPagesMap = new HashMap<String,Integer>();
String entry_URL = "";
String leave_URL = "";
int total_visit_pages = 0;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//将session所对应的所有浏览记录按时间排序
ArrayList<String> browseInfoGroup = new ArrayList<String>();
for(Text browseInfo : values) {
browseInfoGroup.add(browseInfo.toString());
}
Collections.sort(browseInfoGroup,new Comparator<String>() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public int compare(String browseInfo1, String browseInfo2) {
String dateStr1 = browseInfo1.split(" ")[0] + " " + browseInfo1.split(" ")[1];
String dateStr2 = browseInfo2.split(" ")[0] + " " + browseInfo2.split(" ")[1];
Date date1;
Date date2;
try {
date1 = sdf.parse(dateStr1);
date2 = sdf.parse(dateStr2);
if(date1 == null && date2 == null) return 0;
return date1.compareTo(date2);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return 0;
}
}
});
//统计该session访问的总页面数,第一次进入的页面,跳出的页面
for(String browseInfo : browseInfoGroup) {
String[] browseInfoStrArr = browseInfo.split(" ");
String curVisitURL = browseInfoStrArr[3];
Integer curVisitURLInteger = viewedPagesMap.get(curVisitURL);
if(curVisitURLInteger == null) {
viewedPagesMap.put(curVisitURL, 1);
}
}
total_visit_pages = viewedPagesMap.size();
String visitsInfo = visitsParser.parser(browseInfoGroup, total_visit_pages+"");
content.set(visitsInfo);
try {
context.write(content,v);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(VisitsInfo.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(visitMapper.class);
job.setReducerClass(visitReducer.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Date curDate = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
String dateStr = sdf.format(curDate);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path("/clickstream/visitsinfo"+dateStr+"/"));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
[/mw_shl_code]
[mw_shl_code=applescript,true]package com.guludada.dataparser;
import java.util.ArrayList;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.VisitsInfoBean;
import com.guludada.javabean.WebLogSessionBean;
public class VisitsInfoParser {
public String parser(ArrayList<String> pageViewsGroup,String totalVisitNum) {
VisitsInfoBean visitsBean = new VisitsInfoBean();
String entryPage = pageViewsGroup.get(0).split(" ")[4];
String leavePage = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[4];
String startTime = pageViewsGroup.get(0).split(" ")[0] + " " + pageViewsGroup.get(0).split(" ")[1];
String endTime = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[0] +
" " +pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[1];
String session = pageViewsGroup.get(0).split(" ")[3];
String IP = pageViewsGroup.get(0).split(" ")[2];
String referal = pageViewsGroup.get(0).split(" ")[5];
visitsBean.setSession(session);
visitsBean.setStart_time(startTime);
visitsBean.setEnd_time(endTime);
visitsBean.setEntry_page(entryPage);
visitsBean.setLeave_page(leavePage);
visitsBean.setVisit_page_num(totalVisitNum);
visitsBean.setIP_addr(IP);
visitsBean.setReferal(referal);
return visitsBean.toString();
}
}
[/mw_shl_code]
[mw_shl_code=applescript,true]package com.guludada.javabean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class VisitsInfoBean {
String session;
String start_time;
String end_time;
String entry_page;
String leave_page;
String visit_page_num;
String IP_addr;
String referal;
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getStart_time() {
return start_time;
}
public void setStart_time(String start_time) {
this.start_time = start_time;
}
public String getEnd_time() {
return end_time;
}
public void setEnd_time(String end_time) {
this.end_time = end_time;
}
public String getEntry_page() {
return entry_page;
}
public void setEntry_page(String entry_page) {
this.entry_page = entry_page;
}
public String getLeave_page() {
return leave_page;
}
public void setLeave_page(String leave_page) {
this.leave_page = leave_page;
}
public String getVisit_page_num() {
return visit_page_num;
}
public void setVisit_page_num(String visit_page_num) {
this.visit_page_num = visit_page_num;
}
public String getIP_addr() {
return IP_addr;
}
public void setIP_addr(String iP_addr) {
IP_addr = iP_addr;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
@Override
public String toString() {
return session + " " + start_time + " " + end_time
+ " " + entry_page + " " + leave_page + " " + visit_page_num
+ " " + IP_addr + " " + referal;
}
}
[/mw_shl_code]
第四次清洗日志产生的访问记录表结构如下图:
SessionID | 访问时间 | 离开时间 | 第一次访问页面 | 最后一次访问的页面 |
访问的页面总数 | IP |
Referal | Session1 | 2016-05-30 15:17:00 | 2016-05-30 15:19:00 | /blog/me | /blog/others | 5 | 192.168.12.130 | www.baidu.com | Session2 | 2016-05-30 14:17:00 | 2016-05-30 15:19:38 | /home | /profile | 10 | 192.168.12.140 | www.178.com | Session3 | 2016-05-30 12:17:00 | 2016-05-30 15:40:00 | /products | /detail | 6 | 192.168.12.150 | www.78dm.net |
以上就是要进行日志清洗的所有MapReduce程序,因为只是一个简单的演示,方法并没有做很好的抽象。
MapReduce Troubleshooting
指定某个文件夹路径下所有文件作为mapreduce的输入参数的解决方案。
1.hdfs的文件系统中的路径是支持正则表达式的
2.使用.setInputDirRecursive(job,true)方法,然后指定文件夹路径
在分布式环境下如何设置每个用户的SessionID
可以使用UUID,UUID是分布式环境下唯一的元素识别码,它由日期和时间,时钟序列,机器识别码(一般为网卡MAC地址)三部分组成。这样就保证了每个用户的SessionID的唯一性。
HIVE建立数据仓库
使用MapReduce清洗完日志文件后,我们就开始使用Hive去构建对应的数据仓库并使用HiveSql对数据进行分析。而在本系统里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。下面的命令我们可以通过启动Hive的hiveserver2服务器并使用beeline客户端进行操作或者直接写脚本去定时调度。
PageViews数据分析
PageViews的事实表和维度表结构
使用HIVE在数据仓库中创建PageViews的贴源数据表:
>> create table pageviews(session string,ip string,requestdate string,requesttime string,visitpage string, staytime string,step string) comment ‘this is the table for pageviews’ partitioned by(inputDate string) clustered by(session) sorted by(requestdate,requesttime) into 4 buckets row format delimited fields terminated by ‘ ’;
将HDFS中的数据导入到HIVE的PageViews贴源数据表中
>> load data inpath ‘/clickstream/pageviews’ overwrite into table pageviews partition(inputDate=‘2016-05-17’);
如果没有标示是在’Local‘本地文件系统中,则会去HDFS中加载数据
根据具体的业务分析逻辑创建ODS层的PageViews事实表,并从PageViews的贴源表中导入数据
这里根据请求的页面URL来分组(clustered)是为了方便统计每个页面的PV
>> create table ods_pageviews(session string,ip string,viewtime string,visitpage string, staytime string,step string) partitioned by(inputDate string) clustered by(visitpage) sorted by(viewtime) into 4 buckets row format delimited fields terminated by ‘ ’;
>> insert into table ods_pageviews partition(inputDate='2016-05-17') select pv.session,pv.ip,concat(pv.requestdate,"-",pv.requesttime),pv.visitpage,pv.staytime,pv.step from pageviews as pv where pv.inputDate='2016-05-17';
创建PageViews事实表的时间维度表并从当天的事实表里导入数据
>>create table ods_dim_pageviews_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' ';
>> insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-05-17') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv;
创建PageViews事实表的URL维度表并从当天的事实表里导入数据
>> create table ods_dim_pageviews_url(visitpage string,host string,path string,query string) partitioned by(inputDate string) clustered by(visitpage) sorted by(visitpage) into 4 buckets row format delimited fields terminated by ' ';
>> insert into table ods_dim_pageviews_url partition(inputDate='2016-05-17') select distinct pv.visitpage,b.host,b.path,b.query from pageviews pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;
查询每天PV总数前20的页面
>> select op.visitpage as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' group by op.visitpage sort by num desc limit 20;
运行结果:
Visits数据分析
页面具体访问记录Visits的事实表和维度表结构
使用HIVE在数据仓库中创建Visits信息的贴源数据表:
>> create table visitsinfo(session string,startdate string,starttime string,enddate string,endtime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate,starttime) into 4 buckets row format delimited fields terminated by ' ';
将HDFS中的数据导入到HIVE的Visits信息贴源数据表中
>> load data inpath '/clickstream/visitsinfo' overwrite into table visitsinfo partition(inputDate='2016-05-18');
根据具体的业务分析逻辑创建ODS层的Visits事实表,并从visitsinfo的贴源表中导入数据
>> create table ods_visits(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' ';
>> insert into table ods_visits partition(inputDate='2016-05-18') select vi.session,concat(vi.startdate,"-",vi.starttime),concat(vi.enddate,"-",vi.endtime),vi.entrypage,vi.leavepage,vi.viewpagenum,vi.ip,vi.referal from visitsinfo as vi where vi.inputDate='2016-05-18';
创建Visits事实表的时间维度表并从当天的事实表里导入数据
>>create table ods_dim_visits_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' ';
将“访问时间”和“离开时间”两列的值合并后再放入时间维度表中,减少数据的冗余
>>insert overwrite table ods_dim_visits_time partition(inputDate='2016-05-18') select distinct ov.timeparam, substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits as ov1 union select ov2.leavetime as timeparam from ods_visits as ov2) as ov;
创建visits事实表的URL维度表并从当天的事实表里导入数据
>> create table ods_dim_visits_url(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields terminated by ' ';
将每个session的进入页面和离开页面的URL合并后存入到URL维度表中
>>insert into table ods_dim_visits_url partition(inputDate='2016-05-18') select distinct ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits as ov1 union select ov2.leavepage as pageurl from ods_visits as ov2 ) as ov lateral view parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query;
将每个session从哪个外站进入当前网站的信息存入到URL维度表中
>>insert into table ods_dim_visits_url partition(inputDate='2016-05-18') select distinct ov.referal,b.host,b.path,b.query from ods_visits as ov lateral view parse_url_tuple(ov.referal,'HOST','PATH','QUERY') b as host,path,query;
统计每个页面的跳出人数(事实上真正有价值的统计应该是统计页面的跳出率,但为了简单示范,作者在这里简化成统计跳出人数)
>> select ov.leavepage as jumpPage, count(*) as jumpNum from ods_visits as ov group by ov.leavepage order by jumpNum desc;
业务页面转换率分析(漏斗模型)
Hive在创建表的时候无法实现某个字段自增长的关键字,得使用自定义函数(user-defined function)UDF来实现相应的功能。在查询的时候可以使用row_number()来显示行数,不过必须要在complete mode下才能使用,所以可以使用row_number() 函数配合开窗函数over(),具体示例如下。 为简单起见,这里我们创建一个临时表,并手动在里面插入要查看的业务页面链接以及该页面的PV总数,通过这几个参数来计算业务页面之间的转换率,也就是所谓的漏斗模型。
假设我们有“/index” -> “/detail” -> “/createOrder” ->”/confirmOrder” 这一业务页面转化流程
首先我们要创建业务页面的PV的临时信息表,临时表和里面的数据会在session结束的时候清理掉
>> create temporary table transactionpageviews(url string,views int) row format delimited fields terminated by ' ';
先统计业务页面的总PV然后按转换步骤顺序插入每个页面的PV信息到transactionpageviews表中
>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/index' group by opurl.path;
>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/detail' group by opurl.path;
>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/createOrder' group by opurl.path;
>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/confirmOrder' group by opurl.path;
计算业务页面之间的转换率
>> select row_number() over() as rownum,a.url as url, a.views as pageViews,b.views as lastPageViews,a.views/b.views as transferRation from (select row_number() over() as rownum,views,url from transactionpageviews) as a left join (select row_number() over() as rownum,views,url from transactionpageviews) as b on (a.rownum = b.rownum-1 );
Shell脚本+Crontab定时器执行任务调度
执行initialEnv.sh脚本初始化系统环境,为了简单测试,作者只启动了单台服务器,下面的脚本是建立在Hadoop的standalone单节点模式,并且Hive也装在Hadoop服务器上
[mw_shl_code=applescript,true]#!/bin/bash
export HADOOP_HOME=/home/ymh/apps/hadoop-2.6.4
#start hdfs
/home/ymh/apps/hadoop-2.6.4/sbin/start-dfs.sh
#start yarn
if [[ 0 == $? ]]
then
/home/ymh/apps/hadoop-2.6.4/sbin/start-yarn.sh
fi
#start flume
#if [[ 0 == $? ]]
#then
#start flume
#$nohup ~/apache-flume-1.6.0-bin/bin/flume-ng agent -n agent -c conf -f ~/apache-flume-1.6.0-bin/conf/flume-conf.properties &
#fi
#start mysql
if [ 0 = $? ]
then
service mysqld start
fi
#start HIVE SERVER
if [ 0 = $? ]
then
$nohup /apps/apache-hive-1.2.1-bin/bin/hiveserver2 &
fi</span>[/mw_shl_code]
执行dataAnalyseTask.sh脚本,先启动MapReduce程序去清洗当日的日志信息,随后使用Hive去构建当日的ODS数据。需要注意的是,本脚本是建立在ODS层中事实表和维度表已经创建完毕的基础上去执行,所以脚本中不会有创建事实表和维度表的HIVE语句(创建语句见上一个章节的内容),并且为了节省篇幅,只列出了PageViews数据分析的脚本部分。
[mw_shl_code=applescript,true]#!/bin/bash
CURDATE=$(date +%y-%m-%d)
CURDATEHIVE=$(date +%Y-%m-%d)
/home/ymh/apps/hadoop-2.6.4/bin/hdfs dfs -df /flume/events/$CURDATE
if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.logClean
fi
if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.logSession
fi
if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.PageViews
fi
#Load today's data
if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hdfs dfs -chmod 777 /clickstream/pageviews/$CURDATE/
echo "load data inpath '/clickstream/pageviews/$CURDATE/' into table pageviews partition(inputDate='$CURDATEHIVE');" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi
#Create fact table and its dimension tables
if [[ 1 -ne $? ]]
then
echo "insert into table ods_pageviews partition(inputDate='$CURDATEHIVE') select pv.session,pv.ip,concat(pv.requestdate,'-',pv.requesttime) as viewtime,pv.visitpage,pv.staytime,pv.step from pageviews as pv where pv.inputDate='$CURDATEHIVE';" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi
if [[ 1 -ne $? ]]
then
echo "insert into table ods_dim_pageviews_time partition(inputDate='$CURDATEHIVE') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv;" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi
if [[ 1 -ne $? ]]
then
echo "insert into table ods_dim_pageviews_url partition(inputDate='$CURDATEHIVE') select distinct pv.visitpage,b.host,b.path,b.query from pageviews pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi
</span>[/mw_shl_code]
创建crontab文件,指定每天的凌晨01点整执行dataAnalyseTask.sh脚本,该脚本执行“使用MapReduce清理日志文件”和“使用HiveSql构建分析ODS层数据”两项任务,并将用户自定义的crontab文件加入到定时器中
[mw_shl_code=applescript,true]$vi root_crontab_hadoop
$echo "0 1 * * * /myShells/dataAnalyseTask.sh" >> root_crontab_hadoop
$crontab root_crontab_hadoop
[/mw_shl_code]
至此,使用Hadoop进行离线计算的简单架构和示例已经全部阐述完毕,而关于如何使用Sqoop将Hive中的数据导入Mysql中,因为篇幅有限,这里就不展开了。作者刚开始接触分布式离线计算,文章中尚有许多不足的地方,欢迎大家提出宝贵意见并做进一步交流。这篇文章的初衷是作者对自己最近所学知识的一个总结,同时也为了和大家分享所学到的东西,希望对大家有帮助,谢谢阅读!
来源:ymh198816的博客
|
|