javaanddonet 发表于 2018-3-30 15:44:10

MapReduce 二次排序和全排序

本帖最后由 javaanddonet 于 2018-3-30 16:20 编辑

最近练习MR的二次排序和全排序。
先看输入文件:
aa 99
bb 98
cc 97
dd 96
aa 80
bb 70
cc 60
dd 77
aa 89
bb 91
cc 79
dd 83
aa 93
bb 94
cc 66
dd 86
3 60
4 22
5 10
6 50
3 81
4 75
5 67
6 27
3 19
4 51
5 69
6 33
3 43
4 64
5 46
6 36


我想让结果先按第一列升序排列,然后在这个基础上,在按照第二列降序排列,然后在取每一组的前两名。

我先考虑使用Map方法将所有key相同的值,聚集为<key,<v2,v4,v1,v3>>这种格式,然后在reduce阶段,对map的输出的value,转换为ArrayList<Int>数组,然后是JDK的Collections.sort(arrayList);进行排序。这样可以实现但是如果数据量很大,估计这个性能是个问题。
然后我查阅资料,需要自己定义一个map的输出类型,将输入文件的每一行值,当作一个key,然后把每一行第二列的值,当作一个value来交个map方法处理。Map的输出的KV是这种类型的:<<k1,v1,>, v1>;<<k2,v2,>, v2>。这就要自己去定义一个数据类型为KV对的数据类如下:然后在Map方法中,将这个数据类型提供给Map函数来使用,当作Map函数的输出的key的数据类型。我的问题来了
第一个问题:下面的这个自定义的数据类型中,我已经重写了compareTo方法,我发现这样已经ok了。已经实现类二次排序的功能。那为何我看网上很多人还要设置下面的这两个属性?并且在自己定义的实现类中重写了compare或compareTo方法。我感觉在自定义的数据类型中实现了两个字段的排序,这里不需要再次调用设置了。否则需要在这里进行重新配置实现。

job.setGroupingComparatorClass()//设置自定义分组的实现,当然这里会设置自己的实现类
job.setSortComparatorClass()//设置自定义排序的实现,当然这里会设置自己的实现类

第二个问题:不用设置reduce的实现,map任务完全可以完成二次排序了。问题网上很多人还要设置下面的reduce的实现?
job.setReducerClass(MyReduce.class)


第三个问题:设置分区的时候,我发现一个问题,如果设置了reduce的个数为1,那么即便是自定义了分区的实现,好像也不会调用实现类。因为我在自定义的分区中输出了一句话,真个MR运行完了,我发现没有输出我要输出的内容。而当我将reduce的个数设置2的时候,就会调用定义的分区实现类。也输出了我想输出内容。我能否这么认为:如果setNumReduceTasks设置为1,那么就不用自定义分区实现类了。因为即便是设置了,也不会调用。
job.setPartitionerClass(MyPartitionerClass.class);//设置分区
job.setNumReduceTasks(1)//设置reduce的个数

第四个问题:我想取排序后的每一组的前两名,怎么做?在reduce的迭代循环中,使用循环判定吗?如下代码中添加判断超过2就return不输出了?有没有类似于saprk中take(2)质量的函数?text.set(reduceKeyInPairKeyWritable.getFirst());
                        for(IntWritable valueIntWritable : reduceValueInIterable){
                              context.write(text, valueIntWritable);
                        }



//需要实现数据的序列化与反序列化,这样才能在多个节点之间传输数据!
//自定义组合数据类型,用与map和reduce的输入输出使用。
public class PairKeyWritable implements WritableComparable<PairKeyWritable>{
      private String first;
      private int second;
      public String getFirst() {
                return first;
      }
      public void setFirst(String first) {
                this.first = first;
      }
      public int getSecond() {
                return second;
      }
      public void setSecond(int second) {
                this.second = second;
      }
      public void set(String first, int second){
                this.setFirst(first);
                this.setSecond(second);
      }
      public PairKeyWritable(String first, int second) {
                this.set(first, second);
      }
      
      // 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
      public PairKeyWritable() { }
      
      //用于输出结果以tab键隔开
      @Override
      public String toString() {
//                        return this.getFirst() + "\t" + this.getSecond();//可以自己灵活定义对象toString的输出的方式
//                        return this.getFirst() + "\t";
                return this.getFirst();
      }
      
      
      
      
      //该方法需要重写,因为在根据可以进行分区的时候,会使用到该方法如下所示,如果job设置了分区方法,则需要调用这个方法。
//                /** Use {@link Object#hashCode()} to partition. */
//                  public int getPartition(K key, V value,
//                                          int numReduceTasks) {
//                  return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//                  }
      @Override
      public int hashCode() {
                final int prime = 31;
                int result = 1;
                result = prime * result + ((first == null) ? 0 : first.hashCode());
                result = prime * result + second;
                return result;
      }
      //至于为何要重写hashCode和equals方法,看看这个帖子:hashcode方法的作用
      //https://blog.csdn.net/anmoyyh/article/details/76019777
      @Override
      public boolean equals(Object obj) {
                if (this == obj)
                        return true;
                if (obj == null)
                        return false;
                if (getClass() != obj.getClass())
                        return false;
                PairKeyWritable other = (PairKeyWritable) obj;
                if (first == null) {
                        if (other.first != null)
                              return false;
                } else if (!first.equals(other.first))
                        return false;
                if (second != other.second)
                        return false;
                return true;
      }
      
