分享

mapreduce如何修改文件名(MultipleOutputs使用)

desehawk 发表于 2014-10-30 13:21:29 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 29138

问题导读
1.修改文件名需要那个类来完成?
2.修改文件名在驱动程序(main)及reduce中,需要做哪些修改?






我们有时候需要根据情况定制我的输出文件名。

比如我要根据did的值分组,产生不同的输出文件。所有did出现次数在[0, 2)的都输出到a文件中,在[2, 4)的输出大b文件,其他输出到c文件。

这里涉及到的输出类是MultipleOutputs类。下面是介绍如何实现。

首先有一个小优化,为了避免每次执行时输入一长串命令,利用maven exec plugin,参考pom.xml配置如下:



  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  
  3.   <modelVersion>4.0.0</modelVersion>  
  4.   <groupId>org.freebird</groupId>  
  5.   <artifactId>mr1_example2</artifactId>  
  6.   <packaging>jar</packaging>  
  7.   <version>1.0-SNAPSHOT</version>  
  8.   <name>mr1_example2</name>  
  9.   <url>http://maven.apache.org</url>  
  10.   <dependencies>  
  11.     <dependency>  
  12.       <groupId>org.apache.hadoop</groupId>  
  13.       <artifactId>hadoop-core</artifactId>  
  14.       <version>1.2.1</version>  
  15.     </dependency>  
  16.   </dependencies>  
  17.   <build>  
  18.     <plugins>  
  19.       <plugin>  
  20.         <groupId>org.codehaus.mojo</groupId>  
  21.         <artifactId>exec-maven-plugin</artifactId>  
  22.         <version>1.3.2</version>  
  23.         <executions>  
  24.           <execution>  
  25.             <goals>  
  26.               <goal>exec</goal>  
  27.             </goals>  
  28.           </execution>  
  29.         </executions>  
  30.         <configuration>  
  31.           <executable>hadoop</executable>  
  32.           <arguments>  
  33.             <argument>jar</argument>  
  34.             <argument>target/mr1_example2-1.0-SNAPSHOT.jar</argument>  
  35.             <argument>org.freebird.LogJob</argument>  
  36.             <argument>/user/chenshu/share/logs</argument>  
  37.             <argument>/user/chenshu/share/output12</argument>  
  38.           </arguments>  
  39.         </configuration>  
  40.       </plugin>  
  41.     </plugins>  
  42.   </build>  
  43. </project>
复制代码


这样每次mvn clean package之后,运行mvn exec:exec命令即可。


然后在LogJob.java文件添加几行代码:


  1. package org.freebird;  
  2.   
  3. import org.apache.hadoop.io.IntWritable;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  9. import org.freebird.reducer.LogReducer;  
  10. import org.freebird.mapper.LogMapper;  
  11. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14.   
  15.   
  16.   
  17. public class LogJob {                                                                                                                                                                                                  
  18.                                                                                                                                                                                                                        
  19.     public static void main(String[] args) throws Exception {                                                                                                                                                         
  20.         System.out.println("args[0]:" + args[0]);                                                                                                                                                                     
  21.         System.out.println("args[1]:" + args[1]);                                                                                                                                                                     
  22.                                                                                                                                                                                                                        
  23.         Configuration conf = new Configuration();                                                                                                                                                                     
  24.         Job job = new Job(conf, "sum_did_from_log_file");                                                                                                                                                              
  25.         job.setJarByClass(LogJob.class);                                                                                                                                                                              
  26.                                                                                                                                                                                                                        
  27.         job.setMapperClass(org.freebird.mapper.LogMapper.class);                                                                                                                                                      
  28.         job.setReducerClass(org.freebird.reducer.LogReducer.class);                                                                                                                                                   
  29.                                                                                                                                                                                                                        
  30.         job.setOutputKeyClass(Text.class);                                                                                                                                                                             
  31.         job.setOutputValueClass(IntWritable.class);                                                                                                                                                                    
  32.                                                                                                                                                                                                                        
  33.         MultipleOutputs.addNamedOutput(job, "a", TextOutputFormat.class, Text.class, IntWritable.class);                                                                                                               
  34.         MultipleOutputs.addNamedOutput(job, "b", TextOutputFormat.class, Text.class, Text.class);                                                                                                                     
  35.         MultipleOutputs.addNamedOutput(job, "c", TextOutputFormat.class, Text.class, Text.class);                                                                                                                     
  36.                                                                                                                                                                                                                        
  37.         FileInputFormat.addInputPath(job, new Path(args[0]));                                                                                                                                                         
  38.         FileOutputFormat.setOutputPath(job, new Path(args[1]));                                                                                                                                                        
  39.                                                                                                                                                                                                                        
  40.         System.exit(job.waitForCompletion(true) ? 0 : 1);                                                                                                                                                              
  41.     }                                                                                                                                                                                                                  
  42. }  
