分享

Hadoop-Hdfs Storage源码


问题导读


1.Storage物理文件夹和文件与类的对应关系是什么?
2.StorageInfo完成了什么功能?
3.哪个方法实现了把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION?









一.物理文件夹和文件与类的对应关系




  • Storage:存储信息文件的集合。由一系列StorageDirectory组成,这些StorageDirectory的VERSION相同。Storage对应着Hdfs的dfs.data.dir或dfs.name.dir目录,默认为%hadoop_home%/dfs/data或%hadoop_home%/dfs/name。DataNode可以配置多个dfs.data.dir,如dfs/data,dfs/data2,此时就有两个Sorage。
  • StorageDirectory:一个存储信息文件。代表Storage下涉及到升级方面的文件,它们共享一个in_use.lock(图上未标出这层关系)。
  • StorageInfo:存储信息文件的父类。

二.类图


三.StorageInfo

  1. public class StorageInfo {  
  2.     //Hadoop版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致  
  3.     //参考FSConstants.LAYOUT_VERSION=-32的定义:  
  4.     // Version is reflected in the dfs image and edit log files.        Version代表image和editlos存储文件的结构  
  5.     // Version is reflected in the data storage file.       Version代表data存储文件的结构  
  6.     // Versions are negative.       Versions是负数。  
  7.     // Decrement LAYOUT_VERSION to define a new version.  
  8.     public int   layoutVersion;   
  9.     //NameNode节点的namespaceID在它format是生成,每格式化一次,就会产生一个新的namespaceID  
  10.     //DataNode节点的每一个StorageDirectory的namespaceID必须与NameNode节点的namespaceID保持一致  
  11.     public int   namespaceID;   
  12.     //FsImage format时赋值  
  13.     public long  cTime;          // creation timestamp  
  14. }  
  15.   
  16. //参考FsImage.format()  
  17. //public void format() throws IOException {  
  18. //    this.layoutVersion = FSConstants.LAYOUT_VERSION;  
  19. //    this.namespaceID = newNamespaceID();  
  20. //    this.cTime = 0L;  
  21. //    this.checkpointTime = FSNamesystem.now();  
  22. //     ...  
  23. //    }  
  24. //  }  
复制代码





四.StorageDirectory

  1. //一个Storage包含一系列StorageDirectory,它们共享一个in_use.lock  
  2.   public class StorageDirectory {  
  3.     File              root;         //对应dfs.data.dir或dfs.name.dir目录  
  4.     FileLock          lock; //对应dfs.data.dir或dfs.name.dir目录下in_use.lock  
  5.     StorageDirType dirType; // storage dir type  
  6.       
  7.       
  8.     //读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION  
  9.     //read里调用getFields,读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION到this  
  10.     public void read() throws IOException {  
  11.       read(getVersionFile());  
  12.     }  
  13.       
  14.     //write里调用setFields,把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION  
  15.     public void write() throws IOException {  
  16.       corruptPreUpgradeStorage(root);  
  17.       write(getVersionFile());  
  18.     }  
  19.   
  20.     //读取Storage下任何一个StorageDirectory  
  21.     public File get*Dir() {  
  22.       return new File(root,  ...);  
  23.     }  
  24.   
  25.   
  26.     //During startup Hadoop servers (name-node and data-nodes) read their local   
  27. * storage information from them.  
  28.     //根据启动参数判断系统处于哪个状态  
  29.     public StorageState analyzeStorage(StartupOption startOpt) throws IOException {  
  30.         //见图状态分析  
  31.     }  
  32.   
  33.     //Complete or recover storage state from previously failed transition.  
  34.     public void doRecover(StorageState curState) throws IOException {  
  35.         //见图恢复操作  
  36.     }  
  37.   
  38.     //lock this storage   
  39.     public void lock() throws IOException {  
  40.       this.lock = tryLock();  
  41.     }  
  42.   
  43.     //unlock this storage  
  44.     public void unlock() throws IOException {  
  45.       this.lock.release();  
  46.       lock.channel().close();  
  47.       lock = null;  
  48.     }  
  49.   }  
复制代码

  

