分享

Spark在内存有限的情况下 如何处理 T 级别的数据?

xuanxufeng 发表于 2016-5-10 15:40:53 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 6483
本帖最后由 xuanxufeng 于 2016-5-10 15:43 编辑

首先需要解开的一个误区是,对于Spark这类内存计算系统,并不是说要处理多大规模的数据就需要多大规模的内存。Spark相对Hadoop MR有大幅性能提升的一个前提就是大量大数据作业同一时刻需要加载进内存的数据只是整体数据的一个子集,且大部分情况下可以完全放入内存,正如Shark(Spark上的Hive兼容的data warehouse)论文1.1节所述:
In fact, one study [1] analyzed the access patterns in the Hive warehouses at Facebook and discovered that for the vast majority (96%) of jobs, the entire inputs could fit into a fraction of the cluster’s total memory.

[1] G. Ananthanarayanan, A. Ghodsi, S. Shenker, and I. Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.

至于数据子集仍然无法放入集群物理内存的情况,Spark仍然可以妥善处理,下文还会详述。

在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地(对于shuffle stage是落地到本地文件系统留待后续stage访问,对于result stage是落地到HDFS或送回driver端等等,视选用的action而定)。如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,一个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。

简单起见,暂不考虑带shuffle的多stage情况和流水线优化。这里拿最经典的log处理的例子来具体说明一下(取出所有以ERROR开头的日志行,按空格分隔并取第2列):
[mw_shl_code=scala,true]val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")[/mw_shl_code]
按传统单机immutable FP的观点来看,上述代码运行起来好像是:
把HDFS上的日志文件全部拉入内存形成一个巨大的字符串数组,
Filter一遍再生成一个略小的新的字符串数组,
再map一遍又生成另一个字符串数组。
真这么玩儿的话Spark早就不用混了……

如前所述,Spark在运行时动态构造了一个复合Iterator。就上述示例来说,构造出来的Iterator的逻辑概念上大致长这样:
[mw_shl_code=scala,true]new Iterator[String] {
  private var head: String = _
  private var headDefined: Boolean = false

  def hasNext: Boolean = headDefined || {
    do {
      try head = readOneLineFromHDFS(...)     // (1) read from HDFS
      catch {
        case _: EOFException => return false
      }
    } while (!head.startsWith("ERROR"))       // (2) filter closure
    true
  }

  def next: String = if (hasNext) {
    headDefined = false
    head.split(" ")(1)                        // (3) map closure
  } else {
    throw new NoSuchElementException("...")
  }
}[/mw_shl_code]
上面这段代码是我按照Spark中FilteredRDD、MappedRDD的定义和Scala Iterator的filter、map方法的框架写的伪码,并且省略了从cache或checkpoint中读取现成结果的逻辑。1、2、3三处便是RDD lineage DAG中相应逻辑嵌入复合出的Iterator的大致方式。每种RDD变换嵌入复合Iterator的具体方式是由不同的RDD以及Scala Iterator的相关方法定义的。可以看到,用这个Iterator访问整个数据集,空间复杂度是O(1)。可见,Spark RDD的immutable语义并不会造成大数据内存计算任务的庞大内存开销。

然后来看加cache的情况。我们假设errors这个RDD比较有用,除了拿出空格分隔的第二列以外,可能在同一个application中我们还会再频繁用它干别的事情,于是选择将它cache住:
[mw_shl_code=scala,true]val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR")).cache()  // <-- !!!
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")[/mw_shl_code]
加了cache之后有什么变化呢?实际上相当于在上述复合Iterator伪码的(2)处,将filter出来的文本行逐一追加到了内存中的一个ArrayBuffer[String]里存起来形成一个block,然后通过cache manager扔进受block manager管理的内存池。注意这里仅仅cache了filter出来的结果,HDFS读出的原始数据没有被cache,对errors做map操作后得到的messages RDD也没有被cache。这样一来,后续任务复用errors这个RDD时,直接从内存中取就好,就不用重新计算了。





已有(2)人评论

跳转到指定楼层
xuliang123789 发表于 2016-5-11 09:00:02
谢谢楼主,学习一下,辛苦,赞~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条