分享

spark缓存cache()详解

pig2 2017-3-1 16:13:41 发表于 介绍解说 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 40697
本帖最后由 pig2 于 2017-3-2 15:52 编辑
问题导读

1.你认为缓存的作用是什么?
2.cache与persist的区别是什么?
3.spark如何实现缓存?






缓存的作用,无论是在传统程序,还是分布式程序,缓存的作用主要针对频繁操作的数据,下次操作的时候直接读取。spark亦是。

Spark 支持把数据集拉到集群内的内存缓存中。当要重复访问时这是非常有用的



cache:定义
用MEMORY_ONLY储存级别对RDD进行缓存,其内部实现是调用persist()函数的。官方文档定义:

Persist this RDD with the default storage level (`MEMORY_ONLY`).


函数原型
[mw_shl_code=scala,true]def cache() : this.type  
[/mw_shl_code]


实例
[mw_shl_code=scala,true]scala> var data = sc.parallelize(List(1,2,3,4))  
data: org.apache.spark.rdd.RDD[Int] =  
  ParallelCollectionRDD[44] at parallelize at <console>:12  
  
scala> data.getStorageLevel  
res65: org.apache.spark.storage.StorageLevel =   
  StorageLevel(false, false, false, false, 1)  
  
scala> data.cache  
res66: org.apache.spark.rdd.RDD[Int] =   
  ParallelCollectionRDD[44] at parallelize at <console>:12  
  
scala> data.getStorageLevel  
res67: org.apache.spark.storage.StorageLevel =   
  StorageLevel(false, true, false, true, 1)  [/mw_shl_code]


我们先是定义了一个RDD,然后通过getStorageLevel函数得到该RDD的默认存储级别,这里是NONE。然后我们调用cache函数,将RDD的存储级别改成了MEMORY_ONLY(看StorageLevel的第二个参数)。关于StorageLevel的其他的几种存储级别介绍请参照下面StorageLevel类进行了解。

cache和persist的区别


cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。


基于Spark 1.4.1 的源码,可以看到

[mw_shl_code=scala,true]/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()[/mw_shl_code]
说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
[mw_shl_code=scala,true]/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)[/mw_shl_code]

可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:
[mw_shl_code=scala,true]/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  sc.persistRDD(this)
  // Register the RDD with the ContextCleaner for automatic GC-based cleanup
  sc.cleaner.foreach(_.registerRDDForCleanup(this))
  storageLevel = newLevel
  this
}[/mw_shl_code]


可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。

至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

RDD的缓存级别

顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

[mw_shl_code=scala,true]object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}[/mw_shl_code]
可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下
[mw_shl_code=scala,true]val MEMORY_ONLY = new StorageLevel(false, true, false, true)
[/mw_shl_code]
查看其构造函数
[mw_shl_code=scala,true]class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}[/mw_shl_code]

可以看到StorageLevel类的主构造器包含了5个参数:
useDisk:使用硬盘(外存)
useMemory:使用内存
useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

另外还注意到有一种特殊的缓存级别


[mw_shl_code=scala,true]val OFF_HEAP = new StorageLevel(false, false, true, false)
[/mw_shl_code]

使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
[mw_shl_code=scala,true]if (useOffHeap) {
  require(!useDisk, "Off-heap storage level does not support using disk")
  require(!useMemory, "Off-heap storage level does not support using heap memory")
  require(!deserialized, "Off-heap storage level does not support deserialized storage")
  require(replication == 1, "Off-heap storage level does not support multiple replication")
}[/mw_shl_code]

参考:
housir的专栏
LW_ICE

已有(3)人评论

跳转到指定楼层
ABCDer丶Bi 发表于 2017-10-26 15:57:54
通过unpersist来释放缓存
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条