分享

Hadoop二次开发必备,Hadoop源码分析(十)

本帖最后由 pig2 于 2014-1-16 00:17 编辑

下面轮到FSNamesystem出场了。FSNamesystem.java一共有4573行,而整个namenode目录下所有的Java程序总共也只有16876行,把FSNamesystem搞定了,NameNode也就基本搞定。
FSNamesystem是NameNode实际记录信息的地方,保存在FSNamesystem中的数据有:
l          文件名à数据块列表(存放在FSImage和日志中)
l          合法的数据块列表(上面关系的逆关系)
l          数据块àDataNode(只保存在内存中,根据DataNode发过来的信息动态建立)
l          DataNode上保存的数据块(上面关系的逆关系)
l          最近发送过心跳信息的DataNode(LRU)
我们先来分析FSNamesystem的成员变量。
  private boolean isPermissionEnabled;
是否打开权限检查,可以通过配置项dfs.permissions来设置。

  private UserGroupInformationfsOwner;
本地文件的用户文件属主和文件组,可以通过hadoop.job.ugi设置,如果没有设置,那么将使用启动HDFS的用户(通过whoami获得)和该用户所在的组(通过groups获得)作为值。

  private String supergroup;
对应配置项dfs.permissions.supergroup,应用在defaultPermission中,是系统的超级组。

  private PermissionStatusdefaultPermission;
缺省权限,缺省用户为fsOwner,缺省用户组为supergroup,缺省权限为0777,可以通过dfs.upgrade.permission修改。

  private long capacityTotal, capacityUsed,capacityRemaining;
系统总容量/已使用容量/剩余容量

  private int totalLoad = 0;
系统总连接数,根据DataNode心跳信息跟新。

  private long pendingReplicationBlocksCount, underReplicatedBlocksCount,scheduledReplicationBlocksCount;
分别是成员变量pendingReplications(正在复制的数据块),neededReplications(需要复制的数据块)的大小,scheduledReplicationBlocksCount是当前正在处理的复制工作数目。

  public FSDirectory dir;
指向系统使用的FSDirectory对象。

  BlocksMap blocksMap= new BlocksMap();
保存数据块到INodeDataNode的映射关系
publicCorruptReplicasMap corruptReplicas = newCorruptReplicasMap();
保存损坏(如:校验没通过)的数据块到对应DataNode的关系,CorruptReplicasMap类图如下,类只有一个成员变量,保存Block到一个DatanodeDescriptor的集合的映射和这个映射上的一系列操作:

16.PNG

