需求说明: 1、对两列数据进行升序排列 2、先按第一列排序,再对第二列进行排序
案例数据3 5 299 50 19 1 19 69 2 45 3 500 3 1 400 1 38 3 19 5
程序
Mapperpackage com.fish.had.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, NewK2Writable, LongWritable>{
protected NewK2Writable k2 = null;
@Override protected void map( LongWritable k1, Text v1, Mapper<LongWritable, Text, NewK2Writable, LongWritable>.Context context) throws IOException, InterruptedException {
String[] ss = v1.toString().split("\t"); k2 = new NewK2Writable(Long.parseLong(ss[0]), Long.parseLong(ss[1])); context.write(k2, new LongWritable(Long.parseLong(ss[1]))); } }
Reducerpackage com.fish.had.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>{
protected void reduce( NewK2Writable k2, Iterable<LongWritable> v2s, Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { context.write(new LongWritable(k2.first), new LongWritable(k2.second)); } }
自定义Writablepackage com.fish.had.sort;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class NewK2Writable implements WritableComparable<NewK2Writable>{
long first; long second;
public NewK2Writable(){}
public NewK2Writable(long first, long second){ this.first = first; this.second = second; }
public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); }
public void write(DataOutput out) throws IOException { out.writeLong(this.first); out.writeLong(this.second); }
@Override public int compareTo(NewK2Writable o) { long minus = this.first - o.first; if(minus != 0) return (int) minus; return (int) (this.second - o.second); }
}
Mainpackage com.fish.had.sort;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class SortMain extends Configured implements Tool{
public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.print("----> parameter is two, not null"); System.exit(1); } ToolRunner.run(new SortMain(), args); }
@SuppressWarnings("deprecation") public int run(String[] args) throws Exception { String INPATH = args[0]; String OUTPATH = args[1]; Path outPath = new Path(OUTPATH);
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPATH), conf); if(fs.exists(outPath)) fs.delete(outPath, true);
Job job = new Job(conf, SortMain.class.getSimpleName()); job.setJarByClass(SortMain.class);
job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(NewK2Writable.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class);
job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(INPATH)); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); return 0; }
}
运行
查看结果
|