本帖最后由 breaking 于 2016-3-2 09:01 编辑问题导读:
应用程序启动时SparkEnv会调用createExecutorEnv 方法创建ExecutorEnv并且创建Executor上的BlockManger,createDriverEnv 方法创建SparkEnv 并创建BlockManagerMaster每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slaveslave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息 ,典型的中心化设计, master和slave之间的通信通过BlockManagerMasterEndpoint来进行每个Executor中的BlockManger实例化的时候都会向Driver中的BlockManagerMaster注册,BlockMangerMaster在接收BlockManger注册的时候会为其创建BlockMangerInFo来进行元数据管理
总结BlockMnager 跟Executor是一一对应关系,由Driver上的BlockManagerMaster来管理所有的BlockManager,master 跟slave直接用netty来通信
1, 在Application启动的时候会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,
private def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
private def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option = None): SparkEnv = {
assert(conf.contains(""), " is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("")
val port = conf.get("spark.driver.port").toInt
isDriver = true,
isLocal = isLocal,
numUsableCores = numCores,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
第三步调用create(SparkEnv )
private def create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
numUsableCores: Int,
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option = None): SparkEnv = {
// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
val securityManager = new SecurityManager(conf)
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass(className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf, java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
} catch {
case _: NoSuchMethodException =>
try {
} catch {
case _: NoSuchMethodException =>
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf(propertyName: String, defaultClassName: String): T = {
instantiateClass(conf.get(propertyName, defaultClassName))
val serializer = instantiateClassFromConf(
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val closureSerializer = instantiateClassFromConf(
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf, conf))
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass(shuffleMgrClass)
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
val cacheManager = new CacheManager(blockManager)
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.N
conf.set("", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
备注:此方法中创建BlockManagerMaster,如果是创建Executor上的 BlockManager就把创建的BlockManager注册给Driver上的BlockManagerMaster
a) BlockManagerMaster:对整个集群的 Block数据进行管理的;
b) MapOutputTrackerMaster:跟踪所有的Mapper的输出的;
2, BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager
BlockManager 中的receiveAndReply 方法接收Executor上的的BlockManager通过 BlockManagerSlaveEndpoint 跟BlockManagerMasterEndpoint来通信(通过各种case class 不用case object 因为case object是全局共享的)
1) RegisterBlockManager : Executor 调用initialize 方法中 向Driver中的BlockManagerMaster注册当前节点的BlockManager 当BlockManagerMasterEndpoint 收到消息后会为注册的BlockManager创建BlockManagerInfo 来保存BlockManager的元信息,包括BlockManager 的id,内存大小,创建时间,slaveEndpoint(用来给BlockManager发消息的)
2) UpdateBlockInfo :更新BlockManager的元信息 例如Driver 广播时,shuffleMapTask完成等都会更新Block信息
3) GetLocations:获取Block信息,例如ShuffledRDD 计算时会向 mapOutputTracker 要上一个Stage数据
3, 每启动一个ExecutorBackend都会实例化BlockManager并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint;
4, MemoryStore是BlockManager中专门负责内存数据存储和读写的类;
private val memoryStore = new MemoryStore(this, memoryManager) BlockManager
创建时会实例化MemoryStore ,DiskStore
5, DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类
private val diskStore = new DiskStore(this, diskBlockManager)
6, DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘的文件的创建、读写等
val diskBlockManager = new DiskBlockManager(this, conf)
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
subDirs(dirId)(subDirId) = newDir
new File(subDir, filename)
创建Shuffle block
/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
(blockId, getFile(blockId))
HashShuffleWriter 会把shuffle的中间结果写磁盘,
Write a bunch of records to this task's output
shuffle.writers(bucketId).write(elem._1, elem._2)
最终调用DiskBlockObjectWriter 将序列化的结果写到磁盘
1, 首先通过MemoryStore来存储广播变量
sc.textFile 方法会调用hadoopFile
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
def hadoopFile(
path: String,
inputFormatClass: Class],
keyClass: Class,
valueClass: Class,
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
2, 在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的。
BlockManagerMasterEndpointregister 方法
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)}
Executor上的BlockManager注册时会创建 BlockManagerInfo 记录了BlockManagerI注册的时间,最大内存, slaveEndpoint整些元信息
3, 当改变了具体的ExecutorBackend上的Block信息后就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo
override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
startGCTime = computeTotalGcTime()
try {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize](taskBytes, Thread.currentThread.getContextClassLoader)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var threwException = true
val value = try {
val res =
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
} finally {
for (m <- task.metrics) {
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract's deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize >0 && resultSize >maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)}>${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult(TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult(blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
env.blockManager.putBytes(blockId,serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER;BlockManager会调用reportBlockStatus 方法来更新BlockManagerInfo的信息
4, 当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTrackerMasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送当前请求的Stage;
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
我们继续看read方法 ShuffleReader 的实现类:BlockStoreShuffleReader的read方法:
override def read(): Iterator] = {
val blockFetcherItr = new ShuffleBlockFetcherIterator(
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
这里调用了getStatuses(MapOutputTracker类) 方法:
val fetchedBytes = askTracker](GetMapOutputStatuses(shuffleId))
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
MapOutputTrackerMasterEndpoint 收到
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
if (serializedSize >maxRpcMessageSize) {
val msg = s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
* A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
val exception = new SparkException(msg)
logError(msg, exception)
} else {
16/02/14 20:47:15 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to Worker3:45965
16/02/14 20:47:15 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 218 bytes
16/02/14 20:47:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Worker4:46458 (size: 22.6 KB, free: 2.7 GB)
16/02/14 20:47:15 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to Worker4:57697
图一 BlockManager执行流程图
图二 BlockManager执行流程图
sparkimf..... 加油。。。。 很赞 ! 看到瓦力的作品~继续保持