下面个这个例子没啥实际意思,但是很好的演示了ChainMapper的作用。
源文件
100 tom 90
101 mary 85
102 kate 60
map00的结果,过滤掉100的记录
101 mary 85
102 kate 60
map01的结果,过滤掉101的记录
102 kate 60
reduce结果
102 kate 60
package org.myorg;
import java.io.IOException;
import java.util.*;
import java.lang.String;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
public class WordCount
{
public static class Map00 extends MapReduceBase implements Mapper
{
public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
{
Text ft = new Text(“100″);
if(!key.equals(ft))
{
output.collect(key, value);
}
}
}
public static class Map01 extends MapReduceBase implements Mapper
{
public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
{
Text ft = new Text(“101″);
if(!key.equals(ft))
{
output.collect(key, value);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
{
while(values.hasNext())
{
output.collect(key, values.next());
}
}
}
public static void main(String[] args) throws Exception
{
JobConf conf = new JobConf(WordCount.class);
conf.setJobName(“wordcount00″);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
ChainMapper cm = new ChainMapper();
JobConf mapAConf = new JobConf(false);
cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf);
JobConf mapBConf = new JobConf(false);
cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf);
conf.setReducerClass(Reduce.class);
conf00.setOutputKeyClass(Text.class);
conf00.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
} |