分享

MapReduce不执行Reduce

Joker 发表于 2014-10-19 14:43:58 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 24243

  1. package cn.basemapreduce;
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.Reducer.Context;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. public class Filter3 {
  19.         public static class Map extends
  20.                         Mapper<LongWritable, Text, Text, LongWritable> {
  21.                 @Override
  22.                 protected void map(LongWritable key, Text value,
  23.                                 org.apache.hadoop.mapreduce.Mapper.Context context)
  24.                                 throws IOException, InterruptedException {
  25.                         // TODO Auto-generated method stub
  26.                         String line = value.toString();
  27.                         StringTokenizer tokenizer = new StringTokenizer(line, "\n");
  28.                         while (tokenizer.hasMoreElements()) {
  29.                                 StringTokenizer st = new StringTokenizer(tokenizer.nextToken());
  30.                                 String score = st.nextToken();
  31.                                 String name = st.nextToken();
  32.                                
  33.                                 if(name.equals("赵六")){
  34.                                        
  35.                                         int tscore = Integer.parseInt(score);
  36.                                         Text tname = new Text(name);
  37.                                        
  38.                                         context.write(name, new LongWritable(tscore));
  39.                                 }
  40.                                
  41.                         }
  42.                 }
  43.         }
  44.        
  45.        
  46.         public static class Reduce extends Reducer<Text, LongWritable, Text, Text>{
  47.                
  48.                
  49.                 protected void reduce(Text key, Iterable<LongWritable> values,Context context)
  50.                                 throws IOException, InterruptedException {
  51.                         // TODO Auto-generated method stub
  52.                         context.write(key, new Text(""));
  53.                 }
  54.                
  55.         }
  56.        
  57.         public static void main(String[] args) throws Exception {
  58.                
  59.                 Configuration conf = new Configuration();
  60.                 // conf.set("mapred.job.tracker", "192.168.1.2:9001");
  61.                 String[] ioArgs = new String[] { "mkdir", "score_out" };
  62.                 String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
  63.                                 .getRemainingArgs();
  64.                 if (otherArgs.length != 2) {
  65.                         System.err.println("Usage: Score Average <in> <out>");
  66.                         System.exit(2);
  67.                 }
  68.                 Job job = new Job(conf, "Filter3");
  69.                 job.setJarByClass(Filter3.class);
  70.                 // 设置Map、Combine和Reduce处理类
  71.                 job.setMapperClass(Map.class);
  72.                 //它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量
  73.                 job.setCombinerClass(Reduce.class);
  74.                 job.setReducerClass(Reduce.class);
  75.                 // 设置输出类型
  76.                 job.setOutputKeyClass(Text.class);
  77.                 job.setOutputValueClass(Text.class);
  78.                 // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
  79.                 job.setInputFormatClass(TextInputFormat.class);
  80.                 // 提供一个RecordWriter的实现,负责数据输出
  81.                 job.setOutputFormatClass(TextOutputFormat.class);
  82.                 // 设置输入和输出目录
  83.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  84.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  85.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  86.         }
  87. }
复制代码
自己写的时候Reduce总是不会被执行,不知道哪里有问题?
大家帮忙看下



已有(5)人评论

跳转到指定楼层
desehawk 发表于 2014-10-19 15:49:10
hadoop1还是hadoop2,是在集群环境,还是eclipse环境。

  protected void reduce(Text key, Iterable<LongWritable> values,Context context)
                                throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                        context.write(key, new Text(""));
                }



上面红字部分替换为下面:

  1. ctx.write(key, new LongWritable(1));
复制代码



回复

使用道具 举报

Joker 发表于 2014-10-19 15:53:24
desehawk 发表于 2014-10-19 15:49
hadoop1还是hadoop2,是在集群环境,还是eclipse环境。

  protected void reduce(Text key, Iterable va ...

Hadoop2.2我是在eclipse环境下

我Reduce定义的是Text,改成IntWritable报错不行
是不是我要先改下类型
回复

使用道具 举报

howtodown 发表于 2014-10-19 16:04:58
Joker 发表于 2014-10-19 15:53
Hadoop2.2我是在eclipse环境下

我Reduce定义的是Text,改成IntWritable报错不行
hadoop2.2已经是新api了,感觉你用的还是旧 api.试试下面程序,直接复制你的程序,然后准备数据运行。
  1. package mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.output.*;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  14. public class mapreduce {
  15. static final String INPUT_PATH = "hdfs://aboutyun:9000/yy1";
  16. static final String OUT_PATH = "hdfs://aboutyun:9000/Output";
  17. public static void main(String[] args) throws Exception {
  18. //主类
  19. Configuration conf = new Configuration();
  20. final Job job = new Job(conf, mapreduce.class.getSimpleName());
  21. job.setJarByClass(mapreduce.class);
  22. // 寻找输入
  23. FileInputFormat.setInputPaths(job, INPUT_PATH);
  24. // 1.2对输入数据进行格式化处理的类
  25. job.setInputFormatClass(TextInputFormat.class);
  26. job.setMapperClass(MyMapper.class);
  27. // 1.2指定map输出类型<key,value>类型
  28. job.setMapOutputKeyClass(Text.class);
  29. job.setMapOutputValueClass(LongWritable.class);
  30. // 1.3指定分区
  31. job.setPartitionerClass(HashPartitioner.class);
  32. job.setNumReduceTasks(1);
  33. // 1.4排序分组省略,使用默认
  34. // 1.5规约省略,使用默认
  35. job.setReducerClass(MyReduce.class);
  36. job.setOutputKeyClass(Text.class);
  37. job.setOutputValueClass(LongWritable.class);
  38. // 指定输出路径
  39. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  40. // 指定输出的格式或则类
  41. job.setOutputFormatClass(TextOutputFormat.class);
  42. // 把作业提交给jobtracer
  43. job.waitForCompletion(true);
  44. }
  45. //map类
  46. static class MyMapper extends
  47. Mapper<LongWritable, Text, Text, LongWritable> {
  48. protected void map(LongWritable key, Text value, Context context)
  49. throws IOException, InterruptedException {
  50. final String[] splited = value.toString().split("\t");
  51. for (String word : splited) {
  52. context.write(new Text(word), new LongWritable(1L));
  53. }
  54. }
  55. }
  56. //reduce类
  57. static class MyReduce extends
  58. Reducer<Text, LongWritable, Text, LongWritable> {
  59. @Override
  60. protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
  61. Context ctx) throws java.io.IOException, InterruptedException {
  62. long times = 0L;
  63. for (LongWritable count : v2s) {
  64. times += count.get();
  65. ctx.write(k2, new LongWritable(times));
  66. }
  67. }
  68. }
  69. }
复制代码

更多参考:
新手指导,该如何在开发环境中,创建mapreduce程序




回复

使用道具 举报

Joker 发表于 2014-10-19 16:20:21
howtodown 发表于 2014-10-19 16:04
hadoop2.2已经是新api了,感觉你用的还是旧 api.试试下面程序,直接复制你的程序,然后准备数据运行。

...

多谢,前些天看新手到工作中的编程篇,然后改变下写的
回复

使用道具 举报

evababy 发表于 2014-11-28 10:51:42
ctx.write(key, new LongWritable(1));job.setOutputValueClass(LongWritable.class);

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条