分享

MapReduce执行过程分析

本帖最后由 javaanddonet 于 2018-4-19 18:26 编辑

1. MapReduce输入的时候,会对HDFS上面的文件进行split,切分的原则是什么样子的?假如HDFS默认文件存储块大小是64MB。如果一个129MB的文件存在HDFS上面,会有三个文件块,分别是64MB/64MB/1MB。那么此时进行切分的时候,是split为几分?

2. 切分后,是否每一个切分文件对应一个map任务?

3. 基于1和2两个问题,是否可以认为:每一个文件块对应一个split的文件,并且对应一个map任务?

4. 每一个MapReduce任务对应几个map,由什么决定的某一个MR任务有几个map任务?

5. 如果一个MR job有5个map,那么这5个map是每一个结算节点上一个map任务吗?还是可以多个map在同一个计算节点上?还是根据hadoop的移动计算的原理而来,数据块文件在哪里,map任务就在哪里?

6. 在每一个map任务中,都对应一个缓冲区,如果缓冲区大于80%,那么既要将缓冲区的数据,spill溢写到本地磁盘中。我知道这个本地磁盘不是指HDFS,那么请问这个本地磁盘是在哪里?需要在何处配置吗?

7. 基于6的基础上,每一个map任务,当它的缓冲区的数据大于80%的时候就溢写到本地磁盘,如果数据量比较大,那么这个map任务有可能会发生很多次的溢写,那么最后这个map任务运行结束后,会生成N多个溢写文件。这些溢写文件会最后合并,是合并为一个溢出写文件?还是多个?如果多个是由什么决定的?需要在哪里配置吗?

本帖被以下淘专辑推荐:

已有(12)人评论

跳转到指定楼层
hello2018 发表于 2018-4-19 22:41:43
上面问题,问的都非常的细致。研究研究,分多次回答,楼主有问题和看法也可以交流。

Hadoop 2.x默认的block大小是128MB,Hadoop 1.x默认的block大小是64MB,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte。

分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size mapred.max.split.size,minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807。【额外说明这里由于版本更新hadoop2.x后面版本及hadoop3.x mapred.min.split.size更改为mapreduce.input.fileinputformat.split.minsize     maxSplitSize更改为: mapreduce.job.split.metainfo.maxsize          】

那么分片到底是多大呢?


minSize=max{minSplitSize,mapred.min.split.size}

maxSize=mapred.max.split.size

splitSize=max{minSize,min{maxSize,blockSize}}

从上面我们看出分片原则:

split分片其实是综合得来的,并不是固定的,而且他和blocksize是有关系的。

首先还是自己的配置,也就是你配置的分片大小。然后最后得出结果。比如你的


我们再来看一下源码

1.jpg

所以在我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?

答案是实际的文件大小,而非一个块的大小。

后面在研究下新版本。继续作答



回复

使用道具 举报

javaanddonet 发表于 2018-4-20 09:36:58
hello2018 发表于 2018-4-19 22:41
上面问题,问的都非常的细致。研究研究,分多次回答,楼主有问题和看法也可以交流。

Hadoop 2.x默认的bl ...

多谢指导,看来,看源码很有必要。
回复

使用道具 举报

hello2018 发表于 2018-4-20 20:57:16
javaanddonet 发表于 2018-4-20 09:36
多谢指导,看来,看源码很有必要。

这里更加简洁的总结:
Split的最大值为:max_split

Split的最小值为:min_split

Block的大小:block

切分规则:max(min_split,min(max_split,block)),主要是为了减少网络带宽。
咱们套用上面的公式:
min_split=1b
max_split=Long.MAX_VALUE = 9223372036854775807

max(1,min(9223372036854775807,64M))
这里显然split为64M
既然为64M,那么64MB/64MB/1MB。那么此时进行切分的时候,是split为几分?
应该分为3份。(由于fileinputmat类中,有个split_slop参数,主要作用判断文件剩余字节是否超过1.1的范围,如超过按新的split处理,如不超过,split夸块处理读取,节省资源。也就是有2分,非3份。由公众号粉丝:久久儿提供,感谢)


回复

使用道具 举报

hello2018 发表于 2018-4-20 21:01:40
第二个问题,这个比较简单了

