cp87597 发表于 2015-8-4 16:16 分区是最后一步 |
cp87597 发表于 2015-8-4 16:13 相当于本地reduce,只不过这个reduce是在map范围内 |
配图中,Combiner不是在Partitioner之前么? |
tntzbzc 发表于 2015-6-16 10:59 同问,“combine在map阶段的初步处理”这是啥意思啊 |
ding123lei 发表于 2015-6-16 10:12 combine在map阶段的初步处理 |
执行combine的时候,首先数据格式应该是<key,Iterable<IntWritable>>,是不是只有在merge结束后才可以执行combine? |
简述 Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入; 在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载; 并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。 什么时候运行Combiner?1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行; 2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行; 3、Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。 实例代码: [mw_shl_code=java,true]package MyCombiner; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CombinerExp { private final static String INPUT_PATH = "hdfs://master:8020/input"; private final static String OUTPUT_PATH = "hdfs://master:8020/output.txt"; public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private IntWritable one = new IntWritable(1);//1 private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] str = value.toString().split("\\s+"); for (String string : str) { System.out.println(string); word.set(string); context.write(word, one); } } } public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{ private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum+=val.get(); } result.set(sum); context.write(key,result); } } public static void main(String[] args) throws Exception { //1、配置 Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf); if(fileSystem.exists(new Path(OUTPUT_PATH))) { fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(conf, "word count"); //2、打包运行必须执行的方法 job.setJarByClass(CombinerExp.class); //3、输入路径 FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); //4、Map job.setMapperClass(MyMapper.class); //5、Combiner job.setCombinerClass(MyReducer.class); //6、Reducer //job.setReducerClass(MyReducer.class); job.setNumReduceTasks(0);//reduce个数默认是1 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //7、 输出路径 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //8、提交作业 System.exit(job.waitForCompletion(true) ? 0 : 1); } }[/mw_shl_code] [mw_shl_code=bash,true][root@master liguodong]# hdfs dfs -ls -R /input/ -rw-r--r-- 1 root supergroup 27 2015-06-13 22:15 /input/input1 -rw-r--r-- 1 root supergroup 38 2015-06-13 22:15 /input/input2 当我们只有map和combine而没有reduce时,combine并不会执行。 而输出的结果并没有被求和。 [root@master liguodong]# hdfs dfs -ls -R /output/ -rw-r--r-- 3 liguodong supergroup 0 2015-06-13 22:17 /output/_SUCCESS -rw-r--r-- 3 liguodong supergroup 50 2015-06-13 22:17 /output/part-m-00000 -rw-r--r-- 3 liguodong supergroup 39 2015-06-13 22:17 /output/part-m-00001 [root@master liguodong]# hdfs dfs -cat /output/part-m-00000 hello 1 you 1 hello 1 everyone 1 hello 1 hadoop 1 [root@master liguodong]# hdfs dfs -cat /output/part-m-00001 hello 1 you 1 hello 1 me 1 hi 1 baby 1 当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 32 File System Counters ...... Map-Reduce Framework Map input records=6 Map output records=12 ...... Input split bytes=192 Combine input records=12 Combine output records=9 ...... Reduce input records=9 Reduce output records=7 Spilled Records=18 ...... Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=457912320 File Input Format Counters Bytes Read=65 File Output Format Counters Bytes Written=51 [root@master hadoop]# hdfs dfs -ls -R /output/ -rw-r--r-- 3 liguodong supergroup 0 2015-06-13 22:41 /output/_SUCCESS -rw-r--r-- 3 liguodong supergroup 51 2015-06-13 22:41 /output/part-r-00000 [root@master hadoop]# hdfs dfs -cat /output/pa* baby 1 everyone 1 hadoop 1 hello 5 hi 1 me 1 you 2[/mw_shl_code] |