简述 Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入; 在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载; 并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。 什么时候运行Combiner?1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行;
2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行;
3、Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。
实例代码:
[mw_shl_code=java,true]package MyCombiner;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CombinerExp {
private final static String INPUT_PATH = "hdfs://master:8020/input";
private final static String OUTPUT_PATH = "hdfs://master:8020/output.txt";
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private IntWritable one = new IntWritable(1);//1
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] str = value.toString().split("\\s+");
for (String string : str) {
System.out.println(string);
word.set(string);
context.write(word, one);
}
}
}
public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum+=val.get();
}
result.set(sum);
context.write(key,result);
}
}
public static void main(String[] args) throws Exception {
//1、配置
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
if(fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job = Job.getInstance(conf, "word count");
//2、打包运行必须执行的方法
job.setJarByClass(CombinerExp.class);
//3、输入路径
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
//4、Map
job.setMapperClass(MyMapper.class);
//5、Combiner
job.setCombinerClass(MyReducer.class);
//6、Reducer
//job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(0);//reduce个数默认是1
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7、 输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
//8、提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}[/mw_shl_code]
[mw_shl_code=bash,true][root@master liguodong]# hdfs dfs -ls -R /input/
-rw-r--r-- 1 root supergroup 27 2015-06-13 22:15 /input/input1
-rw-r--r-- 1 root supergroup 38 2015-06-13 22:15 /input/input2
当我们只有map和combine而没有reduce时,combine并不会执行。
而输出的结果并没有被求和。
[root@master liguodong]# hdfs dfs -ls -R /output/
-rw-r--r-- 3 liguodong supergroup 0 2015-06-13 22:17 /output/_SUCCESS
-rw-r--r-- 3 liguodong supergroup 50 2015-06-13 22:17 /output/part-m-00000
-rw-r--r-- 3 liguodong supergroup 39 2015-06-13 22:17 /output/part-m-00001
[root@master liguodong]# hdfs dfs -cat /output/part-m-00000
hello 1
you 1
hello 1
everyone 1
hello 1
hadoop 1
[root@master liguodong]# hdfs dfs -cat /output/part-m-00001
hello 1
you 1
hello 1
me 1
hi 1
baby 1
当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 32
File System Counters
......
Map-Reduce Framework
Map input records=6
Map output records=12
......
Input split bytes=192
Combine input records=12
Combine output records=9
......
Reduce input records=9
Reduce output records=7
Spilled Records=18
......
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=457912320
File Input Format Counters
Bytes Read=65
File Output Format Counters
Bytes Written=51
[root@master hadoop]# hdfs dfs -ls -R /output/
-rw-r--r-- 3 liguodong supergroup 0 2015-06-13 22:41 /output/_SUCCESS
-rw-r--r-- 3 liguodong supergroup 51 2015-06-13 22:41 /output/part-r-00000
[root@master hadoop]# hdfs dfs -cat /output/pa*
baby 1
everyone 1
hadoop 1
hello 5
hi 1
me 1
you 2[/mw_shl_code]
|