desehawk 发表于 2018-6-7 18:42:04

Yarn调度之CapacityScheduler源码分析资源抢占


问题导读

1.当抢占资源服务启动时,会启动一个线程每隔多长时间调用一次?
2.抢占资源的策略是什么?
3.什么情况下,强制回收资源?



static/image/hrline/line7.png



资源调度器是YARN最核心的组件之一。它的实现是插拔式的,用户可以按照接口规范自定义资源调度器。YARN自带了FIFO、Capacity Scheduler和Fair Scheduler三种常用资源调度器。本文将重点对YARN默认调度器Capacity Scheduler的资源抢占模块的源码做一些分析。

一、前言
Capacity Scheduler资源调度器也采用层级队列组织方式,其ROOT队列可以有多个子队列,而每个子队列又可以有自己的子队列,在队列层级最下面是叶子队列,所有Application皆提交至叶子队列。Capacity Scheduler以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限。资源调度器总是优先满足最低资源保证,也就是说最低资源保证是每个队列分配资源的依据。
考虑下述极端情况:集群中部分队列提交应用较多,负载较重,设定的最低保证资源不能满足所有Application的请求,导致大部分应用处于pending状态;而与此同时,有部分队列则负责较轻,使用的资源量低于最低资源保证,导致整个集群资源利用率较低。

二、概述
为了提高资源使用率,合理分配集群资源,YARN引入了资源抢占功能,即资源调度器会将负载较轻的队列的资源暂时分配给负载重的队列(即最低资源保证并不是硬资源保证,当队列不需要太多资源时,并不会满足它的最低资源保证,而是暂时将空闲资源分配给其他需要资源的队列),仅当负载较轻队列突然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源还给它。由于此时资源可能正在被别的队列使用,因此调度器必须等待其他队列释放资源后,才能将这些资源“物归原主”,这通常需要等待一段不确定时间。为了防止等待时间过长,调度器发现等待一段时间后若发现资源并未得到释放,则会强制进行资源回收。

三、源码分析
1、createPolicyMonitors
ResourceManager类通过createPolicyMonitors方法创建资源抢占服务:
protected void createPolicyMonitors() {
      // 判断是否需要创建资源抢占服务:
      // a、判断调度器是否实现PreemptableResourceScheduler接口,目前只有Capacity Scheduler
      // 实现了此接口,也就是说只有调度器是Capacity Scheduler才启动此服务
      // b、是否配置yarn.resourcemanager.scheduler.monitor.policies为true
      // 若上述条件皆满足,则将创建资源抢占服务
      if (scheduler instanceof PreemptableResourceScheduler
          && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
          YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
      LOG.info("Loading policy monitors");

      // 获取yarn.resourcemanager.scheduler.monitor.policies中配置的实现了
      // SchedulingEditPolicy接口的资源抢占实现类,默认为      
      // ProportionalCapacityPreemptionPolicy,此类实现了具体的资源抢占逻辑。
      List<SchedulingEditPolicy> policies = conf.getInstances(
            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
            SchedulingEditPolicy.class);
      if (policies.size() > 0) {

          // 在RM中央处理器注册ContainerPreemptEventType事件类型,并设置事件处理类
          // 为RMContainerPreemptEventDispatcher,此类会执行具体的资源抢占动作。
          rmDispatcher.register(ContainerPreemptEventType.class,
            new RMContainerPreemptEventDispatcher(
                  (PreemptableResourceScheduler) scheduler));
          for (SchedulingEditPolicy policy : policies) {
            LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
            // periodically check whether we need to take action to guarantee
            // constraints
            // 此处创建了资源抢占服务类。当此服务启动时,会启动一个线程每隔3秒调用一次
            // ProportionalCapacityPreemptionPolicy类中的editSchedule方法,在此方
            // 法中实现了具体的资源抢占逻辑。
            SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
            addService(mon);
          }
      } else {
          LOG.warn("Policy monitors configured (" +
            YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
            ") but none specified (" +
            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
      }
      }
    }
}```

###2、editSchedule
上一节分析中,我们提到资源抢占服务会启动一个线程每隔3秒钟调用ProportionalCapacityPreemptionPolicy类中的editSchedule方法,接下来我们来分析editSchedule方法:

```java
public void editSchedule(){
    // 获取调度器的root队列
    CSQueue root = scheduler.getRootQueue();

    // 获得目前集群资源快照
    Resource clusterResources =
      Resources.clone(scheduler.getClusterResource());
    // 进行具体资源抢占
    containerBasedPreemptOrKill(root, clusterResources);
}





