代码:
package ri.zhi.fen.xi;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 巨型網站日志系统分析,提取KPI数据
* */
public class BaseStationDataPreprocess extends Configured implements Tool {
//计数器
enum Counter {
OUTOFTIMESKIP, TIMESKIP, LINESKIP
};
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
//传递任务参数
conf.set("date", args[2]);
conf.set("timepoint", args[3]);
Job job = new Job(conf, "BaseStationDataPreprecess");
job.setJarByClass(BaseStationDataPreprocess.class);
//输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//调用Map类作为Map任务的代码
job.setMapperClass(Map.class);
//调用Reduce类作为Reduce任务代码
job.setReducerClass(Reducer.class);
//job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//执行任务命令
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 读取一行数据,以“IMSI+时间段”作为key发射出去
*
* */
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
String data;
String[] timepoint;
boolean dataSource;
//初始化,每个Mapper开始的时候执行一次
public void setup(Context context) throws IOException {
//提取参数,从context获取文件名区分数据来源和字段
this.data = context.getConfiguration().get("date");
this.timepoint = context.getConfiguration().get("timpoint").split("-");
//提取文件名-打开输入的文件
FileSplit fs = (FileSplit) context.getInputSplit();
//获取文件名
String filename = fs.getPath().getName();
//通过文件名判断数据来源与哪个文件
if (filename.startsWith("POS"))
dataSource = true;
else if (filename.startsWith("NET"))
dataSource = false;
else
throw new IOException("File Name should starts with POS or NET");
}
//Map任务,Map函数,对每一行输入数据执行一次。
/**
* Map任务
* 读取基站数据
* 找出数据所对应的时间段
* 以IMSI和时间段作为Key
* CGI和时间作为Value
*
* */
public void map(LongWritable key, Text value, Context context) throws IOException {
//读取数据一行
String line = value.toString();
TableLine tableLine = new TableLine();
//读取行
try {
//自定义TableLine类提取字段
tableLine.set(line, dataSource, this.data, timepoint);
} catch (LineException e) {
// TODO Auto-generated catch block
if (e.getFlag() == -1)
//接收到错误的时间记录,然后相应的counter+1
context.getCounter(Counter.OUTOFTIMESKIP).increment(1);
else
//格式不对,解析不了,然后相应的counter+1
context.getCounter(Counter.TIMESKIP).increment(1);
return;
} catch (Exception e) {
//读取失败,直接跳过整行
context.getCounter(Counter.LINESKIP).increment(1);
return;
}
try {
context.write(tableLine.outKey(), tableLine.outValue());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//解析每一行数据,提取需要的部分
public class TableLine {
private String imsi, position, time, timeFlag;
private Date day;
private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void set(String line, boolean source, String date, String[] timepint) throws LineException {
String[] lineSpline = line.split("\t");
if (source) {
this.imsi = lineSpline[0];
this.position = lineSpline[3];
this.time = lineSpline[4];
} else {
this.imsi = lineSpline[0];
this.position = lineSpline[2];
this.time = lineSpline[3];
}
//检查时间是否与输入的相同
if (!this.time.startsWith(date))
//不同的话,flag是-1
throw new LineException("", -1);
try {
this.day = this.formatter.parse(this.time);
} catch (ParseException e) {
throw new LineException("", 0);
}
//判断时间是否在指定的时间段内
int i = 0, n = timepoint.length;
//hour大于最大的时间点
int hour = Integer.valueOf(this.time.split(" ")[1].split(":")[0]);
while (i < n && Integer.valueOf(timepoint) <= hour)
i++;
if (i < n) {
if (i == 0)
//判断是否在时间段之前,然后输出时间段
this.timeFlag = ("00-" + timepint);
else
this.timeFlag = (timepoint[i - 1] + "-" + timepoint);
} else
//不是在指定的时间段里面
throw new LineException("", -1);
}
public Text outKey() {
return new Text(this.imsi + "|" + this.timeFlag);
}
public Text outValue() {
long t = (day.getTime() / 1000L);
//用时间的偏移量作为输出时间-把时间转化成UNIX格式
return new Text(this.position + "|" + String.valueOf(t));
}
}
public class LineException extends Exception {
/**
*
*/
private static final long serialVersionUID = 3655169408843271282L;
int flag;
public LineException(String msg, int flag) {
super(msg);
this.flag = flag;
}
public int getFlag() {
return flag;
}
}
}
/**
* 统计同一个IMSI在同一时间段,在不同CGI停留的时长
* */
public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
private String date;
private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void setup(Context context) {
this.date=context.getConfiguration().get("date");
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException {
//取出用户,以|分割
String imsi = key.toString().split("\\|")[0];
//取出时间段
String timeFlag = key.toString().split("\\|")[1];
//用一个Treemap记录时间(自然排序)---使用TreeMap可以让数据按照时间排序
TreeMap<Long, String> uploads = new TreeMap<Long, String>();
String valueString;
for (Text value : values) {
valueString = value.toString();
try {
//时间+地点
uploads.put(Long.valueOf(valueString.split("\\|")[1]), valueString.split("\\|")[0]);
} catch (NumberFormatException e) {
context.getCounter(Counter.TIMESKIP).increment(1);
continue;
}
}
try {
//组合出来最后的时间
Date tmp = this.formatter.parse(this.date + " " + timeFlag.split("-")[1] + ":00:00");
//自己设定一个最后的时间OFF
uploads.put((tmp.getTime() / 1000L), "OFF");
//需要键值对表示,并不关心顺序的
HashMap<String, Float> locs = getStayTime(uploads);
for (Entry<String, Float> entry : locs.entrySet()) {
StringBuilder builder = new StringBuilder();
builder.append(imsi).append("|");
builder.append(entry.getKey()).append("|");
builder.append(timeFlag).append("|");
builder.append(entry.getValue());
context.write(NullWritable.get(), new Text(builder.toString()));
}
} catch (Exception e) {
e.getMessage();
}
}
//自定义函数,用于汇总停留时间--用后一个时间减去前一个时间,如果间隔超过60分钟就认定关机
public HashMap<String, Float> getStayTime(TreeMap<Long, String> uploads) {
Entry<Long, String> upload, nextUpload;
HashMap<String, Float> locs = new HashMap<String, Float>();
Iterator<Entry<Long, String>> it = uploads.entrySet().iterator();
upload = it.next();
while (it.hasNext()) {
nextUpload = it.next();
float diff = (float) (nextUpload.getKey() - upload.getKey()) / 60.0f;
if (diff <= 60.0) {
if (locs.containsKey(upload.getValue()))
locs.put(upload.getValue(), locs.get(upload.getValue()) + diff);
else
locs.put(upload.getValue(), diff);
}
upload = nextUpload;
}
return locs;
}
}
public static void main(String args[]) throws Exception {
//检查参数个数是否正确
if (args.length != 4) {
System.err.println("");
System.err.println("");
System.exit(-1);;
}
//调用run函数执行任务
int res = ToolRunner.run(new Configuration(), new BaseStationDataPreprocess(), args);
System.exit(res);
}
}
运行代码:
hadoop jar ./HadoopTest01.jar cn.dataguru.hadoop.BaseStationDataPreprocess /user/hadoop/file/input /user/hadoop/file/output 2013-09-12 07-09-17-24 |
|