本帖最后由 pig2 于 2014-1-16 00:37 编辑
介绍完org.apache.hadoop.io以后,我们开始来分析org.apache.hadoop.rpc。RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论HDFS的,通信可能发生在:
- Client-NameNode之间,其中NameNode是服务器
- Client-DataNode之间,其中DataNode是服务器
- DataNode-NameNode之间,其中NameNode是服务器
- DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端
如果我们考虑Hadoop的Map/Reduce以后,这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信,Hadoop引入了一个RPC框架。该RPC框架利用的Java的反射能力,避免了某些RPC解决方案中需要根据某种接口语言(如CORBA的IDL)生成存根和框架的问题。但是,该RPC框架要求调用的参数和返回结果必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出IOException异常。
既然是RPC,当然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。但是类Server是一个抽象类,类RPC封装了Server,利用反射,把某个对象的方法开放出来,变成RPC中的服务器。
下图是org.apache.hadoop.rpc的类图。
既然是RPC,自然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。在这里我们来仔细考察org.apache.hadoop.rpc.Client。下面的图包含了org.apache.hadoop.rpc.Client中的关键类和关键方法。
由于Client可能和多个Server通信,典型的一次HDFS读,需要和NameNode打交道,也需要和某个/某些DataNode通信。这就意味着某一个Client需要维护多个连接。同时,为了减少不必要的连接,现在Client的做法是拿ConnectionId(图中最右侧)来做为Connection的ID。ConnectionId包括一个InetSocketAddress(IP地址+端口号或主机名+端口号)对象和一个用户信息对象。这就是说,同一个用户到同一个InetSocketAddress的通信将共享同一个连接。
连接被封装在类Client.Connection中,所有的RPC调用,都是通过Connection,进行通信。一个RPC调用,自然有输入参数,输出参数和可能的异常,同时,为了区分在同一个Connection上的不同调用,每个调用都有唯一的id。调用是否结束也需要一个标记,所有的这些都体现在对象Client.Call中。Connection对象通过一个Hash表,维护在这个连接上的所有Call:Java代码
- private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
复制代码
一个RPC调用通过addCall,把请求加到Connection里。为了能够在这个框架上传输Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组,我们一般把Call需要的参数打包成为ObjectWritable对象。Client.Connection会通过socket连接服务器,连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法),校验成功后就可以通过Writable对象来进行请求的发送/应答了。
注意,每个Client.Connection会起一个线程,不断去读取socket,并将收到的结果解包,找出对应的Call,设置Call并通知结果已经获取。Call使用Obejct的wait和notify,把RPC上的异步消息交互转成同步调用。还有一点需要注意,一个Client会有多个Client.Connection,这是一个很自然的结果。聊完了Client聊Server,按惯例,先把类图贴出来。
需要注意的是,这里的Server类是个抽象类,唯一抽象的地方,就是Java代码- public abstract Writable call(Writable param, long receiveTime) throws IOException;
复制代码
这表明,Server提供了一个架子,Server的具体功能,需要具体类来完成。而具体类,当然就是实现call方法。我们先来分析Server.Call,和Client.Call类似,Server.Call包含了一次请求,其中,id和param的含义和Client.Call是一致的。不同点在后面三个属性,connection是该Call来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection,发送给客户端。属性timestamp是请求到达的时间戳,如果请求很长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的response是请求处理的结果,可能是一个Writable的串行化结果,也可能一个异常的串行化结果。
Server.Connection维护了一个来之客户端的socket连接。它处理版本校验,读取请求并把请求发送到请求处理线程,接收处理结果并把结果发送给客户端。
Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取socket上的数据。在Server中,只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是上面图里的Listener。
请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。
对于处理完的请求,需要将结果写回去,同样,利用NIO,只需要一个线程,相关的逻辑在Responder里。
-------------------------------------------------------------------------------------------------------------------------------------------------
(注:本节需要用到一些Java反射的背景)有了Client和Server,很自然就能RPC啦。下面轮到RPC.java啦。一般来说,分布式对象一般都会要求根据接口生成存根和框架。如CORBA,可以通过IDL,生成存根和框架。但是,在org.apache.hadoop.rpc,我们就不需要这样的步骤了。上类图。
为了分析Invoker,我们需要介绍一些Java反射实现Dynamic Proxy的背景。
Dynamic Proxy是由两个class实现的:java.lang.reflect.Proxy 和java.lang.reflect.InvocationHandler,后者是一个接口。所谓DynamicProxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就宣称它实现了这些interface。
这个Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个handler,在Hadoop的RPC中,就是Invoker对象。
我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的InvocationHandler实现中。
在Hadoop的RPC中,Invoker实现了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。Invoker会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的Client,通过socket传递到服务器端。就是说,你在proxy类上的任何调用,都通过Client发送到远方的服务器上。
Invoker使用Invocation。Invocation封装了一个远程调用的所有相关信息,它的主要属性有:methodName,调用方法名,parameterClasses,调用方法参数的类型列表和parameters,调用方法参数。注意,它实现了Writable接口,可以串行化。
RPC.Server实现了org.apache.hadoop.ipc.Server,你可以把一个对象,通过RPC,升级成为一个服务器。服务器接收到的请求(通过Invocation),解串行化以后,就变成了方法名,方法参数列表和参数列表。利用Java反射,我们就可以调用对应的对象的方法。调用的结果再通过socket,返回给客户端,客户端把结果解包后,就可以返回给Dynamic Proxy的使用者了。
------------------------------------------------------------------------------------------------------------------------------------------------
一个典型的HDFS系统包括一个NameNode和多个DataNode。NameNode维护名字空间;而DataNode存储数据块。
DataNode负责存储数据,一个数据块在多个DataNode中有备份;而一个DataNode对于一个块最多只包含一个备份。所以我们可以简单地认为DataNode上存了数据块ID和数据块内容,以及他们的映射关系。
一个HDFS集群可能包含上千DataNode节点,这些DataNode定时和NameNode通信,接受NameNode的指令。为了减轻NameNode的负担,NameNode上并不永久保存那个DataNode上有那些数据块的信息,而是通过DataNode启动时的上报,来更新NameNode上的映射表。
DataNode和NameNode建立连接以后,就会不断地和NameNode保持心跳。心跳的返回其还也包含了NameNode对DataNode的一些命令,如删除数据库或者是把数据块复制到另一个DataNode。应该注意的是:NameNode不会发起到DataNode的请求,在这个通信过程中,它们是严格的客户端/服务器架构。
DataNode当然也作为服务器接受来自客户端的访问,处理数据块读/写请求。DataNode之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作的时候,DataNode需要相互配合,保证写操作的一致性。
下面我们就来具体分析一下DataNode的实现。DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。我们先来看本地数据块管理部分。
安装Hadoop的时候,我们会指定对应的数据块存放目录,当我们检查数据块存放目录目录时,我们回发现下面有个叫dfs的目录,所有的数据就存放在dfs/data里面。
其中有两个文件,storage里存的东西是一些出错信息,貌似是版本不对…云云。in_use.lock是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它上面的一个锁。
接下来是3个目录,current存的是当前有效的数据块,detach存的是快照(snapshot,目前没有实现),tmp保存的是一些操作需要的临时数据块。
但我们进入current目录以后,就会发现有一系列的数据块文件和数据块元数据文件。同时还有一些子目录,它们的名字是subdir0到subdir63,子目录下也有数据块文件和数据块元数据。这是因为HDFS限定了每个目录存放数据块文件的数量,多了以后会创建子目录来保存。
数据块文件显然保存了HDFS中的数据,数据块最大可以到64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:
blk_3148782637964391313
blk_3148782637964391313_242812.meta
上面的例子中,3148782637964391313是数据块的ID号,242812是数据块的版本号,用于一致性检查。
在current目录下还有下面几个文件:VERSION,保存了一些文件系统的元信息。
dncp_block_verification.log.curr和dncp_block_verification.log.prev,它记录了一些DataNode对文件系定时统做一致性检查需要的信息。
下一篇
|