分享

对Map/Reduce分区与分组的实例分析

howtodown 发表于 2014-9-29 18:42:50 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 30921
问题导读:
思考的内容偏多
1.Map/Reduce为什么分区?
2.分组的作用是什么?





做两文件Join工作的.data.txt和info.txt
data.txt

  1. 201001 1003 abc
  2. 201002 1005 def
  3. 201003 1006 ghi
  4. 201004 1003 jkl
  5. 201005 1004 mno
  6. 201006 1005 pqr
复制代码

info.txt

  1. 1003 kaka
  2. 1004 da
  3. 1005 jue
  4. 1006 zhao
复制代码


期望输出结果:



  1. 1003        201001        abc kaka
  2. 1003        201004        jkl kaka
  3. 1004        201005        mno da
  4. 1005        201002        def jue
  5. 1005        201006        pqr jue
  6. 1006        201003        ghi zhao
复制代码
代码部分:(这里分享下,群主给的代码,加上个人所做的试验)


  1. public class JionQurey extends Configured implements Tool {
  2.        
  3.         public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text>{
  4.                 @Override
  5.                 public void map(LongWritable key, Text value, Context context)
  6.                                 throws IOException, InterruptedException {
  7.                         String pathName = ((FileSplit)context.getInputSplit()).getPath().toString();
  8.                         //根据文件名判断处理
  9.                         if(pathName.contains("data.txt")){
  10.                                 String[] line =value.toString().split(" ");
  11.                                 if(line.length < 3){
  12.                                         //data数据格式second不规范,字段小于3,抛弃数据
  13.                                         return ;
  14.                                 }else{
  15.                                         // 数据格式规范,区分标识为1
  16.                                         TextPair tp = new TextPair(new Text(line[1]), new Text("1"));
  17.                                         context.write(tp, new Text(line[0]+" "+line[2]));
  18.                                 }
  19.                         }
  20.                         if(pathName.contains("info.txt")){
  21.                                 String[] line = value.toString().split(" ");
  22.                                 if(line.length < 2){
  23.                                         // data数据格式不规范,字段小于2,抛弃数据
  24.                                         return ;
  25.                                 }else{
  26.                                         // 数据格式规范,区分标识为0
  27.                                         TextPair tp = new TextPair(new Text(line[0]), new Text("0"));
  28.                                         context.write(tp, new Text(line[1]));
  29.                                 }
  30.                         }
  31.                 }
  32.         }
  33.        
  34.         public static class Example_Join_01_Partitionner extends Partitioner<TextPair, Text>{
  35.                 @Override
  36.                 public int getPartition(TextPair key, Text value, int numParition) {
  37.                         return Math.abs(key.getFirst().hashCode() * 127) % numParition;
  38.                 }
  39.                
  40.         }
  41.        
  42.         public static class Example_Join_01_Comparator extends WritableComparator{
  43.                
  44.                 public Example_Join_01_Comparator(){
  45.                         super(TextPair.class,true);
  46.                 }
  47.                 @Override
  48.                 public int compare(WritableComparable a, WritableComparable b) {
  49.                         // TODO Auto-generated method stub
  50.                         TextPair t1 =(TextPair) a;
  51.                         TextPair t2 = (TextPair)b;
  52.                         return t1.getFirst().compareTo(t2.getFirst());   //只要是第一个字段相同的就分成为同一组
  53.                 }
  54.         }
  55. //        1、map之后的输出会进行一些分区的操作,代码贴出来:
  56. //
  57. //        public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
  58. //                @Override
  59. //                public int getPartition(TextPair key, Text value, int numParititon) {
  60. //                        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
  61. //                }
  62. //        }
  63.         public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
  64.                 @Override
  65.                 public void reduce(TextPair key, Iterable<Text> values,
  66.                                 Context context)
  67.                                 throws IOException, InterruptedException {
  68.                         Text pid = key.getFirst();
  69.                         //其实这里已近排序好了第一个值就是info.txt的字段,后面的都是data.txt字段
  70.                         String desc = values.iterator().next().toString();
  71.                         while(values.iterator().hasNext()){
  72.                                 context.write(pid, new Text(values.iterator().next().toString()+" "+desc));
  73.                         }
  74.                 }
  75.         }
  76.         /**
  77.          * @param args
  78.          * @throws Exception
  79.          */
  80.         public static void main(String[] args) throws Exception {
  81.                 int exitCode = ToolRunner.run(new Configuration(), new JionQurey(), args);
  82.                 System.exit(exitCode);
  83.         }
  84.        
  85.         public static class TextPair implements WritableComparable<TextPair>{
  86.                 private Text first;
  87.                 private Text second;
  88.                
  89.                 public Text getFirst() {
  90.                         return first;
  91.                 }
  92.                 public void setFirst(Text first) {
  93.                         this.first = first;
  94.                 }
  95.                 public Text getsecond() {
  96.                         return second;
  97.                 }
  98.                 public void setsecond(Text second) {
  99.                         this.second = second;
  100.                 }
  101.                
  102. //                public TextPair(Text first, Text second) {
  103. //                        this.first = first;
  104. //                        this.second = second;
  105. //                }
  106.                 public void set(Text first, Text second) {
  107.                         this.first = first;
  108.                         this.second = second;
  109.                 }
  110.                 public TextPair() {
  111.                         set(new Text(), new Text());
  112.                 }
  113.                 public TextPair(String first, String second) {
  114.                         set(new Text(first), new Text(second));
  115.                 }
  116.                 public TextPair(Text first, Text second) {
  117.                         set(first, second);
  118.                 }
  119.                 @Override
  120.                 public void readFields(DataInput in) throws IOException {
  121.                         first.readFields(in);
  122.                         second.readFields(in);
  123.                 }
  124.                 @Override
  125.                 public void write(DataOutput out) throws IOException {
  126.                         first.write(out);
  127.                         second.write(out);
  128.                 }
  129.                 @Override
  130.                 public int compareTo(TextPair tp) {
  131.                         int cmp = first.compareTo(tp.first);
  132.                         if(cmp != 0) return cmp;
  133.                         return second.compareTo(tp.second);
  134.                 }
  135.                
  136.         }
  137.         @Override
  138.         public int run(String[] args) throws Exception {
  139.                 for(int i = 0 ;i<args.length;i++ ){
  140.                         System.out.println(args[i]);
  141.                 }
  142.                 Configuration conf = new Configuration();
  143.                 GenericOptionsParser parser = new GenericOptionsParser(conf,args);
  144.                 String[] otherArgs = parser.getRemainingArgs();
  145.                 if(args.length < 3){
  146.                         System.out.println("please enter <in path1><in Path2> <out Path>");
  147.                         System.exit(2);
  148.                 }
  149.                 FileSystem fs =FileSystem.get(URI.create(args[2]), conf);
  150.                 if(fs.exists(new Path(args[2]))){
  151.                         fs.delete(new Path(args[2]),true);
  152.                 }
  153.                
  154.                 Job job = new Job(conf ,"JionQurey");
  155.                 // 设置运行的job
  156.                 job.setJarByClass(JionQurey.class);
  157.                 // 设置Map相关内容
  158.                 job.setMapperClass(Example_Join_01_Mapper.class);
  159.                 // 设置Map的输出
  160.                 job.setMapOutputKeyClass(TextPair.class);
  161.                 job.setMapOutputValueClass(Text.class);
  162.                 // 设置partition
  163.                 job.setPartitionerClass(Example_Join_01_Partitionner.class);
  164.                 // 在分区之后按照指定的条件分组
  165.                 job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
  166.                 // 设置reduce
  167.                 job.setReducerClass(Example_Join_01_Reduce.class);
  168.                 job.setOutputKeyClass(Text.class);
  169.                 job.setOutputValueClass(Text.class);
  170.                 // 设置输入和输出的目录
  171.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  172.                 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
  173.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
  174.                 // 执行,直到结束就退出
  175.                 return job.waitForCompletion(true) ? 0 : 1;
  176.         }
  177. }
