分享

spark rdd释放说明

本帖最后由 xuanxufeng 于 2016-5-10 15:41 编辑
问题导读

1.spark.cleaner.ttl为何被弃用?
2.在ContextCleaner,调用哪个函数来清除已经持久化的RDD数据?








spark.cleaner.ttl参数的原意是清除超过这个时间的所有RDD数据,以便腾出空间给后来的RDD使用。周期性清除保证在这个时间之前的元数据会被遗忘,对于那些运行了几小时或者几天的Spark作业(特别是Spark Streaming)设置这个是很有用的。注意:任何内存中的RDD只要过了这个时间就会被清除掉。官方文档是这么介绍的:
Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.

但是有经验的用户会知道,超过了spark.cleaner.ttl时间的RDD不一定是需要被删除的,这些RDD可能正在或者将要被使用,而且基于时间间隔去删除RDD这种设计就不太合理,所以基于这些原因,在Spark 1.4中参数spark.cleaner.ttl被标记为Deprecated,如下:
[mw_shl_code=bash,true]DeprecatedConfig("spark.cleaner.ttl", "1.4",
   "TTL-based metadata cleaning is no longer necessary in recent Spark versions " +
   "and can lead to confusing errors if metadata is deleted for entities that are still" +
   " in use. Except in extremely special circumstances, you should remove this setting" +
   " and rely  on Spark's reference-tracking-based cleanup instead." +
   " See SPARK-7689 for more details.")[/mw_shl_code]


同时社区重新设计了删除RDD的逻辑,使得Spark可以自动地清除已经持久化RDD相关的metadata和数据,以及shuffles和broadcast 相关变量数据,并引入了ContextCleaner类,这个类在SparkContext中被实例化:

[mw_shl_code=bash,true]  private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  }
  cleaner.foreach(_.start())[/mw_shl_code]


在ContextCleaner 中会调用RDD.unpersist()来清除已经持久化的RDD数据:
[mw_shl_code=bash,true]/** Perform RDD cleanup. */
def doCleanupRDD(rddId: Int, blocking: Boolean) {
  try {
    logDebug("Cleaning RDD " + rddId)
    sc.unpersistRDD(rddId, blocking)
    listeners.foreach(_.rddCleaned(rddId))
    logInfo("Cleaned RDD " + rddId)
  } catch {
    case t: Throwable => logError("Error cleaning RDD " + rddId, t)
  }
}[/mw_shl_code]
清除Shuffle和Broadcast相关的数据会分别调用doCleanupShuffle和doCleanupBroadcast函数。根据需要清除数据的类型分别调用:

[mw_shl_code=bash,true]task match {
   case CleanRDD(rddId) =>
         doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
   case CleanShuffle(shuffleId) =>
         doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
   case CleanBroadcast(broadcastId) =>
         doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}[/mw_shl_code]

  相信加上这些逻辑之后,Spark清除RDD会更加智能,期待吧。


已有(2)人评论

跳转到指定楼层
xuliang123789 发表于 2016-5-11 09:28:38
谢谢楼主,学习一下,辛苦,赞~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条