求大神指教一下,为什么有错误
package org.apache.hadoop.examples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class SecondarySortEX {
public static class IntPair implements WritableComparable<IntPair>{
int first =0;
int second=0;
public void set(int left,int right){
first =left;
second=right;
}
public int getFirst(){
return first;
}
public int getSecond(){
return second;
}
@Override
public void readFields(DataInput in) throws IOException {
first=in.readInt();
second=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.write(second);
}
public int compareTo(IntPair o){
if (first!=o.first)
{
return first<o.first?-1:1;
}
else if(second!=o.second)
{
return second<o.second?-1:1;
}
else
{
return 0;
}
}
@Override
public int hashCode() {
return first*157+second;
}
public boolean equals(Object right){
if(right==null)
return false;
if(this==right)
return true;
if(right instanceof IntPair){
IntPair r=(IntPair)right;
return r.first==first&&r.second==second;
}
else{
return false;
}
}
}
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
public int getPartition(IntPair key,IntWritable value,int numPartitions)
{
return Math.abs(key.getFirst()*127) % numPartitions;
}
}
public static class GroupingComparator implements RawComparator<IntPair>{
@Override
public int compare(IntPair o1, IntPair o2) {
int first1=o1.getFirst();
int first2=o2.getFirst();
return first1==first2?0:(first1<first2?-1:1);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2,
int s2, int l2) {
return WritableComparator.compareBytes(b1, s1,Integer.SIZE/8, b2, s2, Integer.SIZE/8);
}
}
public static class MapClass extends Mapper<LongWritable,Text,IntPair,IntWritable>{
private final IntPair inkey=new IntPair();
private final IntWritable invalue=new IntWritable();
@Override
protected void map(LongWritable Key, Text Value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr=new StringTokenizer(invalue.toString());
int left=0;
int right=0;
if(itr.hasMoreTokens()){
left=Integer.parseInt(itr.nextToken());
if(itr.hasMoreTokens()){
right=Integer.parseInt(itr.nextToken());
}
inkey.set(left, right);
invalue.set(right);
context.write(inkey, invalue);
}
}
public static class ReduceClass extends Reducer<IntPair,IntWritable,Text,IntWritable>{
private final Text SEPARATOR=new Text("--------------------------------------");
private final Text first=new Text();
@Override
protected void reduce(IntPair key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
context.write(SEPARATOR, null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value:values){
context.write(first, value);
}
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
System.out.println("Begin");
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("please input at least 2 arguments");
System.exit(2);
}
Job job = new Job(conf, "SecondarySort Score");
job.setJarByClass(SecondarySortEX.class);
job.setMapperClass(MapClass.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setReducerClass(ReduceClass.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("End");
}
}
}
错误·
|
|