xiaobaiyang 发表于 2015-6-11 23:11:28

小文件合并成SequenceFile文件

package com.it.mapreduce;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileDemo {
    private static String[] data={"one,two,buckly my shoe","three,four,shut the door",
            "five,six,pick up sticks","seven,eight,lay the straight"};
   
    public static void main(String[] args) throws Exception {
      
      SmallFile2SequFile(args);
      
      //SequenceReader(args);
      
      //SequenceWriter(args);
    }

    /**
   * 利用sequenceFile打包多个小文件,MapFile是sequenceFile的排序形式,程序如下:
   * @param args
   * @throws IOException
   */
    private static void SmallFile2SequFile(String[] args) throws IOException {
      
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] files = fs.listStatus(new Path(args));
         
      Text key = new Text();
      Text value = new Text();
      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(args),key.getClass() , value.getClass());
      InputStream in = null;
      byte[] buffer = null;
         
      for(int i=0;i<files.length;i++){
            key.set(files.getPath().getName());
            //读取file文件
            in = fs.open(files.getPath());
            buffer = new byte[(int) files.getLen()];
            //将读取道德内容填充到数组buffer中
            IOUtils.readFully(in, buffer, 0, buffer.length);
            value.set(buffer);
            IOUtils.closeStream(in);
         // System.out.println(key.toString()+"\n"+value.toString());
            writer.append(key, value);
      }   
      IOUtils.closeStream(writer);
    }

    /**
   * SequenceFile 读操作
   * @param args
   * @throws IOException
   * @throws URISyntaxException
   */
    private static void SequenceReader(String[] args) throws IOException,
            URISyntaxException {
      String path=args;
      Configuration conf = new Configuration();
      FileSystem fileSystem = FileSystem.get(new URI(path), conf);
      Path path2 = new Path(path);
      SequenceFile.Reader reader=null;
      try {
            reader=new SequenceFile.Reader(fileSystem, path2, conf);
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            
            long position = reader.getPosition();
            while(reader.next(key, value)){
                //设置同步点
                String syncSeen=reader.syncSeen()?"*":" ";
                System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,value);
                position=reader.getPosition();
               
            }
      } catch (Exception e) {
      }finally{
            IOUtils.closeStream(reader);
      }
    }

    /**
   * SequenceFile 写操作
   * @param args
   * @throws IOException
   * @throws URISyntaxException
   */
    private static void SequenceWriter(String[] args) throws IOException,
            URISyntaxException {
      //指的是hdfs上的路径
      String path=args;
      Configuration conf = new Configuration();
      FileSystem fileSystem = FileSystem.get(new URI(path), conf);
      //定义IntWritable key,Text value 两个变量
      IntWritable key = new IntWritable();
      Text value = new Text();
      //定义输出流(SequenceFile)
      SequenceFile.Writer writer=null;
      try {
            //创建输出流writer
            writer = SequenceFile.createWriter(fileSystem, conf, new Path(path), key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100-i);
                value.set(data);
                System.out.printf("[%s]\t%s\t%s\n",writer.getLength(),key,value);
                writer.append(key, value);
            }
      } catch (Exception e) {
      }finally{
            IOUtils.closeStream(writer);
      }
    }
}


wangzhenqiang 发表于 2016-1-20 18:42:56

考下来试试
页: [1]
查看完整版本: 小文件合并成SequenceFile文件