分享

Map/Reduce的GroupingComparator的作用是什么?

xng2012 2014-4-10 17:08:20 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 11659
本帖最后由 xng2012 于 2014-4-10 17:09 编辑
这里主要研究Map/Reduce中,整个过程我们已经很了解了,但是还存在一些疑问,这里讲一

GroupingComparator作用是什么?



一、背景

排序对于MR来说是个核心内容,如何做好排序十分的重要,这几天写了一些,总结一下,以供以后读阅。

二、准备

1、hadoop版本是0.20.2

2、输入的数据格式(这个很重要,看清楚格式),名称是secondary.txt:



  1. abc     123
  2. acb     124
  3. cbd     523
  4. abc     234
  5. nbc     563
  6. fds     235
  7. khi     234
  8. cbd     675
  9. fds     971
  10. hka     862
  11. ubd     621
  12. khi     123
  13. fds     321
复制代码


仔细看下,数据文件第一列是字母,第二列是数字,我要做的就是结合这组数据进行一些排序的测试。


3、代码框架,因为接下来的测试改动都是针对部分代码的修改,框架的代码是不会改变的,所以先把主要代码贴在这里。

代码分为2部分:自定义的key和主框架代码。先贴上主框架代码:

MyGrouping.java

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.WritableComparator;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Partitioner;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.util.GenericOptionsParser;
  13. import com.run.lenged.business.TextPair;
  14. public class MyGrouping {
  15.         /**
  16.          * Map
  17.          *
  18.          * @author Administrator
  19.          */
  20.         public static class MyGroupingMap extends Mapper<LongWritable, Text, TextPair, Text> {
  21.                 protected void map(LongWritable key, Text value,
  22.                                 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, TextPair, Text>.Context context)
  23.                                 throws java.io.IOException, InterruptedException {
  24.                         String arr[] = value.toString().split("/t");
  25.                         if (arr.length != 2) {
  26.                                 return;
  27.                         }
  28.                         TextPair tp = new TextPair();
  29.                         tp.set(new Text(arr[0]), new Text(arr[1]));
  30.                         context.write(tp, new Text(arr[1]));
  31.                 }
  32.         }
  33.         /**
  34.          * 按照Hashcode值来进行切分
  35.          *
  36.          * @author Administrator
  37.          */
  38.         public static class MyGroupingPartition extends Partitioner<TextPair, Text> {
  39.                 @Override
  40.                 public int getPartition(TextPair key, Text value, int numPartitions) {
  41.                         return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
  42.                 }
  43.         }
  44.         /**
  45.          * group进行排序
  46.          *
  47.          * @author Administrator
  48.          */
  49.         @SuppressWarnings("unchecked")
  50.         public static class MyGroupingGroup extends WritableComparator {
  51.                 //代码变动部分
  52.         }
  53.         /**
  54.          * reduce
  55.          *
  56.          * @author Administrator
  57.          */
  58.         public static class MyGroupingReduce extends Reducer<TextPair, Text, Text, Text> {
  59.                 protected void reduce(TextPair key, java.lang.Iterable<Text> value,
  60.                                 org.apache.hadoop.mapreduce.Reducer<TextPair, Text, Text, Text>.Context context)
  61.                                 throws java.io.IOException, InterruptedException {
  62.                         StringBuffer sb = new StringBuffer();
  63.                         while (value.iterator().hasNext()) {
  64.                                 sb.append(value.iterator().next().toString() + "_");
  65.                         }
  66.                         context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));
  67.                 }
  68.         }
  69.         public static void main(String args[]) throws Exception {
  70.                 Configuration conf = new Configuration();
  71.                 GenericOptionsParser parser = new GenericOptionsParser(conf, args);
  72.                 String[] otherArgs = parser.getRemainingArgs();
  73.                 if (args.length != 2) {
  74.                         System.err.println("Usage: NewlyJoin <inpath> <output>");
  75.                         System.exit(2);
  76.                 }
  77.                 Job job = new Job(conf, "MyGrouping");
  78.                 // 设置运行的job
  79.                 job.setJarByClass(MyGrouping.class);
  80.                 // 设置Map相关内容
  81.                 job.setMapperClass(MyGroupingMap.class);
  82.                 job.setMapOutputKeyClass(TextPair.class);
  83.                 job.setMapOutputValueClass(Text.class);
  84.                 job.setPartitionerClass(MyGroupingPartition.class);
  85.                
  86.                 job.setGroupingComparatorClass(MyGroupingGroup.class);
  87.                
  88.                 // 设置reduce
  89.                 job.setReducerClass(MyGroupingReduce.class);
  90.                 job.setOutputKeyClass(Text.class);
  91.                 job.setOutputValueClass(Text.class);
  92.                 // 设置输入和输出的目录
  93.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  94.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  95.                 // 执行,直到结束就退出
  96.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  97.         }
  98. }
