package com.it.mapreduce;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFileDemo {
private static String[] data={"one,two,buckly my shoe","three,four,shut the door",
"five,six,pick up sticks","seven,eight,lay the straight"};
public static void main(String[] args) throws Exception {
SmallFile2SequFile(args);
//SequenceReader(args);
//SequenceWriter(args);
}
/**
* 利用sequenceFile打包多个小文件,MapFile是sequenceFile的排序形式,程序如下:
* @param args
* @throws IOException
*/
private static void SmallFile2SequFile(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] files = fs.listStatus(new Path(args[0]));
Text key = new Text();
Text value = new Text();
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(args[1]),key.getClass() , value.getClass());
InputStream in = null;
byte[] buffer = null;
for(int i=0;i<files.length;i++){
key.set(files[i].getPath().getName());
//读取file文件
in = fs.open(files[i].getPath());
buffer = new byte[(int) files[i].getLen()];
//将读取道德内容填充到数组buffer中
IOUtils.readFully(in, buffer, 0, buffer.length);
value.set(buffer);
IOUtils.closeStream(in);
// System.out.println(key.toString()+"\n"+value.toString());
writer.append(key, value);
}
IOUtils.closeStream(writer);
}
/**
* SequenceFile 读操作
* @param args
* @throws IOException
* @throws URISyntaxException
*/
private static void SequenceReader(String[] args) throws IOException,
URISyntaxException {
String path=args[0];
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(path), conf);
Path path2 = new Path(path);
SequenceFile.Reader reader=null;
try {
reader=new SequenceFile.Reader(fileSystem, path2, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while(reader.next(key, value)){
//设置同步点
String syncSeen=reader.syncSeen()?"*":" ";
System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,value);
position=reader.getPosition();
}
} catch (Exception e) {
}finally{
IOUtils.closeStream(reader);
}
}
/**
* SequenceFile 写操作
* @param args
* @throws IOException
* @throws URISyntaxException
*/
private static void SequenceWriter(String[] args) throws IOException,
URISyntaxException {
//指的是hdfs上的路径
String path=args[0];
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(path), conf);
//定义IntWritable key,Text value 两个变量
IntWritable key = new IntWritable();
Text value = new Text();
//定义输出流(SequenceFile)
SequenceFile.Writer writer=null;
try {
//创建输出流writer
writer = SequenceFile.createWriter(fileSystem, conf, new Path(path), key.getClass(), value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100-i);
value.set(data[i%data.length]);
System.out.printf("[%s]\t%s\t%s\n",writer.getLength(),key,value);
writer.append(key, value);
}
} catch (Exception e) {
}finally{
IOUtils.closeStream(writer);
}
}
}
|
|