3、containerBasedPreemptOrKill

editSchedule方法很简单,逻辑都被封装到containerBasedPreemptOrKill方法中,我们将用较大篇幅来介绍此方法:

/**
   * This method selects and tracks containers to be preempted. If a container
   * is in the target list for more than maxWaitTime it is killed.
   *
   * @param root the root of the CapacityScheduler queue hierarchy
   * @param clusterResources the total amount of resources in the cluster
   */
private void containerBasedPreemptOrKill(CSQueue root,
      Resource clusterResources) {

    // extract a summary of the queues from scheduler

    // ---------------------------------- 第一步 -------------------------------------
    // 从ROOT队列开始递归拷贝子队列信息,此步骤目的是为了生成所有队列快照信息,防止集群
    // 队列信息变化导致资源抢占过程中出现问题。
    TempQueue tRoot;
    synchronized (scheduler) {
      tRoot = cloneQueues(root, clusterResources);
    }

    // compute the ideal distribution of resources among queues
    // updates cloned queues state accordingly

    // ---------------------------------- 第二步 -------------------------------------
    // 将用户设定的最低资源作为最开始的理想最低资源,递归计算各队列实际理想最低资源,作为
    // 第三步进行具体资源抢占的依据。
    tRoot.idealAssigned = tRoot.guaranteed;
    Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
      percentageClusterPreemptionAllowed);
    List<TempQueue> queues =
      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);

    // based on ideal allocation select containers to be preempted from each

    // ---------------------------------- 第三步---------------------------------------
    // 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的
    // 资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些
    // container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
    // queue and each application
    Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
      getContainersToPreempt(queues, clusterResources);

    if (LOG.isDebugEnabled()) {
      logToCSV(queues);
    }

    // if we are in observeOnly mode return before any action is taken
    if (observeOnly) {
      return;
    }

    // preempt (or kill) the selected containers
    for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
         : toPreempt.entrySet()) {
      for (RMContainer container : e.getValue()) {
      // if we tried to preempt this for more than maxWaitTime
      if (preempted.get(container) != null &&
            preempted.get(container) + maxWaitTime < clock.getTime()) {
          // kill it
          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
                ContainerPreemptEventType.KILL_CONTAINER));
          preempted.remove(container);
      } else {
          //otherwise just send preemption events
          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
                ContainerPreemptEventType.PREEMPT_CONTAINER));
          if (preempted.get(container) == null) {
            preempted.put(container, clock.getTime());
          }
      }
      }
    }

    // Keep the preempted list clean
    for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
      RMContainer id = i.next();
      // garbage collect containers that are irrelevant for preemption
      if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
      i.remove();
      }
    }
}


可以看到containerBasedPreemptOrKill方法大概分为三个步骤,下面我们详细分析每个步骤:步骤一:递归拷贝队列信息,返回TempQueue对象。我们先来分析返回对象TempQueue的构造方法,以此来洞悉TempQueue类的结构:
TempQueue(String queueName, Resource current, Resource pending,
      Resource guaranteed, Resource maxCapacity) {
      this.queueName = queueName; // 队列名称
      this.current = current; // 队列已经使用的资源量
      this.pending = pending; // 队列尚需资源量
      this.guaranteed = guaranteed; // 最低资源保证
      this.maxCapacity = maxCapacity; // 队列的最大资源使用量
      this.idealAssigned = Resource.newInstance(0, 0); // 理想的资源分配量
      this.actuallyPreempted = Resource.newInstance(0, 0); // 实际抢占资源量
      this.toBePreempted = Resource.newInstance(0, 0); // 计划抢占资源量
      this.normalizedGuarantee = Float.NaN; // 标准的最低资源占比,队列是否需要资源
      // 抢占的依据
      this.children = new ArrayList<TempQueue>(); //队列的子队列
    }