复制代码



MultipleOutputs.addNamedOutput 函数被调用了三次,设置了文件名为a,b和c,最后两个参数分别是output key和output value类型,应该和job.setOutputKeyClass以及job.setOutputValueClass保持一致。


最后修改reducer类的代码:


  1. public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
  2.   
  3.     private IntWritable result = new IntWritable();  
  4.   
  5.     private MultipleOutputs outputs;  
  6.   
  7.     @Override  
  8.     public void setup(Context context) throws IOException, InterruptedException {  
  9.         System.out.println("enter LogReducer:::setup method");  
  10.         outputs = new MultipleOutputs(context);  
  11.     }  
  12.   
  13.     @Override  
  14.     public void cleanup(Context context) throws IOException, InterruptedException {  
  15.         System.out.println("enter LogReducer:::cleanup method");  
  16.         outputs.close();  
  17.     }  
  18.   
  19.     public void reduce(Text key, Iterable<IntWritable> values,  
  20.                        Context context) throws IOException, InterruptedException {  
  21.         System.out.println("enter LogReducer::reduce method");  
  22.         int sum = 0;  
  23.         for (IntWritable val : values) {  
  24.             sum += val.get();  
  25.         }  
  26.         result.set(sum);  
  27.         System.out.println("key: " + key.toString() + " sum: " + sum);                                                                                               
  28.         if ((sum < 2) && (sum >= 0)) {  
  29.             outputs.write("a", key, sum);  
  30.         } else if (sum < 4) {  
  31.             outputs.write("b", key, sum);  
  32.         } else {  
  33.             outputs.write("c", key, sum);  
  34.         }  
  35.     }  
  36. }  
复制代码

根据相同key(did)sum的结果大小,写入到不同的文件中。运行后观察一下结果:
  1. [chenshu@hadoopMaster output12]$ ls  
  2. a-r-00000  a-r-00004  a-r-00008  a-r-00012  b-r-00001  b-r-00005  b-r-00009  b-r-00013  c-r-00002  c-r-00006  c-r-00010  c-r-00014     part-r-00002  part-r-00006  part-r-00010  part-r-00014  
  3. a-r-00001  a-r-00005  a-r-00009  a-r-00013  b-r-00002  b-r-00006  b-r-00010  b-r-00014  c-r-00003  c-r-00007  c-r-00011  _logs         part-r-00003  part-r-00007  part-r-00011  _SUCCESS  
  4. a-r-00002  a-r-00006  a-r-00010  a-r-00014  b-r-00003  b-r-00007  b-r-00011  c-r-00000  c-r-00004  c-r-00008  c-r-00012  part-r-00000  part-r-00004  part-r-00008  part-r-00012  
  5. a-r-00003  a-r-00007  a-r-00011  b-r-00000  b-r-00004  b-r-00008  b-r-00012  c-r-00001  c-r-00005  c-r-00009  c-r-00013  part-r-00001  part-r-00005  part-r-00009  part-r-00013
复制代码

打开任意的a,b和c开头的文件,查看值果然是如此
  1. 5371700bc7b2231db03afeb0        6  
  2. 5371700cc7b2231db03afec0        7  
  3. 5371701cc7b2231db03aff8d        6  
  4. 5371709dc7b2231db03b0136        6  
  5. 537170a0c7b2231db03b01ac        6  
  6. 537170a6c7b2231db03b01fc        6  
  7. 537170a8c7b2231db03b0217        6  
  8. 537170b3c7b2231db03b0268        6  
  9. 53719aa9c7b2231db03b0721        6  
  10. 53719ad0c7b2231db03b0731        4  
复制代码

使用MultipleOutputs根据sum值对设备ID进行分组成功了。
MapReduce仍然会默认生成part....文件,不用理会,都是空文件。








已有(2)人评论

跳转到指定楼层
monkey131499 发表于 2015-1-6 10:59:01
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条