参考下面
- //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task
- if (numMaps < maxMapLoad) {
- int totalNeededMaps = 0;
- synchronized (jobQueue) {
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING) {
- continue;
- }
- Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts());
- if (t != null) {
- return Collections.singletonList(t);
- }
- ……
- }
- }
- }
- //分配完map task,再分配reduce task
- if (numReduces < maxReduceLoad) {
- int totalNeededReduces = 0;
- synchronized (jobQueue) {
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING ||
- job.numReduceTasks == 0) {
- continue;
- }
- Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts());
- if (t != null) {
- return Collections.singletonList(t);
- }
- ……
- }
- }
- }
- return null;
- }
复制代码
从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。
知道obtainNewMapTask分配任务,getNumberOfUniqueHosts只是一个参数,对于这个参数是什么,你需要进一步提供它的实现代码,大家一起研讨。
|