分享

spark 广播变量(Broadcast)源码分析

desehawk 2015-2-28 20:55:33 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 55335

问题导读

1.创建broadcast变量如何实现的?
2.如何读取广播变量的值?






概述最近工作上忙死了……广播变量这一块其实早就看过了,一直没有贴出来。
本文基于Spark 1.0源码分析,主要探讨广播变量的初始化、创建、读取以及清除。

类关系BroadcastManager类中包含一个BroadcastFactory对象的引用。大部分操作通过调用BroadcastFactory中的方法来实现。
BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory、HttpBroadcastFactory。这两个子类实现了对HttpBroadcast、TorrentBroadcast的封装,而后面两个又同时集成了Broadcast抽象类。
图……就不画了

BroadcastManager的初始化SparkContext初始化时会创建SparkEnv对象env,这个过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:

  1. val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
复制代码

构造BroadcastManager对象时会调用initialize方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。
  1. val broadcastFactoryClass =
  2.           conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
  3.         broadcastFactory =
  4.           Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
  5.         // Initialize appropriate BroadcastFactory and BroadcastObject
  6.         broadcastFactory.initialize(isDriver, conf, securityManager)
复制代码
两个工厂类的initialize方法都是对其相应实体类的initialize方法的调用,下面分开两个类来看。

HttpBroadcast的initialize方法
  1.   def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
  2.     synchronized {
  3.       if (!initialized) {
  4.         bufferSize = conf.getInt("spark.buffer.size", 65536)
  5.         compress = conf.getBoolean("spark.broadcast.compress", true)
  6.         securityManager = securityMgr
  7.         if (isDriver) {
  8.           createServer(conf)
  9.           conf.set("spark.httpBroadcast.uri",  serverUri)
  10.         }
  11.         serverUri = conf.get("spark.httpBroadcast.uri")
  12.         cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
  13.         compressionCodec = CompressionCodec.createCodec(conf)
  14.         initialized = true
  15.       }
  16.     }
  17.   }
复制代码

除了一些变量的初始化外,主要做两件事情,一是createServer(只有在Driver端会做),其次是创建一个MetadataCleaner对象。


createServer
  1.   private def createServer(conf: SparkConf) {
  2.     broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
  3.     server = new HttpServer(broadcastDir, securityManager)
  4.     server.start()
  5.     serverUri = server.uri
  6.     logInfo("Broadcast server started at " + serverUri)
  7.   }
复制代码
首先创建一个存放广播变量的目录,默认是
  1. conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)
复制代码
然后初始化一个HttpServer对象并启动(封装了jetty),启动过程中包括加载资源文件,起端口和线程用来监控请求等。这部分的细节在org.apache.spark.HttpServer类中,此处不做展开。


创建MetadataCleaner对象一个MetadataCleaner对象包装了一个定时计划Timer,每隔一段时间执行一个回调函数,此处传入的回调函数为cleanup:
  1.   private def cleanup(cleanupTime: Long) {
  2.     val iterator = files.internalMap.entrySet().iterator()
  3.     while(iterator.hasNext) {
  4.       val entry = iterator.next()
  5.       val (file, time) = (entry.getKey, entry.getValue)
  6.       if (time < cleanupTime) {
  7.         iterator.remove()
  8.         deleteBroadcastFile(file)
  9.       }
  10.     }
  11.   }
复制代码
即清楚存在吵过一定时长的broadcast文件。在时长未设定(默认情况)时,不清除:
  1. if (delaySeconds > 0) {
  2.     logDebug(
  3.       "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
  4.       "and period of " + periodSeconds + " secs")
  5.     timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
  6.   }
复制代码


TorrentBroadcast的initialize方法
  1.   def initialize(_isDriver: Boolean, conf: SparkConf) {
  2.     TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
  3.     synchronized {
  4.       if (!initialized) {
  5.         initialized = true
  6.       }
  7.     }
  8.   }
复制代码
Torrent在此处没做什么,这也可以看出和Http的区别,Torrent的处理方式就是p2p,去中心化。而Http是中心化服务,需要启动服务来接受请求。