下面我们分析cloneQueues方法:
/**
   * This method walks a tree of CSQueue and clones the portion of the state
   * relevant for preemption in TempQueue(s). It also maintains a pointer to
   * the leaves. Finally it aggregates pending resources in each queue and rolls
   * it up to higher levels.
   *
   * @param root the root of the CapacityScheduler queue hierarchy
   * @param clusterResources the total amount of resources in the cluster
   * @return the root of the cloned queue hierarchy
   */
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
    TempQueue ret;
    synchronized (root) {
      // 获取队列名称
      String queueName = root.getQueueName();
      // 获取队列已使用容量占比
      float absUsed = root.getAbsoluteUsedCapacity();
      // 获取用户设定的队列绝对容量(最低资源保证)占比
      float absCap = root.getAbsoluteCapacity();
      // 获取队列绝对最大容量占比
      float absMaxCap = root.getAbsoluteMaximumCapacity();

      // 计算队列目前已使用资源量
      Resource current = Resources.multiply(clusterResources, absUsed);
      // 计算队列最低资源保证
      Resource guaranteed = Resources.multiply(clusterResources, absCap);
      // 计算队列最大容量
      Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
      if (root instanceof LeafQueue) { //如果是叶子队列,递归方法返回
      LeafQueue l = (LeafQueue) root;
      Resource pending = l.getTotalResourcePending();
      ret = new TempQueue(queueName, current, pending, guaranteed,
            maxCapacity);

      ret.setLeafQueue(l);
      } else {
      Resource pending = Resource.newInstance(0, 0);
      ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
            maxCapacity);
      for (CSQueue c : root.getChildQueues()) {
          // 子队列递归调用cloneQueues方法,并将创建的TempQueue子队列添加到子队
          // 列列表中
          ret.addChild(cloneQueues(c, clusterResources));
      }
      }
    }
    return ret;
}



步骤二:递归计算各队列实际理想最低资源,作为第三步进行具体资源抢占的依据。这部分逻辑主要在recursivelyComputeIdealAssignment方法中实现,我们来分析此方法的具体实现:
/**
   * This method recursively computes the ideal assignment of resources to each
   * level of the hierarchy. This ensures that leafs that are over-capacity but
   * with parents within capacity will not be preempted. Preemptions are allowed
   * within each subtree according to local over/under capacity.
   *
   * @param root the root of the cloned queue hierachy
   * @param totalPreemptionAllowed maximum amount of preemption allowed
   * @return a list of leaf queues updated with preemption targets
   */
private List<TempQueue> recursivelyComputeIdealAssignment(
      TempQueue root, Resource totalPreemptionAllowed) {
    List<TempQueue> leafs = new ArrayList<TempQueue>();
    if (root.getChildren() != null &&
      root.getChildren().size() > 0) {
      
      // 计算理想的最低资源分配
      computeIdealResourceDistribution(rc, root.getChildren(),
          totalPreemptionAllowed, root.idealAssigned);
      // compute recursively for lower levels and build list of leafs
      for(TempQueue t : root.getChildren()) {
      leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
      }
    } else {
      // we are in a leaf nothing to do, just return yourself
      return Collections.singletonList(root);
    }
    return leafs;
}


