本帖最后由 lzw 于 2013-11-30 17:13 编辑
排序文件名:content.txt,文件数据分为两列,第一列做为分组,第二列做为排序。B,10
A,15
C,80
B,60
A,99
C,35
A,19
B,23
A,1
C,12
C,1
下面是实现的代码:
- public class SortByT {
- public static class Map extends Mapper<Object,Text,Text,Text>{
- public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
- context.write(value, value); // 已value作为map输出key和value
- }
- }
-
- public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
- public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
- for(Text tx : values){
- context.write(null, tx); // reduce输出已排好序value值,key输出为null
- }
- }
- }
-
- public static class MyPartitioner extends Partitioner<Text,Text>{
-
- @Override
- public int getPartition(Text key, Text arg1, int numPartitioner) {
- String ky = key.toString();
- String[] k = ky.split(",");
- return Math.abs(k[0].hashCode()*127) % numPartitioner; // 已key中'_'之前值作为分组
- }
- }
-
- public static class KeyComparator implements RawComparator<Text>{
-
- public int compare(){
- return 0;
- }
-
- public int compare(Text o1, Text o2) {
- // 已key中'_'之后值排序输出
- String[] c1 = o1.toString().split(",");
- String[] c2 = o2.toString().split(",");
-
- int f = Integer.parseInt(c1[1]);
- int s = Integer.parseInt(c2[1]);
-
- if(f > s){
- return 1;
- }else{
- return -1;
- }
- }
-
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
- int arg4, int arg5) {
- return WritableComparator.compareBytes(arg0,arg1,arg2,arg3,arg4,arg5);
- }
- }
-
-
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
- Configuration conf = new Configuration();
- Job job = new Job(conf,"Sort");
- job.setJarByClass(SortByTime.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- job.setPartitionerClass(MyPartitioner.class);
- job.setGroupingComparatorClass(KeyComparator.class);
- job.setNumReduceTasks(2);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
复制代码
输出结果:
B,10
B,23
B,60
A,1
A,15
A,19
A,99
C,1
C,12
C,35
C,80
加入qq群(号码:39327136),讨论云技术,获取最新资讯资源等 |