hadoop2.X之HDFS集群管理:ReplicationMonitor
问题导读1、什么是ReplicationMonitor?
2、ReplicationMonitor在HDFS中的作用是什么?
3、在哪几种情况下会产生无效块?
static/image/hrline/4.gif
ReplicationMonitor在HDFS中的工作相当重要,首先不仅会负责为副本不足的数据块选择source 数据节点,选择冗余的target节点,等待DN节点下次心跳将这些工作带回给相应的DN执行块冗余操作,同时也会将各个数据节点上无效的数据块副本加入无效集合,等待下次心跳将这些工作带回给相应的DataNode执行删除无效块操作。
7.2.1.块冗余处理机制
在进行该部分分析前,首先分析下几个集合的作用,因为他们在这些工作中起到至关重要的作用。
名称作用
neededReplications类UnderReplicatedBlocks的一个实例,负责存储需要冗余副本的块集合,副本可能被冗余1份或多份;在数据块报告的时候,检查到块副本数量不够,就会将数据块加入该实例存储;在ReplicationMonitor中,将block及其target加入replicateBlocks队列后,将该块从neededReplications中删除;
pendingReplicationsPendingReplicationBlocks的一个实例,在ReplicationMonitor中,将block及其target加入replicateBlocks队列后,同时将该块加入pendingReplications维护的一个集合,之所以会引入一个待定状态,因为如果在超时时间(默认5min)范围内,块副本冗余失败,可以重试;当数据块冗余成功,DN节点将该块副本报告上来后,会将该块从pendingReplications维护的集合中删除;
replicateBlocks一个块队列,NN为每个DN节点维护了一个这样的集合,当计算确定数据块副本冗余到某几个DN节点后,会将block及其target加入该replicateBlocks维护的一个队列中,当source节点下次心跳后,会将这些信息返回给source节点,source节点就会将block副本冗余到相应的DN节点(可能几个);
timedOutItems数据块集合,在PendingReplicationMonitor线程内,会定时扫描pendingReplications维护的集合,检查到某块冗余时间已经超时,会将该数据块块加入timedOutItems集合;在ReplicationMonitor中,又会将timedOutItems中的数据块重新加入neededReplications;
priorityQueues优先级队列,为一个二维集合,在NN检查到块副本不足的情况下,根据副本数量的等情况,将块加入到相应优先级列表中;
priorityToReplIdx在优先级队列priorityQueues,每个优先级列表维护了一个索引(类似游标),在对priorityQueues每个集合遍历数据的时候,忽略掉priorityToReplIdx前面的元素,从游标处开始遍历提取块信息;
清楚了相关集合作用后,来分析下冗余副本不足的情况,我在此就不对源码的每行细节进行分析,仅对流程和重要部分进行说明。当DN节点报告块的时候,NN会判断块的冗余是否足够,如果不足可能有几种情况:副本可能1份或者2份,如果副本仅一份,则需要冗余的优先级就相对更高,则会将该块放入优先级更高的队列,副本已经有2份的块则会放入优先级更低点的队列,编号越低,一共分为5个优先级。
对于冗余工作,根据DN节点数量不同,每次处理的数据块也会不同,比如当前集群有10个DN节点,则默认情况下,ReplicationMonitor一个周期只会从neededReplications中取出20个数据块进行处理,之所以HDFS要这样设计,主要是考虑到如果一次冗余的数据块太多,势必会对DN节点的网络造成比较大的影响,因此根据不同的DN集群规模决定一次冗余的数据块数量。在具体冗余的过程中,首先按照优先级从优先级队列中拿一部分数据(如前面提及的20)块到replicateBlocks中,并且每次记录下读取的优先级队列的偏移量,下次从该偏移指针处真正读取块信息,如下图所示:红色为第一次读取的块,蓝色为第二次读取的块。
整个过程在方法chooseUnderReplicatedBlocks内实现。
遍历获取当一定量需要冗余的数据块后,开始执行方法computeReplicationWorkForBlocks,在该方法中,会对每个块做如下处理(忽略次要部分):首先会为数据块选择一个source节点,而HDFS是这样从块副本所在的datanode(s)中来选择一个source的:
1、 判断在某DataNode上副本是否损坏,如果损坏,遍历下个DataNode节点;
2、 判断在某DataNode对于的replicateBlocks中,是否已经有大于等于2个数据块了,如果是这样,遍历下一个DataNode节点,之所以这样实现,防止单DataNode流量过高;
3、 如果某DataNode节点已经退役,遍历下一个DataNode节点;
4、 如果某DataNode节点正在退役,将该DataNode作为source,而这也是HDFS设计的目的,这样的节点不会让出现因为读写数据块而网络拥塞,因此也不会让网络那么忙,当然即使是正在退役的节点,replicateBlocks中的块数量也不能超过2个(防止网络拥塞);如果有多个节点正在退役,会选择最后一个退役节点作为该块的source节点;
5、 如果没有正在退役的节点,随意选择一个DN节点作为source节点;
选择了source节点后,会选择冗余的target(s)节点,当然,目前存储了该块副本的DN节点不会再作为target节点,至于选择target节点的数目,根据需要再冗余的块副本数量而定,比如还需要冗余2个副本,则会选择2个target节点,选择的方式,依然采用机架感知相关算法,详情可以关注方法chooseTarget,不过该方法有多种实现。
确定块的target(s)后,将块及其target(s)加入replicateBlocks队列;
最后,从neededReplications中删除该数据块,并递减对于优先级集合的指针偏移量;同时将该块加入pendingReplications集合,至于为什么要这样做,可以看上面对该集合作用的解释。
整个过程可以精简为如下几步:
实际上整个过程,就是对几个集合的操作,下面是几个集合间的操作逻辑:
看了上面2个图,应该基本清楚了块冗余的流程了。接下来看下代码,其调用顺序为:ReplicationMonitor.run()->BlockManager.computeDatanodeWork()->BlockManager. computeReplicationWork()->UnderReplicatedBlocks. chooseUnderReplicatedBlocks()。该部分主要在方法chooseUnderReplicatedBlocks中实现。具体的代码如下:
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
if (size() == 0) { // There are no blocks to collect.
return blocksToReplicate;
}
int blockCount = 0;
//从低优先级向高优先级扫描
for (int priority = 0; priority < LEVEL; priority++) {
// Go through检查 all blocks that need replications with current priority.
//给当前优先级的块加便利器
BlockIterator neededReplicationsIterator = iterator(priority);
//通过优先级获取该优先级列表下目前的索引指针,以便遍历的时候从该位置开始遍历数据块
Integer replIndex = priorityToReplIdx.get(priority);
// skip to the first unprocessed block, which is at replIndex??
//忽略过在之前已经扫描过的数据块
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
//size():所有UC的数据块量
blocksToProcess = Math.min(blocksToProcess, size());
if (blockCount == blocksToProcess) {
break;// break if already expected blocks are obtained
}
// Loop through all remaining blocks in the list.
//获得该优先级内需要冗余的所有的块
while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
//如果该优先级没有下一个块且优先级已经最高了
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
//重置所有优先级的副本index到0,因为最近没有在块中添加列表
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
}
break;
}
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}
最后会将冗余超时的块重新加入需要冗余的集合中,如上图。
7.2.2.块无效处理机制
HDFS引入无效块的概念,从名字可以看出,这种类型数据块对于集群没有作用,反而如果不删除,会占用磁盘空间。在几种情况下会产生无效块:
1、 执行删除文件的时候,会首先快速地删除元数据,文件所对应的数据块就变得无效;
2、 Client上传数据块到DataNode的时候,可能由于网络原因等,数据块上传一部分后失败,这些块成了无效块;
3、 在HDFS做rebalance操作的时候,会将负载较重的DataNode上部分块文件复制到其他负载较轻的DataNode上,数据块复制成功后,负载较轻的DataNode上的该数据块就成了无效块;
接下来看下无效块的处理,在每个循环处理周期,处理的无效块数量也是有限制的,举例来说吧,如果HDFS集群中有10个DataNode节点,则需要处理的节点数目为4(默认情况下),而每个DataNode节点最多删除的无效块数量为1000(默认),这样就是4000。为什么相对冗余机制来说,无效块处理过程中每次处理的块更多呢?这是应为冗余块会涉及到DataNode节点之间的数据迁移,如果每次处理块太多,会影响集群网络,进而影响用户的读写性能;无效块处理就不一样了,它仅是将这些块从DataNode节点上删除。由于过程相对比较简单,可以直接看代码。其代码调用流程为:ReplicationMonitor.run()->BlockManager. computeDatanodeWork()->BlockManager.computeInvalidateWork()->BlockManager .invalidateWorkForOneNode()->InvalidateBlocks .invalidateWork()->InvalidateBlocks.invalidateWork()。在该流程中,也并不是无效块集合中有多少块,NameNode就一次性让DataNode把相应的无效块删除完,而是每次均删除一部分,这样的话不会造成DataNode因为删除块而带宽,CPU资源等占用过大。在computeInvalidateWork方法内实现如下:
int computeInvalidateWork(int nodesToProcess) {
final List<String> nodes = invalidateBlocks.getStorageIDs();
Collections.shuffle(nodes);
nodesToProcess = Math.min(nodes.size(), nodesToProcess);
int blockCnt = 0;
for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
//对某个节点,获取到本次应该删除该DataNode节点上无效块的数目
blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
}
return blockCnt;
}
在整个调用过程中,invalidateWork才是整个过程实现的核心
private synchronized List<Block> invalidateWork(
final String storageId, final DatanodeDescriptor dn) {
final LightWeightHashSet<Block> set = node2blocks.get(storageId);
if (set == null) {
return null;
}
// # blocks that can be sent in one message is limited
final int limit = datanodeManager.blockInvalidateLimit;
//获取某个节点上一定量的无效块(即可能一次仅删除部分无效块)
final List<Block> toInvalidate = set.pollN(limit);
// If we send everything in this message, remove this node entry
if (set.isEmpty()) {
remove(storageId);
}
dn.addBlocksToBeInvalidated(toInvalidate);
numBlocks -= toInvalidate.size();
return toInvalidate;
}
页:
[1]