分享

Hadoop学习Map-Reduce的执行过程源码详细解析

阿飞 发表于 2014-11-5 23:10:22 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 2 14696
本帖最后由 阿飞 于 2014-11-5 23:18 编辑

问题导读

1.客户端提交任务,需要做哪些准备?
2.JobTracker中main函数主要调用有哪两部分组成?
3.TaskTracker做了哪些工作?
4.map task和reduce task都是哪个进程中进行的?






一、客户端
Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:

public static RunningJob runJob(JobConf job) throws IOException {
//首先生成一个JobClient对象
JobClient jc = new JobClient(job);
……
//调用submitJob来提交一个任务
running = jc.submitJob(job);
JobID jobId = running.getID();
……
while (true) {
//while循环中不断得到此任务的状态,并打印到客户端console中
}
return running;
}



其中JobClient的submitJob函数实现如下:
public RunningJob submitJob(JobConf job) throws FileNotFoundException,

                                InvalidJobConfException, IOException {

  //从JobTracker得到当前任务的id

  JobID jobId = jobSubmitClient.getNewJobId();

  //准备将任务运行所需要的要素写入HDFS:

  //任务运行程序所在的jar封装成job.jar

  //任务所要处理的input split信息写入job.split

  //任务运行的配置项汇总写入job.xml


  Path submitJobDir = new Path(getSystemDir(), jobId.toString());

  Path submitJarFile = new Path(submitJobDir, "job.jar");

  Path submitSplitFile = new Path(submitJobDir, "job.split");

  //此处将-libjars命令行指定的jar上传至HDFS

  configureCommandLineOptions(job, submitJobDir, submitJarFile);

  Path submitJobFile = new Path(submitJobDir, "job.xml");

  ……

  //通过input format的格式获得相应的input split,默认类型为FileSplit

  InputSplit[] splits =

    job.getInputFormat().getSplits(job, job.getNumMapTasks());



  // 生成一个写入流,将input split得信息写入job.split文件

  FSDataOutputStream out = FileSystem.create(fs,

      submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

  try {

    //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。

    //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。

    writeSplitsFile(splits, out);

  } finally {

    out.close();

  }

  job.set("mapred.job.split.file", submitSplitFile.toString());

  //根据split的个数设定map task的个数

  job.setNumMapTasks(splits.length);

  // 写入job的配置信息入job.xml文件      

  out = FileSystem.create(fs, submitJobFile,

      new FsPermission(JOB_FILE_PERMISSION));

  try {

    job.writeXml(out);

  } finally {

    out.close();

  }

  //真正的调用JobTracker来提交任务

  JobStatus status = jobSubmitClient.submitJob(jobId);

  ……

}







二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:
  • 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
  • 调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:


  • JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
  • EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
  1. synchronized (jobs) {
  2.   synchronized (taskScheduler) {
  3.     jobs.put(job.getProfile().getJobID(), job);
  4.     //对JobTracker的每一个listener都调用jobAdded函数
  5.     for (JobInProgressListener listener : jobInProgressListeners) {
  6.       listener.jobAdded(job);
  7.     }
  8.   }
  9. }
复制代码






EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

public synchronized void initTasks() throws IOException {
……
//从HDFS中读取job.split文件从而生成input splits
String jobFile = profile.getJobFile();
Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
//map task的个数就是input split的个数
numMapTasks = splits.length;
//为每个map tasks生成一个TaskInProgress来处理一个input split
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits.getDataLength();
maps = new TaskInProgress(jobId, jobFile,
splits,
jobtracker, conf, this, i);
}
//对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}

//创建reduce task
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
//reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。
nonRunningReduces.add(reduces);
}

//创建两个cleanup task,一个用来清理map,一个用来清理reduce.
cleanup = new TaskInProgress[2];
cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();
//创建两个初始化 task,一个初始化map,一个初始化reduce.
setup = new TaskInProgress[2];
setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();
tasksInited.set(true);//初始化完毕
……
}





三、TaskTracker

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

State offerService() throws Exception {
long lastHeartbeat = 0;
//TaskTracker进行是一直存在的
while (running && !shuttingDown) {
……
long now = System.currentTimeMillis();
//每隔一段时间就向JobTracker发送heartbeat
long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
synchronized(finishedCount) {
if (finishedCount[0] == 0) {
finishedCount.wait(waitTime);
}
finishedCount[0] = 0;
}
}
……
//发送Heartbeat到JobTracker,得到response
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
……
//从Response中得到此TaskTracker需要做的事情
TaskTrackerAction[] actions = heartbeatResponse.getActions();
……
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
//如果是运行一个新的Task,则将Action添加到任务队列中
addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction) {
CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
commitResponses.add(commitAction.getTaskID());
}
} else {
tasksToCleanup.put(action);
}
}
}
}
return State.NORMAL;
}


