wx_BZ06L1bA 发表于 2018-6-6 12:57:53

关于YARN capacity调度器模式下队列超额资源的释放的疑问

YARN的调度器设置为capacity调度器,设置两个队列a和b,无嵌套的子队列,capacity分别配置为40%和60%,max capacity分别设置为40%和100%,即队列a如果没有用到40%的资源,队列b可以“借用”其空闲的部分,而队列a不能超额使用资源


现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的?

在某些文档资料中看到capacity调度器模式下,队列内部的调度默认是fifo,也支持fair,另一个疑问是怎么将队列内部的调度配置成fair模式,
如果能配置成fair模式,是否支持抢占?

对于这部分的问题,欢迎大家给出宝贵意见,十分感谢!

tntzbzc 发表于 2018-6-6 21:14:09

这个问题问的有深度,明天研究下,给楼主探讨

tntzbzc 发表于 2018-6-7 18:11:47


现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的?

回答:
由于A队列资源正在被别B队列使用,因此调度器必须等待B队列释放资源后,才能将这些资源收回,这通常需要等待一段不确定时间。为了防止等待时间过长,调度器发现等待一段时间后若发现资源并未得到释放,则会强制进行资源回收。
上面其实是策略,那么相信楼主想看代码实现:这里贴出来

/**
   * This method selects and tracks containers to be preempted. If a container
   * is in the target list for more than maxWaitTime it is killed.
   *
   * @param root the root of the CapacityScheduler queue hierarchy
   * @param clusterResources the total amount of resources in the cluster
   */
private void containerBasedPreemptOrKill(CSQueue root,
      Resource clusterResources) {

    // extract a summary of the queues from scheduler

    // ---------------------------------- 第一步 -------------------------------------
    // 从ROOT队列开始递归拷贝子队列信息,此步骤目的是为了生成所有队列快照信息,防止集群
    // 队列信息变化导致资源抢占过程中出现问题。
    TempQueue tRoot;
    synchronized (scheduler) {
      tRoot = cloneQueues(root, clusterResources);
    }

    // compute the ideal distribution of resources among queues
    // updates cloned queues state accordingly

    // ---------------------------------- 第二步 -------------------------------------
    // 将用户设定的最低资源作为最开始的理想最低资源,递归计算各队列实际理想最低资源,作为
    // 第三步进行具体资源抢占的依据。
    tRoot.idealAssigned = tRoot.guaranteed;
    Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
      percentageClusterPreemptionAllowed);
    List<TempQueue> queues =
      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);

    // based on ideal allocation select containers to be preempted from each

    // ---------------------------------- 第三步---------------------------------------
    // 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的
    // 资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些
    // container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
    // queue and each application
    Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
      getContainersToPreempt(queues, clusterResources);

    if (LOG.isDebugEnabled()) {
      logToCSV(queues);
    }

    // if we are in observeOnly mode return before any action is taken
    if (observeOnly) {
      return;
    }

    // preempt (or kill) the selected containers
    for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
         : toPreempt.entrySet()) {
      for (RMContainer container : e.getValue()) {
      // if we tried to preempt this for more than maxWaitTime
      if (preempted.get(container) != null &&
            preempted.get(container) + maxWaitTime < clock.getTime()) {
          // kill it
          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
                ContainerPreemptEventType.KILL_CONTAINER));
          preempted.remove(container);
      } else {
          //otherwise just send preemption events
          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
                ContainerPreemptEventType.PREEMPT_CONTAINER));
          if (preempted.get(container) == null) {
            preempted.put(container, clock.getTime());
          }
      }
      }
    }

    // Keep the preempted list clean
    for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
      RMContainer id = i.next();
      // garbage collect containers that are irrelevant for preemption
      if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
      i.remove();
      }
    }
}

我们来英文和中文
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.

container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。

上面都提到了,maxWaitTime 也就是中文的预设时间,天那,这个预设时间能否配置那?也就是代码里面的maxWaitTime ,是从哪读取到的。他的时间是多少?我们再来看下面代码

https://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

我这里截图



也就是下面代码
dispatcher = disp;
    scheduler = (CapacityScheduler) sched;
    maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
    naturalTerminationFactor =
      config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
    maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
    monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
    percentageClusterPreemptionAllowed =
      config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
    observeOnly = config.getBoolean(OBSERVE_ONLY, false);
    rc = scheduler.getResourceCalculator();

源文件

我们看到maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
默认为15000,这个默认的时间还未确定,如果有知道的大家可以讨论。
如果是15000ms=15秒
如果是15000s=250分钟
当然具体大家可以试试。
这里又产生一个问题,就是该如何配置这个值,这个值还真难找,但是一个相关的
yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor:

Given a computed preemption target, account for containers naturally expiring and preempt only this percentage of the delta. This determines the rate of geometric convergence into the deadzone (MAX_IGNORED_OVER_CAPACITY). For example, a termination factor of 0.5 will reclaim almost 95% of resources within 5 * #WAIT_TIME_BEFORE_KILL, even absent natural termination. Default value is 0.2

地址:


http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

截图:


其它就没有了,也就是说可能是系统给我们默认了这个预定的时间。





desehawk 发表于 2018-6-7 18:55:32

楼主想了解更详细,推荐下面文章
推荐这篇文章
Yarn调度之CapacityScheduler源码分析资源抢占
http://www.aboutyun.com/forum.php?mod=viewthread&tid=24628


页: [1]
查看完整版本: 关于YARN capacity调度器模式下队列超额资源的释放的疑问