分享

MapReduce中InputFormat详解

howtodown 发表于 2014-3-4 18:26:14 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10222
在查看原文前,可以查看

InputFormat的数据划分、Split调度、数据读取三个问题的浅析

hadoop CombineFileInputFormat使用


在书写mapreduce中看到job.setInputFormatClass(KeyValueTextInputFormat.class),就这么一行代码,就一个类名,总觉得可有可无,去掉吧,又觉得会少些内容,经过查找,看来不是可有可无的,原来里面文章还不少。


阅读下面内容可以带着下面问题:
1.job.setInputFormatClass(KeyValueTextInputFormat.class);此行代码的作用是什么?
2.InputFormat抽象类有几个方法?
3.通过InputFormat使用Mapreduce除了做到验证作业的正确性,还有什么作用?



已有(2)人评论

跳转到指定楼层
howtodown 发表于 2014-3-4 18:26:31
1. 概述

我们在设置MapReduce输入格式的时候,会调用这样一条语句:
  1. job.setInputFormatClass(KeyValueTextInputFormat.class);
复制代码
这条语句保证了输入文件会按照我们预设的格式被读取。KeyValueTextInputFormat即为我们设定的数据读取格式。

所有的输入格式类都继承自InputFormat,这是一个抽象类。其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等。相关类图简单画出如下(推荐新标签中打开图片查看):

o_1345714215_4111.png


2. InputFormat

从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:

  • List<InputSplit> getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
  • RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:

  • 验证作业输入的正确性
  • 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
  • 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用

InputFormat抽象类源码也很简单,如下供参考(文章格式考虑,删除了部分注释,添加了部分中文注释):
  1. public abstract class InputFormat<K, V> {
  2. /**
  3. * 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组 <input-file-path, start, offset>
  4. */
  5. public abstract List<InputSplit> getSplits(JobContext context)
  6. throws IOException, InterruptedException;
  7. /**
  8. * Create a record reader for a given split.
  9. */
  10. public abstract RecordReader<K, V> createRecordReader(InputSplit split,
  11. TaskAttemptContext context) throws IOException,
  12. InterruptedException;
  13. }
复制代码
不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。

3.InputSplit

Mappers的输入是一个一个的输入分片,称InputSplit。看源码可知,InputSplit也是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。
  1. public abstract class InputSplit {
  2. /**
  3. * 获取Split的大小,支持根据size对InputSplit排序.
  4. */
  5. public abstract long getLength() throws IOException, InterruptedException;
  6. /**
  7. * 获取存储该分片的数据所在的节点位置.
  8. */
  9. public abstract
  10. String[] getLocations() throws IOException, InterruptedException;
  11. }
复制代码
下面深入看一个InputSplit的子类:FileSplit类
  1. public class FileSplit extends InputSplit implements Writable {
  2. private Path file;
  3. private long start;
  4. private long length;
  5. private String[] hosts;
  6. /**
  7. * Constructs a split with host information
  8. *
  9. * @param file
  10. * the file name
  11. * @param start
  12. * the position of the first byte in the file to process
  13. * @param length
  14. * the number of bytes in the file to process
  15. * @param hosts
  16. * the list of hosts containing the block, possibly null
  17. */
  18. public FileSplit(Path file, long start, long length, String[] hosts) {
  19. this.file = file;
  20. this.start = start;
  21. this.length = length;
  22. this.hosts = hosts;
  23. }
  24. /** The number of bytes in the file to process. */
  25. @Override
  26. public long getLength() {
  27. return length;
  28. }
  29. @Override
  30. public String[] getLocations() throws IOException {
  31. if (this.hosts == null) {
  32. return new String[] {};
  33. } else {
  34. return this.hosts;
  35. }
  36. }
  37. // 略掉部分方法
  38. }
复制代码
从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。

再看一个InputSplit的子类:CombineFileSplit。源码如下:
  1. public class CombineFileSplit extends InputSplit implements Writable {
  2. private Path[] paths;
  3. private long[] startoffset;
  4. private long[] lengths;
  5. private String[] locations;
  6. private long totLength;
  7. public CombineFileSplit(Path[] files, long[] start, long[] lengths,
  8. String[] locations) {
  9. initSplit(files, start, lengths, locations);
  10. }
  11. private void initSplit(Path[] files, long[] start, long[] lengths,
  12. String[] locations) {
  13. this.startoffset = start;
  14. this.lengths = lengths;
  15. this.paths = files;
  16. this.totLength = 0;
  17. this.locations = locations;
  18. for (long length : lengths) {
  19. totLength += length;
  20. }
  21. }
  22. public long getLength() {
  23. return totLength;
  24. }
  25. /** Returns all the Paths where this input-split resides */
  26. public String[] getLocations() throws IOException {
  27. return locations;
  28. }
  29. //省略了部分构造函数和方法,深入学习请阅读源文件
  30. }
