分享

求高手ChainMapReduce完整例子

bob007 发表于 2013-10-26 15:08:50 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 5382
目前做的东西需要用到chainmapreduce,想找相关例子看看,求高手帮助啊

已有(1)人评论

跳转到指定楼层
nextuser 发表于 2013-10-26 15:08:50
下面个这个例子没啥实际意思,但是很好的演示了ChainMapper的作用。
源文件
100        tom        90
101        mary        85
102        kate        60
map00的结果,过滤掉100的记录
101        mary        85
102        kate        60
map01的结果,过滤掉101的记录
102        kate        60
reduce结果
102        kate        60
package org.myorg;
import java.io.IOException;
import java.util.*;
import java.lang.String;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
public class WordCount
{
    public static class Map00 extends MapReduceBase implements Mapper
    {
        public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
        {
            Text ft = new Text(“100″);
            if(!key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }
    public static class Map01 extends MapReduceBase implements Mapper
    {
        public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
        {
            Text ft = new Text(“101″);
            if(!key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }
    public static class Reduce extends MapReduceBase implements Reducer
    {
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
        {
            while(values.hasNext())
            {
                output.collect(key, values.next());
            }
        }
    }
    public static void main(String[] args) throws Exception
    {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName(“wordcount00″);
        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        ChainMapper cm = new ChainMapper();
        JobConf mapAConf = new JobConf(false);
        cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf);
        JobConf mapBConf = new JobConf(false);
        cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf);
        conf.setReducerClass(Reduce.class);
        conf00.setOutputKeyClass(Text.class);
        conf00.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条