      //注意:读和写的时候,字段的顺序要一致,否则会出现读写字段混乱的问题。
      @Override
      public void write(DataOutput out) throws IOException {
                out.writeUTF(this.first);
                out.writeInt(this.second);
      }
      
      //注意:读和写的时候,字段的顺序要一致,否则会出现读写字段混乱的问题。
      @Override
      public void readFields(DataInput in) throws IOException {
                this.first = in.readUTF();
                this.second = in.readInt();
      }
      
      //比较排序,先按照第一个字段排序,如果第一个自动排序完成后,再按照第二个字段排序。
      @Override
      public int compareTo(PairKeyWritable o) {
                PairKeyWritable pairKeyWritable = (PairKeyWritable) o;
                int firstCompareResult = this.getFirst().compareTo(pairKeyWritable.getFirst());
               
                //如果比较结果大于0,返回1;如果小于0,返回-1;等于0,返回0。
                //如果直接返回第一个字段比较结果,而不是用下面的if...else分别返回,那么最终的结果只有第一个字段是排序的,第二个字段是无序的。
//                        return firstCompareResult > 0 ? 1 : firstCompareResult < 0 ? -1 : 0;
               
                if(firstCompareResult != 0){//如果比较结果不等0,表示两个值不相等,那么直接返回该比较结果,否则使用第二个字段进行比较。
                        return firstCompareResult > 0 ? 1 : -1;//升序
//                              return firstCompareResult > 0 ? -1 : 1;//降序
                }else{
                        int secondCompareResult = this.getSecond() - pairKeyWritable.getSecond();//如果比较结果大于0,返回1; 如果小于0,返回-1; 如果等于0,返回0,表示两个要比较的值相等。
//                              return secondCompareResult > 0 ? 1 : secondCompareResult < 0 ? -1 : 0;//升序
                        return secondCompareResult > 0 ? -1 : secondCompareResult < 0 ? 1 : 0;//降序
                }

      }
}



Map函数:
public static class MyMap extends Mapper<LongWritable, Text, PairKeyWritable, IntWritable>{

                @Override
                protected void setup(Context context)
                              throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                }
               
                PairKeyWritable pairKeyWritable = new PairKeyWritable();
                String first = null;
                int second = 0;
                String[] lineStringArray = null;
                String line = null;
                // MapReduce框架每读一行数据就调用一次map方法
                @Override
                public void map(LongWritable mapKeyInLongWritable, Text mapValueInText, Context context) throws IOException, InterruptedException {
                        System.out.println("MyMap.........");
                        line = mapValueInText.toString();
                        lineStringArray = line.split(" ");
                        first = lineStringArray;
                        second = Integer.parseInt(lineStringArray);
                        pairKeyWritable.set(first, second);
                        context.write(pairKeyWritable, new IntWritable(second));
                }
               
                @Override
                protected void cleanup(Context context)
                              throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                }
      }

Reduce函数:
public static class MyReduce extends Reducer<PairKeyWritable, IntWritable, Text, IntWritable>{
               
                @Override
                protected void setup(Reducer<PairKeyWritable, IntWritable, Text, IntWritable>.Context context)
                              throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                }
                private Text text = new Text();
                @Override
                protected void reduce(PairKeyWritable reduceKeyInPairKeyWritable, Iterable<IntWritable> reduceValueInIterable, Context context) throws IOException, InterruptedException {
                        System.out.println("MyReduce====================");
                        //下面的for循环也可以正常显示已结果,这个更简洁。
//                        context.write(new Text(reduceKeyInPairKeyWritable.getFirst()), new IntWritable(reduceKeyInPairKeyWritable.getSecond()));
                        
                        text.set(reduceKeyInPairKeyWritable.getFirst());
                        for(IntWritable valueIntWritable : reduceValueInIterable){
                              context.write(text, valueIntWritable);
                        }
                        
                }
               
                @Override
                protected void cleanup(Reducer<PairKeyWritable, IntWritable, Text, IntWritable>.Context context)
                              throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                        super.cleanup(context);
                }
      }


