关于YARN capacity调度器模式下队列超额资源的释放的疑问
YARN的调度器设置为capacity调度器,设置两个队列a和b,无嵌套的子队列,capacity分别配置为40%和60%,max capacity分别设置为40%和100%,即队列a如果没有用到40%的资源,队列b可以“借用”其空闲的部分,而队列a不能超额使用资源现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的?
在某些文档资料中看到capacity调度器模式下,队列内部的调度默认是fifo,也支持fair,另一个疑问是怎么将队列内部的调度配置成fair模式,
如果能配置成fair模式,是否支持抢占?
对于这部分的问题,欢迎大家给出宝贵意见,十分感谢!
这个问题问的有深度,明天研究下,给楼主探讨
现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的?
回答:
由于A队列资源正在被别B队列使用,因此调度器必须等待B队列释放资源后,才能将这些资源收回,这通常需要等待一段不确定时间。为了防止等待时间过长,调度器发现等待一段时间后若发现资源并未得到释放,则会强制进行资源回收。
上面其实是策略,那么相信楼主想看代码实现:这里贴出来
/**
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
*
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// extract a summary of the queues from scheduler
// ---------------------------------- 第一步 -------------------------------------
// 从ROOT队列开始递归拷贝子队列信息,此步骤目的是为了生成所有队列快照信息,防止集群
// 队列信息变化导致资源抢占过程中出现问题。
TempQueue tRoot;
synchronized (scheduler) {
tRoot = cloneQueues(root, clusterResources);
}
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
// ---------------------------------- 第二步 -------------------------------------
// 将用户设定的最低资源作为最开始的理想最低资源,递归计算各队列实际理想最低资源,作为
// 第三步进行具体资源抢占的依据。
tRoot.idealAssigned = tRoot.guaranteed;
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
percentageClusterPreemptionAllowed);
List<TempQueue> queues =
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
// based on ideal allocation select containers to be preempted from each
// ---------------------------------- 第三步---------------------------------------
// 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的
// 资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些
// container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
// queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(queues, clusterResources);
if (LOG.isDebugEnabled()) {
logToCSV(queues);
}
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
return;
}
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.KILL_CONTAINER));
preempted.remove(container);
} else {
//otherwise just send preemption events
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.PREEMPT_CONTAINER));
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
}
}
}
}
// Keep the preempted list clean
for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
RMContainer id = i.next();
// garbage collect containers that are irrelevant for preemption
if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
i.remove();
}
}
}
我们来英文和中文
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
上面都提到了,maxWaitTime 也就是中文的预设时间,天那,这个预设时间能否配置那?也就是代码里面的maxWaitTime ,是从哪读取到的。他的时间是多少?我们再来看下面代码
https://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
我这里截图
也就是下面代码
dispatcher = disp;
scheduler = (CapacityScheduler) sched;
maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
naturalTerminationFactor =
config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
percentageClusterPreemptionAllowed =
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
源文件
我们看到maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
默认为15000,这个默认的时间还未确定,如果有知道的大家可以讨论。
如果是15000ms=15秒
如果是15000s=250分钟
当然具体大家可以试试。
这里又产生一个问题,就是该如何配置这个值,这个值还真难找,但是一个相关的
yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor:
Given a computed preemption target, account for containers naturally expiring and preempt only this percentage of the delta. This determines the rate of geometric convergence into the deadzone (MAX_IGNORED_OVER_CAPACITY). For example, a termination factor of 0.5 will reclaim almost 95% of resources within 5 * #WAIT_TIME_BEFORE_KILL, even absent natural termination. Default value is 0.2
地址:
http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
截图:
其它就没有了,也就是说可能是系统给我们默认了这个预定的时间。
楼主想了解更详细,推荐下面文章
推荐这篇文章
Yarn调度之CapacityScheduler源码分析资源抢占
http://www.aboutyun.com/forum.php?mod=viewthread&tid=24628
页:
[1]