- public static RunningJob runJob(JobConf job) throws IOException {
- //首先生成一个JobClient对象
- JobClient jc = new JobClient(job);
- ……
- //调用submitJob来提交一个任务
- running = jc.submitJob(job);
- JobID jobId = running.getID();
- ……
- while (true) {
- //while循环中不断得到此任务的状态,并打印到客户端console中
- }
- return running;
- }
- 其中JobClient的submitJob函数实现如下:
- public RunningJob submitJob(JobConf job) throws FileNotFoundException,
- InvalidJobConfException, IOException {
- //从JobTracker得到当前任务的id
- JobID jobId = jobSubmitClient.getNewJobId();
- //准备将任务运行所需要的要素写入HDFS:
- //任务运行程序所在的jar封装成job.jar
- //任务所要处理的input split信息写入job.split
- //任务运行的配置项汇总写入job.xml
- Path submitJobDir = new Path(getSystemDir(), jobId.toString());
- Path submitJarFile = new Path(submitJobDir, "job.jar");
- Path submitSplitFile = new Path(submitJobDir, "job.split");
- //此处将-libjars命令行指定的jar上传至HDFS
- configureCommandLineOptions(job, submitJobDir, submitJarFile);
- Path submitJobFile = new Path(submitJobDir, "job.xml");
- ……
- //通过input format的格式获得相应的input split,默认类型为FileSplit
- InputSplit[] splits =
- job.getInputFormat().getSplits(job, job.getNumMapTasks());
- // 生成一个写入流,将input split得信息写入job.split文件
- FSDataOutputStream out = FileSystem.create(fs,
- submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
- try {
- //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。
- //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。
- writeSplitsFile(splits, out);
- } finally {
- out.close();
- }
- job.set("mapred.job.split.file", submitSplitFile.toString());
- //根据split的个数设定map task的个数
- job.setNumMapTasks(splits.length);
- // 写入job的配置信息入job.xml文件
- out = FileSystem.create(fs, submitJobFile,
- new FsPermission(JOB_FILE_PERMISSION));
- try {
- job.writeXml(out);
- } finally {
- out.close();
- }
- //真正的调用JobTracker来提交任务
- JobStatus status = jobSubmitClient.submitJob(jobId);
- ……
- }
调用静态函数startTracker(new JobConf())创建一个JobTracker对象
JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化
- synchronized (jobs) {
- synchronized (taskScheduler) {
- jobs.put(job.getProfile().getJobID(), job);
- //对JobTracker的每一个listener都调用jobAdded函数
- for (JobInProgressListener listener : jobInProgressListeners) {
- listener.jobAdded(job);
- }
- }
- }
- public synchronized void initTasks() throws IOException {
- ……
- //从HDFS中读取job.split文件从而生成input splits
- String jobFile = profile.getJobFile();
- Path sysDir = new Path(this.jobtracker.getSystemDir());
- FileSystem fs = sysDir.getFileSystem(conf);
- DataInputStream splitFile =
- fs.open(new Path(conf.get("mapred.job.split.file")));
- JobClient.RawSplit[] splits;
- try {
- splits = JobClient.readSplitFile(splitFile);
- } finally {
- splitFile.close();
- }
- //map task的个数就是input split的个数
- numMapTasks = splits.length;
- //为每个map tasks生成一个TaskInProgress来处理一个input split
- maps = new TaskInProgress[numMapTasks];
- for(int i=0; i < numMapTasks; ++i) {
- inputLength += splits[i].getDataLength();
- maps[i] = new TaskInProgress(jobId, jobFile,
- splits[i],
- jobtracker, conf, this, i);
- }
- //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
- if (numMapTasks > 0) {
- nonRunningMapCache = createCache(splits, maxLevel);
- }
- //创建reduce task
- this.reduces = new TaskInProgress[numReduceTasks];
- for (int i = 0; i < numReduceTasks; i++) {
- reduces[i] = new TaskInProgress(jobId, jobFile,
- numMapTasks, i,
- jobtracker, conf, this);
- //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。
- nonRunningReduces.add(reduces[i]);
- }
- //创建两个cleanup task,一个用来清理map,一个用来清理reduce.
- cleanup = new TaskInProgress[2];
- cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
- jobtracker, conf, this, numMapTasks);
- cleanup[0].setJobCleanupTask();
- cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
- numReduceTasks, jobtracker, conf, this);
- cleanup[1].setJobCleanupTask();
- //创建两个初始化 task,一个初始化map,一个初始化reduce.
- setup = new TaskInProgress[2];
- setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
- jobtracker, conf, this, numMapTasks + 1 );
- setup[0].setJobSetupTask();
- setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
- numReduceTasks + 1, jobtracker, conf, this);
- setup[1].setJobSetupTask();
- tasksInited.set(true);//初始化完毕
- ……
- }
TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
- public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
- boolean initialContact, boolean acceptNewTasks, short responseId)
- throws IOException {
- ……
- String trackerName = status.getTrackerName();
- ……
- short newResponseId = (short)(responseId + 1);
- ……
- HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
- List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
- //如果TaskTracker向JobTracker请求一个task运行
- if (acceptNewTasks) {
- TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
- if (taskTrackerStatus == null) {
- LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
- } else {
- //setup和cleanup的task优先级最高
- List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
- if (tasks == null ) {
- //任务调度器分配任务
- tasks = taskScheduler.assignTasks(taskTrackerStatus);
- }
- if (tasks != null) {
- for (Task task : tasks) {
- //将任务放入actions列表,返回给TaskTracker
- expireLaunchingTasks.addNewTask(task.getTaskID());
- actions.add(new LaunchTaskAction(task));
- }
- }
- }
- }
- ……
- int nextInterval = getNextHeartbeatInterval();
- response.setHeartbeatInterval(nextInterval);
- response.setActions(
- actions.toArray(new TaskTrackerAction[actions.size()]));
- ……
- return response;
- }
从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。
在向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
- private void addToTaskQueue(LaunchTaskAction action) {
- if (action.getTask().isMapTask()) {
- mapLauncher.addToTaskQueue(action);
- } else {
- reduceLauncher.addToTaskQueue(action);
- }
- }
TaskLauncher是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
- private void localizeJob(TaskInProgress tip) throws IOException {
- //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar
- Path localJarFile = null;
- Task t = tip.getTask();
- JobID jobId = t.getJobID();
- Path jobFile = new Path(t.getJobFile());
- ……
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "job.xml",
- jobFileSize, fConf);
- RunningJob rjob = addTaskToJob(jobId, tip);
- synchronized (rjob) {
- if (!rjob.localized) {
- FileSystem localFs = FileSystem.getLocal(fConf);
- Path jobDir = localJobFile.getParent();
- ……
- //将job.split拷贝到本地
- systemFS.copyToLocalFile(jobFile, localJobFile);
- JobConf localJobConf = new JobConf(localJobFile);
- Path workDir = lDirAlloc.getLocalPathForWrite(
- (getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "work"), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty("job.local.dir", workDir.toString());
- localJobConf.set("job.local.dir", workDir.toString());
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "jars",
- 5 * jarFileSize, fConf), "job.jar");
- if (!localFs.mkdirs(localJarFile.getParent())) {
- throw new IOException("Mkdirs failed to create jars directory ");
- }
- //将job.jar拷贝到本地
- systemFS.copyToLocalFile(jarFilePath, localJarFile);
- localJobConf.setJar(localJarFile.toString());
- //将job得configuration写成job.xml
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
- // 解压缩job.jar
- RunJar.unJar(new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()));
- }
- rjob.localized = true;
- rjob.jobConf = localJobConf;
- }
- }
- //真正的启动此Task
- launchTaskForJob(tip, new JobConf(rjob.jobConf));
- }
- public synchronized void launchTask() throws IOException {
- ……
- //创建task运行目录
- localizeTask(task);
- if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
- this.taskStatus.setRunState(TaskStatus.State.RUNNING);
- }
- //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
- this.runner = task.createRunner(TaskTracker.this, this);
- this.runner.start();
- this.taskStatus.setStartTime(System.currentTimeMillis());
- }
- public final void run() {
- ……
- TaskAttemptID taskid = t.getTaskID();
- LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- File jobCacheDir = null;
- if (conf.getJar() != null) {
- jobCacheDir = new File(
- new Path(conf.getJar()).getParent().toString());
- }
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(
- t.getJobID().toString(),
- t.getTaskID().toString(),
- t.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- conf). toString());
- FileSystem fileSystem;
- Path localPath;
- ……
- //拼写classpath
- String baseDir;
- String sep = System.getProperty("path.separator");
- StringBuffer classPath = new StringBuffer();
- // start with same classpath as parent process
- classPath.append(System.getProperty("java.class.path"));
- classPath.append(sep);
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- LOG.fatal("Mkdirs failed to create " + workDir.toString());
- }
- }
- String jar = conf.getJar();
- if (jar != null) {
- // if jar exists, it into workDir
- File[] libs = new File(jobCacheDir, "lib").listFiles();
- if (libs != null) {
- for (int i = 0; i < libs.length; i++) {
- classPath.append(sep); // add libs from jar to classpath
- classPath.append(libs[i]);
- }
- }
- classPath.append(sep);
- classPath.append(new File(jobCacheDir, "classes"));
- classPath.append(sep);
- classPath.append(jobCacheDir);
- }
- ……
- classPath.append(sep);
- classPath.append(workDir);
- //拼写命令行java及其参数
- Vector<String> vargs = new Vector<String>(8);
- File jvm =
- new File(new File(System.getProperty("java.home"), "bin"), "java");
- vargs.add(jvm.toString());
- String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
- javaOpts = javaOpts.replace("@taskid@", taskid.toString());
- String [] javaOptsSplit = javaOpts.split(" ");
- String libraryPath = System.getProperty("java.library.path");
- if (libraryPath == null) {
- libraryPath = workDir.getAbsolutePath();
- } else {
- libraryPath += sep + workDir;
- }
- boolean hasUserLDPath = false;
- for(int i=0; i<javaOptsSplit.length ;i++) {
- if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
- javaOptsSplit[i] += sep + libraryPath;
- hasUserLDPath = true;
- break;
- }
- }
- if(!hasUserLDPath) {
- vargs.add("-Djava.library.path=" + libraryPath);
- }
- for (int i = 0; i < javaOptsSplit.length; i++) {
- vargs.add(javaOptsSplit[i]);
- }
- //添加Child进程的临时文件夹
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- }
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
- // Add classpath.
- vargs.add("-classpath");
- vargs.add(classPath.toString());
- //log文件夹
- long logSize = TaskLog.getTaskLogLength(conf);
- vargs.add("-Dhadoop.log.dir=" +
- new File(System.getProperty("hadoop.log.dir")
- ).getAbsolutePath());
- vargs.add("-Dhadoop.root.logger=INFO,TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
- // 运行map task和reduce task的子进程的main class是Child
- vargs.add(Child.class.getName()); // main of Child
- ……
- //运行子进程
- jvmManager.launchJvm(this,
- jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
- workDir, env, pidFile, conf));
- }
真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
- while (true) {
- //从TaskTracker通过网络通信得到JvmTask对象
- JvmTask myTask = umbilical.getTask(jvmId);
- ……
- idleLoopCount = 0;
- task = myTask.getTask();
- taskid = task.getTaskID();
- isCleanup = task.isTaskCleanupTask();
- JobConf job = new JobConf(task.getJobFile());
- TaskRunner.setupWorkDir(job);
- numTasksToExecute = job.getNumTasksToExecutePerJvm();
- task.setConf(job);
- defaultConf.addResource(new Path(task.getJobFile()));
- ……
- //运行task
- task.run(job, umbilical); // run the task
- if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
- break;
- }
- }
- public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
- throws IOException {
- //用于同TaskTracker进行通信,汇报运行状况
- final Reporter reporter = getReporter(umbilical);
- startCommunicationThread(umbilical);
- initialize(job, reporter);
- ……
- //map task的输出
- int numReduceTasks = conf.getNumReduceTasks();
- MapOutputCollector collector = null;
- if (numReduceTasks > 0) {
- collector = new MapOutputBuffer(umbilical, job, reporter);
- } else {
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
- }
- //读取input split,按照其中的信息,生成RecordReader来读取数据
- instantiatedSplit = (InputSplit)
- ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
- DataInputBuffer splitBuffer = new DataInputBuffer();
- splitBuffer.reset(split.getBytes(), 0, split.getLength());
- instantiatedSplit.readFields(splitBuffer);
- if (instantiatedSplit instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) instantiatedSplit;
- job.set("map.input.file", fileSplit.getPath().toString());
- job.setLong("map.input.start", fileSplit.getStart());
- job.setLong("map.input.length", fileSplit.getLength());
- }
- RecordReader rawIn = // open input
- job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
- RecordReader in = isSkipping() ?
- new SkippingRecordReader(rawIn, getCounters(), umbilical) :
- new TrackedRecordReader(rawIn, getCounters());
- job.setBoolean("mapred.skip.on", isSkipping());
- //对于map task,生成一个MapRunnable,默认是MapRunner
- MapRunnable runner =
- ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
- try {
- //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。
- runner.run(in, collector, reporter);
- collector.flush();
- } finally {
- in.close(); // close input
- collector.close();
- }
- done(umbilical);
- }
- public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
- Reporter reporter)
- throws IOException {
- try {
- K1 key = input.createKey();
- V1 value = input.createValue();
- while (input.next(key, value)) {
- mapper.map(key, value, output, reporter);
- if(incrProcCount) {
- reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
- }
- }
- } finally {
- mapper.close();
- }
- }
- private void sortAndSpill() throws IOException {
- ……
- FSDataOutputStream out = null;
- FSDataOutputStream indexOut = null;
- IFileOutputStream indexChecksumOut = null;
- //创建硬盘上的spill文件
- Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
- out = rfs.create(filename);
- ……
- final int endPosition = (kvend > kvstart)
- ? kvend
- : kvoffsets.length + kvend;
- //按照partition的顺序对buffer中的数据进行排序
- sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
- int spindex = kvstart;
- InMemValBytes value = new InMemValBytes();
- //依次一个一个parition的写入文件
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<K, V> writer = null;
- long segmentStart = out.getPos();
- writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
- //如果combiner为空,则直接写入文件
- if (null == combinerClass) {
- ……
- writer.append(key, value);
- ++spindex;
- }
- else {
- ……
- //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
- combineAndSpill(kvIter, combineInputCounter);
- }
- }
- ……
- }
- private void mergeParts() throws IOException {
- ……
- //对于每一个partition
- for (int parts = 0; parts < partitions; parts++){
- //create the segments to be merged
- List<Segment<K, V>> segmentList =
- new ArrayList<Segment<K, V>>(numSpills);
- TaskAttemptID mapId = getTaskID();
- //依次从各个spill文件中收集属于当前partition的段
- for(int i = 0; i < numSpills; i++) {
- final IndexRecord indexRecord =
- getIndexInformation(mapId, i, parts);
- long segmentOffset = indexRecord.startOffset;
- long segmentLength = indexRecord.partLength;
- Segment<K, V> s =
- new Segment<K, V>(job, rfs, filename[i], segmentOffset,
- segmentLength, codec, true);
- segmentList.add(i, s);
- }
- //将属于同一个partition的段merge到一起
- RawKeyValueIterator kvIter =
- Merger.merge(job, rfs,
- keyClass, valClass,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(getTaskID().toString()),
- job.getOutputKeyComparator(), reporter);
- //写入合并后的段到文件
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
- if (null == combinerClass || numSpills < minSpillsForCombine) {
- Merger.writeFile(kvIter, writer, reporter, job);
- } else {
- combineCollector.setWriter(writer);
- combineAndSpill(kvIter, combineInputCounter);
- }
- ……
- }
- }
- public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
- throws IOException {
- job.setBoolean("mapred.skip.on", isSkipping());
- //对于reduce,则包含三个步骤:拷贝,排序,Reduce
- if (isMapOrReduce()) {
- copyPhase = getProgress().addPhase("copy");
- sortPhase = getProgress().addPhase("sort");
- reducePhase = getProgress().addPhase("reduce");
- }
- startCommunicationThread(umbilical);
- final Reporter reporter = getReporter(umbilical);
- initialize(job, reporter);
- //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。
- boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
- if (!isLocal) {
- reduceCopier = new ReduceCopier(umbilical, job);
- if (!reduceCopier.fetchOutputs()) {
- ……
- }
- }
- copyPhase.complete();
- //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value
- setPhase(TaskStatus.Phase.SORT);
- statusUpdate(umbilical);
- final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- RawKeyValueIterator rIter = isLocal
- ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
- !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
- new Path(getTaskID().toString()), job.getOutputKeyComparator(),
- reporter)
- : reduceCopier.createKVIterator(job, rfs, reporter);
- mapOutputFilesOnDisk.clear();
- sortPhase.complete();
- //reduce阶段
- setPhase(TaskStatus.Phase.REDUCE);
- ……
- Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
- Class keyClass = job.getMapOutputKeyClass();
- Class valClass = job.getMapOutputValueClass();
- ReduceValuesIterator values = isSkipping() ?
- new SkippingReduceValuesIterator(rIter,
- job.getOutputValueGroupingComparator(), keyClass, valClass,
- job, reporter, umbilical) :
- new ReduceValuesIterator(rIter,
- job.getOutputValueGroupingComparator(), keyClass, valClass,
- job, reporter);
- //逐个读出key-value list,然后调用Reducer的reduce函数
- while (values.more()) {
- reduceInputKeyCounter.increment(1);
- reducer.reduce(values.getKey(), values, collector, reporter);
- values.nextKey();
- values.informReduceProgress();
- }
- reducer.close();
- out.close(reporter);
- done(umbilical);
- }