复制代码
实现的大致流程是:
  1. C、Map执行完成之后,输出的中间结果如下:
  2. 1003,0        kaka
  3. 1004,0        da
  4. 1005,0        jue
  5. 1006,0        zhao
  6. 1003,1        201001        abc
  7. 1003,1        201004        jkl
  8. 1004,1        201005        mon
  9. 1005,1        201002        def
  10. 1005,1        201006        pqr
  11. 1006,1        201003        ghi
  12. 分区后:
  13. 同一区:
  14. 1003,0        kaka
  15. 1003,1        201001        abc
  16. 1003,1        201004        jkl
  17. 同一区:
  18. 1004,0        da
  19. 1004,1        201005        mon
  20. 同一区:
  21. 1005,0        jue
  22. 1005,1        201002        def
  23. 1005,1        201006        pqr
  24. 同一区:
  25. 1006,0        zhao
  26. 1006,1        201003        ghi
  27. 分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,
  28. 是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下:
  29. 分组后:
  30. 同一组:
  31. 1003,0        kaka
  32. 1003,0        201001        abc
  33. 1003,0        201004        jkl
  34. 同一组:
  35. 1004,0        da
  36. 1004,0        201005        mon
  37. 同一组:
  38. 1005,0        jue
  39. 1005,0        201002        def
  40. 1005,0        201006        pqr
  41. 同一组:
  42. 1006,0        zhao
  43. 1006,0        201003        ghi
  44. 看上去比我的所做的好多了,且不需要用这么多List来保存数据.
  45. 这样并不算完了,我们还没真正体验到Partitioner和WritableComparator到底实现了怎么一个功能.
  46. 于是我就先去到Partitioner,看看文件中出现的是什么..
  47. 1003        201001 abc kaka
  48. 1003        201004 jkl kaka
  49. 1004        201005 mno da
  50. 1005        201002 def jue
  51. 1005        201006 pqr jue
  52. 1006        201003 ghi zhao
  53. 咦,这不就是我们想要的结果吗?那我们为什么还要分组呢?
  54. 我是这样理解的,我们在分组的代码中是通过:
  55. t1.getFirst().compareTo(t2.getFirst());
  56. 它把Map中的结果作为一个分区,然后直接通过WritableComparator忽略了第二个字段,分组。
  57. 然后,我又把代码恢复,去掉WritableComparator
  58. 看到的结果想象,于是我又该了下,把Reduce直接做打印操作。
  59. public void reduce(TextPair key, Iterable<Text> values,
  60.                                 Context context)
  61.                                 throws IOException, InterruptedException {
  62.                         Text pid = key.getFirst();
  63.                         //其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段
  64. //                        String desc = values.iterator().next().toString();
  65.                         while(values.iterator().hasNext()){
  66.                                 context.write(pid, new Text(values.iterator().next().toString()+" "));
  67.                         }
  68.                 }
  69. 显示结果:
  70. 1003        kaka
  71. 1003        201001 abc
  72. 1003        201004 jkl
  73. 1004        da
  74. 1004        201005 mno
  75. 1005        jue
  76. 1005        201002 def
  77. 1005        201006 pqr
  78. 1006        zhao
  79. 1006        201003 ghi
  80. 肯定有人会认为,这样一个结果不就和上面的一样了么,把屏蔽的去掉就行了,其实不然,像1003是分了两组的.
  81. 继续修改下代码:
  82. public void reduce(TextPair key, Iterable<Text> values,
  83.                                 Context context)
  84.                                 throws IOException, InterruptedException {
  85.                         Text pid = key.getFirst();
  86.                         //其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段
  87. //                        String desc = values.iterator().next().toString();
  88.                         while(values.iterator().hasNext()){
  89.                                 context.write(pid, new Text(values.iterator().next().toString()+" "));
  90.                         }
  91.                         context.write(new Text("Group"), new Text("+++++++++++++++++++++++++++++++"));
  92.                 }
  93. 输出的结果是:
  94. 1003        kaka
  95. Group        +++++++++++++++++++++++++++++++
  96. 1003        201001 abc
  97. 1003        201004 jkl
  98. Group        +++++++++++++++++++++++++++++++
  99. 1004        da
  100. Group        +++++++++++++++++++++++++++++++
  101. 1004        201005 mno
  102. Group        +++++++++++++++++++++++++++++++
  103. 1005        jue
  104. Group        +++++++++++++++++++++++++++++++
  105. 1005        201002 def
  106. 1005        201006 pqr
  107. Group        +++++++++++++++++++++++++++++++
  108. 1006        zhao
  109. Group        +++++++++++++++++++++++++++++++
  110. 1006        201003 ghi
  111. Group        +++++++++++++++++++++++++++++++
  112. 可以看出他们的并同一个Reduce中输出的结果...分成了不同的组.
  113. 做了这么多,还是由些疑虑,为什么只用分组就能实现了我们想要的结果了呢?难道真是做为了一个分区来进行分组的吗?
  114. 那我们平常不重写Partitioner和WritableComparator的时候,怎么能够更具key来分开呢?还望早日有高手解答我的不明之处..
  115. 次日,同时又做了下hadoop利用Partitioner分类输出到不同的文件夹中的例子,我自是稍作修改,发现更加奇怪了,我这里只是显示一个文件夹,
  116.                而在别人写的资料上显示的却是多个文件夹,如果通过一个Partitioner要分为多个文件夹的话,那岂不是上面的例题要分为很多个文件夹么..
  117. 于是我就带着问题去找导师,导师给我的解释是,分文件夹的是老版本的例题.Map中都是用output.collect();现在我直接继承的是Mapper,用的是新版本的
  118. 新版本不允许分为多个文件夹了....自能有一个文件夹。那WritableComparable是怎么一回事啊.直接告诉我主要功能是进行一种排序.
  119. 这些天一直不忘想这个问题,今天又得到点感悟...来解释下前面为什么会出现不要partitioner还是返回这样的结果:
  120. 首先,我们需要知道Map/Reduce的工作流程要有所清楚:Map ------> partitioner(分区) ------> comparato
  121. 来看下那个没有重写partitioner的代码.看到它的key传递的是个对象.在明确一下就是一个静态的对象.
  122. 着就表明了不管我是怎么new了个对象,他们的地址都是一样的.然后根据key来分区(默认),所以他们就会被分为同一个区.
  123. 至于接下来的分组操作就好理解了...
