分享

Spark Core源码分析: Spark任务模型

丫丫 2015-7-24 18:11:26 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 16436
本帖最后由 丫丫 于 2015-7-24 18:35 编辑
问题导读

1.Task是如何实现的?
2.重要外部类有哪些?
3.进程模型与线程模型有哪些优劣势?








概述
一个Spark的Job分为多个stage,最后一个stage会包括一个或多个ResultTask,前面的stages会包括一个或多个ShuffleMapTasks。
ResultTask执行并将结果返回给driver application。
ShuffleMapTask将task的output根据task的partition分离到多个buckets里。一个ShuffleMapTask对应一个ShuffleDependency的partition,而总partition数同并行度、reduce数目是一致的。


Task
Task的代码在scheduler package下。
抽象类Task构造参数如下:


[mw_shl_code=java,true]private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable[/mw_shl_code]  
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable
Task对应一个stageId和partitionId。
提供runTask()接口、kill()接口等。
提供killed变量、TaskMetrics变量、TaskContext变量等。

除了上述基本接口和变量,Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法。原因是Task需要保证工作节点具备本次Task需要的其他依赖,注册到SparkContext下,所以提供了把依赖转成流写入写出的方法。


Task的两种实现
ShuffleMapTask
ShuffleMapTask构造参数如下,



[mw_shl_code=java,true]private[spark] class ShuffleMapTask(      stageId: Int,      var rdd: RDD[_],      var dep: ShuffleDependency[_,_],      _partitionId: Int,      @transient private var locs: Seq[TaskLocation])    extends Task[MapStatus](stageId, _partitionId)  [/mw_shl_code]

private[spark] class ShuffleMapTask(    stageId: Int,    var rdd: RDD[_],    var dep: ShuffleDependency[_,_],    _partitionId: Int,    @transient private var locs: Seq[TaskLocation])  extends Task[MapStatus](stageId, _partitionId)RDD partitioner对应的是ShuffleDependency。

ShuffleMapTask复写了MapStatus向外读写的方法,因为向外读写的内容包括:stageId,rdd,dep,partitionId,epoch和split(某个partition)。对于其中的stageId,rdd,dep有统一的序列化和反序列化操作并会cache在内存里,再放到ObjectOutput里写出去。序列化操作使用的是Gzip,序列化信息会维护在serializedInfoCache = newHashMap[Int, Array[Byte]]。这部分需要序列化并保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,为了减轻master节点负担,把这部分序列化结果cache了起来。


Stage执行逻辑
主要步骤如下:



[mw_shl_code=java,true]val ser = Serializer.getSerializer(dep.serializer)  shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)  [/mw_shl_code]
val ser = Serializer.getSerializer(dep.serializer)shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)这一步是初始化一个ShuffleWriterGroup,Group里面是一个BlockObjectWriter数组。




[mw_shl_code=java,true]for (elem <- rdd.iterator(split, context)) {  val pair = elem.asInstanceOf[Product2[Any, Any]]  val bucketId = dep.partitioner.getPartition(pair._1)   shuffle.writers(bucketId).write(pair)  }  [/mw_shl_code]
for (elem <- rdd.iterator(split, context)) {val pair = elem.asInstanceOf[Product2[Any, Any]]  val bucketId = dep.partitioner.getPartition(pair._1)  shuffle.writers(bucketId).write(pair)}这一步是为每个Writer对应一个bucket,调用每个BlockObjectWriter的write()方法写数据






[mw_shl_code=java,true]var totalBytes = 0L  var totalTime = 0L  val compressedSizes: Array[Byte] =   shuffle.writers.map { writer: BlockObjectWriter =>  writer.commit()  writer.close()  val size = writer.fileSegment().length  totalBytes += size  totalTime += writer.timeWriting()  MapOutputTracker.compressSize(size)  }  [/mw_shl_code]
var totalBytes = 0Lvar totalTime = 0Lval compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>    writer.commit()    writer.close()val size = writer.fileSegment().length    totalBytes += sizetotalTime += writer.timeWriting()MapOutputTracker.compressSize(size)}这一步是执行writer.commit(),并得到结果file segment大小,对总大小压缩




