howtodown 发表于 2014-9-29 18:42:50

对Map/Reduce分区与分组的实例分析

问题导读:
思考的内容偏多
1.Map/Reduce为什么分区?
2.分组的作用是什么?

static/image/hrline/4.gif



做两文件Join工作的.data.txt和info.txt
data.txt

201001 1003 abc
201002 1005 def
201003 1006 ghi
201004 1003 jkl
201005 1004 mno
201006 1005 pqr
info.txt

1003 kaka
1004 da
1005 jue
1006 zhao

期望输出结果:

1003        201001        abc kaka
1003        201004        jkl kaka
1004        201005        mno da
1005        201002        def jue
1005        201006        pqr jue
1006        201003        ghi zhao
代码部分:(这里分享下,群主给的代码,加上个人所做的试验)


public class JionQurey extends Configured implements Tool {

       
        public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text>{

                @Override
                public void map(LongWritable key, Text value, Context context)
                                throws IOException, InterruptedException {
                        String pathName = ((FileSplit)context.getInputSplit()).getPath().toString();
                        //根据文件名判断处理
                        if(pathName.contains("data.txt")){
                                String[] line =value.toString().split(" ");
                                if(line.length < 3){
                                        //data数据格式second不规范,字段小于3,抛弃数据
                                        return ;
                                }else{
                                        // 数据格式规范,区分标识为1
                                        TextPair tp = new TextPair(new Text(line), new Text("1"));
                                        context.write(tp, new Text(line+" "+line));
                                }
                        }
                        if(pathName.contains("info.txt")){
                                String[] line = value.toString().split(" ");
                                if(line.length < 2){
                                        // data数据格式不规范,字段小于2,抛弃数据
                                        return ;
                                }else{
                                        // 数据格式规范,区分标识为0
                                        TextPair tp = new TextPair(new Text(line), new Text("0"));
                                        context.write(tp, new Text(line));
                                }
                        }
                }
        }
       
        public static class Example_Join_01_Partitionner extends Partitioner<TextPair, Text>{

                @Override
                public int getPartition(TextPair key, Text value, int numParition) {
                        return Math.abs(key.getFirst().hashCode() * 127) % numParition;
                }
               
        }
       
        public static class Example_Join_01_Comparator extends WritableComparator{
               
                public Example_Join_01_Comparator(){
                        super(TextPair.class,true);
                }

                @Override
                public int compare(WritableComparable a, WritableComparable b) {
                        // TODO Auto-generated method stub
                        TextPair t1 =(TextPair) a;
                        TextPair t2 = (TextPair)b;
                        return t1.getFirst().compareTo(t2.getFirst());   //只要是第一个字段相同的就分成为同一组
                }
        }
//        1、map之后的输出会进行一些分区的操作,代码贴出来:
//
//        public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
//                @Override
//                public int getPartition(TextPair key, Text value, int numParititon) {
//                        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
//                }
//        }
        public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

                @Override
                public void reduce(TextPair key, Iterable<Text> values,
                                Context context)
                                throws IOException, InterruptedException {
                        Text pid = key.getFirst();
                        //其实这里已近排序好了第一个值就是info.txt的字段,后面的都是data.txt字段
                        String desc = values.iterator().next().toString();
                        while(values.iterator().hasNext()){
                                context.write(pid, new Text(values.iterator().next().toString()+" "+desc));
                        }
                }
        }
        /**
       * @param args
       * @throws Exception
       */
        public static void main(String[] args) throws Exception {
                int exitCode = ToolRunner.run(new Configuration(), new JionQurey(), args);
                System.exit(exitCode);
        }
       
        public static class TextPair implements WritableComparable<TextPair>{
                private Text first;
                private Text second;
               
                public Text getFirst() {
                        return first;
                }
                public void setFirst(Text first) {
                        this.first = first;
                }
                public Text getsecond() {
                        return second;
                }
                public void setsecond(Text second) {
                        this.second = second;
                }
               
//                public TextPair(Text first, Text second) {
//                        this.first = first;
//                        this.second = second;
//                }
                public void set(Text first, Text second) {
                        this.first = first;
                        this.second = second;
                }
                public TextPair() {
                        set(new Text(), new Text());
                }

                public TextPair(String first, String second) {
                        set(new Text(first), new Text(second));
                }

                public TextPair(Text first, Text second) {
                        set(first, second);
                }

                @Override
                public void readFields(DataInput in) throws IOException {
                        first.readFields(in);
                        second.readFields(in);
                }

                @Override
                public void write(DataOutput out) throws IOException {
                        first.write(out);
                        second.write(out);
                }

                @Override
                public int compareTo(TextPair tp) {
                        int cmp = first.compareTo(tp.first);
                        if(cmp != 0) return cmp;
                        return second.compareTo(tp.second);
                }
               
        }

        @Override
        public int run(String[] args) throws Exception {
                for(int i = 0 ;i<args.length;i++ ){
                        System.out.println(args);
                }
                Configuration conf = new Configuration();
                GenericOptionsParser parser = new GenericOptionsParser(conf,args);
                String[] otherArgs = parser.getRemainingArgs();
                if(args.length < 3){
                        System.out.println("please enter <in path1><in Path2> <out Path>");
                        System.exit(2);
                }
                FileSystem fs =FileSystem.get(URI.create(args), conf);
                if(fs.exists(new Path(args))){
                        fs.delete(new Path(args),true);
                }
               
                Job job = new Job(conf ,"JionQurey");
                // 设置运行的job
                job.setJarByClass(JionQurey.class);
                // 设置Map相关内容
                job.setMapperClass(Example_Join_01_Mapper.class);
                // 设置Map的输出
                job.setMapOutputKeyClass(TextPair.class);
                job.setMapOutputValueClass(Text.class);
                // 设置partition
                job.setPartitionerClass(Example_Join_01_Partitionner.class);
                // 在分区之后按照指定的条件分组
                job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
                // 设置reduce
                job.setReducerClass(Example_Join_01_Reduce.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                // 设置输入和输出的目录
                FileInputFormat.addInputPath(job, new Path(otherArgs));
                FileInputFormat.addInputPath(job, new Path(otherArgs));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs));
                // 执行,直到结束就退出
                return job.waitForCompletion(true) ? 0 : 1;
        }

}
实现的大致流程是:
C、Map执行完成之后,输出的中间结果如下:

