xng2012 发表于 2013-12-12 03:03:08

Hadoop源码以及流程解析

整体结构DN: Configuration,JobClient,JobConfMaster:JobTracker,JobInProgress,TaskInProgressWN:TaskTracker,Task(MapTask、ReduceTask),JvmManager,Child ClientConfiguration从Configuration类的源代码可以看到,定义了如下私有成员变量: private boolean   quietmode = true;// 第一个是boolean型变量quietmode,用于设置加载配置的模式。通过阅读源代码就可以清楚,这个quietmode如果为true,实际上默认就为true,加载配置的模式为快速模式,其实也就是在解析配置文件的过程中,不输出日志信息,就这么简单。private ArrayList defaultResources = new ArrayList();//它是一个列表,该列表中存放的是配置文件的名称private ArrayList<Object> resources = new ArrayList<Object>();//全部资源的配置包括URL、String、Path、InputStreamprivate Set<String> finalParameters = new HashSet<String>();//程序性的 private boolean loadDefaults = true;//是否载入默认资源private static final WeakHashMap<Configuration, Object> REGISTRY = new WeakHashMap<Configuration, Object>();//private Properties properties;//个人程序所需要的所有配置 会以Properties的形式存储private Properties overlay;// 它也是一个Properties变量。它对应于finalResources列表,也就是解析finalResources列表中设置的配置文件,配置项设置到overlay中。这里,overlay比较关键的一点就是,如果overlay不为空属性配置,在创建一个Configuration实例的时候会检查overlay,不空就将其中的配置项加入到properties中private ClassLoader classLoader;//类加载器在这里所有客户端程序中配置的类的信息和其他运行信息,都会保存在这个类里。JobClientJobClient.runJob(job)静态方法会实例化一个JobClient实例,然后用此实例的submitJob(job)方法向 master提交作业。此方法会返回一个RunningJob对象,它用来跟踪作业的状态。作业提交完毕后,JobClient会根据此对象开始轮询作业的进度,直到作业完成。submitJob(job)内部是通过submitJobInternal(job)方法完成实质性的作业提交。submitJobInternal(job)方法首先会向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性 + jobId决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/ mapred/system。写完这三个文件之后, 此方法会通过RPC调用master节点上的JobTracker. submitJob(job)方法,此时作业已经提交完成。关键代码流程解析:jobClient.submit();调用jobClient.submitJobInternal(conf);在这个函数中利用jobId建立提交根路径,jar文件路径,job分割文件的路径,job.xml路径。代码如下:JobID jobId = jobSubmitClient.getNewJobId();//生成jobIdrpc         Path submitJobDir = new Path(getSystemDir(), jobId.toString());//用jobId来建立job任务的提交根路径Path submitJobFile = new Path(submitJobDir, "job.xml");//生成job.xml,这个xml将要记录Configuration中的所有配置信息,这个貌似与我们的NEMR的xml配置功能相似。….configureCommandLineOptions();//其中按job提交路径调用FileSystem在其中建立虚拟路径并,并把要执行的自定义程序打包成jar然后传到FileSystem中已定义的jar文件路径中。…..job.getOutputFormat().checkOutputSpecs(getFs(), job);//从这里去FileSystem中查看输出路径是否已存在,如果已存在则报已存在异常。否则继续执行。…..maps = writeNewSplits(context, submitSplitFile); //这一步的主要目标是将分割好的inputsplit数组信息写到Dfs中,然后把写入的路径添加到Configuration中{…..input=ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());//获取已定义的inputformat类,如果未定义默认为TextInputFormat。…..splits=input.getSplits(job);//获取分配好的的InputSplit集合。与我们的NEMR一样这里同样需要输入文件的wn地址,以TextInputFormat举例,TextInputFormat需要继承FileInputFormat类,FileInputFormat类中已实现getSplits()方法,在getSplits()方法中需要用到FileSystem,来获取输入文件的主机地址、长度、起点信息。…..DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, array.length);//这一步的目标是讲分割好的InputSplit信息写入到DFS中,包括头文件、版本号、数组长度……Serializer<T> serializer = factory.getSerializer((Class<T>) array.getClass());//在这里主要目标是将已经分割好的InputSplit信息写到Configuration中。}…..FSDataOutputStream out = FileSystem.create(getFs(), submitJobFile, new FsPermission (JOB_FILE_PERMISSION));try {       job.writeXml(out);} finally {       out.close();}//从这里我们看到上面代码中在fs中建立的job.xml,具体就是对job.xml把Configuration的配置信息写在job.xml里。……JobStatus status = jobSubmitClient.submitJob(jobId); //这里通知JobTracker,发送jobId过去,由JobTracker根据jobId去执行。JobClient里面使用使用RPC机制来构造一个实现 JobSubmissionProtocol接口的JobTracker的代理,然后利用远程发放直接执行JobTracker里的submitJob,与我们的利用Socket通信略有不同。DNJobTrackerJobTracker的地位相当于我们的Master,它负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它。JobTracker一直在等待JobClient通过RPC提交作业,而TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, 则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程: slave主动向master拉生意。slave节点的TaskTracker接到任务后在其本地发起Task,然后执行任务。启动:有一个main函数,不过这里我们可以先从它着手开始分析。tracker = new JobTracker(conf);   //构造构造函数先获取一堆常量的值,然后清空'systemDir',接着启动RPC服务器。       InetSocketAddress addr = getAddress(conf);      this.localMachine = addr.getHostName();      this.port = addr.getPort();      this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);      this.interTrackerServer.start(); 启动TrackInfoServer:       this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);      this.infoServer = new JobTrackerInfoServer(this, infoPort);      this.infoServer.start();JobInProgressJobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。TaskInProgress每个TaskInProgress就代表一个map或一个reduce处理,它将Job里面的Map和Reduce操作进行了封装,不过在Master端生成的TaskInProgress只是初始化了信息但并不调用执行方法。它等待TaskTracker端的RPC调用。 JobTracker.heartbeat();这个方法主要是TaskTracker端远程调用时用到的方法,其主要作用就是分派具体任务,并将该任务分发到TaskTracker端:其关键代码为:1 HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);2 List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();……3 if (tasks == null) {4   tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));5 }6 if (tasks != null) {7   for (Task task : tasks) {8            expireLaunchingTasks.addNewTask(task.getTaskID());         ……            9            actions.add(new LaunchTaskAction(task));10   }11}这里的调度中主要会根据我们的任务的输入文件dfs中的文件存放节点来分配,数据在哪个节点上任务就分配到哪个节点上的TaskTracker中。简单来说这里会根据我们的上面提到的JobInProgress和TaskInPorgress调度生成具体的MapTask和ReuceTask实例,他们均继承自抽象类Task该实例会放入到LaunchTaskAction中,最后获得的任务列表会被返回到TaskTracker端。Task会在下面做简单介绍。代码流程解析:第一步:当JobClient向JobTracker通信时利用RPC调用JobTracker的submitJob()方法时,中会生成一个JobInProgress类的实例,该实例是为了记录当前这个Job任务的执行状况。在构造的同时会调用到FileSystem,把在JobClient端上传的所有任务文件下载到本地的文件系统中的临时目录里。见程序清单 1,这其中包括上传的*.jar文件包、记录配置信息的xml、记录分割信息的文件。第二步:生成的JobInProgress实例会加载到作业队列管理器QueueManager中,通过QueueManager来调度。然后在流程中执行到jobtracker.initJob(),执行jobinProgress.initTask();根据初始化的信息生成每一个map与reduce,每个map与reduce都是TaskInProgress,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。生成的map与reduce监控类会被维护在JobInProgress实例中。第三步:首先步骤JobInProgress会创建map的监控对象, 在initTasks()函数里通过调用JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表该列表已经在初始化时下载到本地了,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在FILESYSTEM里的blocks所在的DataNode节点的host,这个会在RawSplit创建时通过FileSplit的函数获取,该函数会调用FileSystem的函数获得。其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的也是TaskInProgress类,不过构造方法有所不同。WNTaskTrackerTask的执行实际是由TaskTracker发起的,TaskTracker会定期(缺省为3秒钟,参见MRConstants类中定义的HEARTBEAT_INTERVAL变量)与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等,TaskTracker里面会通过循环的方式查找。TaskTracker$TaskLauncherTaskLauncher是个继承了Thread线程的TaskTracker的内部类,在这里面会维护一个TaskInProgress的链表:private List<TaskInProgress> tasksToLaunch;该列表中的每个TaskInProgress 实例对应一个TaskUnit任务。该类中的run方法才是主体关键之处,他会循环判断是否tasksToLaunch中有新任务要做,有就去从该列表中拿出来然后去调用TaskTracker.startNewTask(TaskInprogress);去开启一个新任务。Task在这里有两个子类,分别是MapTask 和 ReduceTask。 TaskTracker$TaskInProgress 这里的TaskInProgress主要是对每个执行的任务的监控和具体调度。TaskTracker.localizeJob();此函数主要任务是初始化工作目录workDir,再将job jar包从HDFS复制到本地文件系统中,调用RunJar.unJar()将包解压到工作目录。然后创建一个RunningJob并调用addTaskToJob()函数将它添加到runningJobs监控队列中。完成后即调用launchTaskForJob()开始执行Task。 TaskTracker.launchTaskForJob();主要是设置参数,并调用TaskTracker$TaskInProgress.launchTask()来启动已创建的Task任务。 JvmManager任务执行的主体调度。并管理任务队列,维护map与reduce的双重任务队列。这里面会根据取出的任务去生成一个对应的JvmRunner类,JvmRunner继承自Thread,其run函数主体会调用runChild()方法,runChild会执行主体函数。 MapTask.run();initialize(job, getJobID(), reporter, useNewApi);......if (useNewApi) {       runNewMapper(job, split, umbilical, reporter);} else {       runOldMapper(job, split, umbilical, reporter);}这里主要包括两个关键块:一个是initialize这里会根据预设的OutputFormat来格式输出,然后就是从runNewMapper()或runOldMapper()处执行map任务,用runNewMapper举例:从代码清单2中我们看出这里的执行过程,首先我们的程序先生成我们任务的map类,然后生成map任务的数据输入格式类,并根据我们的数据输入格式将我们的这块的数据分割成我们的指定的输入数据就是RecordReader,然后将RecordReader作为输入循环调用map的最终map()方法,也就是我们的客户端的主体map方法。ReduceTask.run();与map的开始过程一样,不再重复了,就是在后面有所不同,首先reduce的数据会在操作前利用Merge函数合并一下,然后生成key、value对遍历对象,然后执行循环执行Reducer.reduce(),结果上传到fs中。 代码流程:启动:TaskTracker的启动的时候会加载所有信息,包括利用RPC获得JobTracker 的RPC变量定义为jobClient,TaskTracker.run()方法会去循环向JobTracker心跳,在里面主要调用TaskTracker. offerService()方法,offerService方法调用JobTracker.transmitHeartBeat()方法,去执行jobClient.heartbeat()(也就是上面我们介绍的JobTracker.heartbeat();)返回心跳信息HeartbeatResponse类,所有map或reduce信息就在类LaunchTaskAction(每个maptask或reducetask对应一个独自的LaunchTaskAction实例,LaunchTaskAction类实现了抽象类LaunchTaskAction)的实例里面。TaskTrackerAction[] actions = heartbeatResponse.getActions();…….if (actions != null) {       for (TaskTrackerAction taskaction: actions) {            if (taskaction instanceof LaunchTaskAction) {                     addToTaskQueue((LaunchTaskAction) taskaction);…….根据获得的taskaction,循环添加到map与reduce维护列表中,如下:private TaskLauncher mapLauncher;private TaskLauncher reduceLauncher;从TaskLauncher的主体中会执行开启新的任务。这个在上面已经对TaskLauncher做了详细介绍。在上面代码中看到的addToTaskQueue ()方法在调用的时候会调用到TaskLauncher. addToTaskQueue(),该方法体内会调用TaskTracker.registerTask();该方法会根据每个taskaction生成一个TaskInProgress的实例,TaskInProgress与DN中的TaskInProgress不同,这里的TaskInProgress是TaskTracker的内部类,它是每个任务运行的主体,生成的实例会被添加到任务列表中去,就是我们在上面介绍过的tasksToLaunch。该类中的主体方法会循环查找任务去执行。如TaskLauncher类介绍,调用到的TaskTracker.startNewTask()方法开启一个新的任务,然后会去执行关键方法TaskTracker. localizeJob(任务)初始化所需要的信息,和工作目录的创建和调用hdfs下载执行的jar包和配置信息xml,完成向监控队列的添加后会去调用launchTaskForJob(),然后launchTaskForJob()调用TaskTracker$TaskInProgress.launchTask()开始执行Task。1 TaskTracker$TaskInProgress.launchTask()调用的分析:2 localizeTask(task);....3 this.runner = task.createRunner(TaskTracker.this, this);4 this.runner.start();通过localizeTask方法根据当前的任务创建工作目录,并把所需要的数据信息下载到本地。如果是Map任务的话就是MapTask.createRunner();方法会去创建MapTaskRunner,如果是Reduce的话就是ReduceTask.createRunner();方法去创建ReduceTaskRunner。TaskRunner是继承Thread的类,在它的run函数主体执行的过程比较复杂,主要的工作就是初始化启动java子进程的一系列环境变量,包括设定工作目录workDir,设置CLASSPATH环境变量等(需要将TaskTracker的环境变量以及job jar的路径合并起来),然后装载job jar包。代码往下走:jvmManager.launchJvm(…);这里是执行的主体,函数体内会判断是map任务还是reduce任务,并把它加载到管理队列里面。从JvmManager中我们看到runChild()方法的调用,方法中会去调用DefaultTaskController中的launchTaskJVM()方法,DefaultTaskController.launchTaskJVM()会根据已经获取的环境变量和jvm运行堆大小设定等参数,利用ProcessBuilder自动生成shell脚本,并运行来设定wn节点的变量信息。在装载过的信息中有个类是Child它自动去调用TaskTracker.getTask()方法去取出当前的task任务,然后调用run()方法在这里同样会用到hdfs从中取出输入数据,然后根据判断是旧api还是新api去执行对应的方法。具体在MapTask.run();和ReduceTask.run();中解释。与job执行相关的接口、类。与以前版本一样,这里没发生变化。程序清单1.   this.localFs = jobtracker.getLocalFileSystem();//hdfs   JobConf default_job_conf = new JobConf(default_conf);   this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR             + "/" + jobid + ".xml");   this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR             + "/" + jobid + ".jar");   Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);   fs = jobtracker.getFileSystem(jobDir);   jobFile = new Path(jobDir, "job.xml");   fs.copyToLocalFile(jobFile, localJobFile);   conf = new JobConf(localJobFile);   this.priority = conf.getJobPriority();   this.status.setJobPriority(this.priority);   this.profile = new JobProfile(conf.getUser(), jobid,             jobFile.toString(), url, conf.getJobName(), conf.getQueueName());   String jarFile = conf.getJar();   if (jarFile != null) {         fs.copyToLocalFile(new Path(jarFile), localJarFile);         conf.setJar(localJarFile.toString());   }代码清单2// make a task context so we can get the classes      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(                job, getTaskID());      // make a mapper      org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>) ReflectionUtils                .newInstance(taskContext.getMapperClass(), job);      // make the input format      org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>) ReflectionUtils                .newInstance(taskContext.getInputFormatClass(), job);      // rebuild the input split      org.apache.hadoop.mapreduce.InputSplit split = null;      DataInputBuffer splitBuffer = new DataInputBuffer();      splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());      SerializationFactory factory = new SerializationFactory(job);      Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) factory                .getDeserializer(job.getClassByName(splitClass));      deserializer.open(splitBuffer);      split = deserializer.deserialize(null);       org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new NewTrackingRecordReader<INKEY, INVALUE>(                inputFormat.createRecordReader(split, taskContext), reporter);       job.setBoolean("mapred.skip.on", isSkipping());      org.apache.hadoop.mapreduce.RecordWriter output = null;      org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = null;      try {            Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor = org.apache.hadoop.mapreduce.Mapper.Context.class                  .getConstructor(new Class[] {                            org.apache.hadoop.mapreduce.Mapper.class,                            Configuration.class,                            org.apache.hadoop.mapreduce.TaskAttemptID.class,                            org.apache.hadoop.mapreduce.RecordReader.class,                            org.apache.hadoop.mapreduce.RecordWriter.class,                            org.apache.hadoop.mapreduce.OutputCommitter.class,                            org.apache.hadoop.mapreduce.StatusReporter.class,                            org.apache.hadoop.mapreduce.InputSplit.class });             // get an output object            if (job.getNumReduceTasks() == 0) {                output = outputFormat.getRecordWriter(taskContext);            } else {                output = new NewOutputCollector(taskContext, job, umbilical,                        reporter);            }             mapperContext = contextConstructor.newInstance(mapper, job,                  getTaskID(), input, output, committer, reporter, split);             input.initialize(split, mapperContext);            mapper.run(mapperContext);            mapPhase.complete();            setPhase(TaskStatus.Phase.SORT);            statusUpdate(umbilical);            input.close();            output.close(mapperContext);      } catch (NoSuchMethodException e) {            throw new IOException("Can't find Context constructor", e);      } catch (InstantiationException e) {            throw new IOException("Can't create Context", e);      } catch (InvocationTargetException e) {            throw new IOException("Can't invoke Context constructor", e);      } catch (IllegalAccessException e) {            throw new IOException("Can't invoke Context constructor", e);   }
页: [1]
查看完整版本: Hadoop源码以及流程解析