在HDFS与Local本地间进行数据文件的copy
package com.it.filesystem;import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* filesystem 是hdfs 文件系统,LocalFilesystem 是本地文件系统,
* LocalFileSystem local = FileSystem.getLocal(conf);
* 对filesystem 进行读写操作 文件在本地与在hdsf文件系统之间进行copy
* 注意:在本地文件系统上(linux)创建文件,与在windows上创建文件一样,用FileOutputStream 创建文件,
* 本地读取文件用FileInputStream读取
* 而在hdfs文件系统上创建文件,用filesystem.create(path)进行创建,用filesystem.open(path)读取文件
* @author Administrator
*/
public class ReaderFile4Hdfs {
public static void main(String[] args) throws Exception {
CopyHFiles2Local();
//CopyHfile2Local(args);
//local2Hdfs();
//seekDemo();
//write4HdfsFile();
}
/**
* 将hdfs上的文件合并后copy到本地文件系统上
* @throws IOException
* @throws URISyntaxException
* @throws FileNotFoundException
*/
private static void CopyHFiles2Local() throws IOException,
URISyntaxException, FileNotFoundException {
String hdfs="hdfs://hadoop:9000/merg";
String local="/root/Downloads/file";
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(hdfs), conf);
FileOutputStream out = new FileOutputStream(local);
//获取路径hdfs下的文件集合
FileStatus[] listStatus = fileSystem.listStatus(new Path(hdfs));
for (FileStatus fs : listStatus) {
//fs.getPath()获取文件路径 读取文件
FSDataInputStream in = fileSystem.open(fs.getPath());
//将输入流输出到同一输出流中,进行文件的合并
IOUtils.copyBytes(in, out, 4096, false);
in.close();
}
out.close();
}
/**
* 将hdfs上的文件copy到本地文件中(单个文件)
* hadoop jar file.jar com.it.filesystem.ReaderFile4Hdfs hdfs://hadoop:9000/web.log /root/Downloads/file
* @param args
* @throws IOException
* @throws FileNotFoundException
*/
private static void CopyHfile2Local(String[] args) throws IOException,
FileNotFoundException {
String hdfs=args;
String local=args;
// String file="hdfs://hadoop:9000/web.log";//hdfs文件 地址
Configuration config=new Configuration();
FileSystem fs=FileSystem.get(URI.create(hdfs),config);//构建FileSystem
InputStream is=fs.open(new Path(hdfs));//读取文件
IOUtils.copyBytes(is, new FileOutputStream(new File(local)),4096, true);
//保存到本地最后 关闭输入输出流
IOUtils.closeStream(is);
}
/**
* 将本地文件读取到hdfs上
* @throws IOException
* @throws FileNotFoundException
*/
private static void local2Hdfs() throws IOException, FileNotFoundException {
String path="/root/install.log.syslog";
String des="hdfs://hadoop:9000/syslog";
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(des), conf);
InputStream in = new FileInputStream(path);
//FSDataOutputStream out = fileSystem.create(new Path(des));
////process表示记录是否已经被处理过
OutputStream out=fileSystem.create(new Path(des), new Progressable() {
@Override
public void progress() {
System.out.println(".");
}
});
IOUtils.copyBytes(in, out, 4096, false);
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
private static void seekDemo() throws IOException, URISyntaxException {
String path="hdfs://hadoop:9000/http.dat";
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(path), conf);
FSDataInputStream in=null;
try {
in = fileSystem.open(new Path(path));
IOUtils.copyBytes(in, System.out, conf, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, conf, false);
} catch (Exception e) {
} finally{
IOUtils.closeStream(in);
}
}
//读取hdfs 上的文件内容
private static void write4HdfsFile() throws IOException,
URISyntaxException, Exception {
String path="hdfs://hadoop:9000/http.dat";
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(path), conf);
InputStream in=null;
try {
//读取指定路径文件
in = fileSystem.open(new Path(path));
//将输入流in中的内容写到输出流out中
IOUtils.copyBytes(in, System.out, conf, false);
} catch (Exception e) {
throw new Exception(e);
}finally{
IOUtils.closeStream(in);
}
}
}
页:
[1]