[mw_shl_code=java,true]val shuffleMetrics = new ShuffleWriteMetrics  shuffleMetrics.shuffleBytesWritten = totalBytes  shuffleMetrics.shuffleWriteTime = totalTime  metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)    success = true  new MapStatus(blockManager.blockManagerId, compressedSizes)  [/mw_shl_code]
val shuffleMetrics = new ShuffleWriteMetricsshuffleMetrics.shuffleBytesWritten = totalBytesshuffleMetrics.shuffleWriteTime = totalTimemetrics.get.shuffleWriteMetrics = Some(shuffleMetrics)success = truenew MapStatus(blockManager.blockManagerId, compressedSizes)这一步是记录metrcis信息,最后返回一个MapStatus类,里面是本地ShuffleMapTask结果的相关信息。

最后会release writers,让对应的shuffle文件得到记录和重用(ShuffleBlockManager管理这些file,这些file是Shuffle Task中一组Writer写的对象)。
主要把下图看懂。


重要类介绍涉及到的重要外部类,帮助理解。


ShuffleBlockManager
整体梳理:
ShuffleState维护了两个ShuffleFileGroup的ConcurrentLinkedQueue,以记录目前shuffle的state。
ShuffleState记录了一次shuffle操作的文件组状态,在ShuffleBlockManager内用Map为每个shuffleId维护了一个ShuffleState。
每个shuffleId通过forMapTask()方法得到一组writer,即ShuflleWriterGroup。这组里的writers共享一个shuffleId和mapId,但是每个对应不同的bucketId和file。在为writer分配FileGroup的时候,会从shuffleId对应的shuffle state里先取unusedFileGroup,如果不存在,则在HDFS上新建File。
对于HDFS上的目标file,writer是可以append写的。在新建file的时候,是根据shuffleId和bucket number和一个递增的fileId来创建新的文件的。



ShuffleFileGroup的重用files和记录mapId,index,offset这块似懂非懂。

重要方法:

[mw_shl_code=java,true]def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {} }  [/mw_shl_code]

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {} }
该方法被一个ShuffleMapTask调用,传入了这次shuffle操作的id,mapId是partitionId。Buckects数目等于分区数目。该方法返回的ShuffleWriterGroup里面是一组DiskBlockObjectWriter,每一个writer都属于这一次shuffle操作,所以他们有共同的shuffleId,mapId,但是他们对应了不同的bucket,并且各自对应一个file。

在shuffle run里的调用和参数传入:

[mw_shl_code=java,true]val ser = Serializer.getSerializer(dep.serializer)  
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)  [/mw_shl_code]

val ser = Serializer.getSerializer(dep.serializer)shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)shuffleId是由ShuffleDependency获得的全局唯一id,代表本次shuffle任务id
mapId等于partitionId
Bucket数目等于分区数目

产生writers:
Writer类型是DiskBlockObjectWriter,数目等于buckets数目。bufferSize的设置:


[mw_shl_code=java,true]conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 [/mw_shl_code]

conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024blockId产生自:


[mw_shl_code=java,true]blockId = ShuffleBlockId(shuffleId, mapId, bucketId)  [/mw_shl_code]

blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
在生成writer的时候调用的是BlockManager的getDiskWriter方法,ShuffleBlockManager初始化的时候绑定BlockManager。


[mw_shl_code=java,true]private[spark] class DiskBlockObjectWriter(  
blockId: BlockId,  
file: File,  
serializer: Serializer,  
bufferSize: Int,  
compressStream: OutputStream => OutputStream,  
syncWrites: Boolean)  
extends BlockObjectWriter(blockId)  [/mw_shl_code]

