1.如何设置Spark程序的并行度?
2.并行度和分区数的关系?
3.如何正确设置RDD分区数?4.分区数设置的最佳实践?
并行度指的就是RDD的分区数,由于一个分区对应一个task,并行度也是一个stage中的task数,这些task被并行地处理。我们知道,RDD是以partition即分区的形式散落在集群上,每个分区都包含了一部分待处理的数据,Spark程序运行时,会为每个待处理的分区创建一个task,且默认情况下每个task占用一个cpu core来处理。
Spark有一套自己自动推导出默认的分区数的机制。当我们在程序中通过操作算子如textFile等读取外部数据源以获得Input RDD时,spark会自动地根据外部数据源的大小推导出一个合适的默认的分区数,如HDFS文件的每个block就对应一个分区;在对RDD进行map类不涉及shuffle的操作的时候,由于分区数具有遗传性,新产生的RDD的分区数由parent RDD中最大的分区数决定;在对RDD进行reduce类涉及shuffle操作的算子时(如groupByKey,reduceByKey等各种reduce操作算子),由于分区数具有遗传性,新产生的RDD的分区数也由parent RDD中最大的分区数决定。如果是在spark-shell交互式命令终端下,可以通过方法rdd.partitions.size来获得某个RDD的分区数,而在spark1.6.0以后的版本中,也可以通过rdd.getNumPartitions()来获得某个RDD的分区数。
并行度对性能的影响有两方面,当并行度不够大时会存在资源的闲置与浪费,比如一个应用程序分配到了1000个core,但是一个stage里只有30个task,这时就可以提高并行度以提升硬件利用率;而当并行度太大时,task常常几微妙就执行完毕,或task读写的数据量很小,这种情况下,task频繁地开辟与销毁的不必要的开销则太大,我们就需要调小并行度。
由于Spark自动推导出来的默认的分区数很多时候是不理想的,我们必须人为地加以控制,来改变并行度。Spark提供了四种改变并行度的方式。
第一种,在使用读取外部数据源的textFile类算子时,我们可以通过可选的参数minPartitions来显示指定最小的分区数。
第二种,针对已经存在的RDD,可以通过方法repartition()或coalesce()来改变并行度。repartition()和coalesce()的区别在于,前者会产生shuffle,而后者默认不会产生shuffle。事实上,当有大量小任务(任务处理的数据量小且耗时短)时,比如某个rdd在filter操作后,由于过滤掉了大量数据,每个分区都只剩下了很少量的数据,这时我们常用coalesce()来合并分区,调小并行度,减少不必要的任务开辟与销毁的消耗;而当任务耗时长且处理的数据量大时,如果计算只发生在部分executor上,我们常用repartition()来重新分区,提高并行度,开辟更多的并行计算的任务来完成计算。
第三种,在对RDD进行reduce类涉及shuffle操作的算子时,这些算子大都可以接受一个显示指定的参数来确定新产生的RDD的分区数,我们可以显示地指定这类参数来改变shuffle后新产生的RDD的分区数而不是采用系统推导出的默认的分区数,
第四种,我们也可以配置参数spark.default.parallelism来设置默认的并行度。该参数其实指定的就是在对RDD进行reduce类涉及shuffle操作的算子时,如果没有对这些算子显示指定参数来确定新产生的RDD的分区数时,这类reduce类涉及shuffle操作的算子产生的新的rdd的paritition数量。该参数也指定了parallelize 等没有parent RDDs类操作的算子所产生的新的RDD的分区数。
最佳实践是,并行度设置为集群的总的cpu cores的个数的2~3倍,比如Executor的总CPU core数量为400个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源;每个分区的大小在128MB左右。
需要说明的是,通过以上方式确定了任务的并行度,就确定了理论上能够并行执行的任务的数量,而实际执行时真正并发执行的任务的数量还要受到应用分配到的实际资源数量的限制,要想改变应用程序获得的资源数目,这就涉及到资源参数的调优。
欢迎关注笔者微信公众号“三角兽”,了解更多数学、算法、大数据干货文章。
|