1003,0        kaka
1004,0        da
1005,0        jue
1006,0        zhao
1003,1        201001        abc
1003,1        201004        jkl
1004,1        201005        mon
1005,1        201002        def
1005,1        201006        pqr
1006,1        201003        ghi


分区后:

同一区:
1003,0        kaka
1003,1        201001        abc
1003,1        201004        jkl

同一区:
1004,0        da
1004,1        201005        mon

同一区:
1005,0        jue
1005,1        201002        def
1005,1        201006        pqr

同一区:
1006,0        zhao
1006,1        201003        ghi

分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,

是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下:
分组后:

同一组:
1003,0        kaka
1003,0        201001        abc
1003,0        201004        jkl

同一组:
1004,0        da
1004,0        201005        mon

同一组:
1005,0        jue
1005,0        201002        def
1005,0        201006        pqr

同一组:
1006,0        zhao
1006,0        201003        ghi

看上去比我的所做的好多了,且不需要用这么多List来保存数据.

这样并不算完了,我们还没真正体验到Partitioner和WritableComparator到底实现了怎么一个功能.

于是我就先去到Partitioner,看看文件中出现的是什么..

1003        201001 abc kaka
1003        201004 jkl kaka
1004        201005 mno da
1005        201002 def jue
1005        201006 pqr jue
1006        201003 ghi zhao

咦,这不就是我们想要的结果吗?那我们为什么还要分组呢?

我是这样理解的,我们在分组的代码中是通过:

t1.getFirst().compareTo(t2.getFirst());

它把Map中的结果作为一个分区,然后直接通过WritableComparator忽略了第二个字段,分组。

然后,我又把代码恢复,去掉WritableComparator

看到的结果想象,于是我又该了下,把Reduce直接做打印操作。

public void reduce(TextPair key, Iterable<Text> values,
                                Context context)
                                throws IOException, InterruptedException {
                        Text pid = key.getFirst();
                        //其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段
//                        String desc = values.iterator().next().toString();
                        while(values.iterator().hasNext()){
                                context.write(pid, new Text(values.iterator().next().toString()+" "));
                        }
                }

显示结果:

1003        kaka
1003        201001 abc
1003        201004 jkl
1004        da
1004        201005 mno
1005        jue
1005        201002 def
1005        201006 pqr
1006        zhao
1006        201003 ghi

肯定有人会认为,这样一个结果不就和上面的一样了么,把屏蔽的去掉就行了,其实不然,像1003是分了两组的.

继续修改下代码:

public void reduce(TextPair key, Iterable<Text> values,
                                Context context)
                                throws IOException, InterruptedException {
                        Text pid = key.getFirst();
                        //其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段
//                        String desc = values.iterator().next().toString();
                        while(values.iterator().hasNext()){
                                context.write(pid, new Text(values.iterator().next().toString()+" "));
                        }
                        context.write(new Text("Group"), new Text("+++++++++++++++++++++++++++++++"));
                }

输出的结果是:

1003        kaka
Group        +++++++++++++++++++++++++++++++
1003        201001 abc
1003        201004 jkl
Group        +++++++++++++++++++++++++++++++
1004        da
Group        +++++++++++++++++++++++++++++++
1004        201005 mno
Group        +++++++++++++++++++++++++++++++
1005        jue
Group        +++++++++++++++++++++++++++++++
1005        201002 def
1005        201006 pqr
Group        +++++++++++++++++++++++++++++++
1006        zhao
Group        +++++++++++++++++++++++++++++++
1006        201003 ghi
Group        +++++++++++++++++++++++++++++++

