分享

小文件合并成SequenceFile文件

xiaobaiyang 发表于 2015-6-11 23:11:28 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 34457
package com.it.mapreduce;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileDemo {
    private static String[] data={"one,two,buckly my shoe","three,four,shut the door",
            "five,six,pick up sticks","seven,eight,lay the straight"};
   
    public static void main(String[] args) throws Exception {
        
        SmallFile2SequFile(args);  
        
        //SequenceReader(args);
        
        //SequenceWriter(args);
    }

    /**
     * 利用sequenceFile打包多个小文件,MapFile是sequenceFile的排序形式,程序如下:
     * @param args
     * @throws IOException
     */
    private static void SmallFile2SequFile(String[] args) throws IOException {
        
        Configuration conf = new Configuration();  
        FileSystem fs = FileSystem.get(conf);  
        FileStatus[] files = fs.listStatus(new Path(args[0]));  
         
        Text key = new Text();  
        Text value = new Text();  
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(args[1]),key.getClass() , value.getClass());  
        InputStream in = null;  
        byte[] buffer = null;  
         
        for(int i=0;i<files.length;i++){  
            key.set(files[i].getPath().getName());  
            //读取file文件
            in = fs.open(files[i].getPath());
            buffer = new byte[(int) files[i].getLen()];  
            //将读取道德内容填充到数组buffer中
            IOUtils.readFully(in, buffer, 0, buffer.length);  
            value.set(buffer);  
            IOUtils.closeStream(in);  
           // System.out.println(key.toString()+"\n"+value.toString());  
            writer.append(key, value);  
        }     
        IOUtils.closeStream(writer);
    }

    /**
     * SequenceFile 读操作
     * @param args
     * @throws IOException
     * @throws URISyntaxException
     */
    private static void SequenceReader(String[] args) throws IOException,
            URISyntaxException {
        String path=args[0];
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI(path), conf);
        Path path2 = new Path(path);
        SequenceFile.Reader reader=null;
        try {
            reader=new SequenceFile.Reader(fileSystem, path2, conf);
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            
            long position = reader.getPosition();
            while(reader.next(key, value)){
                //设置同步点
                String syncSeen=reader.syncSeen()?"*":" ";
                System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,value);
                position=reader.getPosition();
               
            }
        } catch (Exception e) {
        }finally{
            IOUtils.closeStream(reader);
        }
    }

    /**
     * SequenceFile 写操作
     * @param args
     * @throws IOException
     * @throws URISyntaxException
     */
    private static void SequenceWriter(String[] args) throws IOException,
            URISyntaxException {
        //指的是hdfs上的路径
        String path=args[0];
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI(path), conf);
        //定义IntWritable key,Text value 两个变量
        IntWritable key = new IntWritable();
        Text value = new Text();
        //定义输出流(SequenceFile)
        SequenceFile.Writer writer=null;
        try {
            //创建输出流writer
            writer = SequenceFile.createWriter(fileSystem, conf, new Path(path), key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100-i);
                value.set(data[i%data.length]);
                System.out.printf("[%s]\t%s\t%s\n",writer.getLength(),key,value);
                writer.append(key, value);
            }
        } catch (Exception e) {
        }finally{
            IOUtils.closeStream(writer);
        }
    }
}


已有(1)人评论

跳转到指定楼层
wangzhenqiang 发表于 2016-1-20 18:42:56
考下来试试
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条