问题导读
1、SequenceFile的优缺点是什么?
2、SequenceFile的压缩基于CompressType是如何实现读写文件?
3、SequenceFile文件的数据组成形式是什么?用代码实现关键字段。
Hadoop序列化文件SequenceFile可以用于解决大量小文件(所谓小文件:泛指小于black大小的文件)问题,SequenceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key,value>对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。
hadoop Archive也是一个高效地将小文件放入HDFS块中的文件存档文件格式,详情请看:hadoop Archive
但是SequenceFile文件不能追加写入,适用于一次性写入大量小文件的操作。
SequenceFile的压缩基于CompressType,请看源码:
01. /**
02. * The compression type used to compress key/value pairs in the
03. * {@link SequenceFile}.
04. * @see SequenceFile.Writer
05. */
06.public static enum CompressionType {
07. /** Do not compress records. */
08. NONE, //不压缩
09. /** Compress values only, each separately. */
10. RECORD, //只压缩values
11. /** Compress sequences of records together in blocks. */
12. BLOCK //压缩很多记录的key/value组成块
13.} |
SequenceFile读写示例:
01.import java.io.IOException;
02.
03.import org.apache.hadoop.conf.Configuration;
04.import org.apache.hadoop.fs.Path;
05.import org.apache.hadoop.io.IOUtils;
06.import org.apache.hadoop.io.IntWritable;
07.import org.apache.hadoop.io.SequenceFile;
08.import org.apache.hadoop.io.SequenceFile.CompressionType;
09.import org.apache.hadoop.io.SequenceFile.Reader;
10.import org.apache.hadoop.io.SequenceFile.Writer;
11.import org.apache.hadoop.io.Text;
12.
13./**
14. * @version 1.0
15. * @author Fish
16. */
17.public class SequenceFileWriteDemo {
18. private static final String[] DATA = { "fish1", "fish2", "fish3", "fish4" };
19.
20. public static void main(String[] args) throws IOException {
21. /**
22. * 写SequenceFile
23. */
24. String uri = "/test/fish/seq.txt";
25. Configuration conf = new Configuration();
26. Path path = new Path(uri);
27. IntWritable key = new IntWritable();
28. Text value = new Text();
29. Writer writer = null;
30. try {
31. /**
32. * CompressionType.NONE 不压缩<br>
33. * CompressionType.RECORD 只压缩value<br>
34. * CompressionType.BLOCK 压缩很多记录的key/value组成块
35. */
36. writer = SequenceFile.createWriter(conf, Writer.file(path), Writer.keyClass(key.getClass()),
37. Writer.valueClass(value.getClass()), Writer.compression(CompressionType.BLOCK));
38.
39. for (int i = 0; i < 4; i++) {
40. value.set(DATA);
41. key.set(i);
42. System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
43. writer.append(key, value);
44.
45. }
46. } finally {
47. IOUtils.closeStream(writer);
48. }
49.
50. /**
51. * 读SequenceFile
52. */
53. SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path));
54. IntWritable key1 = new IntWritable();
55. Text value1 = new Text();
56. while (reader.next(key1, value1)) {
57. System.out.println(key1 + "----" + value1);
58. }
59. IOUtils.closeStream(reader);// 关闭read流
60.
61. /**
62. * 用于排序
63. */
64.// SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, comparator, IntWritable.class, Text.class, conf);
65. }
66.} |
以上程序执行多次,并不会出现数据append的情况,每次都是重新创建一个文件,且文件中仅仅只有四条数据。究其原因,可以查看SequenceFile.Writer类的构造方法源码:
01.out = fs.create(p, true, bufferSize, replication, blockSize, progress); | 第二个参数为true,表示每次覆盖同名文件,如果为false会抛出异常。这样设计的目的可能是和HDFS一次写入多次读取有关,不提倡追加现有文件,所以构造方法写死了true。
SequenceFile文件的数据组成形式:
一、Header
写入头部的源码:
01./** Write and flush the file header. */
02.private void writeFileHeader()
03. throws IOException {
04. out.write(VERSION);//版本号
05. Text.writeString(out, keyClass.getName());//key的Class
06. Text.writeString(out, valClass.getName());//val的Class
07.
08. out.writeBoolean(this.isCompressed());//是否压缩
09. out.writeBoolean(this.isBlockCompressed());//是否是CompressionType.BLOCK类型的压缩
10.
11. if (this.isCompressed()) {
12. Text.writeString(out, (codec.getClass()).getName());//压缩类的名称
13. }
14. this.metadata.write(out);//写入metadata
15. out.write(sync); // write the sync bytes
16. out.flush(); // flush header
17.} |
版本号:
01.private static byte[] VERSION = new byte[] {
02. (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
03.}; |
同步标识符的生成方式:
01.byte[] sync; // 16 random bytes
02.{
03. try {
04. MessageDigest digester = MessageDigest.getInstance("MD5");
05. long time = Time.now();
06. digester.update((new UID()+"@"+time).getBytes());
07. sync = digester.digest();
08. } catch (Exception e) {
09. throw new RuntimeException(e);
10. }
11.} |
二、Record
Writer有三个实现类,分别对应CompressType的NONE,RECOR,BLOCK。下面逐一介绍一下(结合上面的图看):
1、NONE SequenceFile
Record直接存Record 的长度,KEY的长度,key值,Value的值
2、 BlockCompressWriter
01./** Append a key/value pair. */
02. @Override
03. @SuppressWarnings("unchecked")
04. public synchronized void append(Object key, Object val)
05. throws IOException {
06. if (key.getClass() != keyClass)
07. throw new IOException("wrong key class: "+key+" is not "+keyClass);
08. if (val.getClass() != valClass)
09. throw new IOException("wrong value class: "+val+" is not "+valClass);
10.
11. // Save key/value into respective buffers
12. int oldKeyLength = keyBuffer.getLength();
13. keySerializer.serialize(key);
14. int keyLength = keyBuffer.getLength() - oldKeyLength;
15. if (keyLength < 0)
16. throw new IOException("negative length keys not allowed: " + key);
17. WritableUtils.writeVInt(keyLenBuffer, keyLength);//每调一次,都会累加keyLength
18.
19. int oldValLength = valBuffer.getLength();
20. uncompressedValSerializer.serialize(val);
21. int valLength = valBuffer.getLength() - oldValLength;
22. WritableUtils.writeVInt(valLenBuffer, valLength);//每调一次,都会累加valLength
23. // Added another key/value pair
24. ++noBufferedRecords;
25.
26. // Compress and flush?
27. int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
28. if (currentBlockSize >= compressionBlockSize) {
29. //compressionBlockSize = conf.getInt("io.seqfile.compress.blocksize", 1000000);
30. //超过1000000就会写一个Sync
31. sync();
32. } |
超过compressionBlockSize的大小,就会调用sync()方法,下面看看sync的源码(和上面的图对照):
会写入和图中所画的各个数据项。
01./** Compress and flush contents to dfs */
02. @Override
03. public synchronized void sync() throws IOException {
04. if (noBufferedRecords > 0) {
05. super.sync();
06.
07. // No. of records
08. WritableUtils.writeVInt(out, noBufferedRecords);
09.
10. // Write 'keys' and lengths
11. writeBuffer(keyLenBuffer);
12. writeBuffer(keyBuffer);
13.
14. // Write 'values' and lengths
15. writeBuffer(valLenBuffer);
16. writeBuffer(valBuffer);
17.
18. // Flush the file-stream
19. out.flush();
20.
21. // Reset internal states
22. keyLenBuffer.reset();
23. keyBuffer.reset();
24. valLenBuffer.reset();
25. valBuffer.reset();
26. noBufferedRecords = 0;
27. }
28.
29. } |
RecordCompressWriter
01./** Append a key/value pair. */
02. @Override
03. @SuppressWarnings("unchecked")
04. public synchronized void append(Object key, Object val)
05. throws IOException {
06. if (key.getClass() != keyClass)
07. throw new IOException("wrong key class: "+key.getClass().getName()
08. +" is not "+keyClass);
09. if (val.getClass() != valClass)
10. throw new IOException("wrong value class: "+val.getClass().getName()
11. +" is not "+valClass);
12.
13. buffer.reset();
14.
15. // Append the 'key'
16. keySerializer.serialize(key);
17. int keyLength = buffer.getLength();
18. if (keyLength < 0)
19. throw new IOException("negative length keys not allowed: " + key);
20.
21. // Compress 'value' and append it
22. deflateFilter.resetState();
23. compressedValSerializer.serialize(val);
24. deflateOut.flush();
25. deflateFilter.finish();
26.
27. // Write the record out
28. checkAndWriteSync(); // sync
29. out.writeInt(buffer.getLength()); // total record length record的长度
30. out.writeInt(keyLength); // key portion length key的长度
31. out.write(buffer.getData(), 0, buffer.getLength()); // data 数据
32. } |
写入Sync:
01.synchronized void checkAndWriteSync() throws IOException {
02. if (sync != null &&
03. out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
04. sync();
05. }
06. } |
SYNC_INTERVAL的定义:
01.private static final int SYNC_ESCAPE = -1; // "length" of sync entries
02.private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
03.private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
04.
05./** The number of bytes between sync points.*/
06.public static final int SYNC_INTERVAL = 100*SYNC_SIZE; | 每2000个byte,就会写一个Sync。
总结:
Record:存储SequenceFile通用的KV数据格式,Key和Value都是二进制变长的数据。Record表示Key和Value的byte的总和。
Sync:主要是用来扫描和恢复数据的,以至于读取数据的Reader不会迷失。
Header:存储了如下信息:文件标识符SEQ,key和value的格式说明,以及压缩的相关信息,metadata等信息。
metadata:包含文件头所需要的数据:文件标识、Sync标识、数据格式说明(含压缩)、文件元数据(时间、owner、权限等)、检验信息等。
|
|