创建broadcast变量调用SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法来初始化一个广播变量,实现如下:
  1. def broadcast[T: ClassTag](value: T): Broadcast[T] = {
  2.     val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
  3.     cleaner.foreach(_.registerBroadcastForCleanup(bc))
  4.     bc
  5.   }
复制代码
即调用broadcastManager的newBroadcast方法:
  1.   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
  2.     broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  3.   }
复制代码
再调用工厂类的newBroadcast方法,此处返回的是一个Broadcast对象。


HttpBroadcastFactory的newBroadcast
  1.   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
  2.     new HttpBroadcast[T](value_, isLocal, id)
复制代码
即创建一个新的HttpBroadcast对象并返回。
构造对象时主要做两件事情:
  1. HttpBroadcast.synchronized {
  2.     SparkEnv.get.blockManager.putSingle(
  3.       blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  4.   }
  5.   if (!isLocal) {
  6.     HttpBroadcast.write(id, value_)
  7.   }
复制代码
1.将变量id和值放入blockManager,但并不通知master
2.调用伴生对象的write方法
  1. def write(id: Long, value: Any) {
  2.     val file = getFile(id)
  3.     val out: OutputStream = {
  4.       if (compress) {
  5.         compressionCodec.compressedOutputStream(new FileOutputStream(file))
  6.       } else {
  7.         new BufferedOutputStream(new FileOutputStream(file), bufferSize)
  8.       }
  9.     }
  10.     val ser = SparkEnv.get.serializer.newInstance()
  11.     val serOut = ser.serializeStream(out)
  12.     serOut.writeObject(value)
  13.     serOut.close()
  14.     files += file
  15.   }
复制代码
write方法将对象值按照指定的压缩、序列化写入指定的文件。这个文件所在的目录即是HttpServer的资源目录,文件名和id的对应关系为:

  1. case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  2.   def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
  3. }
复制代码

TorrentBroadcastFactory的newBroadcast方法
  1.   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
  2.     new TorrentBroadcast[T](value_, isLocal, id)
