小文件与CombineFileInputFormat,相对于大批量的小文件,hadoop更适合处理少量的小文件。原因是FileInputFormat 生成的InputSplit是一个文件或者该文件的一部分。如果文件很小(比HDFS的块要小很多),并且文件数据很多,那么每次map任务只处理很少的输入数据,每次操作都会造成额外的开销。请比较分割成16个64MB的1GB的一个文件与100KB的10000个文件。10000个文件每个都需要使用一个map操作,作业时间比一个文件上的16个map操作慢上几十甚至几百倍。CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的,CombineFileInputFormat把多个文件打包到一个分片以便每个mapper可以处理更多的数据。CombineFileInputFormat是一个抽象类,没有提供实体类,所以使用的时候需要一些额外的工作。需要创建CombineFileInputFormat的具体子类,在旧的API实现getRecordReader()方法,新的API中实现createRecordReader()方法。下面的例子是实现新的API createRecordReader()的。
FileLineWritable 类:
- package com.duplicate.self;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
-
- public class FileLineWritable implements WritableComparable<FileLineWritable> {
-
- public long offset;
- public String fileName;
-
- public void readFields(DataInput in) throws IOException {
- this.offset = in.readLong();
- this.fileName = Text.readString(in);
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.offset);
- Text.writeString(out,this.fileName);
- }
-
- public int compareTo(FileLineWritable that) {
- int cmp = this.fileName.compareTo(that.fileName);
- if(cmp != 0){
- return cmp;
- }
- return (int)Math.signum((double)(this.offset - that.offset));
- }
-
- @Override
- public int hashCode(){
- final int prime = 31;
- int result = 1;
- result = prime * result + ((fileName == null) ? 0:this.fileName.hashCode());
- result = prime * result + (int)(this.offset ^ (this.offset >>> 2));
- return result;
- }
-
- @Override
- public boolean equals(Object obj){
- if(this == obj){
- return true;
- }
-
- if(obj == null){
- return false;
- }
-
- if(this.getClass() != obj.getClass()){
- return false;
- }
-
- FileLineWritable other = (FileLineWritable)obj;
- if(this.fileName == null){
- if(other.fileName != null){
- return false;
- }
- }else if(!this.fileName.equals(other.fileName)){
- return false;
- }
-
- if(this.offset != other.offset){
- return false;
- }
-
- return true;
- }
-
- }
复制代码
MyRecordReader 类的- package com.duplicate.self;
-
- import java.io.IOException;
-
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
- import org.apache.hadoop.util.LineReader;
-
- public class MyRecordReader extends RecordReader<FileLineWritable, Text> {
-
- private long startOffset;
- private long end;
- private long pos;
- private FileSystem fs;
- private Path path;
- private FileLineWritable key;
- private Text value;
-
- private FSDataInputStream fileIn;
- private LineReader reader;
-
- public MyRecordReader(CombineFileSplit split,TaskAttemptContext context,Integer index) throws IOException{
- this.path = split.getPath(index);
- this.fs = this.path.getFileSystem(context.getConfiguration());
- this.startOffset = split.getOffset(index);
- this.end = this.startOffset + split.getLength();
-
- fileIn = this.fs.open(this.path);
- reader = new LineReader(fileIn);
- this.pos = this.startOffset;
- }
-
- @Override
- public void close() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public FileLineWritable getCurrentKey() throws IOException,
- InterruptedException {
- // TODO Auto-generated method stub
- return key;
- }
-
- @Override
- public Text getCurrentValue() throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- if(this.startOffset == this.end){
- return 0;
- }
-
- return Math.min(1.0f, (this.pos - this.startOffset)/(float)(this.end - this.startOffset));
- }
-
- @Override
- public void initialize(InputSplit arg0, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if(key == null){
- key = new FileLineWritable();
- key.fileName = path.getName();
- }
-
- key.offset = pos;
- if(null == value){
- value = new Text();
- }
-
- int newSize = 0;
- if(pos < end){
- newSize = reader.readLine(value);
- pos += newSize;
- }
-
- if(newSize == 0){
- key = null;
- value = null;
- return false;
- }else{
- return true;
- }
- }
-
- }
复制代码
MyCombineFileInputFormat 类:- package com.duplicate.self;
-
- import java.io.IOException;
-
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
- public class MyCombineFileInputFormat extends CombineFileInputFormat<FileLineWritable, Text> {
-
- @Override
- public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException {
- return new CombineFileRecordReader<FileLineWritable,Text>((CombineFileSplit)split,context,MyRecordReader.class);
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path file){
- return false;
- }
-
- }
复制代码
job配置- import org.apache.hadoop.mapreduce.Job;
- // standard hadoop conf
- Job job = new Job(getConf());
- job.setInputFormatClass(MyCombineFileInputFormat .class);
- job.setMapperClass(Mapper.class);
- job.setNumReduceTasks(0); // map only
复制代码
来自群组: hadoop技术组 |