一个经典的MapReduce模板代码,倒排索引(ReverseIndex)
问题导读1、使用MapReduce如何统计某个号码被哪些号码呼叫了?
2、解决思想是什么?
static/image/hrline/4.gif
问题描述:统计某个号码被哪些号码呼叫了
输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开
要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割
解决思想很简单:Map中将a,b号码分割key为b,value为a写入context
Reduce中将values以"|"迭代分割
不多说(十二点了....),下面代码可在装有mapreduce插件的Eclipse中执行,也可先编译class文件,在打成jar包运行(代码中有提示,不过这对于初学者有些困难)不会的同学可以在下面评论,或加我QQ:1106373297,备注:hadoop学习
很经典的一段代码,哈哈
控制台输出结果:
........
/07/09 08:42:04 INFO mapred.JobClient: Combine input records=0
14/07/09 08:42:04 INFO mapred.JobClient: Reduce input records=5
14/07/09 08:42:04 INFO mapred.JobClient: Reduce input groups=3
14/07/09 08:42:04 INFO mapred.JobClient: Combine output records=0
14/07/09 08:42:04 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
14/07/09 08:42:04 INFO mapred.JobClient: Reduce output records=5
14/07/09 08:42:04 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
14/07/09 08:42:04 INFO mapred.JobClient: Map output records=5
任务名称:ReverseIndex
任务成功:是
输入行数:6
输出行数:5
跳过的行:1
任务开始:2014-07-09 08:41:55
任务结束:2014-07-09 08:42:04
任务耗时:0.15313333 分钟
下面是其代码实现:
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReverseIndex extends Configured implements Tool {
enum Counter {
LINESKIP, // 出错的行
}
public static class Map extends Mapper<LongWritable,Text,Text,Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); // 读取源数据
try{
// 数据处理
String[] lineSplit = line.split(" ");
String anum = lineSplit;
String bnum = lineSplit;
context.write(new Text(bnum), new Text(anum)); // 输出
} catch (java.lang.ArrayIndexOutOfBoundsException e) {
context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1
return;
}
}
}
public static class Reduce extends Reducer {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String valueString;
String out = "";
for (Text value : values) {
valueString = value.toString();
out += valueString + "|";
System.out.println("Ruduce:key="+key+"value="+value);
}
context.write(key, new Text(out));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf, "ReverseIndex"); // 任务名
job.setJarByClass(ReverseIndex.class); // 指定Class
FileInputFormat.addInputPath(job, new Path(args)); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args)); // 输出路径
job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式
job.waitForCompletion(true);
// 输出任务完成情况
System.out.println("任务名称:" + job.getJobName());
System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
System.out.println("输入行数:"
+ job.getCounters()
.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
System.out.println("输出行数:"
+ job.getCounters()
.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_OUTPUT_RECORDS").getValue());
System.out.println("跳过的行:"
+ job.getCounters().findCounter(Counter.LINESKIP).getValue());
return job.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// 判断参数个数是否正确
// 如果无参数运行则显示以作程序说明
if (args.length != 2) {
System.err.println("");
System.err.println("Usage: ReverseIndex < input path > < output path > ");
System.err
.println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");
System.exit(-1);
}
// 记录开始时间
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date start = new Date();
// 运行任务
int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);
// 输出任务耗时
Date end = new Date();
float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
System.out.println("任务开始:" + formatter.format(start));
System.out.println("任务结束:" + formatter.format(end));
System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");
System.exit(res);
}
}
运行成功了, 但是结果没对噢,只是a和b位置互换了,啥情况,我看代码也没啥发现 看视频时就是这个例子, 运行很顺利
页:
[1]