分享

hadoop2.X之HDFS集群管理:ReplicationMonitor

问题导读
1、什么是ReplicationMonitor?
2、ReplicationMonitor在HDFS中的作用是什么?
3、在哪几种情况下会产生无效块?





ReplicationMonitor在HDFS中的工作相当重要,首先不仅会负责为副本不足的数据块选择source 数据节点,选择冗余的target节点,等待DN节点下次心跳将这些工作带回给相应的DN执行块冗余操作,同时也会将各个数据节点上无效的数据块副本加入无效集合,等待下次心跳将这些工作带回给相应的DataNode执行删除无效块操作。

7.2.1.块冗余处理机制

在进行该部分分析前,首先分析下几个集合的作用,因为他们在这些工作中起到至关重要的作用。
  
名称
  
  
作用
  
  
neededReplications
  
  
UnderReplicatedBlocks的一个实例,负责存储需要冗余副本的块集合,副本可能被冗余1份或多份;在数据块报告的时候,检查到块副本数量不够,就会将数据块加入该实例存储;在ReplicationMonitor中,将block及其target加入replicateBlocks队列后,将该块从neededReplications中删除;
  
  
pendingReplications
  
  
PendingReplicationBlocks的一个实例,在ReplicationMonitor中,将block及其target加入replicateBlocks队列后,同时将该块加入pendingReplications维护的一个集合,之所以会引入一个待定状态,因为如果在超时时间(默认5min)范围内,块副本冗余失败,可以重试;当数据块冗余成功,DN节点将该块副本报告上来后,会将该块从pendingReplications维护的集合中删除;
  
  
replicateBlocks
  
  
一个块队列,NN为每个DN节点维护了一个这样的集合,当计算确定数据块副本冗余到某几个DN节点后,会将block及其target加入该replicateBlocks维护的一个队列中,当source节点下次心跳后,会将这些信息返回给source节点,source节点就会将block副本冗余到相应的DN节点(可能几个);
  
  
timedOutItems
  
  
数据块集合,在PendingReplicationMonitor线程内,会定时扫描pendingReplications维护的集合,检查到某块冗余时间已经超时,会将该数据块块加入timedOutItems集合;在ReplicationMonitor中,又会将timedOutItems中的数据块重新加入neededReplications
  
  
priorityQueues
  
  
优先级队列,为一个二维集合,在NN检查到块副本不足的情况下,根据副本数量的等情况,将块加入到相应优先级列表中;
  
  
priorityToReplIdx
  
  
在优先级队列priorityQueues,每个优先级列表维护了一个索引(类似游标),在对priorityQueues每个集合遍历数据的时候,忽略掉priorityToReplIdx前面的元素,从游标处开始遍历提取块信息;
  

清楚了相关集合作用后,来分析下冗余副本不足的情况,我在此就不对源码的每行细节进行分析,仅对流程和重要部分进行说明。当DN节点报告块的时候,NN会判断块的冗余是否足够,如果不足可能有几种情况:副本可能1份或者2份,如果副本仅一份,则需要冗余的优先级就相对更高,则会将该块放入优先级更高的队列,副本已经有2份的块则会放入优先级更低点的队列,编号越低,一共分为5个优先级。

对于冗余工作,根据DN节点数量不同,每次处理的数据块也会不同,比如当前集群有10个DN节点,则默认情况下,ReplicationMonitor一个周期只会从neededReplications中取出20个数据块进行处理,之所以HDFS要这样设计,主要是考虑到如果一次冗余的数据块太多,势必会对DN节点的网络造成比较大的影响,因此根据不同的DN集群规模决定一次冗余的数据块数量。在具体冗余的过程中,首先按照优先级从优先级队列中拿一部分数据(如前面提及的20)块到replicateBlocks中,并且每次记录下读取的优先级队列的偏移量,下次从该偏移指针处真正读取块信息,如下图所示:红色为第一次读取的块,蓝色为第二次读取的块。
1.jpg



整个过程在方法chooseUnderReplicatedBlocks内实现。

