fish_tx 发表于 2015-10-22 17:15:05

hadoop(大数据)对两列数据进行排序

需求说明:1、对两列数据进行升序排列2、先按第一列排序,再对第二列进行排序
案例数据3      5299      5019      119      692      453      5003      1400      138      319      5


程序

Mapperpackage com.fish.had.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, NewK2Writable, LongWritable>{
      protected NewK2Writable k2 = null;
      @Override      protected void map(                        LongWritable k1,                        Text v1,                        Mapper<LongWritable, Text, NewK2Writable, LongWritable>.Context context)                        throws IOException, InterruptedException {
                String[] ss = v1.toString().split("\t");                k2 = new NewK2Writable(Long.parseLong(ss), Long.parseLong(ss));                context.write(k2, new LongWritable(Long.parseLong(ss)));      }}
Reducerpackage com.fish.had.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>{
      protected void reduce(                        NewK2Writable k2,                        Iterable<LongWritable> v2s,                        Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>.Context context)                        throws IOException, InterruptedException {                context.write(new LongWritable(k2.first), new LongWritable(k2.second));      }}

自定义Writablepackage com.fish.had.sort;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class NewK2Writable implements WritableComparable<NewK2Writable>{
      long first;      long second;
      public NewK2Writable(){}
      public NewK2Writable(long first, long second){                this.first = first;                this.second = second;      }
      public void readFields(DataInput in) throws IOException {                this.first = in.readLong();                this.second = in.readLong();      }
      public void write(DataOutput out) throws IOException {                out.writeLong(this.first);                out.writeLong(this.second);      }
      @Override      public int compareTo(NewK2Writable o) {                long minus = this.first - o.first;                if(minus != 0)                        return (int) minus;                return (int) (this.second - o.second);      }
}


Mainpackage com.fish.had.sort;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class SortMain extends Configured implements Tool{
      public static void main(String[] args) throws Exception {                if(args.length != 2){                        System.err.print("----> parameter is two, not null");                        System.exit(1);                }                ToolRunner.run(new SortMain(), args);      }
      @SuppressWarnings("deprecation")      public int run(String[] args) throws Exception {                String INPATH = args;                String OUTPATH = args;                Path outPath = new Path(OUTPATH);
                Configuration conf = new Configuration();                FileSystem fs = FileSystem.get(new URI(INPATH), conf);                if(fs.exists(outPath)) fs.delete(outPath, true);
                Job job = new Job(conf, SortMain.class.getSimpleName());                job.setJarByClass(SortMain.class);
                job.setInputFormatClass(TextInputFormat.class);                job.setOutputFormatClass(TextOutputFormat.class);
                job.setMapperClass(SortMapper.class);                job.setReducerClass(SortReducer.class);
                job.setMapOutputKeyClass(NewK2Writable.class);                job.setMapOutputValueClass(LongWritable.class);
                job.setOutputKeyClass(LongWritable.class);                job.setOutputValueClass(LongWritable.class);
                job.setPartitionerClass(HashPartitioner.class);                job.setNumReduceTasks(1);
                FileInputFormat.setInputPaths(job, new Path(INPATH));                FileOutputFormat.setOutputPath(job, outPath);
                job.waitForCompletion(true);                return 0;      }
}

运行

查看结果

wangzhenqiang 发表于 2015-10-26 11:21:48

学习学习,整点文字说明就好了
页: [1]
查看完整版本: hadoop(大数据)对两列数据进行排序