五.Storage

  1. public abstract class Storage extends StorageInfo {  
  2.    
  3.   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";  
  4.   protected static final String STORAGE_FILE_VERSION  = "VERSION";  
  5.   public static final String STORAGE_DIR_CURRENT   = "current";  
  6.   private   static final String STORAGE_DIR_PREVIOUS  = "previous";  
  7.   private   static final String STORAGE_TMP_REMOVED   = "removed.tmp";  
  8.   private   static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";  
  9.   private   static final String STORAGE_TMP_FINALIZED = "finalized.tmp";  
  10.   private   static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";  
  11.   private   static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";  
  12.    
  13.   public enum StorageState {  
  14.     NON_EXISTENT,  
  15.     NOT_FORMATTED,  
  16.     COMPLETE_UPGRADE,  
  17.     RECOVER_UPGRADE,  
  18.     COMPLETE_FINALIZE,  
  19.     COMPLETE_ROLLBACK,  
  20.     RECOVER_ROLLBACK,  
  21.     COMPLETE_CHECKPOINT,  
  22.     RECOVER_CHECKPOINT,  
  23.     NORMAL;  
  24.   }  
  25.    
  26.   //StorageDirectory下的文件类型  
  27.   public interface StorageDirType {  
  28.     public StorageDirType getStorageDirType();  
  29.     public boolean isOfType(StorageDirType type);  
  30.   }  
  31.    
  32.   //节点类型  
  33.   private NodeType storageType;    // Type of the node using this storage   
  34.   protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();  
  35.    
  36.   //迭代Storage包含的所有StorageDirectory  
  37.   public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {  
  38.     return new DirIterator(dirType);  
  39.   }  
  40.    
  41.   protected Storage(NodeType type) {  
  42.     super();  
  43.     this.storageType = type;  
  44.   }  
  45.    
  46.   //StorageDirectory.read()调用此方法,读取dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION到this  
  47.   //因为所有的StorageDirectory具有相同的VERSION,所以可以把任意一个StorageDirectory的VERSION写到this的这四个属性  
  48.   //write同read  
  49.   protected void getFields(Properties props,  StorageDirectory sd ) throws IOException {  
  50.     String sv, st, sid, sct;  
  51.     sv = props.getProperty("layoutVersion");  
  52.     st = props.getProperty("storageType");  
  53.     sid = props.getProperty("namespaceID");  
  54.     sct = props.getProperty("cTime");  
  55.     layoutVersion = rv;  
  56.     storageType = rt;  
  57.     namespaceID = rid;  
  58.     cTime = rct;  
  59.   }  
  60.    
  61.   //StorageDirectory.write()调用此方法,把this的属性写到dfs.data.dir/current/VERSION或dfs.name.dir/current/VERSION  
  62.   protected void setFields(Properties props, StorageDirectory sd ) throws IOException {  
  63.     props.setProperty("layoutVersion", String.valueOf(layoutVersion));  
  64.     props.setProperty("storageType", storageType.toString());  
  65.     props.setProperty("namespaceID", String.valueOf(namespaceID));  
  66.     props.setProperty("cTime", String.valueOf(cTime));  
  67.   }  
  68.   
  69. }  
复制代码





六.VERSION例子

  1. #Sun May 12 10:25:01 CST 2013  
  2. namespaceID=1378739863  
  3. storageID=DS-1718846927-192.168.1.164-50010-1368305080745  
  4. cTime=0  
  5. storageType=DATA_NODE  
  6. layoutVersion=-32  
  7.   
  8. #Mon May 13 03:26:48 CST 2013  
  9. namespaceID=1378739863  
  10. cTime=0  
  11. storageType=NAME_NODE  
  12. layoutVersion=-32  
复制代码




七.最核心的方法StorageDirectory.analyzeStorage和doRecover
  • 这两个方法就是对整个Storage状态分析和恢复操作。
  • 下文会结合DataNode启动流程把整个串起来。本文先了解这两个方法,当然你必须先了解Hadoop的系统状态。 3.jpg


4.gif

  • 说明:第二张图少了一步。lastcheckpoint.tmp存在之前,首先分析当前StorageDirectory的root是否存在或是否可写或是否文件夹,不满足以上任何一点返回状态StorageState.NON_EXISTENT;满足后才是判断lastcheckpoint.tmp存在。



已有(2)人评论

跳转到指定楼层
gwgyk 发表于 2014-11-21 12:53:05
楼主你好,我想问下,DataNode在把数据块写到dfs.data.dir时,是调用了上面的类的方法吗?
回复

使用道具 举报

sstutu 发表于 2014-11-21 13:34:55
gwgyk 发表于 2014-11-21 12:53
楼主你好,我想问下,DataNode在把数据块写到dfs.data.dir时,是调用了上面的类的方法吗?
hadoop只有namenode和datanode,namenode是不用户存储数据的,只有datanode才能存储数据,应该没有错的
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条