howtodown 发表于 2014-11-21 12:37:11

Hadoop-Hdfs Storage源码


问题导读


1.Storage物理文件夹和文件与类的对应关系是什么?
2.StorageInfo完成了什么功能?
3.哪个方法实现了把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION?

static/image/hrline/4.gif







一.物理文件夹和文件与类的对应关系

http://dl.iteye.com/upload/picture/pic/125835/c7946489-aebb-337e-bc5e-721568a9fb8d.jpg


[*] Storage:存储信息文件的集合。由一系列StorageDirectory组成,这些StorageDirectory的VERSION相同。Storage对应着Hdfs的dfs.data.dir或dfs.name.dir目录,默认为%hadoop_home%/dfs/data或%hadoop_home%/dfs/name。DataNode可以配置多个dfs.data.dir,如dfs/data,dfs/data2,此时就有两个Sorage。
[*]StorageDirectory:一个存储信息文件。代表Storage下涉及到升级方面的文件,它们共享一个in_use.lock(图上未标出这层关系)。
[*]StorageInfo:存储信息文件的父类。

二.类图http://dl.iteye.com/upload/picture/pic/125923/97495aeb-4f22-359b-98e7-9a9b50f62b50.jpg

三.StorageInfo
public class StorageInfo {
    //Hadoop版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致
    //参考FSConstants.LAYOUT_VERSION=-32的定义:
    // Version is reflected in the dfs image and edit log files.      Version代表image和editlos存储文件的结构
    // Version is reflected in the data storage file.       Version代表data存储文件的结构
    // Versions are negative.       Versions是负数。
    // Decrement LAYOUT_VERSION to define a new version.
    public int   layoutVersion;   
    //NameNode节点的namespaceID在它format是生成,每格式化一次,就会产生一个新的namespaceID
    //DataNode节点的每一个StorageDirectory的namespaceID必须与NameNode节点的namespaceID保持一致
    public int   namespaceID;   
    //FsImage format时赋值
    public longcTime;          // creation timestamp
}

//参考FsImage.format()
//public void format() throws IOException {
//    this.layoutVersion = FSConstants.LAYOUT_VERSION;
//    this.namespaceID = newNamespaceID();
//    this.cTime = 0L;
//    this.checkpointTime = FSNamesystem.now();
//   ...
//    }
//}




四.StorageDirectory
//一个Storage包含一系列StorageDirectory,它们共享一个in_use.lock
public class StorageDirectory {
    File            root;         //对应dfs.data.dir或dfs.name.dir目录
    FileLock          lock; //对应dfs.data.dir或dfs.name.dir目录下in_use.lock
    StorageDirType dirType; // storage dir type
      
      
    //读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION
    //read里调用getFields,读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION到this
    public void read() throws IOException {
      read(getVersionFile());
    }
      
    //write里调用setFields,把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION
    public void write() throws IOException {
      corruptPreUpgradeStorage(root);
      write(getVersionFile());
    }

    //读取Storage下任何一个StorageDirectory
    public File get*Dir() {
      return new File(root,...);
    }


    //During startup Hadoop servers (name-node and data-nodes) read their local   
* storage information from them.
    //根据启动参数判断系统处于哪个状态
    public StorageState analyzeStorage(StartupOption startOpt) throws IOException {
      //见图状态分析
    }

    //Complete or recover storage state from previously failed transition.
    public void doRecover(StorageState curState) throws IOException {
      //见图恢复操作
    }

    //lock this storage   
    public void lock() throws IOException {
      this.lock = tryLock();
    }

    //unlock this storage
    public void unlock() throws IOException {
      this.lock.release();
      lock.channel().close();
      lock = null;
    }
}


五.Storage
public abstract class Storage extends StorageInfo {
   
private   static final String STORAGE_FILE_LOCK   = "in_use.lock";
protected static final String STORAGE_FILE_VERSION= "VERSION";
public static final String STORAGE_DIR_CURRENT   = "current";
private   static final String STORAGE_DIR_PREVIOUS= "previous";
private   static final String STORAGE_TMP_REMOVED   = "removed.tmp";
private   static final String STORAGE_TMP_PREVIOUS= "previous.tmp";
private   static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
private   static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
private   static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
   
public enum StorageState {
    NON_EXISTENT,
    NOT_FORMATTED,
    COMPLETE_UPGRADE,
    RECOVER_UPGRADE,
    COMPLETE_FINALIZE,
    COMPLETE_ROLLBACK,
    RECOVER_ROLLBACK,
    COMPLETE_CHECKPOINT,
    RECOVER_CHECKPOINT,
    NORMAL;
}
   
//StorageDirectory下的文件类型
public interface StorageDirType {
    public StorageDirType getStorageDirType();
    public boolean isOfType(StorageDirType type);
}
   
//节点类型
private NodeType storageType;    // Type of the node using this storage   
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
   
//迭代Storage包含的所有StorageDirectory
public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
    return new DirIterator(dirType);
}
   
protected Storage(NodeType type) {
    super();
    this.storageType = type;
}
   
//StorageDirectory.read()调用此方法,读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION到this
//因为所有的StorageDirectory具有相同的VERSION,所以可以把任意一个StorageDirectory的VERSION写到this的这四个属性
//write同read
protected void getFields(Properties props,StorageDirectory sd ) throws IOException {
    String sv, st, sid, sct;
    sv = props.getProperty("layoutVersion");
    st = props.getProperty("storageType");
    sid = props.getProperty("namespaceID");
    sct = props.getProperty("cTime");
    layoutVersion = rv;
    storageType = rt;
    namespaceID = rid;
    cTime = rct;
}
   
//StorageDirectory.write()调用此方法,把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION
protected void setFields(Properties props, StorageDirectory sd ) throws IOException {
    props.setProperty("layoutVersion", String.valueOf(layoutVersion));
    props.setProperty("storageType", storageType.toString());
    props.setProperty("namespaceID", String.valueOf(namespaceID));
    props.setProperty("cTime", String.valueOf(cTime));
}

}




六.VERSION例子
#Sun May 12 10:25:01 CST 2013
namespaceID=1378739863
storageID=DS-1718846927-192.168.1.164-50010-1368305080745
cTime=0
storageType=DATA_NODE
layoutVersion=-32

#Mon May 13 03:26:48 CST 2013
namespaceID=1378739863
cTime=0
storageType=NAME_NODE
layoutVersion=-32



七.最核心的方法StorageDirectory.analyzeStorage和doRecover
[*]这两个方法就是对整个Storage状态分析和恢复操作。
[*]下文会结合DataNode启动流程把整个串起来。本文先了解这两个方法,当然你必须先了解Hadoop的系统状态。





[*]说明:第二张图少了一步。lastcheckpoint.tmp存在之前,首先分析当前StorageDirectory的root是否存在或是否可写或是否文件夹,不满足以上任何一点返回状态StorageState.NON_EXISTENT;满足后才是判断lastcheckpoint.tmp存在。



gwgyk 发表于 2014-11-21 12:53:05

楼主你好,我想问下,DataNode在把数据块写到dfs.data.dir时,是调用了上面的类的方法吗?

sstutu 发表于 2014-11-21 13:34:55

gwgyk 发表于 2014-11-21 12:53
楼主你好,我想问下,DataNode在把数据块写到dfs.data.dir时,是调用了上面的类的方法吗?

hadoop只有namenode和datanode,namenode是不用户存储数据的,只有datanode才能存储数据,应该没有错的
页: [1]
查看完整版本: Hadoop-Hdfs Storage源码