Hadoop-Hdfs Storage源码
问题导读
1.Storage物理文件夹和文件与类的对应关系是什么?
2.StorageInfo完成了什么功能?
3.哪个方法实现了把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION?
static/image/hrline/4.gif
一.物理文件夹和文件与类的对应关系
http://dl.iteye.com/upload/picture/pic/125835/c7946489-aebb-337e-bc5e-721568a9fb8d.jpg
[*] Storage:存储信息文件的集合。由一系列StorageDirectory组成,这些StorageDirectory的VERSION相同。Storage对应着Hdfs的dfs.data.dir或dfs.name.dir目录,默认为%hadoop_home%/dfs/data或%hadoop_home%/dfs/name。DataNode可以配置多个dfs.data.dir,如dfs/data,dfs/data2,此时就有两个Sorage。
[*]StorageDirectory:一个存储信息文件。代表Storage下涉及到升级方面的文件,它们共享一个in_use.lock(图上未标出这层关系)。
[*]StorageInfo:存储信息文件的父类。
二.类图http://dl.iteye.com/upload/picture/pic/125923/97495aeb-4f22-359b-98e7-9a9b50f62b50.jpg
三.StorageInfo
public class StorageInfo {
//Hadoop版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致
//参考FSConstants.LAYOUT_VERSION=-32的定义:
// Version is reflected in the dfs image and edit log files. Version代表image和editlos存储文件的结构
// Version is reflected in the data storage file. Version代表data存储文件的结构
// Versions are negative. Versions是负数。
// Decrement LAYOUT_VERSION to define a new version.
public int layoutVersion;
//NameNode节点的namespaceID在它format是生成,每格式化一次,就会产生一个新的namespaceID
//DataNode节点的每一个StorageDirectory的namespaceID必须与NameNode节点的namespaceID保持一致
public int namespaceID;
//FsImage format时赋值
public longcTime; // creation timestamp
}
//参考FsImage.format()
//public void format() throws IOException {
// this.layoutVersion = FSConstants.LAYOUT_VERSION;
// this.namespaceID = newNamespaceID();
// this.cTime = 0L;
// this.checkpointTime = FSNamesystem.now();
// ...
// }
//}
四.StorageDirectory
//一个Storage包含一系列StorageDirectory,它们共享一个in_use.lock
public class StorageDirectory {
File root; //对应dfs.data.dir或dfs.name.dir目录
FileLock lock; //对应dfs.data.dir或dfs.name.dir目录下in_use.lock
StorageDirType dirType; // storage dir type
//读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION
//read里调用getFields,读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION到this
public void read() throws IOException {
read(getVersionFile());
}
//write里调用setFields,把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION
public void write() throws IOException {
corruptPreUpgradeStorage(root);
write(getVersionFile());
}
//读取Storage下任何一个StorageDirectory
public File get*Dir() {
return new File(root,...);
}
//During startup Hadoop servers (name-node and data-nodes) read their local
* storage information from them.
//根据启动参数判断系统处于哪个状态
public StorageState analyzeStorage(StartupOption startOpt) throws IOException {
//见图状态分析
}
//Complete or recover storage state from previously failed transition.
public void doRecover(StorageState curState) throws IOException {
//见图恢复操作
}
//lock this storage
public void lock() throws IOException {
this.lock = tryLock();
}
//unlock this storage
public void unlock() throws IOException {
this.lock.release();
lock.channel().close();
lock = null;
}
}
五.Storage
public abstract class Storage extends StorageInfo {
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION= "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
private static final String STORAGE_DIR_PREVIOUS= "previous";
private static final String STORAGE_TMP_REMOVED = "removed.tmp";
private static final String STORAGE_TMP_PREVIOUS= "previous.tmp";
private static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
private static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
private static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
public enum StorageState {
NON_EXISTENT,
NOT_FORMATTED,
COMPLETE_UPGRADE,
RECOVER_UPGRADE,
COMPLETE_FINALIZE,
COMPLETE_ROLLBACK,
RECOVER_ROLLBACK,
COMPLETE_CHECKPOINT,
RECOVER_CHECKPOINT,
NORMAL;
}
//StorageDirectory下的文件类型
public interface StorageDirType {
public StorageDirType getStorageDirType();
public boolean isOfType(StorageDirType type);
}
//节点类型
private NodeType storageType; // Type of the node using this storage
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
//迭代Storage包含的所有StorageDirectory
public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
return new DirIterator(dirType);
}
protected Storage(NodeType type) {
super();
this.storageType = type;
}
//StorageDirectory.read()调用此方法,读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION到this
//因为所有的StorageDirectory具有相同的VERSION,所以可以把任意一个StorageDirectory的VERSION写到this的这四个属性
//write同read
protected void getFields(Properties props,StorageDirectory sd ) throws IOException {
String sv, st, sid, sct;
sv = props.getProperty("layoutVersion");
st = props.getProperty("storageType");
sid = props.getProperty("namespaceID");
sct = props.getProperty("cTime");
layoutVersion = rv;
storageType = rt;
namespaceID = rid;
cTime = rct;
}
//StorageDirectory.write()调用此方法,把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION
protected void setFields(Properties props, StorageDirectory sd ) throws IOException {
props.setProperty("layoutVersion", String.valueOf(layoutVersion));
props.setProperty("storageType", storageType.toString());
props.setProperty("namespaceID", String.valueOf(namespaceID));
props.setProperty("cTime", String.valueOf(cTime));
}
}
六.VERSION例子
#Sun May 12 10:25:01 CST 2013
namespaceID=1378739863
storageID=DS-1718846927-192.168.1.164-50010-1368305080745
cTime=0
storageType=DATA_NODE
layoutVersion=-32
#Mon May 13 03:26:48 CST 2013
namespaceID=1378739863
cTime=0
storageType=NAME_NODE
layoutVersion=-32
七.最核心的方法StorageDirectory.analyzeStorage和doRecover
[*]这两个方法就是对整个Storage状态分析和恢复操作。
[*]下文会结合DataNode启动流程把整个串起来。本文先了解这两个方法,当然你必须先了解Hadoop的系统状态。
[*]说明:第二张图少了一步。lastcheckpoint.tmp存在之前,首先分析当前StorageDirectory的root是否存在或是否可写或是否文件夹,不满足以上任何一点返回状态StorageState.NON_EXISTENT;满足后才是判断lastcheckpoint.tmp存在。
楼主你好,我想问下,DataNode在把数据块写到dfs.data.dir时,是调用了上面的类的方法吗? gwgyk 发表于 2014-11-21 12:53
楼主你好,我想问下,DataNode在把数据块写到dfs.data.dir时,是调用了上面的类的方法吗?
hadoop只有namenode和datanode,namenode是不用户存储数据的,只有datanode才能存储数据,应该没有错的
页:
[1]