复制代码




TextPair.java




  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class TextPair implements WritableComparable<TextPair> {
  7.         private Text first;
  8.         private Text second;
  9.         public TextPair() {
  10.                 set(new Text(), new Text());
  11.         }
  12.         public void set(Text first, Text second) {
  13.                 this.first = first;
  14.                 this.second = second;
  15.         }
  16.         public Text getFirst() {
  17.                 return first;
  18.         }
  19.         public Text getSecond() {
  20.                 return second;
  21.         }
  22.         @Override
  23.         public void readFields(DataInput in) throws IOException {
  24.                 first.readFields(in);
  25.                 second.readFields(in);
  26.         }
  27.         @Override
  28.         public void write(DataOutput out) throws IOException {
  29.                 first.write(out);
  30.                 second.write(out);
  31.         }
  32.         @Override
  33.         public int compareTo(TextPair o) {
  34.                 int cmp = first.compareTo(o.first);
  35.                 if (cmp != 0) {
  36.                         return cmp;
  37.                 } else {
  38.                         return second.compareTo(o.second);
  39.                 }
  40.         }
  41. }
复制代码


三、测试前提

1、首先提一个需求,我们结合需求来测试,然后再扩散开。
需求内容是:如果第一列值相同,第二列值叠加,并对第二列值进行升序排序。最后输出的时候,按照第一列值的升序排序输出。




2、需求实现。
根据上面的需求,我们可以分析一下:
需要对第一个字段和第二个字段都进行排序,那么单纯的利用MR框架对key迭代输出,value累加是不行的。因为value是没有进行排序。
所以我们需要做一些改动,定义key为符合组建。TextPair.java类就是自定义的key。
一般来说如果要对key和value同时做排序,那么,自定义的组合key的格式第一个值是第一个字段,第二个值就是第二个字段。




3、那么我们就定义一个job.setGroupingComparatorClass(MyGroupingGroup.class);代码如下:



  1. public static class MyGroupingGroup extends WritableComparator {
  2.                 public int compare(WritableComparable a, WritableComparable b) {
  3.                         return mip1.getFirst().compareTo(mip2.getFirst());
  4.         }
  5.                 protected MyGroupingGroup() {
  6.                         super(TextPair.class, true);
  7.                 }
  8.                 @Override
  9.                         TextPair mip1 = (TextPair) a;
  10.                         TextPair mip2 = (TextPair) b;
  11.                 }
复制代码

只对输出的复合组建第一项值进行排序。输出的结果如下:


  1. abc 123_234
  2. cbd 523_675
  3. khi 123_234
  4. ubd 621
  5. nbc 563
  6. acb 124
  7. fds 235_321_971
  8. hka 862
复制代码

4、查看结果,我们可以看出,基本满足了上面的需求。那么接下来,我们就将做个测试,来实现一下MR的排序功能。

四、Group按第二个字段值进行排序测试

1、修改一下group的排序方式,针对第二个值进行合并排序,代码如下:

  1. public static class MyGroupingGroup extends WritableComparator {
  2.                 protected MyGroupingGroup() {
  3.                         super(TextPair.class, true);
  4.                 }
  5.                 @Override
  6.                 public int compare(WritableComparable a, WritableComparable b) {
  7.                         TextPair mip1 = (TextPair) a;
  8.                         TextPair mip2 = (TextPair) b;
  9.                         return mip1.getSecond().compareTo(mip2.getSecond());
  10.                         //return mip1.getFirst().compareTo(mip2.getFirst());
  11.                 }
  12.         }
复制代码

2、reduce的输出稍微改下,将第2个字段也输出,方便查看,代码如下:


  1. context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));
复制代码

reduce输出的结果:


  1. abc_123        123
  2. abc_234        234
  3. acb_124        124
  4. cbd_523        523
  5. cbd_675        675
  6. fds_235        235
  7. fds_321        321
  8. fds_971        971
  9. hka_862        862
  10. khi_123        123
  11. khi_234        234
  12. nbc_563        563
  13. ubd_621        621
复制代码




3、看到结果,第一反应就是没有按照我的要求,按第二个值进行排序操作。

其实不是,这个结果确实是进行了group的排序,只是说遇到没有符合合并结果数据。所以,看起来没有进行排序。

在这里有个概念,就是group到底是在什么时候做的排序,原文是这样写的:


Job.setGroupingComparatorClass(Class<? extends RawComparator> cls)  
Define the comparator that controls which keys are grouped together
for a single call to Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

我尝试翻译了一下(英文水平实在是有限,不对的地方还望各位指出):

在一个reduce的调用过程中,定义一个comparator,对分组在一起的key进行排序。

通过上面这句话就可以理解,为什么khi_123 123和abc_123 123没有叠加在一起。








欢迎加入about云群371358502、39327136,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条