lzw 发表于 2013-11-29 23:12:06

Map 端辅助排序分享

本帖最后由 lzw 于 2013-11-30 17:13 编辑

排序文件名:content.txt,文件数据分为两列,第一列做为分组,第二列做为排序。B,10
A,15
C,80
B,60
A,99
C,35
A,19
B,23
A,1
C,12
C,1

下面是实现的代码:public class SortByT {
      public static class Map extends Mapper<Object,Text,Text,Text>{
                public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
                        context.write(value, value); //已value作为map输出key和value
                }
      }
      
      public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
                public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
                        for(Text tx : values){
                              context.write(null, tx); // reduce输出已排好序value值,key输出为null
                        }
                }
      }
      
      public static class MyPartitioner extends Partitioner<Text,Text>{

                @Override
                public int getPartition(Text key, Text arg1, int numPartitioner) {
                        String ky = key.toString();
                        String[] k = ky.split(",");
                        return Math.abs(k.hashCode()*127) % numPartitioner; // 已key中'_'之前值作为分组
                }
      }
      
      public static class KeyComparator implements RawComparator<Text>{
               
                public int compare(){
                        return 0;
                }

                public int compare(Text o1, Text o2) {
                        // 已key中'_'之后值排序输出
                        String[] c1 = o1.toString().split(",");
                        String[] c2 = o2.toString().split(",");
                        
                        int f = Integer.parseInt(c1);
                        int s = Integer.parseInt(c2);
                        
                        if(f > s){
                              return 1;
                        }else{
                              return -1;
                        }
                }

                public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
                              int arg4, int arg5) {
                        return WritableComparator.compareBytes(arg0,arg1,arg2,arg3,arg4,arg5);
                }
      }
      
      
      public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
                Configuration conf = new Configuration();
                Job job = new Job(conf,"Sort");
                job.setJarByClass(SortByTime.class);
                job.setMapperClass(Map.class);
                job.setReducerClass(Reduce.class);
                job.setPartitionerClass(MyPartitioner.class);
                job.setGroupingComparatorClass(KeyComparator.class);
                job.setNumReduceTasks(2);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
               
                FileInputFormat.addInputPath(job, new Path(args));
                FileOutputFormat.setOutputPath(job, new Path(args));
               
                System.exit(job.waitForCompletion(true)?0:1);
      }
}输出结果:
B,10
B,23
B,60
A,1
A,15
A,19
A,99
C,1
C,12
C,35
C,80

加入qq群(号码:39327136)http://pub.idqqimg.com/wpa/images/group.png,讨论云技术,获取最新资讯资源等
页: [1]
查看完整版本: Map 端辅助排序分享