hadoop(大数据)对两列数据进行排序
需求说明:1、对两列数据进行升序排列2、先按第一列排序,再对第二列进行排序案例数据3 5299 5019 119 692 453 5003 1400 138 319 5
程序
Mapperpackage com.fish.had.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, NewK2Writable, LongWritable>{
protected NewK2Writable k2 = null;
@Override protected void map( LongWritable k1, Text v1, Mapper<LongWritable, Text, NewK2Writable, LongWritable>.Context context) throws IOException, InterruptedException {
String[] ss = v1.toString().split("\t"); k2 = new NewK2Writable(Long.parseLong(ss), Long.parseLong(ss)); context.write(k2, new LongWritable(Long.parseLong(ss))); }}
Reducerpackage com.fish.had.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>{
protected void reduce( NewK2Writable k2, Iterable<LongWritable> v2s, Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { context.write(new LongWritable(k2.first), new LongWritable(k2.second)); }}
自定义Writablepackage com.fish.had.sort;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class NewK2Writable implements WritableComparable<NewK2Writable>{
long first; long second;
public NewK2Writable(){}
public NewK2Writable(long first, long second){ this.first = first; this.second = second; }
public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); }
public void write(DataOutput out) throws IOException { out.writeLong(this.first); out.writeLong(this.second); }
@Override public int compareTo(NewK2Writable o) { long minus = this.first - o.first; if(minus != 0) return (int) minus; return (int) (this.second - o.second); }
}
Mainpackage com.fish.had.sort;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class SortMain extends Configured implements Tool{
public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.print("----> parameter is two, not null"); System.exit(1); } ToolRunner.run(new SortMain(), args); }
@SuppressWarnings("deprecation") public int run(String[] args) throws Exception { String INPATH = args; String OUTPATH = args; Path outPath = new Path(OUTPATH);
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPATH), conf); if(fs.exists(outPath)) fs.delete(outPath, true);
Job job = new Job(conf, SortMain.class.getSimpleName()); job.setJarByClass(SortMain.class);
job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(NewK2Writable.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class);
job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(INPATH)); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); return 0; }
}
运行
查看结果
学习学习,整点文字说明就好了
页:
[1]