我们可以看到recursivelyComputeIdealAssignment也是个递归调用方法,computeIdealResourceDistribution是其主要方法:
/**
   * This method computes (for a single level in the tree, passed as a {@code
   * List<TempQueue>}) the ideal assignment of resources. This is done
   * recursively to allocate capacity fairly across all queues with pending
   * demands. It terminates when no resources are left to assign, or when all
   * demand is satisfied.
   *
   * @param rc resource calculator
   * @param queues a list of cloned queues to be assigned capacity to (this is
   * an out param)
   * @param totalPreemptionAllowed total amount of preemption we allow
   * @param tot_guarant the amount of capacity assigned to this pool of queues
   */
private void computeIdealResourceDistribution(ResourceCalculator rc,
      List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {

    // qAlloc tracks currently active queues (will decrease progressively as
    // demand is met)
    List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
    // 所有子队列最低资源保证之和,所谓资源抢占只是重新分配最低资源保证
    // unassigned tracks how much resources are still to assign, initialized
    // with the total capacity for this set of queues
    Resource unassigned = Resources.clone(tot_guarant);

    // group queues based on whether they have non-zero guaranteed capacity
    // 设置了最低资源保证的队列
    Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
    // 未设置最低资源保证的队列
    Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
   
    for (TempQueue q : qAlloc) {
      if (Resources
          .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
      nonZeroGuarQueues.add(q);
      } else {
      zeroGuarQueues.add(q);
      }
    }
   
    // 重新计算并设置各子队列理想最低资源,优先计算设置了最低资源保证的队列。
    // 我们先跳过这个方法,在下面将有详细分析。
    // first compute the allocation as a fixpoint based on guaranteed capacity
    computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
      false);
   
    // if any capacity is left unassigned, distributed among zero-guarantee
    // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
    if (!zeroGuarQueues.isEmpty()
      && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
      computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
          true);
    }
   
    // based on ideal assignment computed above and current assignment we derive
    // how much preemption is required overall
    Resource totPreemptionNeeded = Resource.newInstance(0, 0);
    for (TempQueue t:queues) {
      if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
      Resources.addTo(totPreemptionNeeded,
            Resources.subtract(t.current, t.idealAssigned));
      }
    }
   
    // 用户可以配置每次资源抢占最多可以抢占的阈值。超过此阈值,我们将按比例缩小需要
    // 抢占的资源量。
    // if we need to preempt more than is allowed, compute a factor (0<f<1)
    // that is used to scale down how much we ask back from each queue
    float scalingFactor = 1.0F;
    if (Resources.greaterThan(rc, tot_guarant,
          totPreemptionNeeded, totalPreemptionAllowed)) {
       scalingFactor = Resources.divide(rc, tot_guarant,
         totalPreemptionAllowed, totPreemptionNeeded);
    }
   
    // 设置每个队列将要抢占的资源量,即toBePreempted。
    // assign to each queue the amount of actual preemption based on local
    // information of ideal preemption and scaling factor
    for (TempQueue t : queues) {
      t.assignPreemption(scalingFactor, rc, tot_guarant);
    }
    if (LOG.isDebugEnabled()) {
      long time = clock.getTime();
      for (TempQueue t : queues) {
      LOG.debug(time + ": " + t);
      }
    }
}



再来看看computeFixpointAllocation方法:


/**
   * Given a set of queues compute the fix-point distribution of unassigned
   * resources among them. As pending request of a queue are exhausted, the
   * queue is removed from the set and remaining capacity redistributed among
   * remaining queues. The distribution is weighted based on guaranteed
   * capacity, unless asked to ignoreGuarantee, in which case resources are
   * distributed uniformly.
   */
