分享

一个经典的MapReduce模板代码,倒排索引(ReverseIndex)

问题导读
1、使用MapReduce如何统计某个号码被哪些号码呼叫了?
2、解决思想是什么?




问题描述:统计某个号码被哪些号码呼叫了
输入文件如下:
  1. 13588888888 112
  2. 13678987879 13509098987
  3. 18987655436 110
  4. 2543789    112
  5. 15699807656 110
  6. 011-678987 112
复制代码

说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开

要求:
将以上文件以如下格式输出:
  1. 110        18987655436|15699807656
  2. 112        13588888888|011-678987
  3. 13509098987        13678987879
复制代码

说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割


解决思想很简单:Map中将a,b号码分割key为b,value为a写入context
Reduce中将values以"|"迭代分割

不多说(十二点了....),下面代码可在装有mapreduce插件的Eclipse中执行,也可先编译class文件,在打成jar包运行(代码中有提示,不过这对于初学者有些困难)不会的同学可以在下面评论,或加我QQ:1106373297,备注:hadoop学习

很经典的一段代码,哈哈
控制台输出结果:
  1. ........
  2. /07/09 08:42:04 INFO mapred.JobClient:     Combine input records=0
  3. 14/07/09 08:42:04 INFO mapred.JobClient:     Reduce input records=5
  4. 14/07/09 08:42:04 INFO mapred.JobClient:     Reduce input groups=3
  5. 14/07/09 08:42:04 INFO mapred.JobClient:     Combine output records=0
  6. 14/07/09 08:42:04 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
  7. 14/07/09 08:42:04 INFO mapred.JobClient:     Reduce output records=5
  8. 14/07/09 08:42:04 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
  9. 14/07/09 08:42:04 INFO mapred.JobClient:     Map output records=5
  10. 任务名称:ReverseIndex
  11. 任务成功:是
  12. 输入行数:6
  13. 输出行数:5
  14. 跳过的行:1
  15. 任务开始:2014-07-09 08:41:55
  16. 任务结束:2014-07-09 08:42:04
  17. 任务耗时:0.15313333 分钟
复制代码


下面是其代码实现:
  1. import java.io.IOException;
  2. import java.text.DateFormat;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.conf.Configured;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.*;
  9. import org.apache.hadoop.mapreduce.*;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  13. import org.apache.hadoop.util.Tool;
  14. import org.apache.hadoop.util.ToolRunner;
  15. public class ReverseIndex extends Configured implements Tool {
  16. enum Counter {
  17. LINESKIP, // 出错的行
  18. }
  19. public static class Map extends Mapper<LongWritable,Text,Text,Text> {
  20. public void map(LongWritable key, Text value, Context context)
  21. throws IOException, InterruptedException {
  22. String line = value.toString(); // 读取源数据
  23. try{
  24. // 数据处理
  25. String[] lineSplit = line.split(" ");
  26. String anum = lineSplit[0];
  27. String bnum = lineSplit[1];
  28. context.write(new Text(bnum), new Text(anum)); // 输出
  29. } catch (java.lang.ArrayIndexOutOfBoundsException e) {
  30. context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1
  31. return;
  32. }
  33. }
  34. }
  35. public static class Reduce extends Reducer {
  36. public void reduce(Text key, Iterable<Text> values, Context context)
  37. throws IOException, InterruptedException {
  38. String valueString;
  39. String out = "";
  40. for (Text value : values) {
  41. valueString = value.toString();
  42. out += valueString + "|";
  43. System.out.println("Ruduce:key="+key+"  value="+value);
  44. }
  45. context.write(key, new Text(out));
  46. }
  47. }
  48. @Override
  49. public int run(String[] args) throws Exception {
  50. Configuration conf = this.getConf();
  51. Job job = new Job(conf, "ReverseIndex"); // 任务名
  52. job.setJarByClass(ReverseIndex.class); // 指定Class
  53. FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
  54. FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
  55. job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
  56. job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码
  57. job.setOutputFormatClass(TextOutputFormat.class);
  58. job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
  59. job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式
  60. job.waitForCompletion(true);
  61. // 输出任务完成情况
  62. System.out.println("任务名称:" + job.getJobName());
  63. System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
  64. System.out.println("输入行数:"
  65. + job.getCounters()
  66. .findCounter("org.apache.hadoop.mapred.Task$Counter",
  67. "MAP_INPUT_RECORDS").getValue());
  68. System.out.println("输出行数:"
  69. + job.getCounters()
  70. .findCounter("org.apache.hadoop.mapred.Task$Counter",
  71. "MAP_OUTPUT_RECORDS").getValue());
  72. System.out.println("跳过的行:"
  73. + job.getCounters().findCounter(Counter.LINESKIP).getValue());
  74. return job.isSuccessful() ? 0 : 1;
  75. }
  76. public static void main(String[] args) throws Exception {
  77. // 判断参数个数是否正确
  78. // 如果无参数运行则显示以作程序说明
  79. if (args.length != 2) {
  80. System.err.println("");
  81. System.err.println("Usage: ReverseIndex < input path > < output path > ");
  82. System.err
  83. .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");
  84. System.exit(-1);
  85. }
  86. // 记录开始时间
  87. DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  88. Date start = new Date();
  89. // 运行任务
  90. int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);
  91. // 输出任务耗时
  92. Date end = new Date();
  93. float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
  94. System.out.println("任务开始:" + formatter.format(start));
  95. System.out.println("任务结束:" + formatter.format(end));
  96. System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");
  97. System.exit(res);
  98. }
  99. }
复制代码




已有(2)人评论

跳转到指定楼层
sunny62520 发表于 2014-8-2 16:10:03
运行成功了, 但是结果没对噢,只是a和b位置互换了,啥情况,我看代码也没啥发现
回复

使用道具 举报

zhao_yi_bing 发表于 2014-10-21 16:53:45
看视频时就是这个例子, 运行很顺利
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条