复制代码
同样是创建一个TorrentBroadcast对象,并返回。

  1.   TorrentBroadcast.synchronized {
  2.     SparkEnv.get.blockManager.putSingle(
  3.       broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  4.   }
  5.   if (!isLocal) {
  6.     sendBroadcast()
  7.   }
复制代码
做两件事情,第一步和Http一样,第二步:
  1.   def sendBroadcast() {
  2.     val tInfo = TorrentBroadcast.blockifyObject(value_)
  3.     totalBlocks = tInfo.totalBlocks
  4.     totalBytes = tInfo.totalBytes
  5.     hasBlocks = tInfo.totalBlocks
  6.     // Store meta-info
  7.     val metaId = BroadcastBlockId(id, "meta")
  8.     val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
  9.     TorrentBroadcast.synchronized {
  10.       SparkEnv.get.blockManager.putSingle(
  11.         metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
  12.     }
  13.     // Store individual pieces
  14.     for (i <- 0 until totalBlocks) {
  15.       val pieceId = BroadcastBlockId(id, "piece" + i)
  16.       TorrentBroadcast.synchronized {
  17.         SparkEnv.get.blockManager.putSingle(
  18.           pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
  19.       }
  20.     }
  21.   }
复制代码
可以看出,先将元数据信息缓存到blockManager,再将块信息缓存过去。开头可以看到有一个分块动作,是调用伴生对象的blockifyObject方法:
  1. def blockifyObject[T](obj: T): TorrentInfo
复制代码
此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。
元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。
数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象


读取广播变量的值通过调用bc.value来取得广播变量的值,其主要实现在反序列化方法readObject中

HttpBroadcast的反序列化
  1. 此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。
  2. 元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。
  3. 数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象
  4. 读取广播变量的值
  5. 通过调用bc.value来取得广播变量的值,其主要实现在反序列化方法readObject中
  6. HttpBroadcast的反序列化
复制代码
首先查看blockManager中是否已有,如有则直接取值,否则调用伴生对象的read方法进行读取:
  1. def read[T: ClassTag](id: Long): T = {
  2.     logDebug("broadcast read server: " +  serverUri + " id: broadcast-" + id)
  3.     val url = serverUri + "/" + BroadcastBlockId(id).name
  4.     var uc: URLConnection = null
  5.     if (securityManager.isAuthenticationEnabled()) {
  6.       logDebug("broadcast security enabled")
  7.       val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager)
  8.       uc = newuri.toURL.openConnection()
  9.       uc.setAllowUserInteraction(false)
  10.     } else {
  11.       logDebug("broadcast not using security")
  12.       uc = new URL(url).openConnection()
  13.     }
  14.     val in = {
  15.       uc.setReadTimeout(httpReadTimeout)
  16.       val inputStream = uc.getInputStream
  17.       if (compress) {
  18.         compressionCodec.compressedInputStream(inputStream)
  19.       } else {
  20.         new BufferedInputStream(inputStream, bufferSize)
  21.       }
  22.     }
  23.     val ser = SparkEnv.get.serializer.newInstance()
  24.     val serIn = ser.deserializeStream(in)
  25.     val obj = serIn.readObject[T]()
  26.     serIn.close()
  27.     obj
  28.   }
复制代码
使用serverUri和block id对应的文件名直接开启一个HttpConnection将中心服务器上相应的数据取过来,使用配置的压缩和序列化机制进行解压和反序列化。
这里可以看到,所有需要用到广播变量值的executor都需要去driver上pull广播变量的内容。
取到值后,缓存到blockManager中,以便下次使用。


TorrentBroadcast的反序列化
  1. private def readObject(in: ObjectInputStream) {
  2.     in.defaultReadObject()
  3.     TorrentBroadcast.synchronized {
  4.       SparkEnv.get.blockManager.getSingle(broadcastId) match {
  5.         case Some(x) =>
  6.           value_ = x.asInstanceOf[T]
  7.         case None =>
  8.           val start = System.nanoTime
  9.           logInfo("Started reading broadcast variable " + id)
  10.           // Initialize @transient variables that will receive garbage values from the master.
  11.           resetWorkerVariables()
  12.           if (receiveBroadcast()) {
  13.             value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
  14.             /* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
  15.              * This creates a trade-off between memory usage and latency. Storing copy doubles
  16.              * the memory footprint; not storing doubles deserialization cost. Also,
  17.              * this does not need to be reported to BlockManagerMaster since other executors
  18.              * does not need to access this block (they only need to fetch the chunks,
  19.              * which are reported).
  20.              */
  21.             SparkEnv.get.blockManager.putSingle(
  22.               broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  23.             // Remove arrayOfBlocks from memory once value_ is on local cache
  24.             resetWorkerVariables()
  25.           } else {
  26.             logError("Reading broadcast variable " + id + " failed")
  27.           }
  28.           val time = (System.nanoTime - start) / 1e9
  29.           logInfo("Reading broadcast variable " + id + " took " + time + " s")
  30.       }
  31.     }
  32.   }
复制代码
和Http一样,都是先查看blockManager中是否已经缓存,若没有,则调用receiveBroadcast方法:
  1. def receiveBroadcast(): Boolean = {
  2.     // Receive meta-info about the size of broadcast data,
  3.     // the number of chunks it is divided into, etc.
  4.     val metaId = BroadcastBlockId(id, "meta")
  5.     var attemptId = 10
  6.     while (attemptId > 0 && totalBlocks == -1) {
  7.       TorrentBroadcast.synchronized {
  8.         SparkEnv.get.blockManager.getSingle(metaId) match {
  9.           case Some(x) =>
  10.             val tInfo = x.asInstanceOf[TorrentInfo]
  11.             totalBlocks = tInfo.totalBlocks
  12.             totalBytes = tInfo.totalBytes
  13.             arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
  14.             hasBlocks = 0
  15.           case None =>
  16.             Thread.sleep(500)
  17.         }
  18.       }
  19.       attemptId -= 1
  20.     }
  21.     if (totalBlocks == -1) {
  22.       return false
  23.     }
  24.     /*
  25.      * Fetch actual chunks of data. Note that all these chunks are stored in
  26.      * the BlockManager and reported to the master, so that other executors
  27.      * can find out and pull the chunks from this executor.
  28.      */
  29.     val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
  30.     for (pid <- recvOrder) {
  31.       val pieceId = BroadcastBlockId(id, "piece" + pid)
  32.       TorrentBroadcast.synchronized {
  33.         SparkEnv.get.blockManager.getSingle(pieceId) match {
  34.           case Some(x) =>
  35.             arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
  36.             hasBlocks += 1
  37.             SparkEnv.get.blockManager.putSingle(
  38.               pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
  39.           case None =>
  40.             throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
  41.         }
  42.       }
  43.     }
  44.     hasBlocks == totalBlocks
  45.   }
复制代码
和写数据一样,同样是分成两个部分,首先取元数据信息,再根据元数据信息读取实际的block信息。注意这里都是从blockManager中读取的,这里贴出blockManager.getSingle的分析。
调用栈中最后到BlockManager.doGetRemote方法,中间有一条语句:
  1. val locations = Random.shuffle(master.getLocations(blockId))
复制代码
即将存有这个block的节点信息随机打乱,然后使用:

  1. val data = BlockManagerWorker.syncGetBlock(
  2.         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
复制代码
来获取。
从这里可以看出,Torrent方法首先将广播变量数据分块,并存到BlockManager中;每个节点需要读取广播变量时,是分块读取,对每一块都读取其位置信息,然后随机选一个存有此块数据的节点进行get;每个节点读取后会将包含的快信息报告给BlockManagerMaster,这样本地节点也成为了这个广播网络中的一个peer。
与Http方式形成鲜明对比,这是一个去中心化的网络,只需要保持一个tracker即可,这就是p2p的思想。

广播变量的清除
广播变量被创建时,紧接着有这样一句代码:

  1. cleaner.foreach(_.registerBroadcastForCleanup(bc))
复制代码
cleaner是一个ContextCleaner对象,会将刚刚创建的广播变量注册到其中,调用栈为:
  1.   def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
  2.     registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
  3.   }
复制代码
  1.   private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
  2.     referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
  3.   }
复制代码

等出现广播变量被弱引用时(关于弱引用,可以参考:http://blog.csdn.net/lyfi01/article/details/6415726),则会执行
  1. cleaner.foreach(_.start())
复制代码

start方法中会调用keepCleaning方法,会遍历注册的清理任务(包括RDD、shuffle和broadcast),依次进行清理:
  1. private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
  2.     while (!stopped) {
  3.       try {
  4.         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
  5.           .map(_.asInstanceOf[CleanupTaskWeakReference])
  6.         reference.map(_.task).foreach { task =>
  7.           logDebug("Got cleaning task " + task)
  8.           referenceBuffer -= reference.get
  9.           task match {
  10.             case CleanRDD(rddId) =>
  11.               doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
  12.             case CleanShuffle(shuffleId) =>
  13.               doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
  14.             case CleanBroadcast(broadcastId) =>
  15.               doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
  16.           }
  17.         }
  18.       } catch {
  19.         case e: Exception => logError("Error in cleaning thread", e)
  20.       }
  21.     }
  22.   }
复制代码
doCleanupBroadcast调用以下语句:
  1. broadcastManager.unbroadcast(broadcastId, true, blocking)
复制代码

然后是:
  1.   def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
  2.     broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  3.   }
复制代码
每个工厂类调用其对应实体类的伴生对象的unbroadcast方法。

HttpBroadcast中的变量清除
  1. def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
  2.     SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
  3.     if (removeFromDriver) {
  4.       val file = getFile(id)
  5.       files.remove(file)
  6.       deleteBroadcastFile(file)
  7.     }
  8.   }
复制代码
1是删除blockManager中的缓存,2是删除本地持久化的文件


TorrentBroadcast中的变量清除
  1. def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {  
  2.   SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)  
  3. }  
复制代码



小结Broadcast可以使用在executor端多次使用某个数据的场景(比如说字典),Http和Torrent两种方式对应传统的CS访问方式和P2P访问方式,当广播变量较大或者使用较频繁时,采用后者可以减少driver端的压力。
BlockManager在此处充当P2P中的tracker角色,没有展开描述,后续会开专题讲这个部分。




出处:http://blog.csdn.net/asongoficeandfire/article/details/37584643


















已有(1)人评论

跳转到指定楼层
马一博 发表于 2016-8-12 17:05:31
在1.6中,TorrentBroadcast没有了initialize方法,只有TorrentBroadcastFactory有一个 initialize方法,但是什么也不做:
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条