复制代码
上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。

需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。


4. FileInputFormat

FileInputFormat中,分片方法代码及详细注释如下,就不再详细解释该方法:
  1. public List<InputSplit> getSplits(JobContext job) throws IOException {
  2. // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小。
  3. // 由源码可知,这两个值可以通过mapred.min.split.size和mapred.max.split.size来设置
  4. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  5. long maxSize = getMaxSplitSize(job);
  6. // splits链表用来存储计算得到的输入分片结果
  7. List<InputSplit> splits = new ArrayList<InputSplit>();
  8. // files链表存储由listStatus()获取的输入文件列表,listStatus比较特殊,我们在下面详细研究
  9. List<FileStatus> files = listStatus(job);
  10. for (FileStatus file : files) {
  11. Path path = file.getPath();
  12. FileSystem fs = path.getFileSystem(job.getConfiguration());
  13. long length = file.getLen();
  14. // 获取该文件所有的block信息列表[hostname, offset, length]
  15. BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
  16. length);
  17. // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
  18. // 是否分割可以自行重写FileInputFormat的isSplitable来控制
  19. if ((length != 0) && isSplitable(job, path)) {
  20. long blockSize = file.getBlockSize();
  21. // 计算分片大小
  22. // 即 Math.max(minSize, Math.min(maxSize, blockSize));
  23. // 也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize
  24. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  25. long bytesRemaining = length;
  26. // 循环分片。
  27. // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片
  28. while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
  29. int blkIndex = getBlockIndex(blkLocations, length
  30. - bytesRemaining);
  31. splits.add(new FileSplit(path, length - bytesRemaining,
  32. splitSize, blkLocations[blkIndex].getHosts()));
  33. bytesRemaining -= splitSize;
  34. }
  35. // 处理余下的数据
  36. if (bytesRemaining != 0) {
  37. splits.add(new FileSplit(path, length - bytesRemaining,
  38. bytesRemaining,
  39. blkLocations[blkLocations.length - 1].getHosts()));
  40. }
  41. } else if (length != 0) {
  42. // 不可split,整块返回
  43. splits.add(new FileSplit(path, 0, length, blkLocations[0]
  44. .getHosts()));
  45. } else {
  46. // 对于长度为0的文件,创建空Hosts列表,返回
  47. splits.add(new FileSplit(path, 0, length, new String[0]));
  48. }
  49. }
  50. // 设置输入文件数量
  51. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  52. return splits;
  53. }
复制代码
在getSplits()方法中,我们提到了一个方法,listStatus(),我们先来看一下这个方法:
  1. protected List<FileStatus> listStatus(JobContext job) throws IOException {
  2. // 省略部分代码...
  3. List<PathFilter> filters = new ArrayList<PathFilter>();
  4. filters.add(hiddenFileFilter);
  5. PathFilter jobFilter = getInputPathFilter(job);
  6. if (jobFilter != null) {
  7. filters.add(jobFilter);
  8. }
  9. // 创建了一个MultiPathFilter,其内部包含了两个PathFilter
  10. // 一个为过滤隐藏文件的Filter,一个为用户自定义Filter(如果制定了)
  11. PathFilter inputFilter = new MultiPathFilter(filters);
  12. for (int i = 0; i < dirs.length; ++i) {
  13. Path p = dirs[i];
  14. FileSystem fs = p.getFileSystem(job.getConfiguration());
  15. FileStatus[] matches = fs.globStatus(p, inputFilter);
  16. if (matches == null) {
  17. errors.add(new IOException("Input path does not exist: " + p));
  18. } else if (matches.length == 0) {
  19. errors.add(new IOException("Input Pattern " + p
  20. + " matches 0 files"));
  21. } else {
  22. for (FileStatus globStat : matches) {
  23. if (globStat.isDir()) {
  24. for (FileStatus stat : fs.listStatus(
  25. globStat.getPath(), inputFilter)) {
  26. result.add(stat);
  27. }
  28. } else {
  29. result.add(globStat);
  30. }
  31. }
  32. }
  33. }
  34. // 省略部分代码
  35. }
  36. NLineInputFormat是一个很有意思的FileInputFormat的子类,有时间可以了解一下。
复制代码
5. PathFilter

PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。
  1. public interface PathFilter {
  2. /**
  3. * Tests whether or not the specified abstract pathname should be
  4. * included in a pathname list.
  5. *
  6. * @param path The abstract pathname to be tested
  7. * @return <code>true</code> if and only if <code>pathname</code>
  8. * should be included
  9. */
  10. boolean accept(Path path);
  11. }
