- package com.duplicate;
-
- import java.io.IOException;
- import java.util.Random;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class OdsEmp {
- public static class Map extends Mapper<Object, Text, Text, Text>{
-
- private Text k = new Text();
- private Text vs = new Text();
-
- public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
- String v = value.toString();
- String[] token = v.split(",");
- k.set(token[0] + "," + token[1]); //使用 i_date + "," + v_emp_id组合key值
-
- vs.set(token[3] + "," + token[4]); // 使用 i_amr1 + "," + i_amr2组合value值
- context.write(k, vs);
- }
- }
-
- public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
-
- public void reduce(Text key,Iterable<Text> values, Context context)throws IOException,InterruptedException{
- FloatWritable i_amr1 = new FloatWritable();
- FloatWritable i_amr2 = new FloatWritable();
-
- // 进行同一组的sum
- for(Text tx : values){
- String[] split = tx.toString().split(",");
- i_amr1.set( i_amr1.get() + Float.parseFloat(split[0]));
- i_amr2.set(i_amr2.get() + Float.parseFloat(split[1]));
- }// End for
-
- Text value = new Text();
- value.set(key + "," + i_amr1 + "," + i_amr2); // 以i_date,v_emp_id,v_proj_id,i_amr1,i_amr2输出
- context.write(null, value);
- }
- }
-
- /**
- * @param args
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- // TODO Auto-generated method stub
- Configuration conf = new Configuration();
- Job job = new Job(conf,"OdsEmp");
- job.setJarByClass(OdsEmp.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
-
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- System.exit(job.waitForCompletion(true)?0:1);
-
- }
-
- }
复制代码
上面代码是对应group 和sum的实现代码,对排序将上面job执行输出值,再做一次mapreduce进行排序。
|