分区函数:问题?如果reduce个数设置1,自定义的分区不会调用吗?
public static class MyPartitionerClass extends Partitioner<PairKeyWritable, IntWritable>{
                @Override
                public int getPartition(PairKeyWritable mapKeyOut, IntWritable mapValueOut, int numPartitions) {
                        System.out.println("MyPartitionerClass====================");
                        System.out.println("numPartitions="+numPartitions);
                        //根据key的hash code进行分区。如果很大数据量需要考虑抽样分区,然后才能避免出现数据倾斜和访问热点的问题
                        System.out.println("mapKeyOut.getFirst()="+mapKeyOut.getFirst());
                        System.out.println("mapKeyOut.getFirst().hashCode()="+mapKeyOut.getFirst().hashCode());
                        //与reduce job的个数进相除然后取余数。
                        int partitionResult = (mapKeyOut.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
                        System.out.println("partitionResult="+partitionResult);
                        System.out.println("");
                        return partitionResult;
                }
      }


分组函数:这里有一个问题,见里面的注释。我在里面将Object转换一下不可以吗?
public static class MyGroupingComparatorClass extends WritableComparator{
               
                //为何要这个方法?//必须要调用父类的构造器
                protected MyGroupingComparatorClass() {
                        super(PairKeyWritable.class, true);//注册comparator
                }
               
//                public int compare(Object o1, Object o2) {
                //为何上面的方法不行?必须要指定为:WritableComparable
                @Override
                public int compare(WritableComparable o1, WritableComparable o2) {
                        System.out.println("MyGroupingComparatorClass&&&&&&&&&&&&&&&&&&&&&&&&");
                        PairKeyWritable doubleSortBean_1 = (PairKeyWritable) o1;
                        PairKeyWritable doubleSortBean_2 = (PairKeyWritable) o2;
                        int minus = doubleSortBean_1.getFirst().compareTo(doubleSortBean_2.getFirst());
                return minus;
                }
      }


排序函数:
public static class MySortComparatorClass extends WritableComparator{
               
                //为何要这个方法?//必须要调用父类的构造器
                protected MySortComparatorClass() {
                        super(PairKeyWritable.class, true);//注册comparator
                }
               
//                public int compare(Object o1, Object o2) {
                //为何上面的方法不行?必须要指定为:WritableComparable
                @Override
                public int compare(WritableComparable o1, WritableComparable o2) {
                        System.out.println("MySortComparatorClass###########################");
                        PairKeyWritable doubleSortBean_1 = (PairKeyWritable) o1;
                        PairKeyWritable doubleSortBean_2 = (PairKeyWritable) o2;
                        int firstComapreResult = doubleSortBean_1.getFirst().compareTo(doubleSortBean_2.getFirst());
                        //分组内部进行排序,按照第二个字段进行排序,首先要保证是同一个组内,同一个组的标识就是第一个字段相同
                if (firstComapreResult != 0){
                  return firstComapreResult > 0 ? 1 : -1;
                } else {
                  int secondComapreResult = doubleSortBean_1.getSecond() - doubleSortBean_2.getSecond();
                  return secondComapreResult > 0 ? -1 : secondComapreResult < 0 ? 1 : 0;
                }
                }
      }





接下来是全排序的问题。我在使用InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);进行全排序的时候,为何看不到随机抽样的文件在HDFS上面,全局排序的设置如下:
// 设置partition file全路径到conf
      Path path = new Path(args);//随机抽样文件存放位置
      TotalOrderPartitioner.setPartitionFile(configration, path);
      // partitioner class设置成TotalOrderPartitioner
      job.setPartitionerClass(TotalOrderPartitioner.class);
      // RandomSampler第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数
      RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);
      // 写partition file到mapreduce.totalorderpartitioner.path
      InputSampler.writePartitionFile(job, sampler);

使用RandomSampler来做全局排序的时候,有一点比较恶心:要求Map的输入key和输出key的数据类型必须一致?这个太恶心了。我们还得自己重新定义InputFormat实现类。因为我的map输出的key是自定义的数据类型,而Map的输入key是使用的LongWritable类型的。除了自定义InputForMat实现类,还有其他的办法吗?



javaanddonet 发表于 2018-3-31 16:28:40

有大神指导一下吗?

fly2015 发表于 2018-4-2 14:45:22

非要用MR实现吗???

javaanddonet 发表于 2018-4-2 17:14:20

fly2015 发表于 2018-4-2 14:45
非要用MR实现吗???

不一定是要用MR。我只不过想深入研究一下。
我这两天研究了,发现这个MR 能写死人的。
转为hive里面的表,然后用HQL很简单了。
再说,好像没有公司用MR了吧。
页: [1]
查看完整版本: MapReduce 二次排序和全排序