private[spark] class DiskBlockObjectWriter(    blockId: BlockId,    file: File,    serializer: Serializer,    bufferSize: Int,    compressStream: OutputStream => OutputStream,    syncWrites: Boolean)  extends BlockObjectWriter(blockId)
ShuffleFileGroup:私有内部类,对应了一组shuffle files,每个file对应一个reducer。一个Mapper会分到一个ShuffleFileGroup,把mapper的结果写到这组File里去。

MapStatus
注意到ShuffleMapTask的类型是MapStatus类。MapStatus类是ShuffleMapTask要返回给scheduler的执行结果,包括两个东西:


[mw_shl_code=java,true]class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])  [/mw_shl_code]

class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
前者是run这次task的block manager地址(BlockManagerId是一个类,保存了executorId,host, port, nettyPort),后者是output大小,该值会传给接下来的reduce任务。该size是被MapOutputTracker压缩过的。


MapStatus类提供了两个方法如下,ShuffleMapTask进行了复写。


  • [mw_shl_code=java,true]def writeExternal(out: ObjectOutput) {
      location.writeExternal(out)
      out.writeInt(compressedSizes.length)
      out.write(compressedSizes)
    }
    def readExternal(in: ObjectInput) {
      location = BlockManagerId(in)
      compressedSizes = new Array[Byte](in.readInt())
      in.readFully(compressedSizes)
    }[/mw_shl_code]

  def writeExternal(out: ObjectOutput) {    location.writeExternal(out)    out.writeInt(compressedSizes.length)    out.write(compressedSizes)  }  def readExternal(in: ObjectInput) {    location = BlockManagerId(in)    compressedSizes = new Array[Byte](in.readInt())    in.readFully(compressedSizes)  }

BlockManagerId

BlockManagerId类构造依赖executorId, host, port, nettyPort这些信息。伴生对象维护了一个blockManagerIdCache ,实现为ConcurrentHashMap[BlockManagerId,BlockManagerId]() 。
比如MapStatus的readExternal方法把ObjectInput传入BlockManagerId构造函数的时候,BlockManagerId的apply()方法就会根据ObjectInput取出executorId, host, port,nettyPort信息,把这个BlockManagerIdobj维护到blockManagerIdCache


ResultTask
构造参数

[mw_shl_code=java,true]private[spark] class ResultTask[T, U](
stageId: Int,
var rdd: RDD[T],
var func: (TaskContext, Iterator[T]) => U,
_partitionId: Int,
@transient locs: Seq[TaskLocation],
var outputId: Int)
extends Task[U](stageId, _partitionId) with Externalizable {[/mw_shl_code]

private[spark] class ResultTask[T, U](    stageId: Int,    var rdd: RDD[T],    var func: (TaskContext, Iterator[T]) => U,    _partitionId: Int,    @transient locs: Seq[TaskLocation],    var outputId: Int)  extends Task[U](stageId, _partitionId) with Externalizable {ResultTask比较简单,runTask方法调用的是rdd的迭代器:
[mw_shl_code=java,true]override def runTask(context: TaskContext): U = {
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
  } finally {
context.executeOnCompleteCallbacks(
  }
}[/mw_shl_code]

  override def runTask(context: TaskContext): U = {    metrics = Some(context.taskMetrics)    try {      func(context, rdd.iterator(split, context))    } finally {      context.executeOnCompleteCallbacks()    }  }

进程模型 vs. 线程模型
Spark同节点上的任务以多线程的方式运行在一个JVM进程中。

优点:
启动任务快
共享内存,适合内存密集型任务
Executor所占资源可重复利用

缺点:
同节点上的所有任务运行在一个进程中,会出现严重的资源争用,难以细粒度控制每个任务的占用资源。MapReduce为Map Task和Reduce Task设置不同资源,细粒度控制任务占用资源量。

MapReduce的每个Task都是一个JVM进程,都要经历:资源申请->运行任务->释放资源的过程

每个节点可以有一个或多个Executor,Executor配有一定数量slots,Executor内可以跑多个Result Task和ShuffleMap Task。
在共享内存方面,broadcast的变量会在每个executor里存一份,这个executor内的任务可以共享。









没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条