1.png

如上图每一个split,都对应一个map任务

第三个问题:基于1和2两个问题,是否可以认为:每一个文件块对应一个split的文件,并且对应一个map任务?
我们同样看上图,一个文件有很多split,对应多个map任务
同样第四个问题:每一个MapReduce任务对应几个map,由什么决定的某一个MR任务有几个map任务?我们还是看上图,map任务的个数由分片来决定,也就是split决定。当然也可以通过配置JobConf.setNumMapTasks(n),但是如果小于split,不会生效的。大于split才会生效



回复

使用道具 举报

qcbb001 发表于 2018-4-22 14:55:35
这里针对第五个问题:
首先需要了解yarn整个的流程:
步骤1 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。

步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。

步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

步骤5 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。

步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。

步骤7 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。

     在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。

步骤8 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。


##################

我们可以从步骤3和步骤4得知,任务的分配,是由ResourceManager来决定的,由ApplicationMaster申请的。ResourceManager并不是根据舒俱来分配任务,而是根据整体的资源来分配。那么它分配的原则是什么?

首先我们需要知道有三种调度策略FIFO、CapacityScheduler、FairScheduler,每个调度策略又是不同的。因为任务分配也是不同的。但是有一个共同特性:计算本地性

本地性有3个级别:NODE_LOCAL、RACK_LOCAL、OFF_SWITCH,分别代表同节点、同机架、跨机架。计算效率会依次递减。

因为HDFS的多副本,任务应该尽量在选择block所在的机器上执行,可以减少网络传输的消耗。如果开启了Short-Circuit Read特性,还可以直接读本地文件,提高效率。

scheduler能做的只是尽量满足NODE_LOCAL,尽量避免OFF_SWITCH。计算本地性更多的要AM端配合,当AM拿到资源后,优先分配给NODE_LOCAL的任务。

更多参考
http://jxy.me/2015/04/30/yarn-resource-scheduler/




回复

使用道具 举报

hello2018 发表于 2018-4-22 15:39:46
6.在每一个map任务中,都对应一个缓冲区,如果缓冲区大于80%,那么既要将缓冲区的数据,spill溢写到本地磁盘中。我知道这个本地磁盘不是指HDFS,那么请问这个本地磁盘是在哪里?需要在何处配置吗?
这个其实是shuffle阶段,map输出中间结果,而这个结果需要输入到reduce。这个80%是中间结果的时候,先放到缓存,然后整个缓冲区有个溢写的比例spill.percent(可以通过属性Io.sort.spill.percent配置【新版本hadoop3.1已经更新为:mapreduce.map.sort.spill.percent】),这个比例默认是0.8,
1.png
地址:
http://hadoop.apache.org/docs/r3 ... /mapred-default.xml

也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。溢写到哪里?楼主认为是本地磁盘??个人认为这个是一般来讲是写到hdfs文件。在哪配置?如上图,需要在mapred-site.xml文件中配置
回复

使用道具 举报

walykyy 发表于 2018-4-22 15:53:00
好好学习,天天向上
回复

使用道具 举报

hello2018 发表于 2018-4-22 16:08:06
7. 基于6的基础上,每一个map任务,当它的缓冲区的数据大于80%的时候就溢写到本地磁盘,如果数据量比较大,那么这个map任务有可能会发生很多次的溢写,那么最后这个map任务运行结束后,会生成N多个溢写文件。这些溢写文件会最后合并,是合并为一个溢出写文件?还是多个?如果多个是由什么决定的?需要在哪里配置吗?这些溢写文件最终会合并为一个文件。这个过程就叫做Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。
回复

使用道具 举报

javaanddonet 发表于 2018-4-22 17:30:31
hello2018 发表于 2018-4-22 15:39
6.在每一个map任务中,都对应一个缓冲区,如果缓冲区大于80%,那么既要将缓冲区的数据,spill溢写到本地磁 ...

我原先也是认为是溢写到HDFS磁盘上,但是我看网上很多资料强调说是写到“本地磁盘”,我就迷惑了,本地磁盘是什么意思?难道不是写在HDFS上?所以才有这个疑问。看来我是多虑了。及时溢写到HDFS磁盘上了。多谢解答。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条