gefieder 发表于 2013-12-2 15:40:10

Hadoop源代码分析

Hadoop源代码分析于泓烈200921060171一、 引言一个分布式系统基础架构,有Apache基金会开发。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力高速运算和存储。简单地说来,Hadoop是一个可以更容易开发和运行处理大规模数据的软件平台。Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有着高容错性(fault-tolerent)的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高传输率(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样可以流的形式访问(streaming access)文件系统中的数据。下面列举hadoop主要的一些特点:(1)扩容能力(Scalable):能可靠地(reliably)存储和处理千兆字节(PB)数据。(2)成本低(Economical):可以通过普通机器组成的服务器群来分发以及处理数据。这些服务器群总计可达数千个节点。(3)高效率(Efficient):通过分发数据,hadoop可以在数据所在的节点上并行地(parallel)处理它们,这使得处理非常的快速。(4)可靠性(Reliable):hadoop能自动地维护数据的多份复制,并且在任务失败后能自动地重新部署(redeploy)计算任务。二、 NameNode和DataNode介绍一个典型的HDFS系统包括一个NameNode和多个DataNode。NameNode维护名字空间;而DataNode存储数据块。DataNode负责存储数据,一个数据块在多个DataNode中有备份;而一个DataNode对于一个块最多只包含一个备份。所以我们可以简单地认为DataNode上存了数据块ID和数据块内容,以及他们的映射关系。一个HDFS集群可能包含上千DataNode节点,这些DataNode定时和NameNode通信,接受NameNode的指令。为了减轻NameNode的负担,NameNode上并不永久保存那个DataNode上有那些数据块的信息,而是通过DataNode启动时的上报,来更新NameNode上的映射表。DataNode和NameNode建立连接以后,就会不断地和NameNode保持心跳。心跳的返回其还也包含了NameNode对DataNode的一些命令,如删除数据库或者是把数据块复制到另一个DataNode。应该注意的是:NameNode不会发起到DataNode的请求,在这个通信过程中,它们是严格的客户端/服务器架构。2.1 DataNodeDataNode当然也作为服务器接受来自客户端的访问,处理数据块读/写请求。DataNode之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作的时候,DataNode需要相互配合,保证写操作的一致性。下面我们就来具体分析一下DataNode的实现。DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。我们先来看本地数据块管理部分。安装Hadoop的时候,我们会指定对应的数据块存放目录,当我们检查数据块存放目录目录时,我们回发现下面有个叫dfs的目录,所有的数据就存放在dfs/data里面。其中有两个文件,storage里存的东西是一些出错信息,貌似是版本不对。in_use.lock是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它上面的一个锁。接下来是3个目录,current存的是当前有效的数据块,detach存的是快照(snapshot,目前没有实现),tmp保存的是一些操作需要的临时数据块。但我们进入current目录以后,就会发现有一系列的数据块文件和数据块元数据文件。同时还有一些子目录,它们的名字是subdir0到subdir63,子目录下也有数据块文件和数据块元数据。这是因为HDFS限定了每个目录存放数据块文件的数量,多了以后会创建子目录来保存。数据块文件显然保存了HDFS中的数据,数据块最大可以到64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:blk_3148782637964391313
blk_3148782637964391313_242812.meta上面的例子中,3148782637964391313是数据块的ID号,242812是数据块的版本号,用于一致性检查。在current目录下还有下面几个文件:VERSION,保存了一些文件系统的元信息。 dncp_block_verification.log.curr和dncp_block_verification.log.prev,它记录了一些DataNode对文件系定时统做一致性检查需要的信息。 在继续分析DataNode之前,我们有必要看一下系统的工作状态。启动HDFS的时候,我们可以选择以下启动参数:FORMAT("-format"):格式化系统 REGULAR("-regular"):正常启动 UPGRADE("-upgrade"):升级 ROLLBACK("-rollback"):回滚 FINALIZE("-finalize"):提交 IMPORT("-importCheckpoint"):从Checkpoint恢复。 作为一个大型的分布式系统,Hadoop内部实现了一套升级机制。upgrade参数就是为了这个目的而存在的,当然,升级可能成功,也可能失败。如果失败了,那就用rollback进行回滚;如果过了一段时间,系统运行正常,那就可以通过finalize,正式提交这次升级(跟数据库有点像啊)。importCheckpoint选项用于NameNode发生故障后,从某个检查点恢复。有了上面的描述,我们得到下面左边的状态图1:                                       

          图1升级/回滚/提交状态图大家应该注意到,上面的升级/回滚/提交都不可能一下就搞定,就是说,系统故障时,它可能处于上面右边状态中的某一个。特别是分布式的各个节点上,甚至可能出现某些节点已经升级成功,但有些节点可能处于中间状态的情况,所以Hadoop采用类似于数据库事务的升级机制也就不是很奇怪。大家先理解一下上面的状态图,它是下面我们要介绍DataNode存储的基础。我们来看一下升级/回滚/提交时的DataNode上会发生什么(在类DataStorage中实现)。前面我们提到过VERSION文件,它保存了一些文件系统的元信息,这个文件在系统升级时,会发生对应的变化。升级时,NameNode会将新的版本号,通过DataNode的登录应答返回。DataNode收到以后,会将当前的数据块文件目录改名,从current改名为previous.tmp,建立一个snapshot,然后重建current目录。重建包括重建VERSION文件,重建对应的子目录,然后建立数据块文件和数据块元数据文件到previous.tmp的硬连接。建立硬连接意味着在系统中只保留一份数据块文件和数据块元数据文件,current和previous.tmp中的相应文件,在存储中,只保留一份。当所有的这些工作完成以后,会在current里写入新的VERSION文件,并将previous.tmp目录改名为previous,完成升级。了解了升级的过程以后,回滚就相对简单。因为说有的旧版本信息都保存在previous目录里。回滚首先将current目录改名为removed.tmp,然后将previous目录改名为current,最后删除removed.tmp目录。提交的过程,就是将上面的previous目录改名为finalized.tmp,然后启动一个线程,将该目录删除。下图2给出了上面的过程:                                                   图2 过程图需要注意的是,HDFS的升级,往往只是支持从某一个特点的老版本升级到当前版本。回滚时能够恢复到的版本,也是previous中记录的版本。2.1.1Storage相关类文字分析完DataNode存储在文件上的数据以后,我们来看一下运行时对应的数据结构。从大到小,Hadoop中最大的结构是Storage,最小的结构,在DataNode上是block。类Storage保存了和存储相关的信息,它继承了StorageInfo,应用于DataNode的DataStorage,则继承了Storage,总体类图3如下:                                           图3总体类图StorageInfo包含了3个字段,分别是layoutVersion:版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致。namespaceID是Storage的ID,cTime,creation time。和StorageInfo相比,Storage就是个大家伙了。Storage可以包含多个根(参考配置项dfs.data.dir的说明),这些根通过Storage的内部类StorageDirectory来表示。StorageDirectory中最重要的方法是analyzeStorage,它将根据系统启动时的参数和我们上面提到的一些判断条件,返回系统现在的状态。StorageDirectory可能处于以下的某一个状态(与系统的工作状态一定的对应):NON_EXISTENT:指定的目录不存在;
NOT_FORMATTED:指定的目录存在但未被格式化;
COMPLETE_UPGRADE:previous.tmp存在,current也存在
RECOVER_UPGRADE:previous.tmp存在,current不存在

COMPLETE_FINALIZE:finalized.tmp存在,current也存在
COMPLETE_ROLLBACK:removed.tmp存在,current也存在,previous不存在
RECOVER_ROLLBACK:removed.tmp存在,current不存在,previous存在
COMPLETE_CHECKPOINT:lastcheckpoint.tmp存在,current也存在
RECOVER_CHECKPOINT:lastcheckpoint.tmp存在,current不存在
NORMAL:普通工作模式。StorageDirectory处于某些状态是通过发生对应状态改变需要的工作文件夹和正常工作的current夹来进行判断。状态改变需要的工作文件夹包括:previous:用于升级后保存以前版本的文件previous.tmp:用于升级过程中保存以前版本的文件removed.tmp:用于回滚过程中保存文件finalized.tmp:用于提交过程中保存文件lastcheckpoint.tmp:应用于从NameNode中,导入一个检查点previous.checkpoint:应用于从NameNode中,结束导入一个检查点有了这些状态,就可以对系统进行恢复(通过方法doRecover)。恢复的动作如下(结合上面的状态转移图):COMPLETE_UPGRADE:mv previous.tmp -> previous
RECOVER_UPGRADE:mv previous.tmp -> current
COMPLETE_FINALIZE:rm finalized.tmp
COMPLETE_ROLLBACK:rm removed.tmp
RECOVER_ROLLBACK:mv removed.tmp -> current
COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp -> previous.checkpoint
RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current我们以RECOVER_UPGRADE为例,分析一下。根据升级的过程,1. current->previous.tmp2. 重建current3. previous.tmp->previous当我们发现previous.tmp存在,current不存在,我们知道只需要将previous.tmp改为current,就能恢复到未升级时的状态。StorageDirectory还管理着文件系统的元信息,就是我们上面提过StorageInfo信息,当然,StorageDirectory还保存每个具体用途自己的信息。这些信息,其实都存储在VERSION文件中,StorageDirectory中的read/write方法,就是用于对这个文件进行读/写。下面是某一个DataNode的VERSION文件的例子:   配置文件代码 :1. #Fri Nov 14 10:27:35 CST 20082. namespaceID=19509979683. storageID=DS-697414267-127.0.0.1-50010-12266296550264. cTime=05. storageType=DATA_NODE   6. layoutVersion=-16对StorageDirectory的排他操作需要锁,还记得我们在分析系统目录时提到的in_use.lock文件吗?它就是用来给整个系统加/解锁用的。StorageDirectory提供了对应的lock和unlock方法。分析完StorageDirectory以后,Storage类就很简单了。基本上都是对一系列StorageDirectory的操作,同时Storage提供一些辅助方法。DataStorage是Storage的子类,专门应用于DataNode。上面我们对DataNode的升级/回滚/提交过程,就是对DataStorage的doUpgrade/doRollback/doFinalize分析得到的。DataStorage提供了format方法,用于创建DataNode上的Storage,同时,利用StorageDirectory,DataStorage管理存储系统的状态。2.1.2FSDataset相关的类分析完Storage相关的类以后,我们来看下一个大家伙,FSDataset相关的类。上面介绍Storage时,我们并没有涉及到数据块Block的操作,所有和数据块相关的操作,都在FSDataset相关的类中进行处理。下面是类图:                                 图4FSDataset类Block是对一个数据块的抽象,通过前面的讨论我们知道一个Block对应着两个文件,其中一个存数据,一个存校验信息,如下:blk_3148782637964391313
blk_3148782637964391313_242812.meta上面的信息中,blockId是3148782637964391313,242812是数据块的版本号,当然,系统还会保存数据块的大小,在类中是属性numBytes。Block提供了一系列的方法来操作对象的属性。DatanodeBlockInfo存放的是Block在文件系统上的信息。它保存了Block存放的卷(FSVolume),文件名和detach状态。这里有必要解释一下detach状态:我们前面分析过,系统在升级时会创建一个snapshot,snapshot的文件和current里的数据块文件和数据块元文件是通过硬链接,指向了相同的内容。当我们需要改变current里的文件时,如果不进行detach操作,那么,修改的内容就会影响snapshot里的文件,这时,我们需要将对应的硬链接解除掉。方法很简单,就是在临时文件夹里,复制文件,然后将临时文件改名成为current里的对应文件,这样的话,current里的文件和snapshot里的文件就detach了。这样的技术,也叫copy-on-write,是一种有效提高系统性能的方法。DatanodeBlockInfo中的detachBlock,能够对Block对应的数据文件和元数据文件进行detach操作。介绍完类Block和DatanodeBlockInfo后,我们来看FSVolumeSet,FSVolume和FSDir。我们知道在一个DataNode上可以指定多个Storage来存储数据块,由于HDFS规定了一个目录能存放Block的数目,所以一个Storage上存在多个目录。对应的,FSDataset中用FSVolume来对应一个Storage,FSDir对应一个目录,所有的FSVolume由FSVolumeSet管理,FSDataset中通过一个FSVolumeSet对象,就可以管理它的所有存储空间。FSDir对应着HDFS中的一个目录,目录里存放着数据块文件和它的元文件。FSDir的一个重要的操作,就是在添加一个Block时,根据需要有时会扩展目录结构,上面提过,一个Storage上存在多个目录,所有的目录,都对应着一个FSDir,目录的关系,也由FSDir保存。FSDir的getBlockInfo方法分析目录下的所有数据块文件信息,生成Block对象,存放到一个集合中。getVolumeMap方法能,则会建立Block和DatanodeBlockInfo的关系。以上两个方法,用于系统启动时搜集所有的数据块信息,便于后面快速访问。FSVolume对应着是某一个Storage。数据块文件,detach文件和临时文件都是通过FSVolume来管理的,这个其实很自然,在同一个存储系统上移动文件,往往只需要修改文件存储信息,不需要搬数据。FSVolume有一个recoverDetachedBlocks的方法,用于恢复detach文件。和Storage的状态管理一样,detach文件有可能在复制文件时系统崩溃,需要对detach的操作进行回复。FSVolume还会启动一个线程,不断更新FSVolume所在文件系统的剩余容量。创建Block的时候,系统会根据各个FSVolume的容量,来确认Block的存放位置。FSVolumeSet就不讨论了,它管理着所有的FSVolume。HDFS中,对一个chunk的写会使文件处于活跃状态,FSDataset中引入了类ActiveFile。ActiveFile对象保存了一个文件,和操作这个文件的线程。注意,线程有可能有多个。ActiveFile的构造函数会自动地把当前线程加入其中。有了上面的基础,我们可以开始分析FSDataset。FSDataset实现了接口FSDatasetInterface。FSDatasetInterface是DataNode对底层存储的抽象。下面给出了FSDataset的关键成员变量:FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private HashMap<Block,DatanodeBlockInfo> volumeMap = null;其中,volumes就是FSDataset使用的所有Storage,ongoingCreates是Block到ActiveFile的映射,也就是说,说有正在创建的Block,都会记录在ongoingCreates里。下面我们讨论FSDataset中的方法。public long getMetaDataLength(Block b) throws IOException;
得到一个block的元数据长度。通过block的ID,找对应的元数据文件,返回文件长度。public MetaDataInputStream getMetaDataInputStream(Block b) throws IOException;
得到一个block的元数据输入流。通过block的ID,找对应的元数据文件,在上面打开输入流。下面对于类似的简单方法,我们就不再仔细讨论了。public boolean metaFileExists(Block b) throws IOException;
判断block的元数据的元数据文件是否存在。简单方法。public long getLength(Block b) throws IOException;
block的长度。简单方法。public Block getStoredBlock(long blkid) throws IOException;
通过Block的ID,找到对应的Block。简单方法。public InputStream getBlockInputStream(Block b) throws IOException;
public InputStream getBlockInputStream(Block b, long seekOffset) throws IOException;
得到Block数据的输入流。简单方法。public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff) throws IOException;
得到Block的临时输入流。注意,临时输入流是指对应的文件处于tmp目录中。新创建块时,块数据应该写在tmp目录中,直到写操作成功,文件才会被移动到current目录中,如果失败,就不会影响current目录了。简单方法。public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
得到一个block的输出流。BlockWriteStreams既包含了数据输出流,也包含了元数据(校验文件)输出流,这是一个相当复杂的方法。参数isRecovery说明这次写是不是对以前失败的写的一次恢复操作。我们先看正常的写操作流程:首先,如果输入的block是个正常的数据块,或当前的block已经有线程在写,writeToBlock会抛出一个异常。否则,将创建相应的临时数据文件和临时元数据文件,并把相关信息,创建一个ActiveFile对象,记录到ongoingCreates中,并创建返回的BlockWriteStreams。前面我们已经提过,建立新的ActiveFile时,当前线程会自动保存在ActiveFile的threads中。我们以blk_3148782637964391313为例,当DataNode需要为Block ID为3148782637964391313创建写流时,DataNode创建文件tmp/blk_3148782637964391313做为临时数据文件,对应的meta文件是tmp/blk_3148782637964391313_XXXXXX.meta。其中XXXXXX是版本号。isRecovery为true时,表明我们需要从某一次不成功的写中恢复,流程相对于正常流程复杂。如果不成功的写是由于提交(参考finalizeBlock方法)后的确认信息没有收到,先创建一个detached文件(备份)。接着,writeToBlock检查是否有还有对文件写的线程,如果有,则通过线程的interrupt方法,强制结束线程。这就是说,如果有线程还在写对应的文件块,该线程将被终止。同时,从ongoingCreates中移除对应的信息。接下来将根据临时文件是否存在,创建/复用临时数据文件和临时数据元文件。后续操作就和正常流程一样,根据相关信息,创建一个ActiveFile对象,记录到ongoingCreates中……由于这块涉及了一些HDFS写文件时的策略,以后我们还会继续讨论这个话题。public void updateBlock(Block oldblock, Block newblock) throws IOException;
更新一个block。这也是一个相当复杂的方法。updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。public void finalizeBlock(Block b) throws IOException;
提交(或叫:结束finalize)通过writeToBlock打开的block,这意味着写过程没有出错,可以正式把Block从tmp文件夹放到current文件夹。在FSDataset中,finalizeBlock将从ongoingCreates中删除对应的block,同时将block对应的DatanodeBlockInfo,放入volumeMap中。我们还是以blk_3148782637964391313为例,当DataNode提交Block ID为3148782637964391313数据块文件时,DataNode将把tmp/blk_3148782637964391313移到current下某一个目录,以subdir12为例,这是tmp/blk_3148782637964391313将会挪到current/subdir12/blk_3148782637964391313。对应的meta文件也在目录current/subdir12下。public void unfinalizeBlock(Block b) throws IOException;
取消通过writeToBlock打开的block,与finalizeBlock方法作用相反。简单方法。public boolean isValidBlock(Block b);
该Block是否有效。简单方法。public void invalidate(Block invalidBlks[]) throws IOException;
使block变为无效。简单方法。public void validateBlockMetadata(Block b) throws IOException;
检查block的有效性。简单方法。2.1.3DataNote上的动态行为通过上面的一系列介绍,我们知道了DataNode工作时的文件结构和文件结构在内存中的对应对象。下面我们可以来开始分析DataNode上的动态行为。首先我们来分析DataXceiverServer和DataXceiver。DataNode上数据块的接受/发送并没有采用我们前面介绍的RPC机制,原因很简单,RPC是一个命令式的接口,而DataNode处理数据部分,往往是一种流式机制。DataXceiverServer和DataXceiver就是这个机制的实现。其中,DataXceiver还依赖于两个辅助类:BlockSender和BlockReceiver。下面是类图5:                                                     图5 类图(为了简单起见,BlockSender和BlockReceiver的成员变量没有进入UML模型中)DataXceiverServer很简单,它打开一个端口,然后每接收到一个连接,就创建一个DataXceiver,服务于该连接,并记录该连接的socket,对应的实现在DataXceiverServer的run方法里。当系统关闭时,DataXceiverServer将关闭监听的socket和所有DataXceiver的socket,这样就导致了DataXceiver出错并结束线程。DataXceiver才是真正干活的地方,目前,DataXceiver支持的操作总共有六条,分别是:OP_WRITE_BLOCK (80):写数据块OP_READ_BLOCK (81):读数据块OP_READ_METADATA (82):读数据块元文件OP_REPLACE_BLOCK (83):替换一个数据块OP_COPY_BLOCK (84):拷贝一个数据块OP_BLOCK_CHECKSUM (85):读数据块检验码DataXceiver首先读取客户端的版本号并检验,然后再读取一个字节的操作码,并转入相关的子程序进行处理。我们先看一下读数据块的过程吧。首先看输入,下图6是读数据块时,客户端发送过来的信息:          图 6客户端发送来的信息包括了要读取的Block的ID,时间戳,开始偏移和读取的长度,最后是客户端的名字(貌似只是在写日志的时候用到了)。根据上面的信息,我们可以创建一个BlockSender,如果BlockSender没有出错,返回客户端一个正确指示后,否则,返回错误码。成功创建BlockSender以后,就可以开始通过BlockSender.sendBlock发送数据。下面我们就来分析BlockSender。BlockSender的构造函数看似很复杂,其实就是根据需求(特别是在处理checksum上,因为checksum是基于块的),打开相应的数据流。close()用于释放各种资源,如已经打开的数据流。sendBlock用于发送数据,数据发送包括应答头和后续的数据包。应答头如下(包含DataXceiver中发送的成功标识):
                  图7应答头信息然后后面的数据就组织成数据包来发送,包结构如下图8: 图8数据包结构各个字段含义:packetLen:包长度,包括包头
