以下是设计思路二的代码:
- package normalizedVersion1;
-
- import java.io.IOException;
- import java.util.StringTokenizer;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import readDataFromHdfs.readDataFromHdfs;
-
- public class normalizedVersion1 {
-
- static float max;
- static float min;
-
- public static class findMaxMinMapper
- extends Mapper<Object, Text, Text, IntWritable>{
-
- public void map(Object key, Text value, Context context
- ) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
-
- String headStr=itr.nextToken().toString();
-
- float headFloat=Float.parseFloat(headStr);
-
- context.write(new Text("flag"), new IntWritable((int) headFloat));
-
- }
-
- }
-
- public static class findMaxMinReducer
- extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable(1);
-
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context
- ) throws IOException, InterruptedException {
- int minTemp=65533,maxTemp=0;
- for (IntWritable val : values) {
- if(val.get()<(int)minTemp)
- minTemp=val.get();
- if(val.get()>(int)maxTemp)
- maxTemp=val.get();
- }
- context.write(new Text(minTemp+"\t"+maxTemp), result);
- }
- }
-
- public static class normalizedMapper extends Mapper<Object, Text, Text, IntWritable>{
- public void map(Object key, Text value, Context context
- ) throws IOException, InterruptedException {
-
- StringTokenizer itr = new StringTokenizer(value.toString());
-
- String headStr=itr.nextToken().toString();
-
- float headFloat=Float.parseFloat(headStr);
-
- headFloat=(headFloat-min)/(max-min);
- String headReplice= headFloat+" ";
- String valueStr=value.toString();
- String changeValue=valueStr.replaceFirst(headStr, headReplice);
- context.write(new Text(changeValue), new IntWritable(1));
- }
-
- @Override
- protected void setup(
- Mapper<Object, Text, Text, IntWritable>.Context context)
- throws IOException, InterruptedException {
-
- readDataFromHdfs rdfh=new readDataFromHdfs();
- String minMaxStr=rdfh.getMinMaxStr();
- String [] strs=minMaxStr.split("\t");
- min=Float.parseFloat(strs[0]);
- max=Float.parseFloat(strs[1]);
- super.setup(context);
- }
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf, "findMinMax");
- job.setJarByClass(normalizedVersion1.class);
- job.setMapperClass(findMaxMinMapper.class);
- job.setReducerClass(findMaxMinReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- if(!job.waitForCompletion(true))
- System.exit(1);
-
- Job job1 = Job.getInstance(conf, "normalize");
- job1.setJarByClass(normalizedVersion1.class);
- job1.setMapperClass(normalizedMapper.class);
- job1.setOutputKeyClass(Text.class);
- job1.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job1, new Path(args[0]));
- FileOutputFormat.setOutputPath(job1, new Path(args[2]));
- System.exit(job1.waitForCompletion(true) ? 0 : 1);
-
- }
- }
复制代码
|