现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的?
回答:
由于A队列资源正在被别B队列使用,因此调度器必须等待B队列释放资源后,才能将这些资源收回,这通常需要等待一段不确定时间。为了防止等待时间过长,调度器发现等待一段时间后若发现资源并未得到释放,则会强制进行资源回收。
上面其实是策略,那么相信楼主想看代码实现:这里贴出来
[mw_shl_code=java,true]/**
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
*
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// extract a summary of the queues from scheduler
// ---------------------------------- 第一步 -------------------------------------
// 从ROOT队列开始递归拷贝子队列信息,此步骤目的是为了生成所有队列快照信息,防止集群
// 队列信息变化导致资源抢占过程中出现问题。
TempQueue tRoot;
synchronized (scheduler) {
tRoot = cloneQueues(root, clusterResources);
}
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
// ---------------------------------- 第二步 -------------------------------------
// 将用户设定的最低资源作为最开始的理想最低资源,递归计算各队列实际理想最低资源,作为
// 第三步进行具体资源抢占的依据。
tRoot.idealAssigned = tRoot.guaranteed;
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
percentageClusterPreemptionAllowed);
List<TempQueue> queues =
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
// based on ideal allocation select containers to be preempted from each
// ---------------------------------- 第三步---------------------------------------
// 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的
// 资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些
// container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
// queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(queues, clusterResources);
if (LOG.isDebugEnabled()) {
logToCSV(queues);
}
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
return;
}
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.KILL_CONTAINER));
preempted.remove(container);
} else {
//otherwise just send preemption events
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.PREEMPT_CONTAINER));
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
}
}
}
}
// Keep the preempted list clean
for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
RMContainer id = i.next();
// garbage collect containers that are irrelevant for preemption
if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
i.remove();
}
}
}
[/mw_shl_code]
我们来英文和中文
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
上面都提到了,maxWaitTime 也就是中文的预设时间,天那,这个预设时间能否配置那?也就是代码里面的maxWaitTime ,是从哪读取到的。他的时间是多少?我们再来看下面代码
https://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
我这里截图
也就是下面代码
[mw_shl_code=java,true] dispatcher = disp;
scheduler = (CapacityScheduler) sched;
maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
naturalTerminationFactor =
config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
percentageClusterPreemptionAllowed =
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();[/mw_shl_code]
源文件
ProportionalCapacityPreemptionPolicy.rar
(8.19 KB, 下载次数: 0)
|