复制代码
FileInputFormat类有hiddenFileFilter属性:
  1. private static final PathFilter hiddenFileFilter = new PathFilter() {
  2. public boolean accept(Path p) {
  3. String name = p.getName();
  4. return !name.startsWith("_") && !name.startsWith(".");
  5. }
  6. };
复制代码
hiddenFileFilter过滤掉隐藏文件。

FileInputFormat类还有一个内部类:
  1. private static class MultiPathFilter implements PathFilter {
  2. private List<PathFilter> filters;
  3. public MultiPathFilter(List<PathFilter> filters) {
  4. this.filters = filters;
  5. }
  6. public boolean accept(Path path) {
  7. for (PathFilter filter : filters) {
  8. if (!filter.accept(path)) {
  9. return false;
  10. }
  11. }
  12. return true;
  13. }
  14. }
复制代码
MultiPathFilter类类似于一个PathFilter代理,其内部有一个PathFilter list属性,只有符合其内部所有filter的路径,才被作为输入。在FileInputFormat类中,它被listStatus()方法调用,而listStatus()又被getSplits()方法调用来获取输入文件,也即实现了在获取输入分片前进行文件过滤。

至此,我们已经利用PathFilter过滤了文件,利用FileInputFormat 的getSplits方法,计算出了Mapreduce的Map的InputSplit。作业的输入分片有了,而这些分片,是怎么被Map读取的呢?

这由InputFormat中的另一个方法createRecordReader()来负责。FileInputFormat没有对于这个方法的实现,而是交给子类自行去实现它。

6. RecordReader

RecordReader将读入到Map的数据拆分成<key, value>对。RecordReader也是一个抽象类,下面我们通过源码看一下,RecordReader主要做哪些工作:
  1. public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
  2. /**
  3. * 由一个InputSplit初始化
  4. */
  5. public abstract void initialize(InputSplit split, TaskAttemptContext context)
  6. throws IOException, InterruptedException;
  7. /**
  8. * 顾名思义,读取分片下一个<key, value>对
  9. */
  10. public abstract boolean nextKeyValue() throws IOException,
  11. InterruptedException;
  12. /**
  13. * Get the current key
  14. */
  15. public abstract KEYIN getCurrentKey() throws IOException,
  16. InterruptedException;
  17. /**
  18. * Get the current value.
  19. */
  20. public abstract VALUEIN getCurrentValue() throws IOException,
  21. InterruptedException;
  22. /**
  23. * 跟踪读取分片的进度
  24. */
  25. public abstract float getProgress() throws IOException,
  26. InterruptedException;
  27. /**
  28. * Close the record reader.
  29. */
  30. public abstract void close() throws IOException;
  31. }
复制代码
从源码可以看出,一个RecordReader主要来完成这几项功能。接下来,通过一个具体的RecordReader实现类,来详细了解一下各功能的具体操作。
  1. public class LineRecordReader extends RecordReader<LongWritable, Text> {
  2. private CompressionCodecFactory compressionCodecs = null;
  3. private long start;
  4. private long pos;
  5. private long end;
  6. private LineReader in;
  7. private int maxLineLength;
  8. private LongWritable key = null;
  9. private Text value = null;
  10. // initialize函数即对LineRecordReader的一个初始化
  11. // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等
  12. public void initialize(InputSplit genericSplit, TaskAttemptContext context)
  13. throws IOException {
  14. FileSplit split = (FileSplit) genericSplit;
  15. Configuration job = context.getConfiguration();
  16. this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
  17. Integer.MAX_VALUE);
  18. start = split.getStart();
  19. end = start + split.getLength();
  20. final Path file = split.getPath();
  21. compressionCodecs = new CompressionCodecFactory(job);
  22. final CompressionCodec codec = compressionCodecs.getCodec(file);
  23. // 打开文件,并定位到分片读取的起始位置
  24. FileSystem fs = file.getFileSystem(job);
  25. FSDataInputStream fileIn = fs.open(split.getPath());
  26. boolean skipFirstLine = false;
  27. if (codec != null) {
  28. // 文件是压缩文件的话,直接打开文件
  29. in = new LineReader(codec.createInputStream(fileIn), job);
  30. end = Long.MAX_VALUE;
  31. } else {
  32. //
  33. if (start != 0) {
  34. skipFirstLine = true;
  35. --start;
  36. // 定位到偏移位置,下次读取就会从便宜位置开始
  37. fileIn.seek(start);
  38. }
  39. in = new LineReader(fileIn, job);
  40. }
  41. if (skipFirstLine) { // skip first line and re-establish "start".
  42. start += in.readLine(new Text(), 0,
  43. (int) Math.min((long) Integer.MAX_VALUE, end - start));
  44. }
  45. this.pos = start;
  46. }
  47. public boolean nextKeyValue() throws IOException {
  48. if (key == null) {
  49. key = new LongWritable();
  50. }
  51. key.set(pos);// key即为偏移量
  52. if (value == null) {
  53. value = new Text();
  54. }
  55. int newSize = 0;
  56. while (pos < end) {
  57. newSize = in.readLine(value, maxLineLength,
  58. Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
  59. maxLineLength));
  60. // 读取的数据长度为0,则说明已读完
  61. if (newSize == 0) {
  62. break;
  63. }
  64. pos += newSize;
  65. // 读取的数据长度小于最大行长度,也说明已读取完毕
  66. if (newSize < maxLineLength) {
  67. break;
  68. }
  69. // 执行到此处,说明该行数据没读完,继续读入
  70. }
  71. if (newSize == 0) {
  72. key = null;
  73. value = null;
  74. return false;
  75. } else {
  76. return true;
  77. }
  78. }
  79. // 省略了部分方法
  80. }