可以看出他们的并同一个Reduce中输出的结果...分成了不同的组.

做了这么多,还是由些疑虑,为什么只用分组就能实现了我们想要的结果了呢?难道真是做为了一个分区来进行分组的吗?

那我们平常不重写Partitioner和WritableComparator的时候,怎么能够更具key来分开呢?还望早日有高手解答我的不明之处..

次日,同时又做了下hadoop利用Partitioner分类输出到不同的文件夹中的例子,我自是稍作修改,发现更加奇怪了,我这里只是显示一个文件夹,

               而在别人写的资料上显示的却是多个文件夹,如果通过一个Partitioner要分为多个文件夹的话,那岂不是上面的例题要分为很多个文件夹么..

于是我就带着问题去找导师,导师给我的解释是,分文件夹的是老版本的例题.Map中都是用output.collect();现在我直接继承的是Mapper,用的是新版本的

新版本不允许分为多个文件夹了....自能有一个文件夹。那WritableComparable是怎么一回事啊.直接告诉我主要功能是进行一种排序.

这些天一直不忘想这个问题,今天又得到点感悟...来解释下前面为什么会出现不要partitioner还是返回这样的结果:

首先,我们需要知道Map/Reduce的工作流程要有所清楚:Map ------> partitioner(分区) ------> comparato

来看下那个没有重写partitioner的代码.看到它的key传递的是个对象.在明确一下就是一个静态的对象.

着就表明了不管我是怎么new了个对象,他们的地址都是一样的.然后根据key来分区(默认),所以他们就会被分为同一个区.

至于接下来的分组操作就好理解了...










hb1984 发表于 2014-9-30 22:47:47

谢谢楼主的分享

maizhu 发表于 2014-10-3 22:56:28

好东西必须转

爱动的蜗牛 发表于 2015-11-15 16:44:18

群主,你是怎么获得分组后的中间结果的呀?

爱动的蜗牛 发表于 2015-11-15 16:48:18

还有,分组后kye的第二个字段怎么都是零啊?

liuzhixin137 发表于 2016-4-19 09:49:59

学习,学习

javaanddonet 发表于 2018-3-31 13:22:18


[*]分区后:
[*]
[*]同一区:
[*]1003,0      kaka
[*]1003,1      201001      abc
[*]1003,1      201004      jkl
[*]
[*]同一区:
[*]1004,0      da
[*]1004,1      201005      mon
[*]
[*]同一区:
[*]1005,0      jue
[*]1005,1      201002      def
[*]1005,1      201006      pqr
[*]
[*]同一区:
[*]1006,0      zhao
[*]1006,1      201003      ghi
这里我有两个疑问:
第一个:你虽然设置了分区的实现类,但是你没有设置reduce的个数,此时默认是1。此时你的分区实现类会执行吗?我个人认为,你的分区实现是不会被执行的。如果你的reduce个数是1。请不吝赐教。个人的观点而已。如果不正确请指正。
第二个:如果执行力你的分区实现类,那么分区结果一定是你上面这样的分区结果吗?刚好每一个分区,只有一个key值?感觉你上面的示例结果更像一个分组后的结构,而不是分区后的结果。我个人认为,每一个分区应该包含N个key值,但是一个key值,不可能在两个分区分布,只能属于一个分区。所以对你的分区后的结果感到怀疑。

mangohello 发表于 2018-4-18 17:55:47

这样做会产生数据倾斜吧

mangohello 发表于 2018-4-18 17:59:38

这是另一种做法
public class TestMapJoin
{
        static class TestMapJoinMap extends Mapper<LongWritable,Text,Text,NullWritable>
        {
                HashMap<String,String> info = new HashMap<String,String>();
                Text k = new Text();
                NullWritable v = NullWritable.get();
                protected void setup(Context context) throws IOException
                {
                        FileInputStream fs = new FileInputStream("C:/testmpjoin/info.txt");
                        InputStreamReader rn = new InputStreamReader(fs);
                        BufferedReader br = new BufferedReader(rn);
                       
                        String line = null;
                        while((line = br.readLine()) != null)
                        {
                                String[] fields = line.split(" ");
                                info.put(fields, fields);
                        }
                }
               
                protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException
                {
                        String lines = values.toString();
                        String[] fields = lines.split(" ");
                        String value = info.get(fields);
                        k.set(fields+"\t"+fields+"\t"+fields+"\t"+value);
                        context.write(k, v);
                }
        }
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
        {
                Configuration conf = new Configuration();
               
                Job job = Job.getInstance();
               
                job.setJarByClass(TestMapJoin.class);
               
                job.setMapperClass(TestMapJoinMap.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(NullWritable.class);
               
                FileInputFormat.setInputPaths(job, new Path("C:/testinput"));
                FileOutputFormat.setOutputPath(job, new Path("C:/testoutput"));
               
                boolean res = job.waitForCompletion(true);
               
                System.exit(res?1:0);
               
               
        }
}
页: [1]
查看完整版本: 对Map/Reduce分区与分组的实例分析