Map<String, DatanodeDescriptor> datanodeMap= newTreeMap<String, DatanodeDescriptor>();
保存了StorageID à DatanodeDescriptor的映射,用于保证DataNode使用的Storage的一致性。   privateMap<String, Collection<Block>> recentInvalidateSets
保存了每个DataNode上无效但还存在的数据块(StorageIDà ArrayList<Block>)。  Map<String,Collection<Block>> recentInvalidateSets
保存了每个DataNode上有效,但需要删除的数据块(StorageIDà TreeSet<Block>),这种情况可能发生在一个DataNode故障后恢复后,上面的数据块在系统中副本数太多,需要删除一些数据块。  HttpServer infoServer;  int infoPort; Date startTime;
用于内部信息传输的HTTP请求服务器(Servlet的容器)。现在有/fsck,/getimage,/listPaths/*,/data/*和/fileChecksum/*,我们后面还会继续讨论。   ArrayList<DatanodeDescriptor>heartbeats;
所有目前活着的DataNode,线程HeartbeatMonitor会定期检查。private UnderReplicatedBlocks neededReplications
需要进行复制的数据块。UnderReplicatedBlocks的类图如下,它其实是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先级是0),数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。 17.PNG

privatePendingReplicationBlocks pendingReplications;
保存正在复制的数据块的相关信息。PendingReplicationBlocks的类图如下:
18.PNG

其中,pendingReplications保存了所有正在进行复制的数据块,使用Map是需要一些附加的信息PendingBlockInfo。这些信息包括时间戳,用于检测是否已经超时,和现在进行复制的数目numReplicasInProgress。timedOutItems是超时的复制项,超时的复制项在FSNamesystem的processPendingReplications方法中被删除,并从新复制。timerThread是用于检测复制超时的线程的句柄,对应的线程是PendingReplicationMonitor的一个实例,它的run方法每隔一段会检查是否有超时的复制项,如果有,将该数据块加到timedOutItems中。Timeout是run方法的检查间隔,defaultRecheckInterval是缺省值。PendingReplicationBlocks和PendingBlockInfo的方法都很简单。   public LeaseManager leaseManager = new LeaseManager(this);
租约管理器。
-------------------------------------------------------------------------------------------------------
继续对FSNamesystem进行分析。

Daemon hbthread = null;   // HeartbeatMonitor thread
  public Daemon lmthread = null;   // LeaseMonitor thread
Daemon smmthread = null;  // SafeModeMonitor thread
public Daemonreplthread = null;  // Replication thread
NameNode上的线程,分别对应DataNode心跳检查,租约检查,安全模式检查和数据块复制,我们会在后面介绍这些线程对应的功能。

  volatile boolean fsRunning = true;
  long systemStart = 0;
系统运行标志和系统启动时间。

接下来是一堆系统的参数,比方说系统每个DataNode节点允许的最大数据块数,心跳检查间隔时间等… …
//  The maximum number ofreplicates we should allow for a single block
  private int maxReplication;
//  How many outgoing replicationstreams a given node should have at one time
  private int maxReplicationStreams;
  //MIN_REPLICATION is how many copies we need in place or else we disallow thewrite
  private int minReplication;
  //Default replication
  private int defaultReplication;
  //heartbeatRecheckInterval is how often namenode checks for expired datanodes
  private long heartbeatRecheckInterval;
  //heartbeatExpireInterval is how long namenode waits for datanode to report
  //heartbeat
  private long heartbeatExpireInterval;
//replicationRecheckInterval is how often namenode checks for newreplication work
  private long replicationRecheckInterval;
//decommissionRecheckInterval is how often namenode checks if a node hasfinished decommission
  private long decommissionRecheckInterval;
  //default block size of a file
  private long defaultBlockSize = 0;

  private intreplIndex = 0;
neededReplications配合,记录下一个进行复制的数据块位置。
public staticFSNamesystem fsNamesystemObject;
哈哈,不用介绍了,还是static的。
  privateString localMachine;
  private int port;
本机名字和RPC端口。
privateSafeModeInfo safeMode;  // safe modeinformation
记录安全模式的相关信息。
安全模式是这样一种状态,系统处于这个状态时,不接受任何对名字空间的修改,同时也不会对数据块进行复制或删除数据块。NameNode启动的时候会自动进入安全模式,同时也可以手工进入(不会自动离开)。系统启动以后,DataNode会报告目前它拥有的数据块的信息,当系统接收到的Block信息到达一定门槛,同时每个Block都有dfs.replication.min个副本后,系统等待一段时间后就离开安全模式。这个门槛定义的参数包括:
l          dfs.safemode.threshold.pct:接受到的Block的比例,缺省为95%,就是说,必须DataNode报告的数据块数目占总数的95%,才到达门槛;
l          dfs.replication.min:缺省为1,即每个副本都存在系统中;
l          dfs.replication.min:等待时间,缺省为0,单位秒。
SafeModeInfo的类图如下:
19.PNG



thresholdextensionsafeReplication保存的是上面说的3个参数。Reached等于-1表明安全模式是关闭的,0表示安全模式打开但是系统还没达到thresholdblockTotal是计算threshold时的分母,blockSafe是分子,lastStatusReport用于控制写日志的间隔。
SafeModeInfo(Configuration conf)使用配置文件的参数,是NameNode正常启动时使用的构造函数,SafeModeInfo()中,this.threshold = 1.5f使得系统用于处于安全模式。
enter()使系统进入安全模式,leave()会使系统离开安全模式,canLeave()用于检查是否能离开安全模式而needEnter(),则判断是否应该进入安全模式。checkMode()检查系统状态,如果必要,则进入安全模式。其他的方法都比价简单,大多为对成员变量的访问。

讨论完类SafeModeInfo,我们来分析一下SafeModeMonitor,它用于定期检查系统是否能够离开安全模式(smmthread就是它的一个实例)。系统离开安全模式后,smmthread会被重新赋值为null


--------------------------------------------------------------------------------------------------------
privateHost2NodesMap host2DataNodeMap = newHost2NodesMap();
保存了主机名(String)到DatanodeDescriptor数组的映射(Host2NodesMap唯一的成员变量为HashMap<String,DatanodeDescriptor[]> map,它的方法都是对这个map进行操作)。 NetworkTopology clusterMap = newNetworkTopology();
  private DNSToSwitchMappingdnsToSwitchMapping;
定义了HDFS的网络拓扑,网络拓扑对应选择数据块副本的位置很重要。如在一个层次型的网络中,接到同一个交换机的两个节点间的网络速度,会比跨越多个交换机的两个节点间的速度快,但是,如果某交换机故障,那么它对接到它上面的两个节点会同时有影响,但跨越多个交换机的两个节点,这种影响会小得多。下面是NetworkTopology相关的类图: 20.PNG
    Hadoop实现了一个树状的拓扑结构抽象,其中,Node接口,定义了网络节点的一些方法,NodeBase是Node的一个实现,提供了叶子节点的一些方法(明显它没有子节点),而InnerNode则实现了树的内部节点,如果我们考虑一个网络部署的话,那么叶子节点是服务器,而InnerNode则是服务器所在的机架或交换机或路由器。Node提供了对网络位置信息(采用类似文件树的方式),节点名称和Node所在的树的深度的方法。NodeBase提供了一个简单的实现。InnerNode是NetworkTopology的内部类,对比NodeBase,它的clildren保存了所有的子节点,这样的话,就可以构造一个拓扑树。这棵树的叶子可能是服务器,也可能是机架,内部则是机架或者是路由器等设备,InnerNode提供了一系列的方法区分处理这些信息。

    NetworkTopology的add方法和remove用于在拓扑结构中加入节点和删除节点,同时也给出一些get*方法,用于获取一些对象内部的信息,如getDistance,可以获取两个节点的距离,而isOnSameRack可以判断两个节点是否处于同一个机架。chooseRandom有两个实现,用于在一定范围内(另一个还有一个排除选项)随机选取一个节点。chooseRandom在选择数据块副本位置的时候调用。DNSToSwitchMapping配合上面NetworkTopology,用于确定某一个节点的网络位置信息,它的唯一方法,可以通过一系列机器的名字找出它们对应的网络位置信息。目前有支持两种方法,一是通过命令行方式,将节点名作为输入,输出为网络位置信息(RawScriptBasedMapping执行命令CachedDNSToSwitchMapping缓存结果),还有一种就是利用配置参数hadoop.configured.node.mapping静态配置(StaticMapping)。
    ReplicationTargetChooser replicator;用于为数据块备份选择目标,例如,用户写文件时,需要选择一些DataNode,作为数据块的存放位置,这时候就利用它来选择目标地址。chooseTarget是ReplicationTargetChooser中最重要的方法,它通过内部的一个NetworkTopology对象,计算出一个DatanodeDescriptor数组,该数组就是选定的DataNode,同时,顺序就是最佳的数据流顺序(还记得我们讨论DataXceiver些数据的那个图吗?)。
  private HostsFileReaderhostsReader;

保存了系统中允许/不允许连接到NameNode的机器列表。

  private Daemon dnthread = null;

线程句柄,该线程用于检测DataNode上的Decommission进程。例如,某节点被列入到不允许连接到NameNode的机器列表中(HostsFileReader),那么,该节点会进入Decommission状态,它上面的数据块会被复制到其它节点,复制结束后机器进入DatanodeInfo.AdminStates.DECOMMISSIONED,这台机器就可以从HDFS中撤掉。

private long maxFsObjects = 0;          // maximum number of fs objects

系统能拥有的INode最大数(配置项dfs.max.objects,0为无限制)。

private final GenerationStampgenerationStamp = newGenerationStamp();
系统的时间戳生产器。
private int blockInvalidateLimit =FSConstants.BLOCK_INVALIDATE_CHUNK;

发送给DataNode删除数据块消息中,能包含的最大数据块数。比方说,如果某DataNode上有250个Block需要被删除,而这个参数是100,那么一共会有3条删除数据块消息消息,前面两条包含了100个数据块,最后一条是50个。

private long accessTimePrecision = 0;用于控制文件的access时间的精度,也就是说,小于这个精度的两次对文件访问,后面的那次就不做记录了。
-------------------------------------------------------------------------------------------

我们接下来分析NameNode.java的成员变量,然后两个类综合起来,分析它提供的接口,并配合说明接口上请求对应的处理流程。
前面已经介绍过了,NameNode实现了接口ClientProtocol,DatanodeProtocol和NamenodeProtocol,分别提供给客户端/DataNode/从NameNode访问。由于NameNode的大部分功能在类FSNamesystem中实现,那么NameNode.java的成员变量就很少了。
  public FSNamesystem namesystem;
指向FSNamesystem对象。

  private Server server;
NameNodeRPC服务器实例。

  private Thread emptier;
处理回收站的线程句柄。

  private int handlerCount = 2;
还记得我们分析RPC的服务器时提到的服务器请求处理线程(Server.Handle)吗?这个参数给出了server中服务器请求处理线程的数目,对应配置参数为dfs.namenode.handler.count

  private boolean supportAppends = true;
是否支持append操作,对应配置参数为dfs.support.append
  privateInetSocketAddress nameNodeAddress = null;
NameNode的地址,包括IP地址和监听端口。
下面我们来看NameNode的启动过程。main方法是系统的入口,它会调用createNameNode创建NameNode实例。createNameNode分析命令行参数,如果是FORMAT或FINALIZE,调用对应的方法后退出,如果是其他的参数,将创建NameNode对象。NameNode的构造函数会调用initialize,初始化NameNode的成员变量,包括创建RPC服务器,初始化FSNamesystem,启动RPC服务器和回收站线程。
FSNamesystem的构造函数会调用initialize方法,去初始化上面我们分析过的一堆成员变量。几个重要的步骤包括加载FSImage,设置系统为安全模式,启动各个工作线程和HTTP服务器。系统的一些参数是在setConfigurationParameters中初始化的,其中一些值的计算比较麻烦,而且也可能被其它部分的code引用的,就独立出来了,如getNamespaceDirs和getNamespaceEditsDirs。initialize对应的是close方法,很简单,主要是停止initialize中启动的线程。
对应于initialize方法,NameNode也提供了对应的stop方法,用于初始化时出错系统能正确地退出。
NameNode的format和finalize操作,都是先构造FSNamesystem,然后利用FSNamesystem的FSImage提供的对应方法完成的。我们在分析FSImage.java时,已经了解了这部分的功能。
下一篇

上一篇

欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条