什么时候运行Combiner?

查看数: 38448 | 评论数: 7 | 收藏 0
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2015-6-15 19:41

正文摘要:

什么时候运行Combiner? 1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行; 2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到 ...

回复

tntzbzc 发表于 2015-8-11 12:19:55
cp87597 发表于 2015-8-4 16:16
配图中,Combiner不是在Partitioner之前么?

分区是最后一步
tntzbzc 发表于 2015-8-11 12:19:17
cp87597 发表于 2015-8-4 16:13
同问,“combine在map阶段的初步处理”这是啥意思啊

相当于本地reduce,只不过这个reduce是在map范围内
cp87597 发表于 2015-8-4 16:16:53
配图中,Combiner不是在Partitioner之前么?
cp87597 发表于 2015-8-4 16:13:31
tntzbzc 发表于 2015-6-16 10:59
combine在map阶段的初步处理

同问,“combine在map阶段的初步处理”这是啥意思啊
tntzbzc 发表于 2015-6-16 10:59:49
ding123lei 发表于 2015-6-16 10:12
执行combine的时候,首先数据格式应该是,是不是只有在merge结束后才可以执行combine?

combine在map阶段的初步处理
ding123lei 发表于 2015-6-16 10:12:08
执行combine的时候,首先数据格式应该是<key,Iterable<IntWritable>>,是不是只有在merge结束后才可以执行combine?
tntzbzc 发表于 2015-6-15 19:43:17
简述
Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入;
在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载;
并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。
什么时候运行Combiner?
1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行;
2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行;
3、Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。

实例代码:
[mw_shl_code=java,true]package MyCombiner;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class CombinerExp {
    private final static String INPUT_PATH = "hdfs://master:8020/input";
    private final static String OUTPUT_PATH = "hdfs://master:8020/output.txt";
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private IntWritable one = new IntWritable(1);//1
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] str = value.toString().split("\\s+");

            for (String string : str) {
                System.out.println(string);
                word.set(string);
                context.write(word, one);
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum+=val.get();
            }
            result.set(sum);
            context.write(key,result);
        }   
    }

    public static void main(String[] args) throws Exception {
        //1、配置  
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "word count");

        //2、打包运行必须执行的方法
        job.setJarByClass(CombinerExp.class);

        //3、输入路径
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  
        //4、Map
        job.setMapperClass(MyMapper.class);

        //5、Combiner
        job.setCombinerClass(MyReducer.class);

        //6、Reducer
        //job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(0);//reduce个数默认是1

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }  
}[/mw_shl_code]

[mw_shl_code=bash,true][root@master liguodong]# hdfs dfs -ls -R /input/
-rw-r--r--   1 root supergroup         27 2015-06-13 22:15 /input/input1
-rw-r--r--   1 root supergroup         38 2015-06-13 22:15 /input/input2

当我们只有map和combine而没有reduce时,combine并不会执行。
而输出的结果并没有被求和。
[root@master liguodong]# hdfs dfs -ls -R /output/
-rw-r--r--   3 liguodong supergroup          0 2015-06-13 22:17 /output/_SUCCESS
-rw-r--r--   3 liguodong supergroup         50 2015-06-13 22:17 /output/part-m-00000
-rw-r--r--   3 liguodong supergroup         39 2015-06-13 22:17 /output/part-m-00001


[root@master liguodong]# hdfs dfs -cat /output/part-m-00000
hello   1
you     1
hello   1
everyone        1
hello   1
hadoop  1
[root@master liguodong]# hdfs dfs -cat /output/part-m-00001
hello   1
you     1
hello   1
me      1
hi      1
baby    1

当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 32
    File System Counters
        ......
    Map-Reduce Framework
        Map input records=6
        Map output records=12
        ......
        Input split bytes=192
        Combine input records=12
        Combine output records=9
        ......
        Reduce input records=9
        Reduce output records=7
        Spilled Records=18
        ......
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=457912320
    File Input Format Counters
        Bytes Read=65
    File Output Format Counters
        Bytes Written=51

[root@master hadoop]# hdfs dfs -ls -R /output/
-rw-r--r--   3 liguodong supergroup          0 2015-06-13 22:41 /output/_SUCCESS
-rw-r--r--   3 liguodong supergroup         51 2015-06-13 22:41 /output/part-r-00000

[root@master hadoop]# hdfs dfs -cat /output/pa*
baby    1
everyone        1
hadoop  1
hello   5
hi      1
me      1
you     2[/mw_shl_code]

关闭

推荐上一条 /2 下一条