分享

MapReduce中的setCombinerClass疑问

Joker 发表于 2014-10-20 12:07:42 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 12968
在编程篇中看到排序的时候,代码如下
  1. package cn.base.mapreduce;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. import org.apache.hadoop.util.GenericOptionsParser;
  17. public class Sort {
  18.        
  19.         public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
  20.                
  21.                 private static IntWritable iw = new IntWritable();
  22.                
  23.                
  24.                 protected void map(Object key, Text value,Context context)
  25.                                 throws IOException, InterruptedException {
  26.                         // TODO Auto-generated method stub
  27.                         String line = value.toString();
  28.                         iw.set(Integer.parseInt(line));
  29.                         context.write(iw, new IntWritable(1));
  30.                 }
  31.                
  32.         }
  33.        
  34.         public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
  35.                
  36.                 private static IntWritable iw = new IntWritable(1);
  37.                
  38.                
  39.                 protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context)
  40.                                 throws IOException, InterruptedException {
  41.                         // TODO Auto-generated method stub
  42.                        
  43.                         for(IntWritable val : values){
  44.                                 context.write(iw, key);
  45.                                 iw = new IntWritable(iw.get() + 1);
  46.                         }
  47.                 }
  48.                
  49.         }
  50.        
  51.         public static void main(String[] args) throws Exception {
  52.                
  53.                 Configuration conf = new Configuration();
  54.                
  55.                 String[] ioArgs = {"dedup_in","output"};
  56.                
  57.                 String[] otherArgs = new GenericOptionsParser(ioArgs).getRemainingArgs();
  58.                
  59.                 if(otherArgs.length != 2){
  60.                         System.err.println("Usage: Data Deduplication <in> <out>");
  61.                      System.exit(2);
  62.                 }
  63.                
  64.                 Job job = new Job(conf,"Sort");
  65.                
  66.                 job.setJarByClass(Sort.class);
  67.                
  68.                 job.setMapperClass(Map.class);
  69.                 //job.setCombinerClass(Reduce.class);
  70.                 job.setReducerClass(Reduce.class);
  71.                
  72.                
  73.                 job.setInputFormatClass(TextInputFormat.class);
  74.                 job.setOutputKeyClass(IntWritable.class);
  75.                 job.setOutputValueClass(IntWritable.class);
  76.                 job.setOutputFormatClass(TextOutputFormat.class);
  77.                
  78.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  79.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  80.                
  81.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  82.         }
  83. }
复制代码

假设我加入了//job.setCombinerClass(Reduce.class);那么就达不到想要的效果,而且数据都错了
请问这个在哪些时候加入,也看了这个job.setCombinerClass(Reduce.class);介绍,但并不是非常明白
谁可以帮助我解释下吗?

已有(4)人评论

跳转到指定楼层
howtodown 发表于 2014-10-20 12:44:12
本帖最后由 howtodown 于 2014-10-20 13:26 编辑
Reduce在map阶段执行称之为combine,而且map的个数一般比reduce个数多的多。

之所以混乱,是因为你在reduce里面不止做了排序,还执行了其它操作,如果只是单独的排序,map的combine能提高效率



回复

使用道具 举报

Joker 发表于 2014-10-20 13:25:34
howtodown 发表于 2014-10-20 12:44
Reduce在map阶段执行称之为combine,而且map的个数一般比reduce个数多的多。

之所以混乱,是因为你在red ...

多谢版主,写了job.setCombinerClass(Reduce.class);可以提高我的效率
回复

使用道具 举报

howtodown 发表于 2014-10-20 13:27:22
Joker 发表于 2014-10-20 13:25
多谢版主,写了job.setCombinerClass(Reduce.class);可以提高我的效率
恩,是的,提高整个程序的运行效率
回复

使用道具 举报

evababy 发表于 2014-11-28 10:46:54
map的value作为输出,又吧reduce的value作为输出,如果中间执行了combiner,造成reduce输入KEY有已经是被合并数据,相当于执行了两次reduce,你说结果能一样么?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条