xioaxu790 发表于 2014-12-21 19:35:56

hadoop2.X之HDFS集群管理:ReplicationMonitor

问题导读
1、什么是ReplicationMonitor?
2、ReplicationMonitor在HDFS中的作用是什么?
3、在哪几种情况下会产生无效块?

static/image/hrline/4.gif



ReplicationMonitor在HDFS中的工作相当重要,首先不仅会负责为副本不足的数据块选择source 数据节点,选择冗余的target节点,等待DN节点下次心跳将这些工作带回给相应的DN执行块冗余操作,同时也会将各个数据节点上无效的数据块副本加入无效集合,等待下次心跳将这些工作带回给相应的DataNode执行删除无效块操作。

7.2.1.块冗余处理机制

在进行该部分分析前,首先分析下几个集合的作用,因为他们在这些工作中起到至关重要的作用。

名称作用
neededReplications类UnderReplicatedBlocks的一个实例,负责存储需要冗余副本的块集合,副本可能被冗余1份或多份;在数据块报告的时候,检查到块副本数量不够,就会将数据块加入该实例存储;在ReplicationMonitor中,将block及其target加入replicateBlocks队列后,将该块从neededReplications中删除;
pendingReplicationsPendingReplicationBlocks的一个实例,在ReplicationMonitor中,将block及其target加入replicateBlocks队列后,同时将该块加入pendingReplications维护的一个集合,之所以会引入一个待定状态,因为如果在超时时间(默认5min)范围内,块副本冗余失败,可以重试;当数据块冗余成功,DN节点将该块副本报告上来后,会将该块从pendingReplications维护的集合中删除;
replicateBlocks一个块队列,NN为每个DN节点维护了一个这样的集合,当计算确定数据块副本冗余到某几个DN节点后,会将block及其target加入该replicateBlocks维护的一个队列中,当source节点下次心跳后,会将这些信息返回给source节点,source节点就会将block副本冗余到相应的DN节点(可能几个);
timedOutItems数据块集合,在PendingReplicationMonitor线程内,会定时扫描pendingReplications维护的集合,检查到某块冗余时间已经超时,会将该数据块块加入timedOutItems集合;在ReplicationMonitor中,又会将timedOutItems中的数据块重新加入neededReplications;
priorityQueues优先级队列,为一个二维集合,在NN检查到块副本不足的情况下,根据副本数量的等情况,将块加入到相应优先级列表中;
priorityToReplIdx在优先级队列priorityQueues,每个优先级列表维护了一个索引(类似游标),在对priorityQueues每个集合遍历数据的时候,忽略掉priorityToReplIdx前面的元素,从游标处开始遍历提取块信息;

清楚了相关集合作用后,来分析下冗余副本不足的情况,我在此就不对源码的每行细节进行分析,仅对流程和重要部分进行说明。当DN节点报告块的时候,NN会判断块的冗余是否足够,如果不足可能有几种情况:副本可能1份或者2份,如果副本仅一份,则需要冗余的优先级就相对更高,则会将该块放入优先级更高的队列,副本已经有2份的块则会放入优先级更低点的队列,编号越低,一共分为5个优先级。

对于冗余工作,根据DN节点数量不同,每次处理的数据块也会不同,比如当前集群有10个DN节点,则默认情况下,ReplicationMonitor一个周期只会从neededReplications中取出20个数据块进行处理,之所以HDFS要这样设计,主要是考虑到如果一次冗余的数据块太多,势必会对DN节点的网络造成比较大的影响,因此根据不同的DN集群规模决定一次冗余的数据块数量。在具体冗余的过程中,首先按照优先级从优先级队列中拿一部分数据(如前面提及的20)块到replicateBlocks中,并且每次记录下读取的优先级队列的偏移量,下次从该偏移指针处真正读取块信息,如下图所示:红色为第一次读取的块,蓝色为第二次读取的块。




整个过程在方法chooseUnderReplicatedBlocks内实现。

遍历获取当一定量需要冗余的数据块后,开始执行方法computeReplicationWorkForBlocks,在该方法中,会对每个块做如下处理(忽略次要部分):首先会为数据块选择一个source节点,而HDFS是这样从块副本所在的datanode(s)中来选择一个source的:

1、 判断在某DataNode上副本是否损坏,如果损坏,遍历下个DataNode节点;

2、 判断在某DataNode对于的replicateBlocks中,是否已经有大于等于2个数据块了,如果是这样,遍历下一个DataNode节点,之所以这样实现,防止单DataNode流量过高;

3、 如果某DataNode节点已经退役,遍历下一个DataNode节点;

4、 如果某DataNode节点正在退役,将该DataNode作为source,而这也是HDFS设计的目的,这样的节点不会让出现因为读写数据块而网络拥塞,因此也不会让网络那么忙,当然即使是正在退役的节点,replicateBlocks中的块数量也不能超过2个(防止网络拥塞);如果有多个节点正在退役,会选择最后一个退役节点作为该块的source节点;

5、 如果没有正在退役的节点,随意选择一个DN节点作为source节点;

选择了source节点后,会选择冗余的target(s)节点,当然,目前存储了该块副本的DN节点不会再作为target节点,至于选择target节点的数目,根据需要再冗余的块副本数量而定,比如还需要冗余2个副本,则会选择2个target节点,选择的方式,依然采用机架感知相关算法,详情可以关注方法chooseTarget,不过该方法有多种实现。

确定块的target(s)后,将块及其target(s)加入replicateBlocks队列;

最后,从neededReplications中删除该数据块,并递减对于优先级集合的指针偏移量;同时将该块加入pendingReplications集合,至于为什么要这样做,可以看上面对该集合作用的解释。

整个过程可以精简为如下几步:




