分享

Spark性能调优(四):广播大变量

       

1.广播为什么能提升性能?2.什么样的情景下需要广播?
3.广播可能会遇到哪些问题?


        有时在开发过程中,会遇到需要在算子函数中使用外部大变量的场景(比如100M的大集合),那么此时就可以考虑使用Spark的广播(Broadcast)功能来提升性能。
        Spark使用了shared-nothing架构,数据以分区的形式散落在各个节点上,每个节点都有自己的cpu, 内存和存储资源。tasks并没有共享的全局内存区域,driver和tasks通过通信来共享数据。比如,当一个RDD的算子所使用的函数中引用了一个来自driver的变量时,spark会将该变量的一个副本随一个task一起发送给executors,然后每个task得到一个该变量的副本,并以只读的形式访问它,任何对改变量的修改都是本地的,并不会返回给driver。这个发送动作在每个stage的开始都会发生一次。
        这种默认的行为在driver和tasks共享较大的变量比如静态查询表,且job有多个stage时并不高效,比如该静态查询表大小是100MB且该Job有10个stage,则spark会将这100MB的数据发送给每个executor十次。大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。这显然并不高效,Spark为此推出了广播变量。
        Spark的广播变量只会发送给executor节点一次,且发送后广播变量会以非序列化的形式保存在executor的内存中,而不会为每个task都发送一份。Spark还会使用高效的广播算法(Http或Torrent方式)来分发变量,所以网络通信的开销并不大。熟悉源码的朋友都知道,在Spark的HadoopRDD中,就采用了广播来进行Hadoop的 JobConf的传输以提高效率。广播后的变量,会保证每个Executor的内存中,只驻留一份该变量的副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
        需要说明的是,由于每个stage的tasks公用的数据在stage开始时都会向每个executors节点发送一次,发送后这些公用数据会以序列化的形式缓存在executors,然后各个任务在运行时反序列化这些公用的数据以获得一个只读的副本供自己使用,这就意味着我们显式创建广播变量的模式,只有在job有多个stage,且多个stage的tasks需要访问来自driver的相同的数据时,或者需要以非序列化的形式缓存数据的时候,才真正有意义。
事实上,Spark会在master上打印出每个任务序列化后的大小,通常来讲,大于20KB的任务就可以考虑是否可以通过广播机制进行优化。
        我们可以通过在一个只读变量v上调用SparkContext.broadcast(v)来创建广播变量,广播变量本质上是围绕着变量v的封装。广播后可以通过value方法访问这个广播变量的值。在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v,这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,变量v在被广播之后就不应该再修改。
        另外一个对广播变量的常用的使用场景是,当对两个表进行join操作时,为了避免shuffle,我们常常将较小的表广播到executors上。


欢迎关注笔者微信公众号“三角兽”,了解更多数学、算法、大数据干货文章。
qrcode_for_gh_1f93dbc1c492_258 (1).jpg



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条