分享

在HDFS与Local本地间进行数据文件的copy

xiaobaiyang 发表于 2015-6-11 23:09:13 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 20578
package com.it.filesystem;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

/**
* filesystem 是hdfs 文件系统,LocalFilesystem 是本地文件系统,
* LocalFileSystem local = FileSystem.getLocal(conf);
* 对filesystem 进行读写操作 文件在本地与在hdsf文件系统之间进行copy
* 注意:在本地文件系统上(linux)创建文件,与在windows上创建文件一样,用FileOutputStream 创建文件,
*      本地读取文件用FileInputStream读取
* 而在hdfs文件系统上创建文件,用filesystem.create(path)进行创建,用filesystem.open(path)读取文件
* @author Administrator
*/
public class ReaderFile4Hdfs {
    public static void main(String[] args) throws Exception {
        
        CopyHFiles2Local();
        
        //CopyHfile2Local(args);
        //local2Hdfs();
        //seekDemo();
        //write4HdfsFile();
    }

    /**
     * 将hdfs上的文件合并后copy到本地文件系统上
     * @throws IOException
     * @throws URISyntaxException
     * @throws FileNotFoundException
     */
    private static void CopyHFiles2Local() throws IOException,
            URISyntaxException, FileNotFoundException {
        String hdfs="hdfs://hadoop:9000/merg";
        String local="/root/Downloads/file";
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI(hdfs), conf);
        
        FileOutputStream out = new FileOutputStream(local);
        //获取路径hdfs下的文件集合
        FileStatus[] listStatus = fileSystem.listStatus(new Path(hdfs));
        for (FileStatus fs : listStatus) {
            //fs.getPath()  获取文件路径     读取文件
            FSDataInputStream in = fileSystem.open(fs.getPath());
            
            //将输入流输出到同一输出流中,进行文件的合并
            IOUtils.copyBytes(in, out, 4096, false);
            in.close();
        }
        out.close();
    }

    /**
     * 将hdfs上的文件copy到本地文件中(单个文件)
     * hadoop jar file.jar com.it.filesystem.ReaderFile4Hdfs hdfs://hadoop:9000/web.log /root/Downloads/file
     * @param args
     * @throws IOException
     * @throws FileNotFoundException
     */
    private static void CopyHfile2Local(String[] args) throws IOException,
            FileNotFoundException {
        String hdfs=args[0];
        String local=args[1];
        // String file="hdfs://hadoop:9000/web.log";//hdfs文件 地址

        Configuration config=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(hdfs),config);//构建FileSystem
        InputStream is=fs.open(new Path(hdfs));//读取文件
        IOUtils.copyBytes(is, new FileOutputStream(new File(local)),4096, true);
        //保存到本地  最后 关闭输入输出流
        IOUtils.closeStream(is);
    }

    /**
     * 将本地文件读取到hdfs上
     * @throws IOException
     * @throws FileNotFoundException
     */
    private static void local2Hdfs() throws IOException, FileNotFoundException {
        String path="/root/install.log.syslog";
        String des="hdfs://hadoop:9000/syslog";
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(URI.create(des), conf);
        
        InputStream in = new FileInputStream(path);
        
        //FSDataOutputStream out = fileSystem.create(new Path(des));
        ////process表示记录是否已经被处理过
        OutputStream out=fileSystem.create(new Path(des), new Progressable() {
            @Override
            public void progress() {
                System.out.println(".");
            }
        });
        
        IOUtils.copyBytes(in, out, 4096, false);
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
    }

    private static void seekDemo() throws IOException, URISyntaxException {
        String path="hdfs://hadoop:9000/http.dat";
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI(path), conf);
        FSDataInputStream in=null;
        try {
            in = fileSystem.open(new Path(path));
            IOUtils.copyBytes(in, System.out, conf, false);
            in.seek(0);
            IOUtils.copyBytes(in, System.out, conf, false);
        } catch (Exception e) {
            
        } finally{
            IOUtils.closeStream(in);
        }
    }

    //读取hdfs 上的文件内容
    private static void write4HdfsFile() throws IOException,
            URISyntaxException, Exception {
        String path="hdfs://hadoop:9000/http.dat";
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI(path), conf);
        
        InputStream in=null;
        try {
            //读取指定路径文件
            in = fileSystem.open(new Path(path));
            //将输入流in中的内容写到输出流out中
            IOUtils.copyBytes(in, System.out, conf, false);
        } catch (Exception e) {
            throw new Exception(e);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条