private void computeFixpointAllocation(ResourceCalculator rc,
      Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
      boolean ignoreGuarantee) {
    //assign all cluster resources until no more demand, or no resources are left
    // 如果需分配的队列不为空,且待分配资源大于0,则一直进行资源分配。
   while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
          unassigned, Resources.none())) {
      Resource wQassigned = Resource.newInstance(0, 0);
      
      // we compute normalizedGuarantees capacity based on currently active
      // queues
      // 根据当前所有队列,重新设置每个队列的normalizedGuarantee,即重新设置每个队列
      // 的资源分配比例。分以下两种情况:
      // a. 如果ignoreGuarantee为true,那么简单地将normalizedGuarantee设置为所有队
      // 列数量的倒数,即平分所有资源;
      // b. 否则将normalizedGuarantee设置为此队列的用户配置最低资源量占所有队列的用
      // 户配置的最低资源分配量之和的比例。
      resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
      
      // offer for each queue their capacity first and in following invocations
      // their share of over-capacity
      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
      TempQueue sub = i.next();
      // unassigned * normalizedGuarantee,计算此队列的最低资源分配量。其中unassigned
      // 为此队列所在队列层级所有队列的用户配置的最低资源分配量之和。
      Resource wQavail =
          Resources.multiply(unassigned, sub.normalizedGuarantee);

      // 将资源分配给队列,并返回剩下的资源。此方法较为关键,下面会对此方法进行详细分析。
      Resource wQidle = sub.offer(wQavail, rc, tot_guarant);

      // 已经分配给队列的资源
      Resource wQdone = Resources.subtract(wQavail, wQidle);
      // if the queue returned a value > 0 it means it is fully satisfied
      // and it is removed from the list of active queues qAlloc
      // 如果满足此条件,说明已经满足此队列所有资源要求,可以将其移除待分配队列。接
      // 下来的循环会继续为其他未满足资源要求的队列进行资源分配,直到满足所有队列资
      // 源需求或资源已经分配完。
      // 注意:已分配给队列的资源可能是负值,这块逻辑比较让人产生疑惑,具体请看我们
      // 在offer方法中的分析。
      if (!Resources.greaterThan(rc, tot_guarant,
            wQdone, Resources.none())) {
          i.remove();
      }
      Resources.addTo(wQassigned, wQdone);
      }
      Resources.subtractFrom(unassigned, wQassigned);
    }
}

下面是对offer方法的分析,它是TempQueue类的方法,主要用来设置各队列idealAssigned,即资源重分配过后,每个队列的理想最低资源分配量:
Resource offer(Resource avail, ResourceCalculator rc,
      Resource clusterResource) {
      // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
      
      // 队列接受资源的计算方法:提供的资源,队列最大资源-已分配资源,当前已使用资源
      // +未满足的资源-已分配的资源 三者中的最小值。此计算方法有以下两层隐含意思:
      // a. 分配资源量不能超过队列最大资源。即队列使用资源绝对不能超过设置的最大资源;
      // b. 考虑到当队列负载较低时,current + pending - assigned 可能为负值。即目
      // 前使用的资源(pending很可能为0)低于用户设定的最低资源量,此时此队列的
      // idealAssigned反而会减少,而remain可能反而会增大。也就是说这部分资源队列暂
      // 时用不到,可提供给集群其他队列使用。若某一时刻此队列突然提交一批应用,那么下
      // 次进行资源抢占的时,在集群资源固定的前提下,此队列的idealAssigned必然会增加,
      // 这必然导致某些队列的idealAssigned降低。在第三步中我们将根据队列已使用的资源
      // 量与idealAssigned之差来进行资源抢占。这意味着idealAssigned降低的队列很可能
      // 需要让抢占部分资源,给其他队列使用。
      Resource accepted =
          Resources.min(rc, clusterResource,
            Resources.subtract(maxCapacity, idealAssigned),
          Resources.min(rc, clusterResource, avail, Resources.subtract(
            Resources.add(current, pending), idealAssigned)));
      Resource remain = Resources.subtract(avail, accepted);
      Resources.addTo(idealAssigned, accepted);
      return remain;
    }



