public static class Partition extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value,
int numPartitions) {
int Maxnumber = 65223;
int bound = Maxnumber / numPartitions + 1;
int keynumber = key.get();
for (int i = 0; i < numPartitions; i++) {
if (keynumber < bound * i && keynumber >= bound * (i - 1)) {
return i - 1;
}
}
return 0;
}
}
那么我们该如何调用:(下面调用之后,你的分区函数就生效了)
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class); job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");
FileOutputFormat
.setOutputPath(job, new Path("/home/asheng/hadoop/out"));
job.waitForCompletion(true);
}
}