- package cn.basemapreduce;
-
- import java.io.IOException;
- import java.util.StringTokenizer;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
-
-
-
- public class Filter3 {
-
- public static class Map extends
- Mapper<LongWritable, Text, Text, LongWritable> {
-
- @Override
- protected void map(LongWritable key, Text value,
- org.apache.hadoop.mapreduce.Mapper.Context context)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
-
- String line = value.toString();
-
- StringTokenizer tokenizer = new StringTokenizer(line, "\n");
-
- while (tokenizer.hasMoreElements()) {
-
- StringTokenizer st = new StringTokenizer(tokenizer.nextToken());
-
- String score = st.nextToken();
- String name = st.nextToken();
-
- if(name.equals("赵六")){
-
- int tscore = Integer.parseInt(score);
- Text tname = new Text(name);
-
- context.write(name, new LongWritable(tscore));
- }
-
- }
- }
-
- }
-
-
- public static class Reduce extends Reducer<Text, LongWritable, Text, Text>{
-
-
- protected void reduce(Text key, Iterable<LongWritable> values,Context context)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- context.write(key, new Text(""));
- }
-
- }
-
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- // conf.set("mapred.job.tracker", "192.168.1.2:9001");
-
- String[] ioArgs = new String[] { "mkdir", "score_out" };
-
- String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
- .getRemainingArgs();
-
- if (otherArgs.length != 2) {
- System.err.println("Usage: Score Average <in> <out>");
-
- System.exit(2);
- }
-
- Job job = new Job(conf, "Filter3");
-
- job.setJarByClass(Filter3.class);
-
- // 设置Map、Combine和Reduce处理类
- job.setMapperClass(Map.class);
- //它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量
- job.setCombinerClass(Reduce.class);
- job.setReducerClass(Reduce.class);
-
- // 设置输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
- job.setInputFormatClass(TextInputFormat.class);
-
- // 提供一个RecordWriter的实现,负责数据输出
- job.setOutputFormatClass(TextOutputFormat.class);
-
- // 设置输入和输出目录
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-
- }
复制代码
自己写的时候Reduce总是不会被执行,不知道哪里有问题?
大家帮忙看下
|