Hadoop 如何提交Job
本帖最后由 howtodown 于 2014-1-23 19:51 编辑servlet 能否提交job,可以的。只要是能运行程序的地方能识别代码的地方都可以提交job。无论是server,还是client。关键是谁来干这件事。那么剩下的就是该怎么提交job。下面给大家分享一下。
首先创建一个JobClient对象,此对象在构造函数中会根据JobConf对象去连接JobTracker。
JobClient与JobTracker通信是通过jobSubmitClient操作的,jobSubmitClient是JobSubmissionProtocol类型的动态代理类,是通过如下方法产生的:
private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
}
getProxy方法的关键是Invoker类,Invoker类实现了InvocationHandler接口,主要有两个成员变量,remoteId是Client.ConnectionId类型,保存连接地址和用户的ticket,客户端连接服务器由<remoteAddress,protocol,ticket>唯一标识。从这里我们也可以看到一些配置属性值,默认的rpcTimeout是0,
ipc.client.connection.maxidletime客户端连接的最大空闲时间是10s,
ipc.client.connect.max.retries客户端同服务器建立连接时的最大重试次数是10,
ipc.client.tcpnodelay是否开启Nagle算法(对TCP/IP进行拥塞控制),如果开启,会减少延迟,但是会增加小数据报,默认是false。client是Client类,用于IPC通信。client会通过ClientCache类来缓存,如果缓存中没有,会新建一个Client,否则原client计数加1。Invoker类主要的方法是invoke方法,invoke方法的功能是调用client的方法然后返回结果。动态代理类代理的对象是Client对象。
submitJobInternal方法是真正用来提交job的,具体步骤如下:
1、初始化staging目录,staging目录根目录是由mapreduce.jobtracker.staging.root.dir配置的,默认是/tmp/Hadoop/mapred/staging,具体到某个用户的staging目录是$ROOT/userName/.staging。
2、从JobTracker那里取得新的job id,job id从1开始递增。
3、获得提交job的目录submitJobDir=用户的staging目录/jobid,并且将这个目录设置成mapreduce.job.dir的值。
4、copyAndConfigureFiles拷贝和初始化文件,首先从配置属性mapred.submit.replication取得replication值,默认为10。然后判断submitJobDir目录是否存在,如果存在拋异常;否则创建submitJobDir目录;取得job的分布式缓存文件路径=submitJobDir/files;取得job的分布式缓存存档路径=submitJobDir/archives;取得job的分布式缓存libjars路径=submitJobDir/libjars;如果命令行参数有tmpfiles,则将这些文件拷贝到分布式缓存文件路径下,同时将这个路径加入到分布式缓存中;如果命令行参数有tmpjars,则将这些文件拷贝到分布式缓存libjars路径下,同时将这个路径加入到分布式缓存中;如果命令行参数有tmparchives,则将这些文件拷贝到分布式缓存存档路径下,同时将这个路径加入到分布式缓存中;根据mapred.jar属性取得jar包的路径,如果没有指定job的名字,那么将使用jar包的名字作为job名字;取得job jar的存储路径=submitJobDir/job.jar;将用户指定的jar包拷贝到job jar的存储路径;设置工作目录,默认是配置属性mapred.working.dir指定的值。
5、取得job配置文件的路径submitJobFile=submitJobDir/job.xml;设置
mapreduce.job.submithostaddress为本机ip地址,设置
mapreduce.job.submithost为本机主机名。
6、为job创建输入分区,这是由writeSplits方法完成的。以old api为例,首先调用InputFormat的getSplits方法得到一个InputSplit分区数组,FileInputFormat类的getSplits方法实现过程如下:
通过listStatus方法取得输入文件路径列表,过滤掉_和.开头的路径以及根据设置的mapred.input.pathFilter.class过滤;
在JobConf中设置mapreduce.input.num.files为输入文件数;
计算出所有输入文件的总大小totalSize,目标分区大小goalSize=totalSize/numSplits(由mapred.map.tasks配置,默认为1),最小分区大小minSize=mapred.min.split.size配置和1之间的较大值,对于每一个输入文件,如果这个文件的长度不等于0并且是可切分的,计算分区大小splitSize=Math.max(minSize,Math.min(goalSize,blockSize)),blockSize为HDFS存储文件的块大小,对于每一个分区大小,计算对其贡献最大的主机数组(根据机架以及块的字节大小确定),然后将这个分区加入到分区列表;然后根据分区大长度从大到小对分区列表进行排序;然后将分区列表写入到分区文件,分区文件名=submitJobDir/job.split,分区文件的存储格式:SPL字节信息,分区版本号,{InputSplit类名,InputSplit类信息}+;SplitMetaInfo数组记录每个分区信息在文件中的偏移,主机信息和长度;将分区Meta信息SplitMetaInfo数组写入到文件submitJobDir/job.splitmetainfo。
7、JobConf设置mapred.map.tasks为分区数。
8、根据mapred.job.queue.name获得job提交的队列的名字,默认是default,然后根据这个队列名字获得访问控制列表。
9、将重新配置过的JobConf写入到submitJobDir/job.xml文件。
10、将jobid,submitJobDir信息传给JobTracker正式提交job,并通过NetworkedJob对象跟踪job的状态。
monitorAndPrintJob方法监控job的运行并且实时打印job的状态。
页:
[1]