资源调度器是YARN最核心的组件之一。它的实现是插拔式的,用户可以按照接口规范自定义资源调度器。YARN自带了FIFO、Capacity Scheduler和Fair Scheduler三种常用资源调度器。本文将重点对YARN默认调度器Capacity Scheduler的资源抢占模块的源码做一些分析。
Capacity Scheduler资源调度器也采用层级队列组织方式,其ROOT队列可以有多个子队列,而每个子队列又可以有自己的子队列,在队列层级最下面是叶子队列,所有Application皆提交至叶子队列。Capacity Scheduler以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限。资源调度器总是优先满足最低资源保证,也就是说最低资源保证是每个队列分配资源的依据。
protected void createPolicyMonitors() {
// 判断是否需要创建资源抢占服务:
// a、判断调度器是否实现PreemptableResourceScheduler接口,目前只有Capacity Scheduler
// 实现了此接口,也就是说只有调度器是Capacity Scheduler才启动此服务
// b、是否配置yarn.resourcemanager.scheduler.monitor.policies为true
// 若上述条件皆满足,则将创建资源抢占服务
if (scheduler instanceof PreemptableResourceScheduler
&& conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
LOG.info("Loading policy monitors");
// 获取yarn.resourcemanager.scheduler.monitor.policies中配置的实现了
// SchedulingEditPolicy接口的资源抢占实现类,默认为
// ProportionalCapacityPreemptionPolicy,此类实现了具体的资源抢占逻辑。
List<SchedulingEditPolicy> policies = conf.getInstances(
if (policies.size() > 0) {
// 在RM中央处理器注册ContainerPreemptEventType事件类型,并设置事件处理类
// 为RMContainerPreemptEventDispatcher,此类会执行具体的资源抢占动作。
new RMContainerPreemptEventDispatcher(
(PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
// constraints
// 此处创建了资源抢占服务类。当此服务启动时,会启动一个线程每隔3秒调用一次
// ProportionalCapacityPreemptionPolicy类中的editSchedule方法,在此方
// 法中实现了具体的资源抢占逻辑。
SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
} else {
LOG.warn("Policy monitors configured (" +
") but none specified (" +
public void editSchedule(){
// 获取调度器的root队列
CSQueue root = scheduler.getRootQueue();
// 获得目前集群资源快照
Resource clusterResources =
// 进行具体资源抢占
containerBasedPreemptOrKill(root, clusterResources);
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// extract a summary of the queues from scheduler
// ---------------------------------- 第一步 -------------------------------------
// 从ROOT队列开始递归拷贝子队列信息,此步骤目的是为了生成所有队列快照信息,防止集群
// 队列信息变化导致资源抢占过程中出现问题。
TempQueue tRoot;
synchronized (scheduler) {
tRoot = cloneQueues(root, clusterResources);
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
// ---------------------------------- 第二步 -------------------------------------
// 将用户设定的最低资源作为最开始的理想最低资源,递归计算各队列实际理想最低资源,作为
// 第三步进行具体资源抢占的依据。
tRoot.idealAssigned = tRoot.guaranteed;
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
List<TempQueue> queues =
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
// based on ideal allocation select containers to be preempted from each
// ---------------------------------- 第三步---------------------------------------
// 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的
// 资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些
// container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
// queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(queues, clusterResources);
if (LOG.isDebugEnabled()) {
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
} else {
//otherwise just send preemption events
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
// Keep the preempted list clean
for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
RMContainer id = i.next();
// garbage collect containers that are irrelevant for preemption
if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
TempQueue(String queueName, Resource current, Resource pending,
Resource guaranteed, Resource maxCapacity) {
this.queueName = queueName; // 队列名称
this.current = current; // 队列已经使用的资源量
this.pending = pending; // 队列尚需资源量
this.guaranteed = guaranteed; // 最低资源保证
this.maxCapacity = maxCapacity; // 队列的最大资源使用量
this.idealAssigned = Resource.newInstance(0, 0); // 理想的资源分配量
this.actuallyPreempted = Resource.newInstance(0, 0); // 实际抢占资源量
this.toBePreempted = Resource.newInstance(0, 0); // 计划抢占资源量
this.normalizedGuarantee = Float.NaN; // 标准的最低资源占比,队列是否需要资源
// 抢占的依据
this.children = new ArrayList<TempQueue>(); //队列的子队列
* This method walks a tree of CSQueue and clones the portion of the state
* relevant for preemption in TempQueue(s). It also maintains a pointer to
* the leaves. Finally it aggregates pending resources in each queue and rolls
* it up to higher levels.
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
* @return the root of the cloned queue hierarchy
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
TempQueue ret;
synchronized (root) {
// 获取队列名称
String queueName = root.getQueueName();
// 获取队列已使用容量占比
float absUsed = root.getAbsoluteUsedCapacity();
// 获取用户设定的队列绝对容量(最低资源保证)占比
float absCap = root.getAbsoluteCapacity();
// 获取队列绝对最大容量占比
float absMaxCap = root.getAbsoluteMaximumCapacity();
// 计算队列目前已使用资源量
Resource current = Resources.multiply(clusterResources, absUsed);
// 计算队列最低资源保证
Resource guaranteed = Resources.multiply(clusterResources, absCap);
// 计算队列最大容量
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
if (root instanceof LeafQueue) { //如果是叶子队列,递归方法返回
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
ret = new TempQueue(queueName, current, pending, guaranteed,
} else {
Resource pending = Resource.newInstance(0, 0);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
for (CSQueue c : root.getChildQueues()) {
// 子队列递归调用cloneQueues方法,并将创建的TempQueue子队列添加到子队
// 列列表中
ret.addChild(cloneQueues(c, clusterResources));
return ret;
* This method recursively computes the ideal assignment of resources to each
* level of the hierarchy. This ensures that leafs that are over-capacity but
* with parents within capacity will not be preempted. Preemptions are allowed
* within each subtree according to local over/under capacity.
* @param root the root of the cloned queue hierachy
* @param totalPreemptionAllowed maximum amount of preemption allowed
* @return a list of leaf queues updated with preemption targets
private List<TempQueue> recursivelyComputeIdealAssignment(
TempQueue root, Resource totalPreemptionAllowed) {
List<TempQueue> leafs = new ArrayList<TempQueue>();
if (root.getChildren() != null &&
root.getChildren().size() > 0) {
// 计算理想的最低资源分配
computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs
for(TempQueue t : root.getChildren()) {
leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
} else {
// we are in a leaf nothing to do, just return yourself
return Collections.singletonList(root);
return leafs;
* This method computes (for a single level in the tree, passed as a {@code
* List<TempQueue>}) the ideal assignment of resources. This is done
* recursively to allocate capacity fairly across all queues with pending
* demands. It terminates when no resources are left to assign, or when all
* demand is satisfied.
* @param rc resource calculator
* @param queues a list of cloned queues to be assigned capacity to (this is
* an out param)
* @param totalPreemptionAllowed total amount of preemption we allow
* @param tot_guarant the amount of capacity assigned to this pool of queues
private void computeIdealResourceDistribution(ResourceCalculator rc,
List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
// qAlloc tracks currently active queues (will decrease progressively as
// demand is met)
List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
// 所有子队列最低资源保证之和,所谓资源抢占只是重新分配最低资源保证
// unassigned tracks how much resources are still to assign, initialized
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
// group queues based on whether they have non-zero guaranteed capacity
// 设置了最低资源保证的队列
Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
// 未设置最低资源保证的队列
Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
for (TempQueue q : qAlloc) {
if (Resources
.greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
} else {
// 重新计算并设置各子队列理想最低资源,优先计算设置了最低资源保证的队列。
// 我们先跳过这个方法,在下面将有详细分析。
// first compute the allocation as a fixpoint based on guaranteed capacity
computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
// if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
if (!zeroGuarQueues.isEmpty()
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
for (TempQueue t:queues) {
if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
Resources.subtract(t.current, t.idealAssigned));
// 用户可以配置每次资源抢占最多可以抢占的阈值。超过此阈值,我们将按比例缩小需要
// 抢占的资源量。
// if we need to preempt more than is allowed, compute a factor (0<f<1)
// that is used to scale down how much we ask back from each queue
float scalingFactor = 1.0F;
if (Resources.greaterThan(rc, tot_guarant,
totPreemptionNeeded, totalPreemptionAllowed)) {
scalingFactor = Resources.divide(rc, tot_guarant,
totalPreemptionAllowed, totPreemptionNeeded);
// 设置每个队列将要抢占的资源量,即toBePreempted。
// assign to each queue the amount of actual preemption based on local
// information of ideal preemption and scaling factor
for (TempQueue t : queues) {
t.assignPreemption(scalingFactor, rc, tot_guarant);
if (LOG.isDebugEnabled()) {
long time = clock.getTime();
for (TempQueue t : queues) {
LOG.debug(time + ": " + t);
* Given a set of queues compute the fix-point distribution of unassigned
* resources among them. As pending request of a queue are exhausted, the
* queue is removed from the set and remaining capacity redistributed among
* remaining queues. The distribution is weighted based on guaranteed
* capacity, unless asked to ignoreGuarantee, in which case resources are
* distributed uniformly.
private void computeFixpointAllocation(ResourceCalculator rc,
Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
boolean ignoreGuarantee) {
//assign all cluster resources until no more demand, or no resources are left
// 如果需分配的队列不为空,且待分配资源大于0,则一直进行资源分配。
while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
// 根据当前所有队列,重新设置每个队列的normalizedGuarantee,即重新设置每个队列
// 的资源分配比例。分以下两种情况:
// a. 如果ignoreGuarantee为true,那么简单地将normalizedGuarantee设置为所有队
// 列数量的倒数,即平分所有资源;
// b. 否则将normalizedGuarantee设置为此队列的用户配置最低资源量占所有队列的用
// 户配置的最低资源分配量之和的比例。
resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
// offer for each queue their capacity first and in following invocations
// their share of over-capacity
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
TempQueue sub = i.next();
// unassigned * normalizedGuarantee,计算此队列的最低资源分配量。其中unassigned
// 为此队列所在队列层级所有队列的用户配置的最低资源分配量之和。
Resource wQavail =
Resources.multiply(unassigned, sub.normalizedGuarantee);
// 将资源分配给队列,并返回剩下的资源。此方法较为关键,下面会对此方法进行详细分析。
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
// 已经分配给队列的资源
Resource wQdone = Resources.subtract(wQavail, wQidle);
// if the queue returned a value > 0 it means it is fully satisfied
// and it is removed from the list of active queues qAlloc
// 如果满足此条件,说明已经满足此队列所有资源要求,可以将其移除待分配队列。接
// 下来的循环会继续为其他未满足资源要求的队列进行资源分配,直到满足所有队列资
// 源需求或资源已经分配完。
// 注意:已分配给队列的资源可能是负值,这块逻辑比较让人产生疑惑,具体请看我们
// 在offer方法中的分析。
if (!Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) {
Resources.addTo(wQassigned, wQdone);
Resources.subtractFrom(unassigned, wQassigned);
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
// 队列接受资源的计算方法:提供的资源,队列最大资源-已分配资源,当前已使用资源
// +未满足的资源-已分配的资源 三者中的最小值。此计算方法有以下两层隐含意思:
// a. 分配资源量不能超过队列最大资源。即队列使用资源绝对不能超过设置的最大资源;
// b. 考虑到当队列负载较低时,current + pending - assigned 可能为负值。即目
// 前使用的资源(pending很可能为0)低于用户设定的最低资源量,此时此队列的
// idealAssigned反而会减少,而remain可能反而会增大。也就是说这部分资源队列暂
// 时用不到,可提供给集群其他队列使用。若某一时刻此队列突然提交一批应用,那么下
// 次进行资源抢占的时,在集群资源固定的前提下,此队列的idealAssigned必然会增加,
// 这必然导致某些队列的idealAssigned降低。在第三步中我们将根据队列已使用的资源
// 量与idealAssigned之差来进行资源抢占。这意味着idealAssigned降低的队列很可能
// 需要让抢占部分资源,给其他队列使用。
Resource accepted =
Resources.min(rc, clusterResource,
Resources.subtract(maxCapacity, idealAssigned),
Resources.min(rc, clusterResource, avail, Resources.subtract(
Resources.add(current, pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
步骤三: 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。此步骤最主要方法是getContainersToPreempt,我们来详细分析此方法:
* Based a resource preemption target drop reservations of containers and
* if necessary select containers for preemption from applications in each
* over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
* account for containers that will naturally complete.
* @param queues set of leaf queues to preempt from
* @param clusterResource total amount of cluster resources
* @return a map of applciationID to set of containers to preempt
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
for (TempQueue qT : queues) {
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
// 如果队列当前使用资源量大于用户配置的最低资源保证*(1 + maxIgnoredOverCapacity),
// 则允许对此队列进行资源抢占。
if (Resources.greaterThan(rc, clusterResource, qT.current,
Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
// we introduce a dampening factor naturalTerminationFactor that
// accounts for natural termination of containers
// 实际抢占资源需乘以用户配置的naturalTerminationFactor参数。
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
Resource skippedAMSize = Resource.newInstance(0, 0);
// lock the leafqueue while we scan applications and unreserve
synchronized (qT.leafQueue) {
NavigableSet<FiCaSchedulerApp> ns =
(NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
// 将leafqueue中Application倒序排列,即最近提交的Application在最先进行
// 资源抢占。
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
// 设置队列的实际资源抢占量。
qT.actuallyPreempted = Resources.clone(resToObtain);
// 遍历队列内所有应用,获取满足条件的Container。
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
// 如果满足了需抢占资源量,则退出循环。
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
Resources.none())) {
// 在Application内选取Container,具体逻辑如下:
// a. 如果有Reserved Container,则优先释放Reserved Container;
// b. 将应用内所有Container按优先级排序,选择优先级低的Container进行抢占。
preemptFrom(fc, clusterResource, resToObtain,
skippedAMContainerlist, skippedAMSize));
// 若队列Application个数大于设定的最大Application数量,则释放掉多出的
// Application Master所在的Container,杀掉整个应用。
Resource maxAMCapacityForThisQueue = Resources.multiply(
// Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preempted from this Queue.
preemptAMContainers(clusterResource, preemptMap,
skippedAMContainerlist, resToObtain, skippedAMSize,
return preemptMap;
1 . 若A队列负载较低,低于用户设定的最低资源,集群将这部分资源进行抢占,分配给负载较高的B队列。
2 . 若A队列随后突然提交一批应用,集群会从B队列中抢占部分资源以满足A队列。