其中transmitHeartBeat主要逻辑如下:
private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
//每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
……
//报告给JobTracker,此TaskTracker的当前状态
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxCurrentMapTasks,
maxCurrentReduceTasks);
}
}
……
//当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
//当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
//当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
status.countReduceTasks() < maxCurrentReduceTasks) &&
acceptNewTasks;
localMinSpaceStart = minSpaceStart;
}
……
//向JobTracker发送heartbeat,这是一个RPC调用
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);
……
return heartbeatResponse;
}









四、JobTracker

当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
……
String trackerName = status.getTrackerName();
……
short newResponseId = (short)(responseId + 1);
……
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
//如果TaskTracker向JobTracker请求一个task运行
if (acceptNewTasks) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
//setup和cleanup的task优先级最高
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
//任务调度器分配任务
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
//将任务放入actions列表,返回给TaskTracker
expireLaunchingTasks.addNewTask(task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}
……
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
……
return response;
}






默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();
//计算剩余的map和reduce的工作量:remaining
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
int totalMapTasks = job.desiredMaps();
int totalReduceTasks = job.desiredReduces();
remainingMapLoad += (totalMapTasks - job.finishedMaps());
remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
}
}
}
//计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。
int maxMapLoad = 0;
int maxReduceLoad = 0;
if (numTaskTrackers > 0) {
maxMapLoad = Math.min(maxCurrentMapTasks,
(int) Math.ceil((double) remainingMapLoad /
numTaskTrackers));
maxReduceLoad = Math.min(maxCurrentReduceTasks,
(int) Math.ceil((double) remainingReduceLoad
/ numTaskTrackers));
}
……

//map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task
if (numMaps < maxMapLoad) {
int totalNeededMaps = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
……
}
}
}
//分配完map task,再分配reduce task
if (numReduces < maxReduceLoad) {
int totalNeededReduces = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
……
}
}
}
return null;
}


从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。




五、TaskTracker

在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
  1. private void addToTaskQueue(LaunchTaskAction action) {
  2.   if (action.getTask().isMapTask()) {
  3.     mapLauncher.addToTaskQueue(action);
  4.   } else {
  5.     reduceLauncher.addToTaskQueue(action);
  6.   }
  7. }
复制代码





TaskLauncher 是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
private void localizeJob(TaskInProgress tip) throws IOException {
//首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar
Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
……
Path localJobFile = lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if (!rjob.localized) {
FileSystem localFs = FileSystem.getLocal(fConf);
Path jobDir = localJobFile.getParent();
……
//将job.split拷贝到本地
systemFS.copyToLocalFile(jobFile, localJobFile);
JobConf localJobConf = new JobConf(localJobFile);
Path workDir = lDirAlloc.getLocalPathForWrite(
(getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "work"), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
System.setProperty("job.local.dir", workDir.toString());
localJobConf.set("job.local.dir", workDir.toString());
// copy Jar file to the local FS and unjar it.
String jarFile = localJobConf.getJar();
long jarFileSize = -1;
if (jarFile != null) {
Path jarFilePath = new Path(jarFile);
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "jars",
5 * jarFileSize, fConf), "job.jar");
if (!localFs.mkdirs(localJarFile.getParent())) {
throw new IOException("Mkdirs failed to create jars directory ");
}
//将job.jar拷贝到本地
systemFS.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
//将job得configuration写成job.xml
OutputStream out = localFs.create(localJobFile);
try {
localJobConf.writeXml(out);
} finally {
out.close();
}
// 解压缩job.jar
RunJar.unJar(new File(localJarFile.toString()),
new File(localJarFile.getParent().toString()));
}
rjob.localized = true;
rjob.jobConf = localJobConf;
}
}
//真正的启动此Task
launchTaskForJob(tip, new JobConf(rjob.jobConf));
}








当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:

public synchronized void launchTask() throws IOException {
……
//创建task运行目录
localizeTask(task);
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
//创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
}

TaskRunner是一个线程,其run函数如下:

public final void run() {

    ……

    TaskAttemptID taskid = t.getTaskID();

    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

    File jobCacheDir = null;

    if (conf.getJar() != null) {

      jobCacheDir = new File(

                        new Path(conf.getJar()).getParent().toString());

    }

    File workDir = new File(lDirAlloc.getLocalPathToRead(

                              TaskTracker.getLocalTaskDir(

                                t.getJobID().toString(),

                                t.getTaskID().toString(),

                                t.isTaskCleanupTask())

                              + Path.SEPARATOR + MRConstants.WORKDIR,

                              conf). toString());

    FileSystem fileSystem;

    Path localPath;

    ……

    //拼写classpath

    String baseDir;

    String sep = System.getProperty("path.separator");

    StringBuffer classPath = new StringBuffer();

    // start with same classpath as parent process

    classPath.append(System.getProperty("java.class.path"));

    classPath.append(sep);

    if (!workDir.mkdirs()) {

      if (!workDir.isDirectory()) {

        LOG.fatal("Mkdirs failed to create " + workDir.toString());

      }

    }

    String jar = conf.getJar();

    if (jar != null) {      

      // if jar exists, it into workDir

      File[] libs = new File(jobCacheDir, "lib").listFiles();

      if (libs != null) {

        for (int i = 0; i < libs.length; i++) {

          classPath.append(sep);            // add libs from jar to classpath

          classPath.append(libs);

        }

      }

      classPath.append(sep);

      classPath.append(new File(jobCacheDir, "classes"));

      classPath.append(sep);

      classPath.append(jobCacheDir);

    }

    ……

    classPath.append(sep);

    classPath.append(workDir);

    //拼写命令行java及其参数

    Vector<String> vargs = new Vector<String>(8);

    File jvm =

      new File(new File(System.getProperty("java.home"), "bin"), "java");

    vargs.add(jvm.toString());

    String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

    javaOpts = javaOpts.replace("@taskid@", taskid.toString());

    String [] javaOptsSplit = javaOpts.split(" ");

    String libraryPath = System.getProperty("java.library.path");

    if (libraryPath == null) {

      libraryPath = workDir.getAbsolutePath();

    } else {

      libraryPath += sep + workDir;

    }

    boolean haSUSErLDPath = false;

    for(int i=0; i<javaOptsSplit.length ;i++) {

      if(javaOptsSplit.startsWith("-Djava.library.path=")) {

        javaOptsSplit += sep + libraryPath;

        hasUserLDPath = true;

        break;

      }

    }

    if(!hasUserLDPath) {

      vargs.add("-Djava.library.path=" + libraryPath);

    }

    for (int i = 0; i < javaOptsSplit.length; i++) {

      vargs.add(javaOptsSplit);

    }

    //添加Child进程的临时文件夹

    String tmp = conf.get("mapred.child.tmp", "./tmp");

    Path tmpDir = new Path(tmp);

    if (!tmpDir.isAbsolute()) {

      tmpDir = new Path(workDir.toString(), tmp);

    }

    FileSystem localFs = FileSystem.getLocal(conf);

    if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

      throw new IOException("Mkdirs failed to create " + tmpDir.toString());

    }

    vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

    // Add classpath.

    vargs.add("-classpath");

    vargs.add(classPath.toString());

    //log文件夹

    long logSize = TaskLog.getTaskLogLength(conf);

    vargs.add("-DHadoop.log.dir=" +

        new File(System.getProperty("hadoop.log.dir")

        ).getAbsolutePath());

    vargs.add("-Dhadoop.root.logger=INFO,TLA");

    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

    // 运行map task和reduce task的子进程的main class是Child

    vargs.add(Child.class.getName());  // main of Child

    ……

    //运行子进程

    jvmManager.launchJvm(this,

        jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

            workDir, env, pidFile, conf));

}










六、Child

真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
while (true) {
  //从TaskTracker通过网络通信得到JvmTask对象
  JvmTask myTask = umbilical.getTask(jvmId);
  ……
  idleLoopCount = 0;
  task = myTask.getTask();
  taskid = task.getTaskID();
  isCleanup = task.isTaskCleanupTask();
  JobConf job = new JobConf(task.getJobFile());
  TaskRunner.setupWorkDir(job);
  numTasksToExecute = job.getNumTasksToExecutePerJvm();
  task.setConf(job);
  defaultConf.addResource(new Path(task.getJobFile()));
  ……
  //运行task
  task.run(job, umbilical);             // run the task
  if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
    break;
  }
}


