分享

Map 端辅助排序分享

lzw 2013-11-29 23:12:06 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 7250
本帖最后由 lzw 于 2013-11-30 17:13 编辑

排序文件名:content.txt,文件数据分为两列,第一列做为分组,第二列做为排序。B,10
A,15
C,80
B,60
A,99
C,35
A,19
B,23
A,1
C,12
C,1

下面是实现的代码:
  1. public class SortByT {
  2.         public static class Map extends Mapper<Object,Text,Text,Text>{
  3.                 public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
  4.                         context.write(value, value); //  已value作为map输出key和value
  5.                 }
  6.         }
  7.         
  8.         public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
  9.                 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
  10.                         for(Text tx : values){
  11.                                 context.write(null, tx); // reduce输出已排好序value值,key输出为null
  12.                         }
  13.                 }
  14.         }
  15.         
  16.         public static class MyPartitioner extends Partitioner<Text,Text>{
  17.                 @Override
  18.                 public int getPartition(Text key, Text arg1, int numPartitioner) {
  19.                         String ky = key.toString();
  20.                         String[] k = ky.split(",");
  21.                         return Math.abs(k[0].hashCode()*127) % numPartitioner; // 已key中'_'之前值作为分组
  22.                 }
  23.         }
  24.         
  25.         public static class KeyComparator implements RawComparator<Text>{
  26.                
  27.                 public int compare(){
  28.                         return 0;
  29.                 }
  30.                 public int compare(Text o1, Text o2) {
  31.                         // 已key中'_'之后值排序输出
  32.                         String[] c1 = o1.toString().split(",");
  33.                         String[] c2 = o2.toString().split(",");
  34.                         
  35.                         int f = Integer.parseInt(c1[1]);
  36.                         int s = Integer.parseInt(c2[1]);
  37.                         
  38.                         if(f > s){
  39.                                 return 1;
  40.                         }else{
  41.                                 return -1;
  42.                         }
  43.                 }
  44.                 public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
  45.                                 int arg4, int arg5) {
  46.                         return WritableComparator.compareBytes(arg0,arg1,arg2,arg3,arg4,arg5);
  47.                 }
  48.         }
  49.         
  50.         
  51.         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
  52.                 Configuration conf = new Configuration();
  53.                 Job job = new Job(conf,"Sort");
  54.                 job.setJarByClass(SortByTime.class);
  55.                 job.setMapperClass(Map.class);
  56.                 job.setReducerClass(Reduce.class);
  57.                 job.setPartitionerClass(MyPartitioner.class);
  58.                 job.setGroupingComparatorClass(KeyComparator.class);
  59.                 job.setNumReduceTasks(2);
  60.                 job.setOutputKeyClass(Text.class);
  61.                 job.setOutputValueClass(Text.class);
  62.                
  63.                 FileInputFormat.addInputPath(job, new Path(args[0]));
  64.                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
  65.                
  66.                 System.exit(job.waitForCompletion(true)?0:1);
  67.         }
  68. }
复制代码
输出结果:
B,10
B,23
B,60
A,1
A,15
A,19
A,99
C,1
C,12
C,35
C,80

加入qq群(号码:39327136),讨论云技术,获取最新资讯资源等
欢迎加入about云群9037177932227315139327136 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条