Spark性能相关配置参数详解
本帖最后由 Oner 于 2017-11-16 17:18 编辑问题导读:
1. shuffle 相关的配置参数有哪些?
2. Storage相关的配置参数有哪些?
3. 压缩和序列化相关的配置参数有哪些?
4. schedule调度相关的配置参数有哪些?
static/image/hrline/4.gif
Spark性能相关参数配置
http://spark-config.readthedocs.io/en/latest/#
概述
随着Spark的逐渐成熟完善, 越来越多的可配置参数被添加到Spark中来, 在Spark的官方文档http://spark.apache.org/docs/latest/configuration.html 中提供了这些可配置参数中相当大一部分的说明.
但是文档的更新总是落后于代码的开发的, 还有一些配置参数没有来得及被添加到这个文档中, 最重要的是在这个文档中,对于许多的参数也只能简单的介绍它所代表的内容的字面含义, 如果没有一定的实践基础或者对其背后原理的理解, 往往无法真正理解该如何针对具体应用场合进行合理配置。
本文试图通过阐述这其中部分参数的工作原理和配置思路, 和大家一起探讨一下如何根据实际场合对Spark进行配置优化。需要注意的是,理论上,没有绝对正确的配置(否则也就不需要对应的配置参数了,Spark框架内部直接写死就好了),所以请结合自己的实际情况,辩证的看下面的内容。由于本文主要针对和性能相关的一些配置参数进行阐述,所以基本不会覆盖其它和性能没有太多关系的配置参数。
[*]Shuffle 相关(http://spark-config.readthedocs.io/en/latest/shuffle.html)
[*]Storage相关配置参数(http://spark-config.readthedocs.io/en/latest/storage.html)
[*]压缩和序列化相关(http://spark-config.readthedocs.io/en/latest/compress.html)
[*]schedule调度相关(http://spark-config.readthedocs.io/en/latest/scheduler.html)
章节
[*]Shuffle 相关(http://spark-config.readthedocs.io/en/latest/shuffle.html)
[*]spark.shuffle.manager(http://spark-config.readthedocs.io/en/latest/shuffle.html#spark-shuffle-manager)
[*]spark.shuffle.sort.bypassMergeThreshold(http://spark-config.readthedocs.io/en/latest/shuffle.html#spark-shuffle-sort-bypassmergethreshold)
[*]spark.shuffle.consolidateFiles(http://spark-config.readthedocs.io/en/latest/shuffle.html#spark-shuffle-consolidatefiles)
[*]spark.shuffle.spill(http://spark-config.readthedocs.io/en/latest/shuffle.html#spark-shuffle-spill)
[*]spark.shuffle.memoryFraction / spark.shuffle.safetyFraction(http://spark-config.readthedocs.io/en/latest/shuffle.html#spark-shuffle-memoryfraction-spark-shuffle-safetyfraction)
[*]spark.shuffle.spill.compress / spark.shuffle.compress(http://spark-config.readthedocs.io/en/latest/shuffle.html#spark-shuffle-spill-compress-spark-shuffle-compress)
[*]Storage相关配置参数(http://spark-config.readthedocs.io/en/latest/storage.html)
[*]spark.local.dir(http://spark-config.readthedocs.io/en/latest/storage.html#spark-local-dir)
[*]spark.executor.memory(http://spark-config.readthedocs.io/en/latest/storage.html#spark-executor-memory)
[*]spark.storage.memoryFraction(http://spark-config.readthedocs.io/en/latest/storage.html#spark-storage-memoryfraction)
[*]spark.streaming.blockInterval(http://spark-config.readthedocs.io/en/latest/storage.html#spark-streaming-blockinterval)
[*]压缩和序列化相关(http://spark-config.readthedocs.io/en/latest/compress.html)
[*]spark.serializer(http://spark-config.readthedocs.io/en/latest/compress.html#spark-serializer)
[*]spark.rdd.compress(http://spark-config.readthedocs.io/en/latest/compress.html#spark-rdd-compress)
[*]spark.broadcast.compress(http://spark-config.readthedocs.io/en/latest/compress.html#spark-broadcast-compress)
[*]spark.io.compression.codec(http://spark-config.readthedocs.io/en/latest/compress.html#spark-io-compression-codec)
[*]schedule调度相关(http://spark-config.readthedocs.io/en/latest/scheduler.html)
[*]spark.cores.max(http://spark-config.readthedocs.io/en/latest/scheduler.html#spark-cores-max)
[*]spark.task.cpus(http://spark-config.readthedocs.io/en/latest/scheduler.html#spark-task-cpus)
[*]spark.scheduler.mode(http://spark-config.readthedocs.io/en/latest/scheduler.html#spark-scheduler-mode)
[*]spark.locality.wait(http://spark-config.readthedocs.io/en/latest/scheduler.html#spark-locality-wait)
[*]spark.speculation(http://spark-config.readthedocs.io/en/latest/scheduler.html#spark-speculation)
Indices and tables
[*]Index(http://spark-config.readthedocs.io/en/latest/genindex.html)
[*]Module Index(http://spark-config.readthedocs.io/en/latest/py-modindex.html)
[*]Search Page(http://spark-config.readthedocs.io/en/latest/search.html)
Shuffle 相关
Shuffle操作大概是对Spark性能影响最大的步骤之一(因为可能涉及到排序,磁盘IO,网络IO等众多CPU或IO密集的操作),这也是为什么在Spark 1.1的代码中对整个Shuffle框架代码进行了重构,将Shuffle相关读写操作抽象封装到Pluggable的Shuffle Manager中,便于试验和实现不同的Shuffle功能模块。例如为了解决Hash Based的Shuffle Manager在文件读写效率方面的问题而实现的Sort Base的Shuffle Manager。
spark.shuffle.manager
用来配置所使用的Shuffle Manager,目前可选的Shuffle Manager包括默认的org.apache.spark.shuffle.sort.HashShuffleManager(配置参数值为hash)和新的org.apache.spark.shuffle.sort.SortShuffleManager(配置参数值为sort)。
这两个ShuffleManager如何选择呢,首先需要了解他们在实现方式上的区别。
HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中。带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件。如果文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量众多,序列化,以及压缩等操作需要分配的临时内存空间也可能会迅速膨胀到无法接受的地步,对内存的使用和GC带来很大的压力,在Executor内存比较小的情况下尤为突出,例如Spark on Yarn模式。
SortShuffleManager,是1.1版本之后实现的一个试验性(也就是一些功能和接口还在开发演变中)的ShuffleManager,它在写入分区数据的时候,首先会根据实际情况对数据采用不同的方式进行排序操作,底线是至少按照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都可以写入到同一个外部磁盘文件中去,用简单的Offset标志不同Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只需要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题两者的性能比较,取决于内存,排序,文件操作等因素的综合影响。
对于不需要进行排序的Shuffle操作来说,如repartition等,如果文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager需要额外的根据Partition进行排序,显然HashShuffleManager的效率会更高。
而对于本来就需要在Map端进行排序的Shuffle操作来说,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其它的步骤中仍然需要排序,而SortShuffleManager则可以将写数据和排序两个工作合并在一起执行,因此即使不考虑HashShuffleManager的内存使用问题,SortShuffleManager依旧可能更快。
spark.shuffle.sort.bypassMergeThreshold
这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。 这个参数的默认值是200个分区,如果内存GC问题严重,可以降低这个值。
spark.shuffle.consolidateFiles
这个配置参数仅适用于HashShuffleMananger的实现,同样是为了解决生成过多文件的问题,采用的方式是在不同批次运行的Map任务之间重用Shuffle输出文件,也就是说合并的是不同批次的Map任务的输出数据,但是每个Map任务所需要的文件还是取决于Reduce分区的数量,因此,它并不减少同时打开的输出文件的数量,因此对内存使用量的减少并没有帮助。只是HashShuffleManager里的一个折中的解决方案。
需要注意的是,这部分的代码实现尽管原理上说很简单,但是涉及到底层具体的文件系统的实现和限制等因素,例如在并发访问等方面,需要处理的细节很多,因此一直存在着这样那样的bug或者问题,导致在例如EXT3上使用时,特定情况下性能反而可能下降,因此从Spark 0.8的代码开始,一直到Spark 1.1的代码为止也还没有被标志为Stable,不是默认采用的方式。此外因为并不减少同时打开的输出文件的数量,因此对性能具体能带来多大的改善也取决于具体的文件数量的情况。所以即使你面临着Shuffle文件数量巨大的问题,这个配置参数是否使用,在什么版本中可以使用,也最好还是实际测试以后再决定。
spark.shuffle.spill
shuffle的过程中,如果涉及到排序,聚合等操作,势必会需要在内存中维护一些数据结构,进而占用额外的内存。如果内存不够用怎么办,那只有两条路可以走,一就是out of memory 出错了,二就是将部分数据临时写到外部存储设备中去,最后再合并到最终的Shuffle输出文件中去。
这里spark.shuffle.spill 决定是否Spill到外部存储设备(默认打开),如果你的内存足够使用,或者数据集足够小,当然也就不需要Spill,毕竟Spill带来了额外的磁盘操作。
spark.shuffle.memoryFraction / spark.shuffle.safetyFraction
在启用Spill的情况下,spark.shuffle.memoryFraction(1.1后默认为0.2)决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始Spill。
通过spark.shuffle.memoryFraction可以调整Spill的触发条件,即Shuffle占用内存的大小,进而调整Spill的频率和GC的行为。总的来说,如果Spill太过频繁,可以适当增加spark.shuffle.memoryFraction的大小,增加用于Shuffle的内存,减少Spill的次数。当然这样一来为了避免内存溢出,对应的可能需要减少RDD cache占用的内存,即减小spark.storage.memoryFraction的值,这样RDD cache的容量减少,有可能带来性能影响,因此需要综合考虑。
由于Shuffle数据的大小是估算出来的,一来为了降低开销,并不是每增加一个数据项都完整的估算一次,二来估算也会有误差,所以实际暂用的内存可能比估算值要大,这里spark.shuffle.safetyFraction(默认为0.8)用来作为一个保险系数,降低实际Shuffle使用的内存阀值,增加一定的缓冲,降低实际内存占用超过用户配置值的概率。
spark.shuffle.spill.compress / spark.shuffle.compress
这两个配置参数都是用来设置Shuffle过程中是否使用压缩算法对Shuffle数据进行压缩,前者针对Spill的中间数据,后者针对最终的shuffle输出文件,默认都是True
理论上说,spark.shuffle.compress设置为True通常都是合理的,因为如果使用千兆以下的网卡,网络带宽往往最容易成为瓶颈。此外,目前的Spark任务调度实现中,以Shuffle划分Stage,下一个Stage的任务是要等待上一个Stage的任务全部完成以后才能开始执行,所以shuffle数据的传输和CPU计算任务之间通常不会重叠,这样Shuffle数据传输量的大小和所需的时间就直接影响到了整个任务的完成速度。但是压缩也是要消耗大量的CPU资源的,所以打开压缩选项会增加Map任务的执行时间,因此如果在CPU负载的影响远大于磁盘和网络带宽的影响的场合下,也可能将spark.shuffle.compress 设置为False才是最佳的方案
对于spark.shuffle.spill.compress而言,情况类似,但是spill数据不会被发送到网络中,仅仅是临时写入本地磁盘,而且在一个任务中同时需要执行压缩和解压缩两个步骤,所以对CPU负载的影响会更大一些,而磁盘带宽(如果标配12HDD的话)可能往往不会成为Spark应用的主要问题,所以这个参数相对而言,或许更有机会需要设置为False。
总之,Shuffle过程中数据是否应该压缩,取决于CPU/DISK/NETWORK的实际能力和负载,应该综合考虑。
Storage相关配置参数
spark.local.dir
这个看起来很简单,就是Spark用于写中间数据,如RDD Cache,Shuffle,Spill等数据的位置,那么有什么可以注意的呢。
首先,最基本的当然是我们可以配置多个路径(用逗号分隔)到多个磁盘上增加整体IO带宽,这个大家都知道。
其次,目前的实现中,Spark是通过对文件名采用hash算法分布到多个路径下的目录中去,如果你的存储设备有快有慢,比如SSD+HDD混合使用,那么你可以通过在SSD上配置更多的目录路径来增大它被Spark使用的比例,从而更好地利用SSD的IO带宽能力。当然这只是一种变通的方法,终极解决方案还是应该像目前HDFS的实现方向一样,让Spark能够感知具体的存储设备类型,针对性的使用。
需要注意的是,在Spark 1.0 以后,SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)参数会覆盖这个配置。比如Spark On YARN的时候,Spark Executor的本地路径依赖于Yarn的配置,而不取决于这个参数。
spark.executor.memory
Executor 内存的大小,和性能本身当然并没有直接的关系,但是几乎所有运行时性能相关的内容都或多或少间接和内存大小相关。这个参数最终会被设置到Executor的JVM的heap尺寸上,对应的就是Xmx和Xms的值
理论上Executor 内存当然是多多益善,但是实际受机器配置,以及运行环境,资源共享,JVM GC效率等因素的影响,还是有可能需要为它设置一个合理的大小。 多大算合理,要看实际情况
Executor的内存基本上是Executor内部所有任务共享的,而每个Executor上可以支持的任务的数量取决于Executor所管理的CPU Core资源的多少,因此你需要了解每个任务的数据规模的大小,从而推算出每个Executor大致需要多少内存即可满足基本的需求。
如何知道每个任务所需内存的大小呢,这个很难统一的衡量,因为除了数据集本身的开销,还包括算法所需各种临时内存空间的使用,而根据具体的代码算法等不同,临时内存空间的开销也不同。但是数据集本身的大小,对最终所需内存的大小还是有一定的参考意义的。
通常来说每个分区的数据集在内存中的大小,可能是其在磁盘上源数据大小的若干倍(不考虑源数据压缩,Java对象相对于原始裸数据也还要算上用于管理数据的数据结构的额外开销),需要准确的知道大小的话,可以将RDD cache在内存中,从BlockManager的Log输出可以看到每个Cache分区的大小(其实也是估算出来的,并不完全准确)如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134 (size: 495.3 MB)反过来说,如果你的Executor的数量和内存大小受机器物理配置影响相对固定,那么你就需要合理规划每个分区任务的数据规模,例如采用更多的分区,用增加任务数量(进而需要更多的批次来运算所有的任务)的方式来减小每个任务所需处理的数据大小。
spark.storage.memoryFraction
如前面所说spark.executor.memory决定了每个Executor可用内存的大小,而spark.storage.memoryFraction则决定了在这部分内存中有多少可以用于Memory Store管理RDD Cache数据,剩下的内存用来保证任务运行时各种其它内存空间的需要。
spark.executor.memory默认值为0.6,官方文档建议这个比值不要超过JVM Old Gen区域的比值。这也很容易理解,因为RDD Cache数据通常都是长期驻留内存的,理论上也就是说最终会被转移到Old Gen区域(如果该RDD还没有被删除的话),如果这部分数据允许的尺寸太大,势必把Old Gen区域占满,造成频繁的FULL GC。
如何调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略的来说,如果频繁发生Full GC,可以考虑降低这个比值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上了),会带来一定的性能损失,但是腾出更多的内存空间用于执行任务,减少Full GC发生的次数,反而可能改善程序运行的整体性能
spark.streaming.blockInterval
这个参数用来设置Spark Streaming里Stream Receiver生成Block的时间间隔,默认为200ms。具体的行为表现是具体的Receiver所接收的数据,每隔这里设定的时间间隔,就从Buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager中供后续计算过程使用。理论上来说,为了每个Streaming Batch 间隔里的数据是均匀的,这个时间间隔当然应该能被Batch的间隔时间长度所整除。总体来说,如果内存大小够用,Streaming的数据来得及处理,这个blockInterval时间间隔的影响不大,当然,如果数据Cache Level是Memory+Ser,即做了序列化处理,那么BlockInterval的大小会影响序列化后数据块的大小,对于Java 的GC的行为会有一些影响。
此外spark.streaming.blockQueueSize决定了在StreamBlock被存储到BlockMananger之前,队列中最多可以容纳多少个StreamBlock。默认为10,因为这个队列Poll的时间间隔是100ms,所以如果CPU不是特别繁忙的话,基本上应该没有问题。
压缩和序列化相关
spark.serializer
默认为org.apache.spark.serializer.JavaSerializer, 可选 org.apache.spark.serializer.KryoSerializer, 实际上只要是org.apache.spark.serializer的子类就可以了,不过如果只是应用,大概你不会自己去实现一个的。
序列化对于spark应用的性能来说,还是有很大影响的,在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,当然放到整个Spark程序中来考量,比重就没有那么大了,但是以Wordcount为例,通常也很容易达到30%以上的性能提升。而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持。需要注意的是,这里可配的Serializer针对的对象是Shuffle数据,以及RDD Cache等场合,而Spark Task的序列化是通过spark.closure.serializer来配置,但是目前只支持JavaSerializer,所以等于没法配置啦。
更多Kryo序列化相关优化配置,可以参考 http://spark.apache.org/docs/latest/tuning.html#data-serialization 一节
spark.rdd.compress
这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,对于Cache在磁盘上而言,绝对大小大概没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。
这两者,前者通常不会是主要问题,尤其是在RDD Cache本身的目的就是追求速度,减少重算步骤,用IO换CPU的情况下。而后者,GC问题当然是需要考量的,数据量小,占用空间少,GC的问题大概会减轻,但是是否真的需要走到RDD Cache压缩这一步,或许用其它方式来解决可能更加有效。
所以这个值默认是关闭的,但是如果在磁盘IO的确成为问题或者GC问题真的没有其它更好的解决办法的时候,可以考虑启用RDD压缩。
spark.broadcast.compress
是否对Broadcast的数据进行压缩,默认值为True。
Broadcast机制是用来减少运行每个Task时,所需要发送给TASK的RDD所使用到的相关数据的尺寸,一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本地的BlockManager中获取相关数据。在1.1最新版本的代码中,RDD本身也改为以Broadcast的形式发送给Executor(之前的实现RDD本身是随每个任务发送的),因此基本上不太需要显式的决定哪些数据需要broadcast了。
因为Broadcast的数据需要通过网络发送,而在Executor端又需要存储在本地BlockMananger中,加上最新的实现,默认RDD通过Boradcast机制发送,因此大大增加了Broadcast变量的比重,所以通过压缩减小尺寸,来减少网络传输开销和内存占用,通常都是有利于提高整体性能的。
什么情况可能不压缩更好呢,大致上个人觉得同样还是在网络带宽和内存不是问题的时候,如果Driver端CPU资源很成问题(毕竟压缩的动作基本都在Driver端执行),那或许有调整的必要。
spark.io.compression.codec
RDD Cache和Shuffle数据压缩所采用的算法Codec,默认值曾经是使用LZF作为默认Codec,最近因为LZF的内存开销的问题,默认的Codec已经改为Snappy。
LZF和Snappy相比较,前者压缩率比较高(当然要看具体数据内容了,通常要高20%左右),但是除了内存问题以外,CPU代价也大一些(大概也差20%~50%?)
在用于Shuffle数据的场合下,内存方面,应该主要是在使用HashShuffleManager的时候有可能成为问题,因为如果Reduce分区数量巨大,需要同时打开大量的压缩数据流用于写文件,进而在Codec方面需要大量的buffer。但是如果使用SortShuffleManager,由于shuffle文件数量大大减少,不会产生大量的压缩数据流,所以内存开销大概不会成为主要问题。
剩下的就是CPU和压缩率的权衡取舍,和前面一样,取决于CPU/网络/磁盘的能力和负载,个人认为CPU通常更容易成为瓶颈。所以要调整性能,要不不压缩,要不使用Snappy可能性大一些?
对于RDD Cache的场合来说,绝大多数场合都是内存操作或者本地IO,所以CPU负载的问题可能比IO的问题更加突出,这也是为什么 spark.rdd.compress 本身默认为不压缩,如果要压缩,大概也是Snappy合适一些?
schedule调度相关
调度相关的参数设置,大多数内容都很直白,其实无须过多的额外解释,不过基于这些参数的常用性(大概会是你针对自己的集群第一步就会配置的参数),这里多少就其内部机制做一些解释。
spark.cores.max
一个集群最重要的参数之一,当然就是CPU计算资源的数量。spark.cores.max 这个参数决定了在Standalone和Mesos模式下,一个Spark应用程序所能申请的CPU Core的数量。如果你没有并发跑多个Spark应用程序的需求,那么可以不需要设置这个参数,默认会使用spark.deploy.defaultCores的值(而spark.deploy.defaultCores的值默认为Int.Max,也就是不限制的意思)从而应用程序可以使用所有当前可以获得的CPU资源。
针对这个参数需要注意的是,这个参数对Yarn模式不起作用,YARN模式下,资源由Yarn统一调度管理,一个应用启动时所申请的CPU资源的数量由另外两个直接配置Executor的数量和每个Executor中core数量的参数决定。(历史原因造成,不同运行模式下的一些启动参数个人认为还有待进一步整合)此外,在Standalone模式等后台分配CPU资源时,目前的实现中,在spark.cores.max允许的范围内,基本上是优先从每个Worker中申请所能得到的最大数量的CPU core给每个Executor,因此如果人工限制了所申请的Max Core的数量小于Standalone和Mesos模式所管理的CPU数量,可能发生应用只运行在集群中部分节点上的情况(因为部分节点所能提供的最大CPU资源数量已经满足应用的要求),而不是平均分布在集群中。通常这不会是太大的问题,但是如果涉及数据本地性的场合,有可能就会带来一定的必须进行远程数据读取的情况发生。理论上,这个问题可以通过两种途径解决:一是Standalone和Mesos的资源管理模块自动根据节点资源情况,均匀分配和启动Executor,二是和Yarn模式一样,允许用户指定和限制每个Executor的Core的数量。 社区中有一个PR试图走第二种途径来解决类似的问题,不过截至我写下这篇文档为止(2014.8),还没有被Merge。
spark.task.cpus
这个参数在字面上的意思就是分配给每个任务的CPU的数量,默认为1。实际上,这个参数并不能真的控制每个任务实际运行时所使用的CPU的数量,比如你可以通过在任务内部创建额外的工作线程来使用更多的CPU(至少目前为止,将来任务的执行环境是否能通过LXC等技术来控制还不好说)。它所发挥的作用,只是在作业调度时,每分配出一个任务时,对已使用的CPU资源进行计数。也就是说只是理论上用来统计资源的使用情况,便于安排调度。因此,如果你期望通过修改这个参数来加快任务的运行,那还是赶紧换个思路吧。这个参数的意义,个人觉得还是在你真的在任务内部自己通过任何手段,占用了更多的CPU资源时,让调度行为更加准确的一个辅助手段。
spark.scheduler.mode
这个参数决定了单个Spark应用内部调度的时候使用FIFO模式还是Fair模式。是的,你没有看错,这个参数只管理一个Spark应用内部的多个没有依赖关系的Job作业的调度策略。
如果你需要的是多个Spark应用之间的调度策略,那么在Standalone模式下,这取决于每个应用所申请和获得的CPU资源的数量(暂时没有获得资源的应用就Pending在那里了),基本上就是FIFO形式的,谁先申请和获得资源,谁就占用资源直到完成。而在Yarn模式下,则多个Spark应用间的调度策略由Yarn自己的策略配置文件所决定。
那么这个内部的调度逻辑有什么用呢?如果你的Spark应用是通过服务的形式,为多个用户提交作业的话,那么可以通过配置Fair模式相关参数来调整不同用户作业的调度和资源分配优先级。
spark.locality.wait
spark.locality.wait和spark.locality.wait.process,spark.locality.wait.node, spark.locality.wait.rack这几个参数影响了任务分配时的本地性策略的相关细节。
Spark中任务的处理需要考虑所涉及的数据的本地性的场合,基本就两种,一是数据的来源是HadoopRDD; 二是RDD的数据来源来自于RDD Cache(即由CacheManager从BlockManager中读取,或者Streaming数据源RDD)。其它情况下,如果不涉及shuffle操作的RDD,不构成划分Stage和Task的基准,不存在判断Locality本地性的问题,而如果是ShuffleRDD,其本地性始终为No Prefer,因此其实也无所谓Locality。
在理想的情况下,任务当然是分配在可以从本地读取数据的节点上时(同一个JVM内部或同一台物理机器内部)的运行时性能最佳。但是每个任务的执行速度无法准确估计,所以很难在事先获得全局最优的执行策略,当Spark应用得到一个计算资源的时候,如果没有可以满足最佳本地性需求的任务可以运行时,是退而求其次,运行一个本地性条件稍差一点的任务呢,还是继续等待下一个可用的计算资源已期望它能更好的匹配任务的本地性呢?
这几个参数一起决定了Spark任务调度在得到分配任务时,选择暂时不分配任务,而是等待获得满足进程内部/节点内部/机架内部这样的不同层次的本地性资源的最长等待时间。默认都是3000毫秒。
基本上,如果你的任务数量较大和单个任务运行时间比较长的情况下,单个任务是否在数据本地运行,代价区别可能比较显著,如果数据本地性不理想,那么调大这些参数对于性能优化可能会有一定的好处。反之如果等待的代价超过带来的收益,那就不要考虑了。
特别值得注意的是:在处理应用刚启动后提交的第一批任务时,由于当作业调度模块开始工作时,处理任务的Executors可能还没有完全注册完毕,因此一部分的任务会被放置到No Prefer的队列中,这部分任务的优先级仅次于数据本地性满足Process级别的任务,从而被优先分配到非本地节点执行,如果的确没有Executors在对应的节点上运行,或者的确是No Prefer的任务(如shuffleRDD),这样做确实是比较优化的选择,但是这里的实际情况只是这部分Executors还没来得及注册上而已。这种情况下,即使加大本节中这几个参数的数值也没有帮助。针对这个情况,有一些已经完成的和正在进行中的PR通过例如动态调整No Prefer队列,监控节点注册比例等等方式试图来给出更加智能的解决方案。不过,你也可以根据自身集群的启动情况,通过在创建SparkContext之后,主动Sleep几秒的方式来简单的解决这个问题。
spark.speculation
spark.speculation以及spark.speculation.interval, spark.speculation.quantile, spark.speculation.multiplier等参数调整Speculation行为的具体细节,Speculation是在任务调度的时候,如果没有适合当前本地性要求的任务可供运行,将跑得慢的任务在空闲计算资源上再度调度的行为,这些参数调整这些行为的频率和判断指标,默认是不使用Speculation的。
通常来说很难正确的判断是否需要Speculation,能真正发挥Speculation用处的场合,往往是某些节点由于运行环境原因,比如CPU资源由于某种原因被占用,磁盘损坏导致IO缓慢造成任务执行速度异常的情况,当然前提是你的分区任务不存在仅能被执行一次,或者不能同时执行多个拷贝等情况。
Speculation任务参照的指标通常是其它任务的执行时间,而实际的任务可能由于分区数据尺寸不均匀,本来就会有时间差异,加上一定的调度和IO的随机性,所以如果一致性指标定得过严,Speculation可能并不能真的发现问题,反而增加了不必要的任务开销,定得过宽,大概又基本相当于没用。个人觉得,如果你的集群规模比较大,运行环境复杂,的确可能经常发生执行异常,加上数据分区尺寸差异不大,为了程序运行时间的稳定性,那么可以考虑仔细调整这些参数。否则还是考虑如何排除造成任务执行速度异常的因数比较靠铺一些。
当然,我没有实际在很大规模的集群上运行过Spark,所以如果看法有些偏颇,还请有实际经验的XD指正。
来源:http://blog.csdn.net/ZYC88888/article/details/78531462作者:ZhaoYingChao88
页:
[1]