6.1、MapTask
如果task是MapTask,则其run函数如下:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
//用于同TaskTracker进行通信,汇报运行状况
final Reporter reporter = getReporter(umbilical);
startCommunicationThread(umbilical);
initialize(job, reporter);
……
//map task的输出
int numReduceTasks = conf.getNumReduceTasks();
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
//读取input split,按照其中的信息,生成RecordReader来读取数据
instantiatedSplit = (InputSplit)
ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.getBytes(), 0, split.getLength());
instantiatedSplit.readFields(splitBuffer);
if (instantiatedSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) instantiatedSplit;
job.set("map.input.file", fileSplit.getPath().toString());
job.setLong("map.input.start", fileSplit.getStart());
job.setLong("map.input.length", fileSplit.getLength());
}
RecordReader rawIn = // open input
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
RecordReader in = isSkipping() ?
new SkippingRecordReader(rawIn, getCounters(), umbilical) :
new TrackedRecordReader(rawIn, getCounters());
job.setBoolean("mapred.skip.on", isSkipping());
//对于map task,生成一个MapRunnable,默认是MapRunner
MapRunnable runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
//MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。
runner.run(in, collector, reporter);
collector.flush();
} finally {
in.close(); // close input
collector.close();
}
done(umbilical);
}










MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
  1. public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  2.                 Reporter reporter)
  3.   throws IOException {
  4.   try {
  5.     K1 key = input.createKey();
  6.     V1 value = input.createValue();
  7.     while (input.next(key, value)) {
  8.       mapper.map(key, value, output, reporter);
  9.       if(incrProcCount) {
  10.         reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
  11.             SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
  12.       }
  13.     }
  14.   } finally {
  15.     mapper.close();
  16.   }
  17. }
复制代码











结果集全部收集到MapOutputBuffer中,其collect函数如下:
public synchronized void collect(K key, V value)
throws IOException {
reporter.progress();
……
//从此处看,此buffer是一个ring的数据结构
final int kvnext = (kvindex + 1) % kvoffsets.length;
spillLock.lock();
try {
boolean kvfull;
do {
//在ring中,如果下一个空闲位置接上起始位置的话,则表示满了
kvfull = kvnext == kvstart;
//在ring中计算是否需要将buffer写入硬盘的阈值
final boolean kvsoftlimit = ((kvnext > kvend)
? kvnext - kvend > softRecordLimit
: kvend - kvnext <= kvoffsets.length - softRecordLimit);
//如果到达阈值,则开始将buffer写入硬盘,写成spill文件。
//startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘

if (kvstart == kvend && kvsoftlimit) {
startSpill();
}
//如果buffer满了,则只能等待写入完毕
if (kvfull) {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
}
} while (kvfull);
} finally {
spillLock.unlock();
}
try {
//如果buffer不满,则将key, value写入buffer
int keystart = bufindex;
keySerializer.serialize(key);
final int valstart = bufindex;
valSerializer.serialize(value);
int valend = bb.markRecord();
//调用设定的partitioner,根据key, value取得partition id
final int partition = partitioner.getPartition(key, value, partitions);
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend >= keystart
? valend - keystart
: (bufvoid - keystart) + valend);
//将parition id以及key, value在buffer中的偏移量写入索引数组
int ind = kvindex * ACCTSIZE;
kvoffsets[kvindex] = ind;
kvindices[ind + PARTITION] = partition;
kvindices[ind + KEYSTART] = keystart;
kvindices[ind + VALSTART] = valstart;
kvindex = kvnext;
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value);
mapOutputRecordCounter.increment(1);
return;
}
}






内存buffer的格式如下:
(见几位Hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.javaeye.com/)

1.jpg

kvoffsets是为了写入内存前排序使用的。
从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
  1. private void sortAndSpill() throws IOException {
  2.   ……
  3.   FSDataOutputStream out = null;
  4.   FSDataOutputStream indexOut = null;
  5.   IFileOutputStream indexChecksumOut = null;
  6.   //创建硬盘上的spill文件
  7.   Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  8.                                   numSpills, size);
  9.   out = rfs.create(filename);
  10.   ……
  11.   final int endPosition = (kvend > kvstart)
  12.     ? kvend
  13.     : kvoffsets.length + kvend;
  14.   //按照partition的顺序对buffer中的数据进行排序
  15.   sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  16.   int spindex = kvstart;
  17.   InMemValBytes value = new InMemValBytes();
  18.   //依次一个一个parition的写入文件
  19.   for (int i = 0; i < partitions; ++i) {
  20.     IFile.Writer<K, V> writer = null;
  21.     long segmentStart = out.getPos();
  22.     writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
  23.     //如果combiner为空,则直接写入文件
  24.     if (null == combinerClass) {
  25.         ……
  26.         writer.append(key, value);
  27.         ++spindex;
  28.      }
  29.      else {
  30.         ……
  31.         //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
  32.         combineAndSpill(kvIter, combineInputCounter);
  33.      }
  34.   }
  35.   ……
  36. }
