众所周知,JobTracker节点使用配置的任务调度器TaskScheduler来为某一个具体的TaskTracker节点分配任务,同时这个任务调度器只能决定给该TaskTracker节点分配哪一个Job或者那些Job的任务以及分配多少个任务,但是它却不能决定给当前的TaskTracker节点分配一个Job的具体的哪一个任务。
另外,针对一个具体的TaskTracker节点而言,任何一个作业都可以判断它的那些Map任务相对于该TaskTracker节点来说属于本地任务,那些Map任务是属于非本地任务的,当然,对于Reduce任务来说,是没有本地任务与非本地任务这一说法的。
因此,具体来讲就是,当任务调度器决定为一个TaskTracker节点分配一个Job的本地任务时,它会调用该JobInProgress对象的obtainNewLocalMapTask()方法,分配一个非本地任务时,它会调用对应的obtainNewNonLocalMapTask()方法,那么以这个TaskTracker节点在集群中的物理位置为参考,这个Job可能有多个本地任务和多个非本地任务,至于为该TaskTracker节点分配哪一个本地或者非本地任务就由JobInProgress来决定了;当任务调度器为TaskTracker节点分配一个Job的Reduce任务时,就会调用该Job对应的JobInProgress对象的obtainNewReduceTask()方法。至于JobInProgress对象究竟是如何分配一个本地或非本地Map任务、Reduce任务的,那将是本文接下来要详细讲述的重点了。
1.分配作业的Map任务
作业的Map任务之所以有本地和非本地之分,主要是因为该Map任务的输入数据和执行该Map任务的TaskTracker节点在集群中的位置有可能同。本地与非本地Map任务是相对于执行或将要执行该任务的TaskTracker节点来说的,当任务调度器决定为一个TaskTracker节点分配某一个Job的一个本地Map任务时,它(JobInProgress)会查找这个Job中的那些Map任务的输入数据合该TaskTracker节点在同一台PC或机架上,那么这些Map任务对于TaskTracker节点来说就是本地任务了。这里要值得一提的是,在作业初始化的时候,就为每一个Map任务做了一个本地化的预分配工作,即根据Map任务的输入数据的物理位置,将该Map任务挂载到对应的物理节点上,该过程的源代码为:
private Map<Node, List<TaskInProgress>> createCache(JobClient.RawSplit[] splits, int maxLevel) {
Map<Node, List<TaskInProgress>> cache = new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
for (int i = 0; i < splits.length; i++) {
String[] splitLocations = splits[i].getLocations();//获取该数据切片坐在的物理位置(多个副本)
if (splitLocations.length == 0) {
nonLocalMaps.add(maps[i]);
continue;
}
//针对每一个副本的物理位置
for(String host: splitLocations) {
//解析副本在集群中的哪一个节点上
Node node = jobtracker.resolveAndAddToTopology(host);
LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
for (int j = 0; j < maxLevel; j++) {
List<TaskInProgress> hostMaps = cache.get(node);
if (hostMaps == null) {
hostMaps = new ArrayList<TaskInProgress>();
cache.put(node, hostMaps);
hostMaps.add(maps[i]);//将Map任务挂载到该节点上
}
//去重,避免一个节点挂载了两个相同的Map任务
if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
hostMaps.add(maps[i]);
}
node = node.getParent();//获取节点的父节点(由于maxLevel的值是2,所以父节点就是rack节点)
}
}
}
return cache;
} 复制代码
通过这样的一个预处理过程,最终Node与Map任务之间的映射关系被保存在它的一个属性nonRunningMapCache中了。当JobInProgress为一个TaskTracker节点分配一个本地Map任务时,它可以只需要解析该TaskTracker节点在集群中的哪一个节点node上,根据该node就可以从nonRunningMapCache中获取一个Map任务,该Map任务相对于当前这个TaskTracker来说就是本地任务了;当JobInProgress为一个TaskTracker节点分配一个非本地Map任务时,它可以获取集群中所有的rack节点(除它自己所在的rack外),通过这些rack节点node,就可以从nonRunningMapCache中获取一个Map任务,该Map任务相对于当前这个TaskTracker来说就是非本地任务了。
根据上面的源代码可以看出,所谓的本地任务之分就是由maxLevel来确定的,即Map任务的输入数据与TaskTracker节点在集群中的物理距离,在目前的版本中(Hadoop-0.20.2.0),maxLevel的默认值是2,也可由JobTracker节点的配置文件来设置,对应的配置项为:mapred.task.cache.levels。另外,从上面的源代码可以看出,这个预处理过程也明确地定义了非本地Map任务,即map操作的输入数据的位置为null的Map任务,这并不代表说该Map任务没有输入数据。因为,Hadoop为用户提供了自定义数据切片的API(用户自己实现InputSplit),这里的RawSplit并没有直接保存map操作所需的输入数据的位置信息,而是对真正的InputSplit进行了封装,这告诉我们两个很重要的情况,
一:用户在自定义Map任务的InputSplit时,应考虑这个Map任务是否可以作为某些TaskTracker节点的本地任务(比如某一个Map任务的输入数据在跨越多个节点,那么这个Map任务永远也不可能是本地任务);
二:Map任务的InputSplit实现可以为map操作带入少量的输入数据(例如,某一个Map任务需要两个输入数据,一个数据很大,另一个数据很小,只有几百或上千Bytes,那么,用户就可以自定义一个InputSplit来保存这个小数据,很明显,用HDFS保存这样小的数据根本不划算)。这个非本地Map任务保存在nonLocalMaps属性中。
1.1 分配本地Map任务
JobInProgress给某一个TaskTracker节点分配一个本地Map任务的操作比较的简单,不过,这其中有一个异常情况,就是当这个TaskTracker节点无法被解析成集群中的一个Node时,那么,本次的本地Map任务分配会被当做一次分配非本地Map任务来操作。这个过程的源代码如下:
public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,int clusterSize, int numUniqueHosts) throws IOException {
if (!tasksInited.get()) {
return null;
}
//为当前的计算节点获取一个本地map任务
int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, status.mapProgress());
if (target == -1) {
return null;
}
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
}
return result;
}
复制代码
/**
* 为当前的计算节点从作业的map任务集中选取一个合适的任务;
* 参数maxCacheLevel决定了当前分配的是本地任务还是非本地任务
*/
private synchronized int findNewMapTask(final TaskTrackerStatus tts, final int clusterSize, final int numUniqueHosts, final int maxCacheLevel, final double avgProgress) {
...
Node node = jobtracker.getNode(tts.getHost()); //根据当前计算节点的主机/IP来获取其在集群拓扑结构中对应的位置节点
//
// I) Non-running TIP :
// 1. check from local node to the root [bottom up cache lookup]
// i.e if the cache is available and the host has been resolved
// (node!=null)
if (node != null) {
Node key = node; //当前待分配的map任务的输入数据所在的节点
int level = 0;
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
// called to schedule any task (local, rack-local, off-switch or speculative)
// tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
// (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
// tasks
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
for (level = 0;level < maxLevelToSchedule; ++level) {
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key); //获取节点key上还未分配的map任务
if (cacheForLevel != null) {
tip = findTaskFromList(cacheForLevel, tts, numUniqueHosts,level == 0); //从一个map任务集中为当前的计算节点找到一个合适的任务
if (tip != null) {
// Add to running cache
scheduleMap(tip);
// remove the cache if its empty
if (cacheForLevel.size() == 0) {
nonRunningMapCache.remove(key);
}
return tip.getIdWithinJob();
}
}
key = key.getParent();
}
// Check if we need to only schedule a local task (node-local/rack-local)
if (level == maxCacheLevel) {
return -1;
}
}
...
} 复制代码
还有一个值得注意的问题就是,如果该TaskTracker节点所在的Node上有Map任务时,当从该Node上分配挂载的本地任务时,如果以前发生过该TaskTracker节点执行某一Map任务失败了的情况,则应将该Map任务从Node上删除,同时,对于无法执行或正在执行的Map任务也应该从Node上删除,对应的源码为:
private synchronized TaskInProgress findTaskFromList(Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus, int numUniqueHosts, boolean removeFailedTip) {
Iterator<TaskInProgress> iter = tips.iterator();
while (iter.hasNext()) {
TaskInProgress tip = iter.next();
// Select a tip if
// 1. runnable : still needs to be run and is not completed
// 2. ~running : no other node is running it
// 3. earlier attempt failed : has not failed on this host
// and has failed on all the other hosts
// A TIP is removed from the list if
// (1) this tip is scheduled
// (2) if the passed list is a level 0 (host) cache
// (3) when the TIP is non-schedulable (running, killed, complete)
if (tip.isRunnable() && !tip.isRunning()) {
// check if the tip has failed on this host
if (!tip.hasFailedOnMachine(ttStatus.getHost()) || tip.getNumberOfFailedMachines() >= numUniqueHosts) {
// check if the tip has failed on all the nodes
iter.remove();
return tip;
}
else if (removeFailedTip) {
// the case where we want to remove a failed tip from the host cache
// point#3 in the TIP removal logic above
iter.remove();
}
} else {
// see point#3 in the comment above for TIP removal logic
iter.remove();
}
}
return null;
}
复制代码
1.2 分配非本地Map任务
JobInProgress为某一个TaskTracker节点分配一个非本地Map任务相对于分配一个本地任务来说要复杂的多,它首先会先从nonRunningMapCache中选择一个非本地任务,如果没有找到再从nonLocalMaps中选择一个任务,如果还没有找到,则判断这个作业是否设置了hasSpeculativeMaps,如果没有设置,则不再为该TaskTracker节点分配非本地Map任务了;如果设置了,则从正在被其它TaskTracker节点执行的本地或非本地Map任务中选一个,不过这是有优先顺序的,首先从正在运行的runningMapCache中寻找一个本地Map任务,如果没有找到再从runningMapCache中寻找一个非本地Map任务,最后再从nonLocalRunningMaps中寻找一个非本地Map任务,此时还没有找到的话,就不再为该TaskTracker节点分配Map任务了。这个过程的源代码如下:
public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
throws IOException {
if (!tasksInited.get()) {
return null;
}
int target = findNewMapTask(tts, clusterSize, numUniqueHosts, NON_LOCAL_CACHE_LEVEL, status.mapProgress());
if (target == -1) {
return null;
}
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
}
return result;
}
private synchronized int findNewMapTask(final TaskTrackerStatus tts, final int clusterSize, final int numUniqueHosts, final int maxCacheLevel, final double avgProgress) {
...
Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
// get the node parent at max level
Node nodeParentAtMaxLevel = (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
for (Node parent : nodesAtMaxLevel) {
// skip the parent that has already been scanned
if (parent == nodeParentAtMaxLevel) {
continue;
}
List<TaskInProgress> cache = nonRunningMapCache.get(parent);
if (cache != null) {
tip = findTaskFromList(cache, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running cache
scheduleMap(tip);
// remove the cache if empty
if (cache.size() == 0) {
nonRunningMapCache.remove(parent);
}
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();
}
}
}
// 3. Search non-local tips for a new task
tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
scheduleMap(tip);
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();
}
// II) Running TIP :
if (hasSpeculativeMaps) {
long currentTime = System.currentTimeMillis();
// 1. Check bottom up for speculative tasks from the running cache
if (node != null) {
Node key = node;
for (int level = 0; level < maxLevel; ++level) {
Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
if (cacheForLevel != null) {
tip = findSpeculativeTask(cacheForLevel, tts, avgProgress, currentTime, level == 0);
if (tip != null) {
if (cacheForLevel.size() == 0) {
runningMapCache.remove(key);
}
return tip.getIdWithinJob();
}
}
key = key.getParent();
}
}
// 2. Check breadth-wise for speculative tasks
for (Node parent : nodesAtMaxLevel) {
// ignore the parent which is already scanned
if (parent == nodeParentAtMaxLevel) {
continue;
}
Set<TaskInProgress> cache = runningMapCache.get(parent);
if (cache != null) {
tip = findSpeculativeTask(cache, tts, avgProgress, currentTime, false);
if (tip != null) {
// remove empty cache entries
if (cache.size() == 0) {
runningMapCache.remove(parent);
}
LOG.info("Choosing a non-local task " + tip.getTIPId() + " for speculation");
return tip.getIdWithinJob();
}
}
}
// 3. Check non-local tips for speculation
tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, currentTime, false);
if (tip != null) {
LOG.info("Choosing a non-local task " + tip.getTIPId() + " for speculation");
return tip.getIdWithinJob();
}
}
return -1;
}
复制代码
2. 分配作业的Reduce任务
由于 Reduce任务的输入数据来源于该作业所有的Map任务的输出,而执行Map任务的TaskTracker节点将map的输出保存在自己本地,所以Reduce任务的输入数据在绝大多数情况下不可能都在某一个TaskTracker节点上,因此对于任何一个TaskTracker节点来说没有本地和非本地的Reduce任务之分。JobInProgress为某一个TaskTracker节点分配一个Reduce任务的操作就相当的简单了,这个过程类似于分配非本地Map任务。
它首先直接从nonRunningReduces中寻找一个任务,如果没有找到则在看这个作业设置了hasSpeculativeReduces没有,若没有则不分配了;若设置了,则从runningReduces中寻找一个正在被其它TaskTracker节点执行的Reduce任务分配给该TaskTracker节点。该过程对应的源代码如下:
public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) throws IOException {
if (status.getRunState() != JobStatus.RUNNING) {
return null;
}
// Ensure we have sufficient map outputs ready to shuffle before
// scheduling reduces
if (!scheduleReduces()) {
return null;
}
int target = findNewReduceTask(tts, clusterSize, numUniqueHosts, status.reduceProgress());
if (target == -1) {
return null;
}
Task result = reduces[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
}
return result;
}
private synchronized int findNewReduceTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, double avgProgress) {
if (numReduceTasks == 0) {
return -1;
}
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
// Update the last-known clusterSize
this.clusterSize = clusterSize;
if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
}
long outSize = resourceEstimator.getEstimatedReduceInputSize();
long availSpace = tts.getResourceStatus().getAvailableSpace();
if(availSpace < outSize) {
LOG.warn("No local disk space for reduce task. TaskTracker[" + taskTracker + "] has " + availSpace + " bytes free; but we expect reduce input to take " + outSize);
return -1; //see if a different TIP might work better.
}
// 1. check for a never-executed reduce tip
// reducers don't have a cache and so pass -1 to explicitly call that out
tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
if (tip != null) {
scheduleReduce(tip);
return tip.getIdWithinJob();
}
// 2. check for a reduce tip to be speculated
if (hasSpeculativeReduces) {
tip = findSpeculativeTask(runningReduces, tts, avgProgress, System.currentTimeMillis(), false);
if (tip != null) {
scheduleReduce(tip);
return tip.getIdWithinJob();
}
}
return -1;
}
private synchronized TaskInProgress findSpeculativeTask(Collection<TaskInProgress> list, TaskTrackerStatus ttStatus, double avgProgress, long currentTime, boolean shouldRemove) {
Iterator<TaskInProgress> iter = list.iterator();
while (iter.hasNext()) {
TaskInProgress tip = iter.next();
// should never be true! (since we delete completed/failed tasks)
if (!tip.isRunning()) {
iter.remove();
continue;
}
//当前TaskTracker节点没有运行该任务
if (!tip.hasRunOnMachine(ttStatus.getHost(), ttStatus.getTrackerName())) {
if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
// In case of shared list we don't remove it. Since the TIP failed
// on this tracker can be scheduled on some other tracker.
if (shouldRemove) {
iter.remove(); //this tracker is never going to run it again
}
return tip;
}
} else {
// Check if this tip can be removed from the list.
// If the list is shared then we should not remove.
if (shouldRemove) {
// This tracker will never speculate this tip
iter.remove();
}
}
}
return null;
}
复制代码
在目前的Hadoop版本设计中,作业中任务的调度细节被封装到了JobInProgress中,使得作业调度器TaskScheduler可完全控制的调度粒度限制在Job级,同时JobInProgress为上层的TaskScheduler实现的任务调度提供API,这样做就大大地降低了用户自行设计TaskScheduler的门槛,即可以很容易的根据自己的应用场景集中在作业级别上实现合适的调度策略。