实际上整个过程,就是对几个集合的操作,下面是几个集合间的操作逻辑:




看了上面2个图,应该基本清楚了块冗余的流程了。接下来看下代码,其调用顺序为:ReplicationMonitor.run()->BlockManager.computeDatanodeWork()->BlockManager. computeReplicationWork()->UnderReplicatedBlocks. chooseUnderReplicatedBlocks()。该部分主要在方法chooseUnderReplicatedBlocks中实现。具体的代码如下:
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(

      int blocksToProcess) {

    // initialize data structure for the return value

    List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);

    for (int i = 0; i < LEVEL; i++) {

      blocksToReplicate.add(new ArrayList<Block>());

    }



    if (size() == 0) { // There are no blocks to collect.

      return blocksToReplicate;

    }

   

    int blockCount = 0;

    //从低优先级向高优先级扫描

    for (int priority = 0; priority < LEVEL; priority++) {

      // Go through检查 all blocks that need replications with current priority.

   //给当前优先级的块加便利器

      BlockIterator neededReplicationsIterator = iterator(priority);

      //通过优先级获取该优先级列表下目前的索引指针,以便遍历的时候从该位置开始遍历数据块

      Integer replIndex = priorityToReplIdx.get(priority);

   

      // skip to the first unprocessed block, which is at replIndex??

      //忽略过在之前已经扫描过的数据块

      for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {

      neededReplicationsIterator.next();

      }

   //size():所有UC的数据块量

      blocksToProcess = Math.min(blocksToProcess, size());

   

      if (blockCount == blocksToProcess) {

      break;// break if already expected blocks are obtained

      }

   

      // Loop through all remaining blocks in the list.

      //获得该优先级内需要冗余的所有的块

      while (blockCount < blocksToProcess

          && neededReplicationsIterator.hasNext()) {

      Block block = neededReplicationsIterator.next();

      blocksToReplicate.get(priority).add(block);

      replIndex++;

      blockCount++;

      }

   

      //如果该优先级没有下一个块且优先级已经最高了

      if (!neededReplicationsIterator.hasNext()

          && neededReplicationsIterator.getPriority() == LEVEL - 1) {

      //重置所有优先级的副本index到0,因为最近没有在块中添加列表

      // reset all priorities replication index to 0 because there is no

      // recently added blocks in any list.

      for (int i = 0; i < LEVEL; i++) {

          priorityToReplIdx.put(i, 0);

      }

      break;

      }

      priorityToReplIdx.put(priority, replIndex);

    }

    return blocksToReplicate;

}

最后会将冗余超时的块重新加入需要冗余的集合中,如上图。

7.2.2.块无效处理机制

HDFS引入无效块的概念,从名字可以看出,这种类型数据块对于集群没有作用,反而如果不删除,会占用磁盘空间。在几种情况下会产生无效块:

1、 执行删除文件的时候,会首先快速地删除元数据,文件所对应的数据块就变得无效;

2、 Client上传数据块到DataNode的时候,可能由于网络原因等,数据块上传一部分后失败,这些块成了无效块;

3、 在HDFS做rebalance操作的时候,会将负载较重的DataNode上部分块文件复制到其他负载较轻的DataNode上,数据块复制成功后,负载较轻的DataNode上的该数据块就成了无效块;

接下来看下无效块的处理,在每个循环处理周期,处理的无效块数量也是有限制的,举例来说吧,如果HDFS集群中有10个DataNode节点,则需要处理的节点数目为4(默认情况下),而每个DataNode节点最多删除的无效块数量为1000(默认),这样就是4000。为什么相对冗余机制来说,无效块处理过程中每次处理的块更多呢?这是应为冗余块会涉及到DataNode节点之间的数据迁移,如果每次处理块太多,会影响集群网络,进而影响用户的读写性能;无效块处理就不一样了,它仅是将这些块从DataNode节点上删除。由于过程相对比较简单,可以直接看代码。其代码调用流程为:ReplicationMonitor.run()->BlockManager. computeDatanodeWork()->BlockManager.computeInvalidateWork()->BlockManager .invalidateWorkForOneNode()->InvalidateBlocks .invalidateWork()->InvalidateBlocks.invalidateWork()。在该流程中,也并不是无效块集合中有多少块,NameNode就一次性让DataNode把相应的无效块删除完,而是每次均删除一部分,这样的话不会造成DataNode因为删除块而带宽,CPU资源等占用过大。在computeInvalidateWork方法内实现如下:
int computeInvalidateWork(int nodesToProcess) {

    final List<String> nodes = invalidateBlocks.getStorageIDs();

    Collections.shuffle(nodes);

    nodesToProcess = Math.min(nodes.size(), nodesToProcess);

    int blockCnt = 0;

    for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {

      //对某个节点,获取到本次应该删除该DataNode节点上无效块的数目

      blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));

    }

    return blockCnt;

}



在整个调用过程中,invalidateWork才是整个过程实现的核心
private synchronized List<Block> invalidateWork(

      final String storageId, final DatanodeDescriptor dn) {

    final LightWeightHashSet<Block> set = node2blocks.get(storageId);

    if (set == null) {

      return null;

    }

    // # blocks that can be sent in one message is limited

    final int limit = datanodeManager.blockInvalidateLimit;

    //获取某个节点上一定量的无效块(即可能一次仅删除部分无效块)

    final List<Block> toInvalidate = set.pollN(limit);

    // If we send everything in this message, remove this node entry

    if (set.isEmpty()) {

      remove(storageId);

    }

    dn.addBlocksToBeInvalidated(toInvalidate);

    numBlocks -= toInvalidate.size();

    return toInvalidate;

}



页: [1]
查看完整版本: hadoop2.X之HDFS集群管理:ReplicationMonitor