xiaobaiyang 发表于 2015-6-11 23:09:13

在HDFS与Local本地间进行数据文件的copy

package com.it.filesystem;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

/**
* filesystem 是hdfs 文件系统,LocalFilesystem 是本地文件系统,
* LocalFileSystem local = FileSystem.getLocal(conf);
* 对filesystem 进行读写操作 文件在本地与在hdsf文件系统之间进行copy
* 注意:在本地文件系统上(linux)创建文件,与在windows上创建文件一样,用FileOutputStream 创建文件,
*      本地读取文件用FileInputStream读取
* 而在hdfs文件系统上创建文件,用filesystem.create(path)进行创建,用filesystem.open(path)读取文件
* @author Administrator
*/
public class ReaderFile4Hdfs {
    public static void main(String[] args) throws Exception {
      
      CopyHFiles2Local();
      
      //CopyHfile2Local(args);
      //local2Hdfs();
      //seekDemo();
      //write4HdfsFile();
    }

    /**
   * 将hdfs上的文件合并后copy到本地文件系统上
   * @throws IOException
   * @throws URISyntaxException
   * @throws FileNotFoundException
   */
    private static void CopyHFiles2Local() throws IOException,
            URISyntaxException, FileNotFoundException {
      String hdfs="hdfs://hadoop:9000/merg";
      String local="/root/Downloads/file";
      Configuration conf = new Configuration();
      FileSystem fileSystem = FileSystem.get(new URI(hdfs), conf);
      
      FileOutputStream out = new FileOutputStream(local);
      //获取路径hdfs下的文件集合
      FileStatus[] listStatus = fileSystem.listStatus(new Path(hdfs));
      for (FileStatus fs : listStatus) {
            //fs.getPath()获取文件路径   读取文件
            FSDataInputStream in = fileSystem.open(fs.getPath());
            
            //将输入流输出到同一输出流中,进行文件的合并
            IOUtils.copyBytes(in, out, 4096, false);
            in.close();
      }
      out.close();
    }

    /**
   * 将hdfs上的文件copy到本地文件中(单个文件)
   * hadoop jar file.jar com.it.filesystem.ReaderFile4Hdfs hdfs://hadoop:9000/web.log /root/Downloads/file
   * @param args
   * @throws IOException
   * @throws FileNotFoundException
   */
    private static void CopyHfile2Local(String[] args) throws IOException,
            FileNotFoundException {
      String hdfs=args;
      String local=args;
      // String file="hdfs://hadoop:9000/web.log";//hdfs文件 地址

      Configuration config=new Configuration();
      FileSystem fs=FileSystem.get(URI.create(hdfs),config);//构建FileSystem
      InputStream is=fs.open(new Path(hdfs));//读取文件
      IOUtils.copyBytes(is, new FileOutputStream(new File(local)),4096, true);
      //保存到本地最后 关闭输入输出流
      IOUtils.closeStream(is);
    }

    /**
   * 将本地文件读取到hdfs上
   * @throws IOException
   * @throws FileNotFoundException
   */
    private static void local2Hdfs() throws IOException, FileNotFoundException {
      String path="/root/install.log.syslog";
      String des="hdfs://hadoop:9000/syslog";
      Configuration conf = new Configuration();
      FileSystem fileSystem = FileSystem.get(URI.create(des), conf);
      
      InputStream in = new FileInputStream(path);
      
      //FSDataOutputStream out = fileSystem.create(new Path(des));
      ////process表示记录是否已经被处理过
      OutputStream out=fileSystem.create(new Path(des), new Progressable() {
            @Override
            public void progress() {
                System.out.println(".");
            }
      });
      
      IOUtils.copyBytes(in, out, 4096, false);
      IOUtils.closeStream(out);
      IOUtils.closeStream(in);
    }

    private static void seekDemo() throws IOException, URISyntaxException {
      String path="hdfs://hadoop:9000/http.dat";
      Configuration conf = new Configuration();
      FileSystem fileSystem = FileSystem.get(new URI(path), conf);
      FSDataInputStream in=null;
      try {
            in = fileSystem.open(new Path(path));
            IOUtils.copyBytes(in, System.out, conf, false);
            in.seek(0);
            IOUtils.copyBytes(in, System.out, conf, false);
      } catch (Exception e) {
            
      } finally{
            IOUtils.closeStream(in);
      }
    }

    //读取hdfs 上的文件内容
    private static void write4HdfsFile() throws IOException,
            URISyntaxException, Exception {
      String path="hdfs://hadoop:9000/http.dat";
      Configuration conf = new Configuration();
      FileSystem fileSystem = FileSystem.get(new URI(path), conf);
      
      InputStream in=null;
      try {
            //读取指定路径文件
            in = fileSystem.open(new Path(path));
            //将输入流in中的内容写到输出流out中
            IOUtils.copyBytes(in, System.out, conf, false);
      } catch (Exception e) {
            throw new Exception(e);
      }finally{
            IOUtils.closeStream(in);
      }
    }
}


页: [1]
查看完整版本: 在HDFS与Local本地间进行数据文件的copy