复制代码











已有(8)人评论

跳转到指定楼层
hb1984 发表于 2014-9-30 22:47:47
谢谢楼主的分享
回复

使用道具 举报

maizhu 发表于 2014-10-3 22:56:28
好东西必须转
回复

使用道具 举报

爱动的蜗牛 发表于 2015-11-15 16:44:18
群主,你是怎么获得分组后的中间结果的呀?
回复

使用道具 举报

爱动的蜗牛 发表于 2015-11-15 16:48:18
还有,分组后kye的第二个字段怎么都是零啊?
回复

使用道具 举报

javaanddonet 发表于 2018-3-31 13:22:18
  • 分区后:
  • 同一区:
  • 1003,0        kaka
  • 1003,1        201001        abc
  • 1003,1        201004        jkl
  • 同一区:
  • 1004,0        da
  • 1004,1        201005        mon
  • 同一区:
  • 1005,0        jue
  • 1005,1        201002        def
  • 1005,1        201006        pqr
  • 同一区:
  • 1006,0        zhao
  • 1006,1        201003        ghi
这里我有两个疑问:
第一个:你虽然设置了分区的实现类,但是你没有设置reduce的个数,此时默认是1。此时你的分区实现类会执行吗?我个人认为,你的分区实现是不会被执行的。如果你的reduce个数是1。请不吝赐教。个人的观点而已。如果不正确请指正。
第二个:如果执行力你的分区实现类,那么分区结果一定是你上面这样的分区结果吗?刚好每一个分区,只有一个key值?感觉你上面的示例结果更像一个分组后的结构,而不是分区后的结果。我个人认为,每一个分区应该包含N个key值,但是一个key值,不可能在两个分区分布,只能属于一个分区。所以对你的分区后的结果感到怀疑。

