问题导读
1.修改文件名需要那个类来完成?
2.修改文件名在驱动程序(main)及reduce中,需要做哪些修改?
我们有时候需要根据情况定制我的输出文件名。
比如我要根据did的值分组,产生不同的输出文件。所有did出现次数在[0, 2)的都输出到a文件中,在[2, 4)的输出大b文件,其他输出到c文件。
这里涉及到的输出类是MultipleOutputs类。下面是介绍如何实现。
首先有一个小优化,为了避免每次执行时输入一长串命令,利用maven exec plugin,参考pom.xml配置如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.freebird</groupId>
- <artifactId>mr1_example2</artifactId>
- <packaging>jar</packaging>
- <version>1.0-SNAPSHOT</version>
- <name>mr1_example2</name>
- <url>http://maven.apache.org</url>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>1.2.1</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.3.2</version>
- <executions>
- <execution>
- <goals>
- <goal>exec</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <executable>hadoop</executable>
- <arguments>
- <argument>jar</argument>
- <argument>target/mr1_example2-1.0-SNAPSHOT.jar</argument>
- <argument>org.freebird.LogJob</argument>
- <argument>/user/chenshu/share/logs</argument>
- <argument>/user/chenshu/share/output12</argument>
- </arguments>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码
这样每次mvn clean package之后,运行mvn exec:exec命令即可。
然后在LogJob.java文件添加几行代码:
- package org.freebird;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.freebird.reducer.LogReducer;
- import org.freebird.mapper.LogMapper;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
-
- public class LogJob {
-
- public static void main(String[] args) throws Exception {
- System.out.println("args[0]:" + args[0]);
- System.out.println("args[1]:" + args[1]);
-
- Configuration conf = new Configuration();
- Job job = new Job(conf, "sum_did_from_log_file");
- job.setJarByClass(LogJob.class);
-
- job.setMapperClass(org.freebird.mapper.LogMapper.class);
- job.setReducerClass(org.freebird.reducer.LogReducer.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- MultipleOutputs.addNamedOutput(job, "a", TextOutputFormat.class, Text.class, IntWritable.class);
- MultipleOutputs.addNamedOutput(job, "b", TextOutputFormat.class, Text.class, Text.class);
- MultipleOutputs.addNamedOutput(job, "c", TextOutputFormat.class, Text.class, Text.class);
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
复制代码
MultipleOutputs.addNamedOutput 函数被调用了三次,设置了文件名为a,b和c,最后两个参数分别是output key和output value类型,应该和job.setOutputKeyClass以及job.setOutputValueClass保持一致。
最后修改reducer类的代码:
- public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
-
- private IntWritable result = new IntWritable();
-
- private MultipleOutputs outputs;
-
- @Override
- public void setup(Context context) throws IOException, InterruptedException {
- System.out.println("enter LogReducer:::setup method");
- outputs = new MultipleOutputs(context);
- }
-
- @Override
- public void cleanup(Context context) throws IOException, InterruptedException {
- System.out.println("enter LogReducer:::cleanup method");
- outputs.close();
- }
-
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- System.out.println("enter LogReducer::reduce method");
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- System.out.println("key: " + key.toString() + " sum: " + sum);
- if ((sum < 2) && (sum >= 0)) {
- outputs.write("a", key, sum);
- } else if (sum < 4) {
- outputs.write("b", key, sum);
- } else {
- outputs.write("c", key, sum);
- }
- }
- }
复制代码
根据相同key(did)sum的结果大小,写入到不同的文件中。运行后观察一下结果:
- [chenshu@hadoopMaster output12]$ ls
- a-r-00000 a-r-00004 a-r-00008 a-r-00012 b-r-00001 b-r-00005 b-r-00009 b-r-00013 c-r-00002 c-r-00006 c-r-00010 c-r-00014 part-r-00002 part-r-00006 part-r-00010 part-r-00014
- a-r-00001 a-r-00005 a-r-00009 a-r-00013 b-r-00002 b-r-00006 b-r-00010 b-r-00014 c-r-00003 c-r-00007 c-r-00011 _logs part-r-00003 part-r-00007 part-r-00011 _SUCCESS
- a-r-00002 a-r-00006 a-r-00010 a-r-00014 b-r-00003 b-r-00007 b-r-00011 c-r-00000 c-r-00004 c-r-00008 c-r-00012 part-r-00000 part-r-00004 part-r-00008 part-r-00012
- a-r-00003 a-r-00007 a-r-00011 b-r-00000 b-r-00004 b-r-00008 b-r-00012 c-r-00001 c-r-00005 c-r-00009 c-r-00013 part-r-00001 part-r-00005 part-r-00009 part-r-00013
复制代码
打开任意的a,b和c开头的文件,查看值果然是如此
- 5371700bc7b2231db03afeb0 6
- 5371700cc7b2231db03afec0 7
- 5371701cc7b2231db03aff8d 6
- 5371709dc7b2231db03b0136 6
- 537170a0c7b2231db03b01ac 6
- 537170a6c7b2231db03b01fc 6
- 537170a8c7b2231db03b0217 6
- 537170b3c7b2231db03b0268 6
- 53719aa9c7b2231db03b0721 6
- 53719ad0c7b2231db03b0731 4
复制代码
使用MultipleOutputs根据sum值对设备ID进行分组成功了。
MapReduce仍然会默认生成part....文件,不用理会,都是空文件。
|