步骤三: 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。此步骤最主要方法是getContainersToPreempt,我们来详细分析此方法:
/**
   * Based a resource preemption target drop reservations of containers and
   * if necessary select containers for preemption from applications in each
   * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
   * account for containers that will naturally complete.
   *
   * @param queues set of leaf queues to preempt from
   * @param clusterResource total amount of cluster resources
   * @return a map of applciationID to set of containers to preempt
   */
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
      List<TempQueue> queues, Resource clusterResource) {

    Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
      new HashMap<ApplicationAttemptId,Set<RMContainer>>();
    List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();

    for (TempQueue qT : queues) {
      // we act only if we are violating balance by more than
      // maxIgnoredOverCapacity
      // 如果队列当前使用资源量大于用户配置的最低资源保证*(1 + maxIgnoredOverCapacity),
      // 则允许对此队列进行资源抢占。
      if (Resources.greaterThan(rc, clusterResource, qT.current,
          Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
      // we introduce a dampening factor naturalTerminationFactor that
      // accounts for natural termination of containers
      // 实际抢占资源需乘以用户配置的naturalTerminationFactor参数。
      Resource resToObtain =
          Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
      Resource skippedAMSize = Resource.newInstance(0, 0);

      // lock the leafqueue while we scan applications and unreserve
      synchronized (qT.leafQueue) {
          NavigableSet<FiCaSchedulerApp> ns =
            (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
          // 将leafqueue中Application倒序排列,即最近提交的Application在最先进行
          // 资源抢占。
          Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
          // 设置队列的实际资源抢占量。
          qT.actuallyPreempted = Resources.clone(resToObtain);
          // 遍历队列内所有应用,获取满足条件的Container。
          while (desc.hasNext()) {
            FiCaSchedulerApp fc = desc.next();
            
            // 如果满足了需抢占资源量,则退出循环。
            if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
                Resources.none())) {
            break;
            }
            
            // 在Application内选取Container,具体逻辑如下:
            // a. 如果有Reserved Container,则优先释放Reserved Container;
            // b. 将应用内所有Container按优先级排序,选择优先级低的Container进行抢占。
            preemptMap.put(
                fc.getApplicationAttemptId(),
                preemptFrom(fc, clusterResource, resToObtain,
                  skippedAMContainerlist, skippedAMSize));
          }

          // 若队列Application个数大于设定的最大Application数量,则释放掉多出的
          // Application Master所在的Container,杀掉整个应用。
          Resource maxAMCapacityForThisQueue = Resources.multiply(
            Resources.multiply(clusterResource,
                  qT.leafQueue.getAbsoluteCapacity()),
            qT.leafQueue.getMaxAMResourcePerQueuePercent());

          // Can try preempting AMContainers (still saving atmost
          // maxAMCapacityForThisQueue AMResource's) if more resources are
          // required to be preempted from this Queue.
          preemptAMContainers(clusterResource, preemptMap,
            skippedAMContainerlist, resToObtain, skippedAMSize,
            maxAMCapacityForThisQueue);
      }
      }
    }
    return preemptMap;
}


四、总结
资源抢占有两部分含义:

1 . 若A队列负载较低,低于用户设定的最低资源,集群将这部分资源进行抢占,分配给负载较高的B队列。
2 . 若A队列随后突然提交一批应用,集群会从B队列中抢占部分资源以满足A队列。
所谓资源抢占其本质上是根据叶子队列的繁忙程度来动态调整每个叶子队列的理想最低资源量(不同于用户为每个队列设置的最低资源量,用户设定的最低资源量是不变的,而理想最低资源量是资源抢占模块根据队列实际需求动态分配的)。在每一次资源抢占过程中,在集群资源总量固定的前提下,部分队列的理想最低资源量可能会增大,这必然导致部分队列的理想最低资源量减小。若队列理想最低资源量减少,且已使用资源大于用户配置的队列最低资源量*1.1,资源抢占模块会对这部分资源进行抢占,调度器回收这部分资源后将对之重新分配。



作者:鹿先森vv
链接:https://www.jianshu.com/p/814eed0d5d7a
來源:简书

页: [1]
查看完整版本: Yarn调度之CapacityScheduler源码分析资源抢占