regan 发表于 2019-11-27 10:09:22

Spark性能优化(三):序列化与压缩

static/image/hrline/2.gif
1.Spark中默认的序列化方式?2.配置kryo序列化?
3.合理的设置序列化参数?
static/image/hrline/2.gif

      序列化有时是shuffle和cache的瓶颈,合理地设置序列化,不但能提高IO性能(包括网络IO和磁盘IO),还能减少内存的使用。
        Spark默认的序列化器是org.apache.spark.serializer.JavaSerializer,也就是使用ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化,但是这个默认的序列化器的性能和空间表现都比较差。Spark同时支持使用Kryo序列化器org.apache.spark.serializer.KryoSerializer, 该序列化器更快,压缩率也更高。官方介绍,Kryo序列化机制比Java序列化机制,性能高了10倍左右,当然放到整个Spark程序中来考量,比重就没有那么大了.
        Spark之所以默认没有使用Kryo作为序列化器,是因为Kryo并不支持所有可序列化的类型,且要求最好要注册所有需要进行序列化的自定义类型,这对于开发者略显麻烦。但我们推荐在所有网络IO密集型应用中使用Kryo。事实上,Spark对大多数常用的scala类都自动包含了Kryo序列化库。
        在Spark中,以下地方涉及到了序列化:在算子的函数中使用到外部变量时,该变量会被序列化后通过网络传输到task中;使用自定义的类型作为RDD的泛型类型时,所有自定义类型对象,都会进行序列化;使用需序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组;Spark task需要被序列化后从driver发送到executor上。这里涉及到序列化的四个地方,前面三个使用的序列化器,都可以通过设置spark.serializer来使用Kryo序列化其以提高性能;而Spark Task的序列化是通过spark.closure.serializer来配置的,但是目前它只支持JavaSerializer。
        使用Kryo序列化器时,我们只要设置序列化器为Kryo,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等)。事实上,如果没有注册自定义类型到Kryo,Kryo仍然可以工作,但需要对每个该类型的对象都存储完整的类名,当有百万条甚至更多的序列化的记录时,这会额外占用更多的空间,所以推荐注册自定义的类到Kryo,我们也可以通过设置spark.kryo.registrationRequired参数为true来强制注册,这样当Kryo遇到没有注册的类时,会抛出错误。
1.        // 创建SparkConf对象。
2.        val conf = new SparkConf()
3.        // 设置序列化器为KryoSerializer。
4.        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
5.        // 设置需注册自定义类型到Kryo
6.        conf.set("spark.kryo.registrationRequired", "true")
7.        // 注册要序列化的自定义类型。
8.        conf.registerKryoClasses(Array(classOf,classOf))
        在程序运行过程中如果遇到了错误NotSerializableException,就说明程序代码中使用了没有实现JAVA的Serializable序列化接口的类,需要修改相应的类使其实现该接口。当由于程序中使用了很多类而不好判断究竟是哪个类引起该问题时,可以通过在spark-submit提交程序时,在-driver-java-options和-executor-java-options中指定“-Dsun.io.serialization.extended DebugInfo=true”来协助判断究竟是哪个类引起的该问题。
        压缩和解压缩会消耗cpu,但由于它能减小数据体积,使存储和传输更高效,所以在大数据分布式计算框架下很有用。Spark中压缩相关参数如下:spark.rdd.compress:这个参数决定了RDD数据在序列化之后是否进一步进行压缩后再储存到内存或磁盘上,这个值默认是不压缩,但是如果在磁盘IO的确成为问题或者GC问题真的没有其他更好的解决办法的时候,可以考虑启用RDD压缩;spark.broadcast.compress: 这个参数决定了是否对Broadcast的数据进行压缩,默认值为True,因为Broadcast的数据需要通过网络发送,而在Executor端又需要存储在本地BlockMananger中,所以通过压缩从而减小体积,来减少网络传输开销和内存占用,通常都是有利于提高程序整体性能的;spark.io.compression.codec: 这个参数决定了用来压缩内部数据,比如RDD 分区,广播变量,Shuffle输出的数据等,所采用的压缩器,有三种选择:lz4, lzf 和snappy,默认的是Snappy,但和Snappy相比较,lzf压缩率比较高,故在有大量shuffle的情况下,使用lzf可以提高shuffle性能进而提高程序整体效率。

static/image/hrline/2.gif
欢迎关注笔者微信公众号“三角兽”,了解更多数学、算法、大数据干货文章。




页: [1]
查看完整版本: Spark性能优化(三):序列化与压缩