遍历获取当一定量需要冗余的数据块后,开始执行方法computeReplicationWorkForBlocks,在该方法中,会对每个块做如下处理(忽略次要部分):首先会为数据块选择一个source节点,而HDFS是这样从块副本所在的datanode(s)中来选择一个source的:

1、 判断在某DataNode上副本是否损坏,如果损坏,遍历下个DataNode节点;

2、 判断在某DataNode对于的replicateBlocks中,是否已经有大于等于2个数据块了,如果是这样,遍历下一个DataNode节点,之所以这样实现,防止单DataNode流量过高;

3、 如果某DataNode节点已经退役,遍历下一个DataNode节点;

4、 如果某DataNode节点正在退役,将该DataNode作为source,而这也是HDFS设计的目的,这样的节点不会让出现因为读写数据块而网络拥塞,因此也不会让网络那么忙,当然即使是正在退役的节点,replicateBlocks中的块数量也不能超过2个(防止网络拥塞);如果有多个节点正在退役,会选择最后一个退役节点作为该块的source节点;

5、 如果没有正在退役的节点,随意选择一个DN节点作为source节点;

选择了source节点后,会选择冗余的target(s)节点,当然,目前存储了该块副本的DN节点不会再作为target节点,至于选择target节点的数目,根据需要再冗余的块副本数量而定,比如还需要冗余2个副本,则会选择2个target节点,选择的方式,依然采用机架感知相关算法,详情可以关注方法chooseTarget,不过该方法有多种实现。

确定块的target(s)后,将块及其target(s)加入replicateBlocks队列;

最后,从neededReplications中删除该数据块,并递减对于优先级集合的指针偏移量;同时将该块加入pendingReplications集合,至于为什么要这样做,可以看上面对该集合作用的解释。

整个过程可以精简为如下几步:
1.jpg



实际上整个过程,就是对几个集合的操作,下面是几个集合间的操作逻辑:
1.jpg



看了上面2个图,应该基本清楚了块冗余的流程了。接下来看下代码,其调用顺序为:ReplicationMonitor.run()->BlockManager.computeDatanodeWork()->BlockManager. computeReplicationWork()->UnderReplicatedBlocks. chooseUnderReplicatedBlocks()。该部分主要在方法chooseUnderReplicatedBlocks中实现。具体的代码如下:
  1. public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
  2.       int blocksToProcess) {
  3.     // initialize data structure for the return value
  4.     List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
  5.     for (int i = 0; i < LEVEL; i++) {
  6.       blocksToReplicate.add(new ArrayList<Block>());
  7.     }
  8.     if (size() == 0) { // There are no blocks to collect.
  9.       return blocksToReplicate;
  10.     }
  11.    
  12.     int blockCount = 0;
  13.     //从低优先级向高优先级扫描
  14.     for (int priority = 0; priority < LEVEL; priority++) {
  15.       // Go through检查 all blocks that need replications with current priority.
  16.      //给当前优先级的块加便利器
  17.       BlockIterator neededReplicationsIterator = iterator(priority);
  18.       //通过优先级获取该优先级列表下目前的索引指针,以便遍历的时候从该位置开始遍历数据块
  19.       Integer replIndex = priorityToReplIdx.get(priority);
  20.      
  21.       // skip to the first unprocessed block, which is at replIndex??
  22.       //忽略过在之前已经扫描过的数据块
  23.       for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
  24.         neededReplicationsIterator.next();
  25.       }
  26.      //size():所有UC的数据块量
  27.       blocksToProcess = Math.min(blocksToProcess, size());
  28.      
  29.       if (blockCount == blocksToProcess) {
  30.         break;  // break if already expected blocks are obtained
  31.       }
  32.      
  33.       // Loop through all remaining blocks in the list.
  34.       //获得该优先级内需要冗余的所有的块
  35.       while (blockCount < blocksToProcess
  36.           && neededReplicationsIterator.hasNext()) {
  37.         Block block = neededReplicationsIterator.next();
  38.         blocksToReplicate.get(priority).add(block);
  39.         replIndex++;
  40.         blockCount++;
  41.       }
  42.      
  43.       //如果该优先级没有下一个块且优先级已经最高了
  44.       if (!neededReplicationsIterator.hasNext()
  45.           && neededReplicationsIterator.getPriority() == LEVEL - 1) {
  46.         //重置所有优先级的副本index到0,因为最近没有在块中添加列表
  47.         // reset all priorities replication index to 0 because there is no
  48.         // recently added blocks in any list.
  49.         for (int i = 0; i < LEVEL; i++) {
  50.           priorityToReplIdx.put(i, 0);
  51.         }
  52.         break;
  53.       }
  54.       priorityToReplIdx.put(priority, replIndex);
  55.     }
  56.     return blocksToReplicate;
  57.   }
