代码可见 : org.apache.hadoop.mapred.TaskTracker.TaskLauncher.run() 。
Mapper 和 Reducer 都是单独的进程,但是它们与 slots 的关系是这样的:
- org.apache.hadoop.mapred.TaskTracker.TaskLauncher.run() {
- ... ...
- //got a free slot. launch the task
- startNewTask(tip);
- ... ...
- }
复制代码
这里的 slots 有点类似 “令牌” 的感觉:申请资源,先获得令牌;释放资源,交还令牌。
> mapper 和 reducer 都是单独的进程?好像有点不对,是单独的线程吧?
是单独的进程。
启动Mapper/Reducer的总的调用路径是:- org.apache.hadoop.mapred.TaskTracker.TaskLauncher.run()
- ->
- org.apache.hadoop.mapred.TaskTracker.startNewTask()
- ->
- org.apache.hadoop.mapred.TaskTracker.launchTaskForJob()
- ->
- org.apache.hadoop.mapred.TaskTracker.TaskInProgress.launchTask()
- ->
- org.apache.hadoop.mapred.Task.createRunner() // 抽象方法,具体实现在子类 MapTask 和 ReduceTask 中
- |-> org.apache.hadoop.mapred.MapTask.createRunner() // 创建 MapTaskRunner 类实例
- |-> org.apache.hadoop.mapred.ReduceTask.createRunner() // 创建 ReduceTaskRunner 类实例
复制代码
最终,跟踪到了 MapTaskRunner 和 ReduceTaskRunner 这两个类。
至此,我们看看它们的父类 org.apache.hadoop.mapred.TaskRunner ,以下是类的说明:- /** Base class that runs a task in a separate process. Tasks are run in a
- * separate process in order to isolate the map/reduce system code from bugs in
- * user supplied map and reduce functions.
- */
复制代码
TaskRunner 虽然 extends Thread (看起来是个线程),但是真正启动Mapper和Reduce进程的代码在函数 TaskRunner.run() 中:- public final void run() {
- ... ...
- launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
- ... ...
- }
复制代码
其调用了 TaskRunner.launchJvmAndWait() 方法(在此之前还有些创建文件夹、设置配置参数和环境变量等准备性的操作):- void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
- File stderr, long logSize, File workDir, Map<String, String> env)
- throws InterruptedException {
- jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
- stderr, logSize, workDir, env, conf));
- synchronized (lock) {
- while (!done) {
- lock.wait();
- }
- }
- }
复制代码
上面代码主要是 launch 一个 java虚拟机进程。这也是Hadoop启动代价很高的原因,因为launch虚拟机是比较耗资源的;于是又提供了Task JVM Reuse机制。
单独起进程的原因也说得很清楚,就是: isolate the map/reduce system code from bugs in user supplied map and reduce functions。其实就是,通过使用不同的进程空间,进行隔离,防止用户提供的代码中有bug死掉后,造成 TaskTracker 所在进程也死掉(这个死掉了,效果就跟阿凡达里面的发光树被毁了一样)。
Hadoop-0.20.2源码中的实现基本也是差不多的。