offset:偏移量
seqno:包序列号
tail:是否是最后一个包
len:数据长度
checksum:检验数据
data:数据块数据需要注意的,在写数据前,BlockSender会校验数据,保证数据包中的checksum和数据的一致性。同时,如果数据出错,将会有ChecksumException抛出。数据传输结束的标志,是一个packetLen长度为0的包。客户端可以返回一个两字节的应答OP_STATUS_CHECKSUM_OK(5)继续DataXceiver分析,下一块硬骨头是写数据块。HDFS的写数据操作,比读数据复杂N多倍。读数据的时候,只需要在多个数据块文件的选一个读,就可以了;但是,写数据需要同时写到多个数据块文件上,这就比较复杂了。HDFS实现了了Google写文件时的机制,如下图9:                      图9 写数据的流程数据流从客户端开始,流经一系列的节点,到达最后一个DataNode。图中的所有DataNode只需要写一次硬盘,DataNode1和DataNode2会将从socket上接受到的数据,直接写到到下个节点的socket上。我们来看一下写数据块的请求图10。                  图10写数据块的请求首先是客户端的版本号和一个字节的操作码,接下来是我们熟悉的blockId和generationStamp。参数pipelineSize是整个数据流链的长度,以上面为例,pipelineSize=3。isRecovery指示这次写是否是一次恢复操作,还记得我们在讨论FSDataset.writeToBlock时的那个参数吗?isRecovery来自客户端。client是客户端的名字,就是发起请求的节点名,需要特别注意的是,如果是从NameNode来的复制请求,client为空。hasSrcDataNode是一个标志位,如果被设置,表明源节点是个DataNode,接下来读取的数据就是DataNode的信息。numTargets是目标节点的数目,包括当前节点,以上面的图为例,DataNode1上这个参数值为3,到了DataNode3,就只有1了。targets包含了目标节点的相关信息,根据这些信息,就可以创建到它们上面的socket连接。targets后跟着的是校验头。writeBlock最开始是处理上面提到的消息包,然后创建一个BlockReceiver。接下来就是创建一堆用于读写的流,如下图(图中除了in外,都是在writeBlock中创建,这个图还不涉及在BlockReceiver对本地文件读写的流): 在进行实际的数据写之前,上面的这些流会被建立起来(也就是说,DataNode1到DataNode3都可写以后,才开始处理写数据)。如果其中某一个点出错了,那么,出错的节点名会通过mirrorIn发送回来,一直沿着这条链,传播到客户端。如果一切正常,那么,BlockReceiver.receiveBlock就开始干活了。BlockReceiver的构造函数会创建写数据块和校验数据的输出流。剩下的就交给receiveBlock这个大家伙了。首先receiveBlock会再启动一个线程(一般来说,BlockReceiver就跑在它自己的线程上),用于处理应答(内部类PacketResponder定义了该线程),然后就不断调用receivePacket读数据。数据是以分块的形式传送,格式和读Block的时候是一样的。如下图(很奇怪,为啥不抽象为类): 注意:如果当前DataNode处于数据流的中间,该数据包会发送到下一个节点。接下来的处理,就是处理数据和校验,并分别写到数据块文件和数据块元数据文件。如果出错,抛出的异常会导致receiveBlock关闭相关的输出流,并终止传输。注意,数据校验出错还会上报到NameNode上。PacketResponder用于处理应答。也就是上面讲的mirrorIn和replyOut。PacketResponder里有一个队列ackQueue,receivePacket每收到一个包,都会往队列里添加一项。PacketResponder的run方法,根据工作的DataNode所处的位置,行为不一样。最后一个DataNode由于没有后续节点,PacketResponder的ackQueue每收到一项,表明对应的数据块已经处理完毕,那么就可以发送成功应答。如果该应答是最后一个包的,PacketResponder会关闭相关的输出流,并提交(前面讲FSDataset时后我们讨论过的finalizeBlock方法)。如果DataNode有后续节点,那么,它必须等到后续节点的成功应答,才可以发送应答到它前面的节点。PacketResponder的run方法还引入了心跳机制,用于检测连接是否还存在。注意:所有改变DataNode的操作,需要把信息更新到NameNode上,这是通过DataNode.notifyNamenodeReceivedBlock方法,然后通过DataNode统一发送到NameNode上DataXceiver支持的的6条操作,我们已经分析完最重要的两条。剩下的分别是:OP_READ_METADATA (82):读数据块元文件OP_REPLACE_BLOCK (83):替换一个数据块OP_COPY_BLOCK (84):拷贝一个数据块OP_BLOCK_CHECKSUM (85):读数据块检验码我们逐个讨论。读数据块元文件的请求如图(操作码82): 应答很简单,应答码(如OP_STATUS_SUCCESS),文件长度(int),数据。拷贝数据块和替换数据块是一对相对应操作。替换数据块的请求如图(操作码83)。这个比起上面的读数据块元文件请求,有点复杂。替换一个数据块是系统平衡操作的一部分,用于接收一个数据块。它和普通的数据块写的差别是,它只发生在两个节点上,一个写,一个读,而不需要建立数据链。我们可以比较一下它们在创建BlockReceiver对象时的差别:Java代码 blockReceiver = new BlockReceiver(block, proxyReply,         proxySock.getRemoteSocketAddress().toString(),       proxySock.getLocalSocketAddress().toString(),       false, "", null, datanode);//OP_REPLACE_BLOCK   blockReceiver = new BlockReceiver(block, in,          s.getRemoteSocketAddress().toString(),         s.getLocalSocketAddress().toString(),         isRecovery, client, srcDataNode, datanode);//OP_WRITE_BLOCKblockReceiver = new BlockReceiver(block, proxyReply,          proxySock.getRemoteSocketAddress().toString(),          proxySock.getLocalSocketAddress().toString(),          false, "", null, datanode);//OP_REPLACE_BLOCKblockReceiver = new BlockReceiver(block, in,           s.getRemoteSocketAddress().toString(),          s.getLocalSocketAddress().toString(),          isRecovery, client, srcDataNode, datanode);//OP_WRITE_BLOCK首先,proxyReply和in不一样,这是因为发起请求的节点和提供数据的节点并不是同一个。写数据块发起请求方也提供数据,替换数据块请求方不提供数据,而是提供了一个数据源(proxySource参数),由replaceBlock发起一个拷贝数据块的请求,建立数据源。对于拷贝数据块操作,isRecovery=false,client=””, srcDataNode=null。注意,我们在分析BlockReceiver是,讨论过client=””的情况,就是应用于这种场景。在创建BlockReceiver对象前,需要利用下面介绍的拷贝数据块的请求建立到数据源的socket连接并发送拷贝数据块请求。然后通过BlockReceiver.receiveBlock接收数据。任务成功后将结果通知notifyNamenodeReceivedBlock。拷贝数据块的请求如图(操作码84)。和读数据块操作请求类似,但是读取的是整个数据块,所以少了很多参数。读数据块检验码的请求如图(操作码85)。它能够读取某个数据块的检验和的MD5结果,实现的方法很简单。2.2NameDote 相比于DataNode,NameNode比较复杂。系统中只有一个NameNode,作为系统文件目录的管理者和“inode表”(熟悉UNIX的同学们应该了解inode)。为了高可用性,系统中还存在着从NameNode。先前我们分析DataNode的时候,关注的是数据块。NameNode作为HDFS中文件目录和文件分配的管理者,它保存的最重要信息,就是下面两个映射:文件名à数据块数据块àDataNode列表其中,文件名à数据块保存在磁盘上(持久化);但NameNode上不保存数据块àDataNode列表,该列表是通过DataNode上报建立起来的。下图包含了NameNode和DataNode往外暴露的接口,其中,DataNode实现了InterDatanodeProtocol和ClientDatanodeProtocol,剩下的,由NameNode实现。 ClientProtocol提供给客户端,用于访问NameNode。它包含了文件角度上的HDFS功能。和GFS一样,HDFS不提供POSIX形式的接口,而是使用了一个私有接口。一般来说,程序员通过org.apache.hadoop.fs.FileSystem来和HDFS打交道,不需要直接使用该接口。DatanodeProtocol:用于DataNode向NameNode通信,我们已经在DataNode的分析过程中,了解部分接口,包括:register,用于DataNode注册;sendHeartbeat/blockReport/blockReceived,用于DataNode的offerService方法中;errorReport我们没有讨论,它用于向NameNode报告一个错误的Block,用于BlockReceiver和DataBlockScanner;nextGenerationStamp和commitBlockSynchronization用于lease管理,我们在后面讨论到lease时,会统一说明。NamenodeProtocol用于从NameNode到NameNode的通信。下图补充了接口里使用的数据的关系。
我们先分析INode*.java,类INode*抽象了文件层次结构。如果我们对文件系统进行面向对象的抽象,一定会得到和下面一样类似的结构图(类INode*): INode是一个抽象类,它的两个字类,分别对应着目录(INodeDirectory)和文件(INodeFile)。INodeDirectoryWithQuota,如它的名字隐含的,是带了容量限制的目录。INodeFileUnderConstruction,抽象了正在构造的文件,当我们需要在HDFS中创建文件的时候,由于创建过程比较长,目录系统会维护对应的信息。INode中的成员变量有:name,目录/文件名;modificationTime和accessTime是最后的修改时间和访问时间;parent指向了父目录;permission是访问权限。HDFS采用了和UNIX/Linux类似的访问控制机制。系统维护了一个类似于UNIX系统的组表(group)和用户表(user),并给每一个组和用户一个ID,permission在INode中是long型,它同时包含了组和用户信息。INode中存在大量的get和set方法,当然是对上面提到的属性的操作。导出属性,比较重要的有:collectSubtreeBlocksAndClear,用于收集这个INode所有后继中的Block;computeContentSummary用于递归计算INode包含的一些相关信息,如文件数,目录数,占用磁盘空间。INodeDirectory是HDFS管理的目录的抽象,它最重要的成员变量是:private List<INode> children;就是这个目录下的所有目录/文件集合。INodeDirectory也是有大量的get和set方法,都很简单。INodeDirectoryWithQuota进一步加强了INodeDirectory,限制了INodeDirectory可以使用的空间(包括NameSpace和磁盘空间)。INodeFile是HDFS中的文件,最重要的成员变量是:protected BlockInfo blocks[] = null;这是这个文件对应的Block列表,BlockInfo增强了Block类。INodeFileUnderConstruction保存了正在构造的文件的一些信息,包括clientName,这是目前拥有租约的节点名(创建文件时,只有一个节点拥有租约,其他节点配合这个节点工作)。clientMachine是构造该文件的客户端名称,如果构造请求由DataNode发起,clientNode会保持相应的信息,targets保存了配合构造文件的所有节点。上面描述了INode*类的关系。下面我们顺便考察一下一些NameNode上的数据类。BlocksMap保存了Block和它在NameNode上一些相关的信息。其核心是一个map:Map<Block, BlockInfo>。BlockInfo扩展了Block,保存了该Block归属的INodeFile和DatanodeDescriptor,同时还包括了它的前继和后继Block。有了BlocksMap,就可以通过Block找对应的文件和这个Block存放的DataNode的相关信息。接下来我们来分析类Datanode*。DatanodeInfo和DatanodeID都定义在包org.apache.hadoop.hdfs.protocol。DatanodeDescriptor是DatanodeInfo的子类,包含了NameNode需要的附加信息。DatanodeID只包含了一些配置信息,DatanodeInfo增加了一些动态信息,DatanodeDescriptor更进一步,包含了DataNode上一些Block的动态信息。DatanodeDescriptor包含了内部类BlockTargetPair,它保存Block和对应DatanodeDescriptor的关联,BlockQueue是BlockTargetPair队列。DatanodeDescriptor包含了两个BlockQueue,分别记录了该DataNode上正在复制(replicateBlocks)和Lease恢复(recoverBlocks)的Block。同时还有一个Block集合,保存的是该DataNode上已经失效的Block。DatanodeDescriptor提供一系列方法,用于操作上面保存的队列和集合。也提供get*Command方法,用于生成发送到DataNode的命令。当NameNode收到DataNode对现在管理的Block状态的汇报是,会调用reportDiff,找出和现在NameNode上的信息差别,以供后续处理用。readFieldsFromFSEditLog方法用于从日志中恢复DatanodeDescriptor。前面我们提过关系:文件名à数据块持久化在磁盘上,所有对目录树的更新和文件名à数据块关系的修改,都必须能够持久化。为了保证每次修改不需要从新保存整个结构,HDFS使用操作日志,保存更新。现在我们可以得到NameNode需要存储在Disk上的信息了,包括:$ ls -R name
name:
currentimagein_use.lock
name/current:
editsfsimagefstimeVERSION
name/image:
fsimagein_use.lock的功能和DataNode的一致。fsimage保存的是文件系统的目录树,edits则是文件树上的操作日志,fstime是上一次新打开一个操作日志的时间(long型)。image/fsimage是一个保护文件,防止0.13以前的版本启动(0.13以前版本将fsimage存放在name/image目录下,如果用0.13版本启动,显然在读fsimage会出错J)。我们可以开始讨论FSImage了,类FSImage如下图:
分析FSImage,不免要跟DataStorage去做比较(上图也保留了类DataStorage)。前面我们已经分析过DataStorage的状态变化,包括升级/回滚/提交,FSImage也有类似的升级/回滚/提交动作,而且这部分的行为和DataStorage是比较一致,如下状态转移图。图中update方法和DataStorage的差别比较大,是因为处理数据库和处理文件系统名字空间不一样,其他的地方都比较一致。FSImage也能够管理多个Storage,而且还能够区分Storage为IMAGE(目录结构)/EDITS(日志)/IMAGE_AND_EDITS(前面两种的组合)。 我们可以看到,FSImage和DataStorage都有recoverTransitionRead方法。FSImage的recoverTransitionRead方法主要步骤是检查系统一致性(analyzeStorage)并尝试恢复,初始化新的storage,然后根据启动NameNode的参数,做升级/回滚等操作。FSImage需要支持参数-importCheckpoint,该参数用于在某一个checkpoint目录里加载HDFS的目录信息,并更新到当前系统,该参数的主要功能在方法doImportCheckpoint中。该方法很简单,通过读取配置的checkpoint目录来加载fsimage文件和日志文件,然后利用saveFSImage(下面讨论)保存到当前的工作目录,完成导入。loadFSImage(File curFile)用于在fsimage中读入NameNode持久化的信息,是FSImage中最重要的方法之一,该文件的结构如下: 最开始是版本号(注意,各版本文件布局不一样,文中分析的样本是0.17的),然后是命名空间的ID号,文件个数和最高文件版本号(就是说,下一次产生文件版本号的初始值)。接下来就是文件的信息啦,首先是文件名,然后是该文件的副本数,接下来是修改时间/访问时间,数据块大小,数据块数目。数据块数目如果大于0,表明这是个文件,那么接下来就是numBlocks个数据块(浅蓝),如果数据块数目等于0,那该条目是目录,接下来是应用于该目录的quota。最后是访问控制的一些信息。文件信息一共有numFiles个,接下来是处于构造状态的文件的信息。(有些版本可能还会保留DataNode的信息,但0.17已经不保存这样的信息啦)。loadFSImage(File curFile)的对应方法是saveFSImage(File newFile),FSImage中还有一系列的方法(大概7,8个)用于配合这两个方法工作,我们就不再深入讨论了。loadFSEdits(StorageDirectory sd)用于加载日志文件,并把日志文件记录的内容应用到NameNode,loadFSEdits只是简单地调用FSEditLog中对应的方法。loadFSImage()和saveFSImage()是另外一对重要的方法。loadFSImage()会在所有的Storage中,读取最新的NameNode持久化信息,并应用相应的日志,当loadFSImage()调用返回以后,内存中的目录树就是最新的。loadFSImage()会返回一个标记,如果Storage中有任何和内存中最终目录树中不一致的Image(最常见的情况是日志文件不为空,那么,内存中的Image应该是Storage的Image加上日志,当然还有其它情况),那么,该标记为true。saveFSImage()的功能正好相反,它将内存中的目录树持久化,很自然,目录树持久化后就可以把日志清空。saveFSImage()会创建edits.new,并把当前内存中的目录树持久化到fsimage.ckpt(fsimage现在还存在),然后重新打开日志文件edits和edits.new,这会导致日志文件edits和edits.new被清空。最后,saveFSImage()调用rollFSImage()方法。rollFSImage()上来就把所有的edits.new都改为edits(经过了方法saveFSImage,它们都已经为空),然后再把fsimage.ckpt改为fsimage。如下图: 为了防止误调用rollFSImage(),系统引入了状态CheckpointStates.UPLOAD_DONE。有了上面的状态转移图,我们就很好理解方法recover Interrupted Checkpoint了。图中存在另一条路径,应用于GetImageServlet中。GetImageServlet是和从NameNode进行文件通信的接口,这个场景留到我们分析从NameNode时再进行分析。最后我们分析一下和检查点相关的一个类,rollFSImage()会返回这个类的一个实例。CheckpointSignature用于标识一个日志的检查点,它是StorageInfo的子类,同时实现了WritableComparable接口,出了StorageInfo的信息,它还包括了两个属性:editsTime和checkpointTime。editsTime是日志的最后修改时间,checkpointTime是日志建立时间。在和从NameNode节点的通信中,需要用CheckpointSignature,来保证从NameNode获得的日志是最新的。我们来分析FSEditLog.java,该类提供了NameNode操作日志和日志文件的相关方法,相关类图如下: 首先是FSEditLog依赖的输入/输出流。输入流基本上没有新添加功能;输出流在打开的时候,会写入日志的版本号(最前面的4字节),同时,每次将内存刷到硬盘时,会为日志尾部写入一个特殊的标识(OP_INVALID)。FSEditLog有打开/关闭的方法,它们都是很简单的方法,就是关闭的时候,要等待所有正在写日志的操作都完成写以后,才能关闭。processIOError用于处理IO出错,一般这会导致对于的Storage的日志文件被关闭(还记得loadFSImage要找出最后写的日志文件吧,这也是提高系统可靠性的一个方法),如果系统再也找不到可用的日志文件,NameNode将会退出。loadFSEdits是个大家伙,它读取日志文件,并把日志应用到内存中的目录结构中。这家伙大是因为它需要处理所有类型的日志记录,其实就一大case语句。logEdit的作用和loadFSEdits相反,它向日志文件中写入日志记录。我们来分析一下什么操作需要写log,还有就是需要log那些参数:logOpenFile(OP_ADD):申请leasepath(路径)/replication(副本数,文本形式)/modificationTime(修改时间,文本形式)/accessTime(访问时间,文本形式)/preferredBlockSize(块大小,文本形式)/BlockInfo[](增强的数据块信息,数组)/permissionStatus(访问控制信息)/clientName(客户名)/clientMachine(客户机器名)logCloseFile(OP_CLOSE):归还leasepath/replication/modificationTime/accessTime/preferredBlockSize/BlockInfo[]/permissionStatuslogMkDir(OP_MKDIR):创建目录path/modificationTime/accessTime/permissionStatuslogRename(OP_RENAME):改文件名src(原文件名)/dst(新文件名)/timestamp(时间戳)logSetReplication(OP_SET_REPLICATION):更改副本数src/replicationlogSetQuota(OP_SET_QUOTA):设置空间额度path/nsQuota(文件空间额度)/dsQuota(磁盘空间额度)logSetPermissions(OP_SET_PERMISSIONS):设置文件权限位src/permissionStatuslogSetOwner(OP_SET_OWNER):设置文件组和主src/username(所有者)/groupname(所在组)logDelete(OP_DELETE):删除文件src/timestamplogGenerationStamp(OP_SET_GENSTAMP):文件版本序列号genstamp(序列号)logTimes(OP_TIMES):更改文件更新/访问时间src/modificationTime/accessTime通过上面的分析,我们应该清楚日志文件里记录了那些信息。rollEditLog()我们在前面已经提到过(配合saveFSImage和rollFSImage),它用于关闭edits,打开日志到edits.new。purgeEditLog()的作用正好相反,它删除老的edits文件,然后把edits.new改名为edits。这也是Hadoop在做更新修改时经常采用的策略。我们开始对租约Lease进行分析,下面是类图。Lease可以认为是一个文件写锁,当客户端需要写文件的时候,它需要申请一个Lease,NameNode负责记录那个文件上有Lease,Lease的客户是谁,超时时间(分布式处理的一种常用技术)等,所有这些工作由下面3个类完成。至于租约过期NameNode需要采取什么动作,并不是这部分code要完成的功能。 LeaseManager(左)管理着系统中的所有Lease(右),同时,LeaseManager有一个线程Monitor,用于检查是否有Lease到期。一个租约由一个holder(客户端名),lastUpdate(上次更新时间)和paths(该客户端操作的文件集合)构成。了解了这些属性,相关的方法就很好理解了。LeaseManager的方法也就很好理解,就是对Lease进行操作。注意,LeaseManager的addLease并没有检查文件上是否已经有Lease,这个是由LeaseManager的调用者来保证的,这使LeaseManager跟简单。内部类Monitor通过对Lease的最后跟新时间来检测Lease是否过期,如果过期,简单调用FSNamesystem的internalReleaseLease方法。这部分的代码比我想象的简单,主要是大部分的一致性逻辑都存在于LeaseManager的使用者。在开始分析FSNamesystem.java这个4.5k多行的庞然大物之前,我们继续来扫除外围的障碍。下面是关于访问控制的一些类: Hadoop文件保护采用的UNIX的机制,文件用户分文件属主、文件组和其他用户,权限读,写和执行(FsAction中抽象了所有组合)。我们先分析包org.apache.hadoop.fs.permission的几个类吧。FsAction抽象了操作权限,FsPermission记录了某文件/路径的允许情况,分文件属主、文件组和其他用户,同时提供了一系列的转换方法,applyUMask用于去掉某些权限,如某些操作需要去掉文件的写权限,那么可以通过该方法,生成对应的去掉写权限的FsPermission对象。PermissionStatus用于描述一个文件的文件属主、文件组和它的FsPermission。INode在保存PermissionStatus时,用了不同的方法,它用一个long变量,和SerialNumberManager配合,保存了PermissionStatus的所有信息。SerialNumberManager保存了文件主和文件主号,用户组和用户组号的对应关系。注意,在持久化信息FSImage中,不保存文件主号和用户组号,它们只是SerialNumberManager分配的,只保存在内存的信息。通过SerialNumberManager得到某文件主的文件主号时,如果找不到文件主号,会往对应关系中添加一条记录。INode的long变量作为一个位串,分组保存了FsPermission(MODE),文件主号(USER)和用户组号(GROUP)。PermissionChecker用于权限检查。三、 结论Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。在Hadoop中实现了Google的MapReduce算法,它能够把应用程序分割成许多很小的工作单元,每个单元可以在任何集群节点上执行或重复执行。此外,Hadoop还提供一个分布式文件系统用来在各个计算节点上存储数据,并提供了对数据读写的高吞吐率。由于应用了map/reduce和分布式文件系统使得Hadoop框架具有高容错性,它会自动处理失败节点。已经在具有600个节点的集群测试过Hadoop框架

http://www.aboutyun.com/static/image/common/logo.png(http://www.aboutyun.com)
欢迎加入about云官方群39327136
页: [1]
查看完整版本: Hadoop源代码分析