分享

Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究

rsgg03 2014-1-13 00:38:32 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 11714
直到目前,我们看到的所有Mapreduce作业都输出一组文件。但是,在一些场合下,经常要求我们将输出多组文件或者把一个数据集分为多个数据集更为方便;比如将一个log里面属于不同业务线的日志分开来输出,并交给相关的业务线。

  用过旧API的人应该知道,旧API中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat和org.apache.hadoop.mapred.lib.MultipleOutputs,文档对MultipleOutputFormat的解释(MultipleOutputs 解释在后面)是:

 MultipleOutputFormat allowing to write the output data to different output files.
 MultipleOutputFormat可以将相似的记录输出到相同的数据集。在写每条记录之前,MultipleOutputFormat将调用generateFileNameForKeyValue方法来确定需要写入的文件名。通常,我们都是继承MultipleTextOutputFormat类,来重新实现generateFileNameForKeyValue方法以返回每个输出键/值对的文件名。generateFileNameForKeyValue方法的默认实现如下:

  1. protected String generateFileNameForKeyValue(K key, V value, String name) {
  2.     return name;
  3. }
复制代码
返回默认的name,我们可以在自己的类中重写这个方法,来定义自己的输出路径,比如:

  1. public static class PartitionFormat
  2.             extends MultipleTextOutputFormat<NullWritable, Text> {
  3.         @Override
  4.         protected String generateFileNameForKeyValue(
  5.                 NullWritable key,
  6.                 Text value,
  7.                 String name) {
  8.             String[] split = value.toString().split(",", -1);
  9.             String country = split[4].substring(1, 3);
  10.             return country + "/" + name;
  11.         }
  12. }
复制代码
这样相同country的记录将会输出到同一目录下的name文件中。完整的例子如下:

  1. package com.wyp;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapred.*;
  8. import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
  9. import org.apache.hadoop.util.GenericOptionsParser;
  10. import java.io.IOException;
  11. /**
  12. * User: http://www.iteblog.com/
  13. * Date: 13-11-26
  14. * Time: 上午10:02
  15. */
  16. public class OutputTest {
  17.     public static class MapClass extends MapReduceBase
  18.             implements Mapper<LongWritable, Text, NullWritable, Text> {
  19.         @Override
  20.         public void map(LongWritable key, Text value,
  21.                         OutputCollector<NullWritable, Text> output,
  22.                         Reporter reporter) throws IOException {
  23.             output.collect(NullWritable.get(), value);
  24.         }
  25.     }
  26.     public static class PartitionFormat
  27.             extends MultipleTextOutputFormat<NullWritable, Text> {
  28.         //和上面一样,就不写了
  29.     }
  30.     public static void main(String[] args) throws IOException {
  31.         Configuration conf = new Configuration();
  32.         JobConf job = new JobConf(conf, OutputTest.class);
  33.         String[] remainingArgs =
  34.                 new GenericOptionsParser(conf, args).getRemainingArgs();
  35.         if (remainingArgs.length != 2) {
  36.             System.err.println("Error!");
  37.             System.exit(1);
  38.         }
  39.         Path in = new Path(remainingArgs[0]);
  40.         Path out = new Path(remainingArgs[1]);
  41.         FileInputFormat.setInputPaths(job, in);
  42.         FileOutputFormat.setOutputPath(job, out);
  43.         job.setJobName("Output");
  44.         job.setMapperClass(MapClass.class);
  45.         job.setInputFormat(TextInputFormat.class);
  46.         job.setOutputFormat(PartitionFormat.class);
  47.         job.setOutputKeyClass(NullWritable.class);
  48.         job.setOutputValueClass(Text.class);
  49.         job.setNumReduceTasks(0);
  50.         JobClient.runJob(job);
  51.     }
  52. }
复制代码
将上面的程序打包成jar文件(具体怎么打包,就不说),并在
Hadoop
2.2.0上面运行(测试数据请在这里下载:http://pan.baidu.com/s/1td8xN):

  1. /home/q/hadoop-2.2.0/bin/hadoop jar                      \
  2.       /export1/tmp/wyp/OutputText.jar com.wyp.OutputTest \
  3.       /home/wyp/apat63_99.txt                            \
  4.       /home/wyp/out
复制代码
运行完程序之后,可以去/home/wyp/out目录看下运行结果:

  1. [wyp@l-datalog5.data.cn1 ~]$ /home/q/hadoop-2.2.0/bin/hadoop fs         \
  2.                                      -ls /home/wyp/out
  3. .............................这里省略了很多...................................
  4. drwxr-xr-x   - wyp  supergroup     0 2013-11-26 14:25 /home/wyp/out/VE
  5. drwxr-xr-x   - wyp  supergroup     0 2013-11-26 14:25 /home/wyp/out/VG
  6. drwxr-xr-x   - wyp  supergroup     0 2013-11-26 14:25 /home/wyp/out/VN
  7. drwxr-xr-x   - wyp  supergroup     0 2013-11-26 14:25 /home/wyp/out/VU
  8. drwxr-xr-x   - wyp  supergroup     0 2013-11-26 14:25 /home/wyp/out/YE
  9. .............................这里省略了很多...................................
  10. -rw-r--r--   3 wyp  supergroup     0 2013-11-26 14:25 /home/wyp/out/_SUCCESS
  11. [wyp@l-datalog5.data.cn1 ~]$ /home/q/hadoop-2.2.0/bin/hadoop fs        \
  12.                                     -ls /home/wyp/out/VN
  13. Found 2 items
  14. -rw-r--r-- 3 wyp supergroup  148 2013-11-26 14:25 /home/wyp/out/VN/part-00000
  15. -rw-r--r-- 3 wyp supergroup  566 2013-11-26 14:25 /home/wyp/out/VN/part-00001
  16. [wyp@l-datalog5.data.cn1 ~]$ /home/q/hadoop-2.2.0/bin/hadoop fs        \
  17.                                    -cat /home/wyp/out/VN/part-00001
  18. 3430490,1969,3350,1965,"VN","",597185,6,,73,4,43,,0,,,,,,,,,
  19. 3630470,1971,4379,1970,"VN","",,1,,244,5,55,,4,,0.375,,22.5,,,,,
  20. 3654325,1972,4477,1969,"VN","",,1,,554,1,14,,0,,,,,,,,,
  21. 3665081,1972,4526,1970,"VN","",,1,,373,6,66,,1,,0,,3,,,,,
  22. 3772710,1973,5072,1972,"VN","",,1,,4,6,65,,1,,0,,8,,,,,
  23. 3821853,1974,5296,1971,"VN","",,1,,33,6,69,,1,,0,,23,,,,,
  24. 3824277,1974,5310,1970,"VN","",347650,3,,562,1,14,,2,,0.5,,9,,,,0,0
  25. 3918104,1975,5793,1972,"VN","",,1,2,4,6,65,5,0,0.4,,0,,18.2,,,,
复制代码
从上面的结果可以看出,所有country相同的结果都输出到同一个文件夹下面了。MultipleOutputFormat对完全控制文件名和目录名很方便。大家也看到了上面的程序是基于行的split,如果我们要基于列的split,MultipleOutputFormat就无能为力了。这时MultipleOutputs就用上场了。MultipleOutputs在很早的版本就存在,那么我们先看看官方文档是怎么解释MultipleOutputs的:

 MultipleOutputs creates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector.

欢迎大家如about云官方群371358502,更新咨询,更新资源,随时关注

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条