复制代码


最后会将冗余超时的块重新加入需要冗余的集合中,如上图。

7.2.2.块无效处理机制

HDFS引入无效块的概念,从名字可以看出,这种类型数据块对于集群没有作用,反而如果不删除,会占用磁盘空间。在几种情况下会产生无效块:

1、 执行删除文件的时候,会首先快速地删除元数据,文件所对应的数据块就变得无效;

2、 Client上传数据块到DataNode的时候,可能由于网络原因等,数据块上传一部分后失败,这些块成了无效块;

3、 在HDFS做rebalance操作的时候,会将负载较重的DataNode上部分块文件复制到其他负载较轻的DataNode上,数据块复制成功后,负载较轻的DataNode上的该数据块就成了无效块;

接下来看下无效块的处理,在每个循环处理周期,处理的无效块数量也是有限制的,举例来说吧,如果HDFS集群中有10个DataNode节点,则需要处理的节点数目为4(默认情况下),而每个DataNode节点最多删除的无效块数量为1000(默认),这样就是4000。为什么相对冗余机制来说,无效块处理过程中每次处理的块更多呢?这是应为冗余块会涉及到DataNode节点之间的数据迁移,如果每次处理块太多,会影响集群网络,进而影响用户的读写性能;无效块处理就不一样了,它仅是将这些块从DataNode节点上删除。由于过程相对比较简单,可以直接看代码。其代码调用流程为:ReplicationMonitor.run()->BlockManager. computeDatanodeWork()->BlockManager.computeInvalidateWork()->BlockManager .invalidateWorkForOneNode()->InvalidateBlocks .invalidateWork()->InvalidateBlocks.invalidateWork()。在该流程中,也并不是无效块集合中有多少块,NameNode就一次性让DataNode把相应的无效块删除完,而是每次均删除一部分,这样的话不会造成DataNode因为删除块而带宽,CPU资源等占用过大。在computeInvalidateWork方法内实现如下:
  1. int computeInvalidateWork(int nodesToProcess) {
  2.     final List<String> nodes = invalidateBlocks.getStorageIDs();
  3.     Collections.shuffle(nodes);
  4.     nodesToProcess = Math.min(nodes.size(), nodesToProcess);
  5.     int blockCnt = 0;
  6.     for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
  7.         //对某个节点,获取到本次应该删除该DataNode节点上无效块的数目
  8.       blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
  9.     }
  10.     return blockCnt;
  11.   }
复制代码




在整个调用过程中,invalidateWork才是整个过程实现的核心
  1. private synchronized List<Block> invalidateWork(
  2.       final String storageId, final DatanodeDescriptor dn) {
  3.     final LightWeightHashSet<Block> set = node2blocks.get(storageId);
  4.     if (set == null) {
  5.       return null;
  6.     }
  7.     // # blocks that can be sent in one message is limited
  8.     final int limit = datanodeManager.blockInvalidateLimit;
  9.     //获取某个节点上一定量的无效块(即可能一次仅删除部分无效块)
  10.     final List<Block> toInvalidate = set.pollN(limit);
  11.     // If we send everything in this message, remove this node entry
  12.     if (set.isEmpty()) {
  13.       remove(storageId);
  14.     }
  15.     dn.addBlocksToBeInvalidated(toInvalidate);
  16.     numBlocks -= toInvalidate.size();
  17.     return toInvalidate;
  18.   }
复制代码




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条