问题导读
1、使用MapReduce如何统计某个号码被哪些号码呼叫了?
2、解决思想是什么?
问题描述:统计某个号码被哪些号码呼叫了
输入文件如下:
- 13588888888 112
- 13678987879 13509098987
- 18987655436 110
- 2543789 112
- 15699807656 110
- 011-678987 112
复制代码
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开
要求:
将以上文件以如下格式输出:
- 110 18987655436|15699807656
- 112 13588888888|011-678987
- 13509098987 13678987879
复制代码
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割
解决思想很简单:Map中将a,b号码分割key为b,value为a写入context
Reduce中将values以"|"迭代分割
不多说(十二点了....),下面代码可在装有mapreduce插件的Eclipse中执行,也可先编译class文件,在打成jar包运行(代码中有提示,不过这对于初学者有些困难)不会的同学可以在下面评论,或加我QQ:1106373297,备注:hadoop学习
很经典的一段代码,哈哈
控制台输出结果:
- ........
- /07/09 08:42:04 INFO mapred.JobClient: Combine input records=0
- 14/07/09 08:42:04 INFO mapred.JobClient: Reduce input records=5
- 14/07/09 08:42:04 INFO mapred.JobClient: Reduce input groups=3
- 14/07/09 08:42:04 INFO mapred.JobClient: Combine output records=0
- 14/07/09 08:42:04 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
- 14/07/09 08:42:04 INFO mapred.JobClient: Reduce output records=5
- 14/07/09 08:42:04 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
- 14/07/09 08:42:04 INFO mapred.JobClient: Map output records=5
- 任务名称:ReverseIndex
- 任务成功:是
- 输入行数:6
- 输出行数:5
- 跳过的行:1
- 任务开始:2014-07-09 08:41:55
- 任务结束:2014-07-09 08:42:04
- 任务耗时:0.15313333 分钟
复制代码
下面是其代码实现:
- import java.io.IOException;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapreduce.*;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
-
- public class ReverseIndex extends Configured implements Tool {
-
-
- enum Counter {
- LINESKIP, // 出错的行
- }
-
-
- public static class Map extends Mapper<LongWritable,Text,Text,Text> {
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String line = value.toString(); // 读取源数据
- try{
- // 数据处理
- String[] lineSplit = line.split(" ");
- String anum = lineSplit[0];
- String bnum = lineSplit[1];
- context.write(new Text(bnum), new Text(anum)); // 输出
-
- } catch (java.lang.ArrayIndexOutOfBoundsException e) {
- context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1
- return;
- }
- }
- }
-
-
- public static class Reduce extends Reducer {
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- String valueString;
- String out = "";
- for (Text value : values) {
- valueString = value.toString();
- out += valueString + "|";
- System.out.println("Ruduce:key="+key+" value="+value);
- }
- context.write(key, new Text(out));
- }
- }
-
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = this.getConf();
-
-
- Job job = new Job(conf, "ReverseIndex"); // 任务名
- job.setJarByClass(ReverseIndex.class); // 指定Class
-
- FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
- FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
-
- job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
- job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码
-
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
- job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式
-
- job.waitForCompletion(true);
-
- // 输出任务完成情况
- System.out.println("任务名称:" + job.getJobName());
- System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
- System.out.println("输入行数:"
- + job.getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "MAP_INPUT_RECORDS").getValue());
- System.out.println("输出行数:"
- + job.getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "MAP_OUTPUT_RECORDS").getValue());
- System.out.println("跳过的行:"
- + job.getCounters().findCounter(Counter.LINESKIP).getValue());
-
-
- return job.isSuccessful() ? 0 : 1;
- }
-
-
- public static void main(String[] args) throws Exception {
- // 判断参数个数是否正确
- // 如果无参数运行则显示以作程序说明
- if (args.length != 2) {
- System.err.println("");
- System.err.println("Usage: ReverseIndex < input path > < output path > ");
- System.err
- .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");
-
-
- System.exit(-1);
- }
- // 记录开始时间
- DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date start = new Date();
- // 运行任务
- int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);
-
-
- // 输出任务耗时
- Date end = new Date();
- float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
- System.out.println("任务开始:" + formatter.format(start));
- System.out.println("任务结束:" + formatter.format(end));
- System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");
-
-
- System.exit(res);
- }
- }
复制代码
|