以下是本人研究源代码成果, 此文僅献给我和我的小伙伴们,不足之处,欢迎斧正-------------------------------------------------致谢道格等人!
注:hadoop版本0.20.2,有童鞋表示看代码头晕,所以本文采用纯文字描述,哥还特意为你们把字体调调颜色噢 ^ o ^
大家都知道,hadoop是最优秀的大数据处理框架之一,而本文研究的DFSClient是 hadoop内部实现中,较为核心的部分
DFSClient,顾名思义,分布式文件系统的客户端
hadoop的客户端包括:shell命令,java接口,pig 等。。
DFSClient在分布式文件系统中扮演的角色:
现实场景如下:我的数据文件分散存储在集群中的很多台机器上,现在我需要获取它,以使用shell命令操作为例
我输入获取数据的指令后,DFSClient通过RPC机制和NameNode(集群中的主机器)通信并获得文件的元数据信息(数据的存放位置,校验和,等等一些关键的信息),然后DFSClient再与DataNode通信(集群中的其它机器)通过数据I/O流来获取到我要的文件信息,
它在分布式文件系统内部实现中起到双方通信的作用,是用户与文件系统通信的桥梁,由此可见,掌握它,驾驭它,显得尤为迫切!
首先一起讨论下关于DFSClient的体系结构:
它位于org.apache.hadoop.hdfs包下:体系图如下
DFSClient
|-------LeaseChecker implements Runnable
|-------DNAddrPair
|-------BlockReader extends FSInputChecker ( FSInputChecker extends FSInputStream )
|-------DFSInputStream
|-------DFSDataInputStream extends DataInputStream implements Seekable(支持流中随机存储), PositionedReadable(定位读取)
|-------DFSOutputStream extends FSOutputSummer implements Syncable (FSOutputSummer extends OutputStream)
|--------Packet
|--------DataStreamer extends Daemon (extends Thread)
|--------ResponseProcessor extends Thread
LeaseChecker介绍:实现了Runnable接口
在HDFS中可能有多个客户端在同一时刻进行文件的写入操作,有时会出现多个客户端并发的写入一个文件的情况,所以采取一些措施来控制并发写入情况的发送,一般情况下会采用互斥锁的方法来进行控制,使得每一时刻只有一个获得锁的客户端才能执行,写入操作。但是互斥锁的机制在分布式系统中会有很多问题
问题一:每次执行写入时,客户端都需要向NameNode申请互斥锁,从而造成网络开销的增大
问题二:当某个客户端获得锁之后和NameNode失去了联系,此时会造成互斥锁无法释放,使得其他的客户端的操作会被终止
解决方案:HDFS使用Lease租约来解决互斥锁的问题
过程:当DFSClient需要对一个文件执行写入操作时,他首先需要向NameNode申请一个租约(有时间限制),在时间期限内客户端可以 对租约所管理的文件执行写入。一个文件只能被一个租约锁管理,所以只能有一个客户端对文件执行写入操作,在租约的有效时间 内,DFSClient客户端会一直持有写文件的权限,而不需要再向NameNode询问是否有写文件的权限。当客户端一直工作时,它会在 租约过期后向NameNode申请续约,入股在租约的有效期间内,客户端发生了异常,和NameNode失去了联系,当租约期满后, NameNode会发现发生异常的客户端,此时NameNode会将新的租约赋给其它正常的客户端,当发生异常的客户端已经写入了一部 分数据时,HDFS为了分辨出这些无用的数据,会在客户端每次写入数据时增加版本号信息,异常的客户端的写入的数据的版本号 会很低,从而可以被安全删除掉。
LeaseChecker作用: 在DFSClient中有个LeaseChecker线程,该线程会周期性的检查租约是否过期,在租约快过期的时候会对租约进行续约,此外,在namenode包中有个LeaseManager租约管理器,该管理器会不断的检查它所管理的lease是否过期,如果lease已经过期,会将其删除
DNAddrPair介绍:封装了定位到的DataNode信息和DataNode所对应的IP信息
FSInputChecker 介绍:抽象类FSInputChecker继承自FSInputStream,加入了HDFS所需要的校验功能,hadoop会生成与原生文件所对应的校验和文件,并在读写文件的时候对文件进行校验,以确保数据的准确性
BlockReader介绍:BlockReader 继承自 FSInputChecker 继承自 FSInputStream,校验功能是在readChecksumChunk方法中实现,而readChecksumChunk私有方法是被read1私有方法内部调用,而且所有的read方法的都是通过间接地调用read1方法来实现对数据进行读取并做校验和验证的
DFSInputStream介绍:继承自FSInputStream,该类会创建到DataNode的Socket连接,然后使用Socket来读取DataNode上的数据信息
DFSDataInputStream介绍:继承自DataInputStream,DFSDataInputStream的功能都依靠包装的DFSInputStream来完成
DFSOutputStream介绍:继承自DFSOutputStream
Packet介绍:数据包,DFSOutputStream的内部类,DFSClient是通过一个个Packet来向DataNode写入数据的,一个Packet由多个数据chunk组成,每个chunk对应着一个校验和,当写入足够的chunk之后,packet会被添加到dataQueue中
DataStreamer 介绍: DataStreamer是真正写入数据的进程,在发送Packet之前,它会首先从Namenode中获得一个blockid和Block的位置信息,然后它会循环地从dataQueue中取得一个Packet,然后将该Packet真正写入到与DataNode所建立的socket中, 当将属于一个Block的所有Packet都发送给DataNode,并且返回了与每个Packet所对应的响应信息之后,DataStream 会关闭当前的数据Block
ResponseProcessor 介绍:响应处理器ResponseProcessor
至此,DFSClient讨论完毕
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
DFSClient构造器群:
public DFSClient(Configuration conf) throws IOException {
this(NameNode.getAddress(conf), conf);
}
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf ) throws IOException {
this(nameNodeAddr, conf, null);
}
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}
DFSClient为用户提供了简单,一致的标准访问接口下面,但其内部实现较为复杂,本人陪同大家一起一起去探索这个神奇的国度
我们需要构建一个DFSClient对象:
DFSClient提供了4种形式的构造器,构造方法的主要任务有两个:
a,读入配置项并初始化一些成员变量
b,建立和名字节点的IPC连接
详细过程分析:a过程被初始化对象如下:
1,配置对象configuration,
2,收集文件系统统计信息的对象,
3,socket连接的过期时间,
4,写入的数据包的大小
5,通过socket向dataNode写入数据的超期时间
6,创建socket连接的工厂类
7, 用户组信息
8, 最大块获取失败次数
9,客户端的名称(如果该任务是Map-reduces任务,则使用任务ID作为客户端名称)
9, 默认的块大小(64M),默认的块副本数
构造器代码如下:
/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
this.conf = conf;
this.stats = stats;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
try {
this.ugi = UnixUserGroupInformation.login(conf, true);
} catch (LoginException e) {
throw (IOException)(new IOException().initCause(e));
}
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
//如果是MapReduce任务,则客户端名称为任务Id号,否则取随机号
this.clientName = "DFSClient_" + taskId;
} else {
this.clientName = "DFSClient_" + r.nextInt();
}
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
//非常关键的一步
通过RetryProxy的create方法来创建NameNode的RPC客户端ClientProtocol
this.namenode = createNamenode(this.rpcNamenode);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
}
}
b过程 :
通过调用私有方法createNamenode建立与名字节点的连接,方法内部通过RetryProxy的create方法来创建NameNode的RPC客户端ClientProtocol
至此,DFSClient初始化完成
注:对于文件系统,本文的讨论中一直区分两种情况,namenode的远程方法不在本文讨论范围内
a,文件和目录相关事务(都使用远程接口客户端namenode,调用其同名远程方法)
b,数据块读写
|