复制代码









当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
  1. private void mergeParts() throws IOException {
  2.     ……
  3.     //对于每一个partition
  4.     for (int parts = 0; parts < partitions; parts++){
  5.       //create the segments to be merged
  6.       List<Segment<K, V>> segmentList =
  7.         new ArrayList<Segment<K, V>>(numSpills);
  8.       TaskAttemptID mapId = getTaskID();
  9.        //依次从各个spill文件中收集属于当前partition的段
  10.       for(int i = 0; i < numSpills; i++) {
  11.         final IndexRecord indexRecord =
  12.           getIndexInformation(mapId, i, parts);
  13.         long segmentOffset = indexRecord.startOffset;
  14.         long segmentLength = indexRecord.partLength;
  15.         Segment<K, V> s =
  16.           new Segment<K, V>(job, rfs, filename[i], segmentOffset,
  17.                             segmentLength, codec, true);
  18.         segmentList.add(i, s);
  19.       }
  20.       //将属于同一个partition的段merge到一起
  21.       RawKeyValueIterator kvIter =
  22.         Merger.merge(job, rfs,
  23.                      keyClass, valClass,
  24.                      segmentList, job.getInt("io.sort.factor", 100),
  25.                      new Path(getTaskID().toString()),
  26.                      job.getOutputKeyComparator(), reporter);
  27.       //写入合并后的段到文件
  28.       long segmentStart = finalOut.getPos();
  29.       Writer<K, V> writer =
  30.           new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
  31.       if (null == combinerClass || numSpills < minSpillsForCombine) {
  32.         Merger.writeFile(kvIter, writer, reporter, job);
  33.       } else {
  34.         combineCollector.setWriter(writer);
  35.         combineAndSpill(kvIter, combineInputCounter);
  36.       }
  37.       ……
  38.     }
  39. }
复制代码







6.2、ReduceTask
ReduceTask的run函数如下:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException {
  job.setBoolean("mapred.skip.on", isSkipping());
  //对于reduce,则包含三个步骤:拷贝,排序,Reduce
  if (isMapOrReduce()) {
    copyPhase = getProgress().addPhase("copy");
    sortPhase  = getProgress().addPhase("sort");
    reducePhase = getProgress().addPhase("reduce");
  }
  startCommunicationThread(umbilical);
  final Reporter reporter = getReporter(umbilical);
  initialize(job, reporter);
  //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。
  boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
  if (!isLocal) {
    reduceCopier = new ReduceCopier(umbilical, job);
    if (!reduceCopier.fetchOutputs()) {
        ……
    }
  }
  copyPhase.complete();
  //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value
  setPhase(TaskStatus.Phase.SORT);
  statusUpdate(umbilical);
  final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  RawKeyValueIterator rIter = isLocal
    ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
        job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
        !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
        new Path(getTaskID().toString()), job.getOutputKeyComparator(),
        reporter)
    : reduceCopier.createKVIterator(job, rfs, reporter);
  mapOutputFilesOnDisk.clear();
  sortPhase.complete();
  //reduce阶段
  setPhase(TaskStatus.Phase.REDUCE);
  ……
  Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
  Class keyClass = job.getMapOutputKeyClass();
  Class valClass = job.getMapOutputValueClass();
  ReduceValuesIterator values = isSkipping() ?
     new SkippingReduceValuesIterator(rIter,
          job.getOutputValueGroupingComparator(), keyClass, valClass,
          job, reporter, umbilical) :
      new ReduceValuesIterator(rIter,
      job.getOutputValueGroupingComparator(), keyClass, valClass,
      job, reporter);
  //逐个读出key-value list,然后调用Reducer的reduce函数
  while (values.more()) {
    reduceInputKeyCounter.increment(1);
    reducer.reduce(values.getKey(), values, collector, reporter);
    values.nextKey();
    values.informReduceProgress();
  }
  reducer.close();
  out.close(reporter);
  done(umbilical);
}
















已有(2)人评论

跳转到指定楼层
xinren 发表于 2014-11-6 09:23:32
很值得学习。。。。。
回复

使用道具 举报

hery 发表于 2014-11-6 11:07:24
写点不错哦。。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条