复制代码
数据从InputSplit分片中读出已经解决,但是RecordReader是如何被Mapreduce框架利用的呢?我们先看一下Mapper类

7. Mapper
  1. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  2. public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  3. public Context(Configuration conf, TaskAttemptID taskid,
  4. RecordReader<KEYIN, VALUEIN> reader,
  5. RecordWriter<KEYOUT, VALUEOUT> writer,
  6. OutputCommitter committer, StatusReporter reporter,
  7. InputSplit split) throws IOException, InterruptedException {
  8. super(conf, taskid, reader, writer, committer, reporter, split);
  9. }
  10. }
  11. /**
  12. * 预处理,仅在map task启动时运行一次
  13. */
  14. protected void setup(Context context) throws IOException,
  15. InterruptedException {
  16. }
  17. /**
  18. * 对于InputSplit中的每一对<key, value>都会运行一次
  19. */
  20. @SuppressWarnings("unchecked")
  21. protected void map(KEYIN key, VALUEIN value, Context context)
  22. throws IOException, InterruptedException {
  23. context.write((KEYOUT) key, (VALUEOUT) value);
  24. }
  25. /**
  26. * 扫尾工作,比如关闭流等
  27. */
  28. protected void cleanup(Context context) throws IOException,
  29. InterruptedException {
  30. }
  31. /**
  32. * map task的驱动器
  33. */
  34. public void run(Context context) throws IOException, InterruptedException {
  35. setup(context);
  36. while (context.nextKeyValue()) {
  37. map(context.getCurrentKey(), context.getCurrentValue(), context);
  38. }
  39. cleanup(context);
  40. }
  41. }
复制代码
重点看一下Mapper.class中的run()方法,它相当于map task的驱动。

  • run()方法首先调用setup()进行初始操作
  • 然后循环对每个从context.nextKeyValue()获取的“K-V对”调用map()函数进行处理
  • 最后调用cleanup()做最后的处理

事实上,content.nextKeyValue()就是使用了相应的RecordReader来获取“K-V对”。Mapper.class中的Context类,它继承自MapContext类,使用一个RecordReader进行构造。下面我们再看这个MapContext。
  1. public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
  2. TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  3. private RecordReader<KEYIN, VALUEIN> reader;
  4. private InputSplit split;
  5. public MapContext(Configuration conf, TaskAttemptID taskid,
  6. RecordReader<KEYIN, VALUEIN> reader,
  7. RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,
  8. StatusReporter reporter, InputSplit split) {
  9. super(conf, taskid, writer, committer, reporter);
  10. this.reader = reader;
  11. this.split = split;
  12. }
  13. /**
  14. * Get the input split for this map.
  15. */
  16. public InputSplit getInputSplit() {
  17. return split;
  18. }
  19. @Override
  20. public KEYIN getCurrentKey() throws IOException, InterruptedException {
  21. return reader.getCurrentKey();
  22. }
  23. @Override
  24. public VALUEIN getCurrentValue() throws IOException, InterruptedException {
  25. return reader.getCurrentValue();
  26. }
  27. @Override
  28. public boolean nextKeyValue() throws IOException, InterruptedException {
  29. return reader.nextKeyValue();
  30. }
  31. }
复制代码
从MapContent类中的方法可见,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其实都是对RecordReader方法的封装,即MapContext是直接使用传入的RecordReader来对InputSplit进行“K-V对”读取的。

至此,我们已经清楚的知道Mapreduce的输入文件是如何被过滤、读取、分片、读出“K-V对”,然后交给Mapper类来处理的。


回复

使用道具 举报

hyj 发表于 2014-3-4 18:30:42
摘取一些个人认为比较好的:




InputFormat抽象类仅有两个抽象方法:

  • List<InputSplit> getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
  • RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:

  • 验证作业输入的正确性
  • 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
  • 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条