回复

使用道具 举报

mangohello 发表于 2018-4-18 17:55:47
这样做会产生数据倾斜吧
回复

使用道具 举报

mangohello 发表于 2018-4-18 17:59:38
这是另一种做法
public class TestMapJoin
{
        static class TestMapJoinMap extends Mapper<LongWritable,Text,Text,NullWritable>
        {
                HashMap<String,String> info = new HashMap<String,String>();
                Text k = new Text();
                NullWritable v = NullWritable.get();
                protected void setup(Context context) throws IOException
                {
                        FileInputStream fs = new FileInputStream("C:/testmpjoin/info.txt");
                        InputStreamReader rn = new InputStreamReader(fs);
                        BufferedReader br = new BufferedReader(rn);
                       
                        String line = null;
                        while((line = br.readLine()) != null)
                        {
                                String[] fields = line.split(" ");
                                info.put(fields[0], fields[1]);
                        }
                }
               
                protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException
                {
                        String lines = values.toString();
                        String[] fields = lines.split(" ");
                        String value = info.get(fields[1]);
                        k.set(fields[1]+"\t"+fields[0]+"\t"+fields[2]+"\t"+value);
                        context.write(k, v);
                }
        }
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
        {
                Configuration conf = new Configuration();
               
                Job job = Job.getInstance();
               
                job.setJarByClass(TestMapJoin.class);
               
                job.setMapperClass(TestMapJoinMap.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(NullWritable.class);
               
                FileInputFormat.setInputPaths(job, new Path("C:/testinput"));
                FileOutputFormat.setOutputPath(job, new Path("C:/testoutput"));
               
                boolean res = job.waitForCompletion(true);
               
                System.exit(res?1:0);
               
               
        }
}
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条