本帖最后由 levycui 于 2019-6-19 19:54 编辑
问题导读:
1、如何理解RPC 网络通信抽象?
2、如何理解Spark 通信架构?
3、如何进行Maste、Worker节点启动?
4、如何理解Driver 和 DriverRunner?
下一篇:大数据技术Spark学习入门2
目录
第1章 Spark 整体概述
1.1 整体概念
1.2 RDD 抽象
1.3 计算抽象
1.4 集群模式
1.5 RPC 网络通信抽象
1.6 启动 Standalone 集群
1.7 核心组件
1.8 核心组件交互流程
1.9 Block 管理
1.10整体应用
第2章 Spark 通信架构
2.1 通信组件概览
2.2 Endpoint 启动过程
2.3 Endpoint Send&Ask 流程
2.4 Endpoint Receive 流程
2.5 Endpoint Inbox 处理流程
2.6 Endpoint 画像
第3章 脚本解析
3.1 start-daemon.sh
3.2 spark-class
3.3 start-master.sh
3.4 start-slaves.sh
3.5 start-all.sh
3.6 spark-submit
第4章 Master 节点启动
4.1 脚本概览
4.2 启动流程
4.3 OnStart 监听事件
4.4 RpcMessage 处理 (receiveAndReply)
4.5 OneWayMessage 处理 (receive)
4.6 Master 对 RpcMessage/OneWayMessage 处理逻辑
第5章 Worker 节点启动
5.1 脚本概览
5.2 启动流程
5.3 OnStart 监听事件
5.4 RpcMessage 处理 (receiveAndReply)
5.5 OneWayMessage 处理 (receive)
第6章 Client 启动流程
6.1 脚本概览
6.2 SparkSubmit 启动流程
6.3 Client 启动流程
6.4 Client 的 OnStart 监听事件
6.5 RpcMessage 处理 (receiveAndReply)
6.6 OneWayMessage 处理(receive)
第7章 Driver 和 DriverRunner
7.1 Master 对 Driver 资源分配
7.2 Worker 运行 DriverRunner
7.3 DriverRunner 创建并运行 DriverWrapper
(大数据技术Spark学习入门2 http://www.aboutyun.com/forum.php?mod=viewthread&tid=27352)
第8章 SparkContext 解析
8.1 SparkContext 解析
8.2 SparkContext 创建过程
8.3 SparkContext 简易结构与交互关系
8.4 Master 对 Application 资源分配
8.5 Worker 创建 Executor
第9章 Job 提交和 Task 的拆分
9.1 整体预览
9.2 Code 转化为初始 RDDs
9.3 RDD 分解为待执行任务集合(TaskSet)
9.4 TaskSet 封装为 TaskSetManager 并提交至 Driver
9.5 Driver 将 TaskSetManager 分解为 TaskDescriptions 并发布任务到 Executor
第10章 Task 执行和回馈
10.1 Task 的执行流程
10.2 Task 的回馈流程
10.3 Task 的迭代流程
10.4 精彩图解
第11章 Spark 的数据存储
11.1 存储子系统概览
11.2 启动过程分析
11.3 通信层
11.4 存储层
11.4.1 Disk Store
11.4.2 Memory Store
11.5 数据写入过程分析
11.5.1 序列化与否
11.6 数据读取过程分析
11.6.1 本地读取
11.6.2 远程读取
11.7 Partition 如何转化为 Block
11.8 partition 和 block 的对应关系
第12章 Spark Shuffle 过程
12.1 MapReduce 的 Shuffle 过程介绍
12.1.1 Spill 过程(刷写过程)
12.1.2 Merge
12.1.3 Copy
12.1.4 Merge Sort1
2.2 HashShuffle 过程介绍
12.3 SortShuffle 过程介绍
12.4 TungstenShuffle 过程介绍
12.5 MapReduce 与 Spark 过程对比
第13章 Spark 内存管理
13.1 堆内和堆外内存规划
13.1.1 堆内内存
13.1.2 堆外内存
13.1.3 内存管理接口
13.2 内存空间分配
13.2.1 静态内存管理
13.2.2 统一内存管理
13.3 存储内存管理
13.3.1 RDD 的持久化机制
13.3.2 RDD 缓存的过程
13.3.3 淘汰和落盘
13.4 执行内存管理
13.4.1 多任务间内存分配
13.4.2 Shuffle 的内存占用
第14章 部署模式解析
14.1 部署模式概述
14.2 standalone 框架
14.2.1 Standalone 模式下任务运行过程
14.2.2 总结
14.3 yarn 集群模式
14.4 mesos 集群模式
14.5 spark 三种部署模式的区别
14.6 异常场景分析
14.6.1 异常分析1:Worker 异常退出
14.6.2 异常分析2:Executor 异常退出
14.6.3 异常分析3:Master 异常退出
第15章 wordcount 程序运行原理窥探
15.1 spark 之 scala 实现 wordcount
15.2 原理窥探
第1章 Spark 整体概述
1.1 整体概念
Apache Spark 是一个开源的通用集群计算系统,它提供了 High-level 编程 API,支持 Scala、Java 和 Python 三种编程语言。Spark 内核使用 Scala 语言编写,通过基于 Scala 的函数式编程特性,在不同的计算层面进行抽象,代码设计非常优秀。
1.2 RDD 抽象
RDD(Resilient Distributed Datasets),弹性分布式数据集,它是对分布式数据集的一种内存抽象,通过受限的共享内存方式来提供容错性,同时这种内存模型使得计算比传统的数据流模型要高效。RDD 具有 5 个重要的特性,如下图所示:
上图展示了 2 个 RDD 进行 JOIN 操作,体现了 RDD 所具备的 5 个主要特性,如下所示:
• 1)一组分区
• 2)计算每一个数据分片的函数
• 3)RDD 上的一组依赖
• 4)可选,对于键值对 RDD,有一个 Partitioner(通常是 HashPartitioner)
• 5)可选,一组 Preferred location 信息(例如,HDFS 文件的 Block 所在 location 信息)
有了上述特性,能够非常好地通过 RDD 来表达分布式数据集,并作为构建 DAG 图的基础:首先抽象一个分布式计算任务的逻辑表示,最终将任务在实际的物理计算环境中进行处理执行。
1.3 计算抽象
在描述 Spark 中的计算抽象,我们首先需要了解如下几个概念:
1)Application
• 用户编写的 Spark 程序,完成一个计算任务的处理。它是由一个 Driver 程序和一组运行于 Spark 集群上的 Executor 组成。
2)Job
• 用户程序中,每次调用 Action 时,逻辑上会生成一个 Job,一个 Job 包含了多个 Stage 。
3)Stage
• Stage 包括两类:ShuffleMapStage 和 ResultStage,如果用户程序中调用了需要进行 Shuffle 计算的 Operator,如 groupByKey 等,就会以 Shuffle 为边界分成 ShuffleMapStage 和 ResultStage。
4)TaskSet
• 基于 Stage 可以直接映射为 TaskSet,一个 TaskSet 封装了一次需要运算的、具有相同处理逻辑的 Task,这些 Task 可以并行计算,粗粒度的调度是以 TaskSet 为单位的。
5)Task
• Task 是在物理节点上运行的基本单位,Task 包含两类:ShuffleMapTask 和 ResultTask,分别对应于 Stage 中 ShuffleMapStage 和 ResultStage 中的一个执行基本单元。
下面,我们看一下,上面这些基本概念之间的关系,如下图所示:
上图,为了简单,每个 Job 假设都很简单,并且只需要进行一次 Shuffle 处理,所以都对应 2 个 Stage。实际应用中,一个 Job 可能包含若干个 Stage,或者是一个相对复杂的 Stage DAG。
在 Standalone 模式下,默认使用的是 FIFO 这种简单的调度策略,在进行调度的过程中,大概流程如下图所示:
从用户提交 Spark 程序,最终生成 TaskSet,而在调度时,通过 TaskSetManager 来管理一个 TaskSet(包含一组可在物理节点上执行的 Task),这里面 TaskSet 必须要按照顺序执行才能保证计算结果的正确性,因为 TaskSet 之间是有序依赖的(上溯到 ShuffleMapStage 和 ResultStage),只有一个 TaskSet 中的所有 Task 都运行完成后,才能调度下一个 TaskSet 中的 Task 去执行。
1.4 集群模式
Spark 集群在设计的时候,并没有在资源管理的设计上对外封闭,而是充分考虑了未来对接一些更强大的资源管理系统,如 YARN、Mesos 等,所以 Spark 架构设计将资源管理单独抽象出一层,通过这种抽象能够构建一种适合企业当前技术栈的插件式资源管理模块,从而为不同的计算场景提供不同的资源分配与调度策略。Spark 集群模式架构,如下图所示:
上图中,Spark集群Cluster Manager目前支持如下三种模式:
1)Standalone 模式
• Standalone 模式是 Spark 内部默认实现的一种集群管理模式,这种模式是通过集群中的 Master 来统一管理资源,而与 Master 进行资源请求协商的是 Driver 内部的 StandaloneSchedulerBackend(实际上是其内部的 StandaloneAppClient 真正与 Master 通信),后面会详细说明。
2)YARN 模式
• YARN 模式下,可以将资源的管理统一交给 YARN 集群的 ResourceManager 去管理,选择这种模式,可以更大限度的适应企业内部已有的技术栈,如果企业内部已经在使用 Hadoop 技术构建大数据处理平台。
3)Mesos 模式
• 随着 Apache Mesos 的不断成熟,一些企业已经在尝试使用 Mesos 构建数据中心的操作系统(DCOS),Spark 构建在 Mesos 之上,能够支持细粒度、粗粒度的资源调度策略(Mesos 的优势),也可以更好地适应企业内部已有技术栈。
• 那么,Spark 中是怎么考虑满足这一重要的设计决策的呢?也就是说,如何能够保证 Spark 非常容易的让第三方资源管理系统轻松地接入进来。我们深入到类设计的层面看一下,如下类图所示:
• 可以看出,Task 调度直接依赖 SchedulerBackend,SchedulerBackend 与实际资源管理模块交互实现资源请求。这里面,CoarseGrainedSchedulerBackend 是 Spark 中与资源调度相关的最重要的抽象,它需要抽象出与 TaskScheduler 通信的逻辑,同时还要能够与各种不同的第三方资源管理系统无缝地交互。实际上,CoarseGrainedSchedulerBackend 内部采用了一种 ResourceOffer 的方式来处理资源请求。
1.5 RPC 网络通信抽象
Spark RPC 层是基于优秀的网络通信框架 Netty 设计开发的,但是 Spark 提供了一种很好地抽象方式,将底层的通信细节屏蔽起来,而且也能够基于此来设计满足扩展性,比如,如果有其他不基于 Netty 的网络通信框架的新的RPC接入需求,可以很好地扩展而不影响上层的设计。RPC 层设计,如下图类图所示:
任何两个 Endpoint 只能通过消息进行通信,可以实现一个 RpcEndpoint 和一个 RpcEndpointRef。想要与 RpcEndpoint 通信,需要获取到该 RpcEndpoint 对应的 RpcEndpointRef 即可,而且管理 RpcEndpoint 和 RpcEndpointRef 创建及其通信的逻辑,统一在 RpcEnv 对象中管理。
1.6 启动 Standalone 集群
Standalone 模式下,Spark 集群采用了简单的 Master-Slave 架构模式,Master 统一管理所有的 Worker,这种模式很常见,我们简单地看下 Spark Standalone 集群启动的基本流程,如下图所示:
可以看到,Spark 集群采用的消息的模式进行通信,也就是 EDA 架构模式,借助于 RPC 层的优雅设计,任何两个 Endpoint 想要通信,发送消息并携带数据即可。上图的流程描述如下所示:
• 1)Master 启动时首先创一个 RpcEnv 对象,负责管理所有通信逻辑。
• 2)Master 通过 RpcEnv 对象创建一个 Endpoint,Master 就是一个 Endpoint,Worker 可以与其进行通信。
• 3)Worker 启动时也是创一个 RpcEnv 对象。
• 4)Worker 通过 RpcEnv 对象创建一个 Endpoint。
• 5)Worker 通过 RpcEnv 对,建立到 Master 的连接,获取到一个 RpcEndpointRef 对象,通过该对象可以与 Master 通信。
• 6)Worker 向 Master 注册,注册内容包括主机名、端口、CPU Core 数量、内存数量。
• 7)Master 接收到 Worker 的注册,将注册信息维护在内存中的 Table 中,其中还包含了一个到 Worker 的 RpcEndpointRef 对象引用。
• 8)Master 回复 Worker 已经接收到注册,告知 Worker 已经注册成功。
• 9)此时如果有用户提交 Spark 程序,Master 需要协调启动 Driver;而 Worker 端收到成功注册响应后,开始周期性向 Master 发送心跳。
1.7 核心组件
集群处理计算任务的运行时(即用户提交了 Spark 程序),最核心的顶层组件就是 Driver 和 Executor,它们内部管理很多重要的组件来协同完成计算任务,核心组件栈如下图所示:
Driver 和 Executor 都是运行时创建的组件,一旦用户程序运行结束,他们都会释放资源,等待下一个用户程序提交到集群而进行后续调度。上图,我们列出了大多数组件,其中 SparkEnv 是一个重量级组件,他们内部包含计算过程中需要的主要组件,而且,Driver 和 Executor 共同需要的组件在 SparkEnv 中也包含了很多。这里,我们不做过多详述,后面交互流程等处会说明大部分组件负责的功能。
1.8 核心组件交互流程
在 Standalone 模式下,Spark 中各个组件之间交互还是比较复杂的,但是对于一个通用的分布式计算系统来说,这些都是非常重要而且比较基础的交互。首先,为了理解组件之间的主要交互流程,我们给出一些基本要点:
• 一个 Application 会启动一个 Driver
• 一个 Driver 负责跟踪管理该 Application 运行过程中所有的资源状态和任务状态
• 一个 Driver 会管理一组 Executor
• 一个 Executor 只执行属于一个 Driver 的 Task
核心组件之间的主要交互流程,如下图所示:
上图中,通过不同颜色或类型的线条,给出了如下 6 个核心的交互流程,我们会详细说明:
橙色:提交用户 Spark 程序
用户提交一个 Spark 程序,主要的流程如下所示:
•1)用户 spark-submit 脚本提交一个 Spark 程序,会创建一个 ClientEndpoint 对象,该对象负责与 Master 通信交互
•2)ClientEndpoint 向 Master 发送一个 RequestSubmitDriver 消息,表示提交用户程序
•3)Master 收到 RequestSubmitDriver 消息,向 ClientEndpoint 回复 SubmitDriverResponse,表示用户程序已经完成注册
•4)ClientEndpoint 向 Master 发送 RequestDriverStatus 消息,请求 Driver 状态
•5)如果当前用户程序对应的 Driver 已经启动,则 ClientEndpoint 直接退出,完成提交用户程序
紫色:启动 Driver 进程
当用户提交用户 Spark 程序后,需要启动 Driver 来处理用户程序的计算逻辑,完成计算任务,这时 Master 协调需要启动一个 Driver,具体流程如下所示:
•1)Maser 内存中维护着用户提交计算的任务 Application,每次内存结构变更都会触发调度,向 Worker 发送 LaunchDriver 请求
•2)Worker 收到 LaunchDriver 消息,会启动一个 DriverRunner 线程去执行 LaunchDriver 的任务
•3)DriverRunner 线程在 Worker 上启动一个新的 JVM 实例,该 JVM 实例内运行一个 Driver 进程,该 Driver 会创建 SparkContext 对象
红色:注册 Application
Dirver 启动以后,它会创建 SparkContext 对象,初始化计算过程中必需的基本组件,并向 Master 注册 Application,流程描述如下:
•1)创建 SparkEnv 对象,创建并管理一些数基本组件
•2)创建 TaskScheduler,负责 Task 调度
•3)创建 StandaloneSchedulerBackend,负责与 ClusterManager 进行资源协商
•4)创建 DriverEndpoint,其它组件可以与 Driver 进行通信
•5)在 StandaloneSchedulerBackend 内部创建一个 StandaloneAppClient,负责处理与 Master 的通信交互
•6)StandaloneAppClient 创建一个 ClientEndpoint,实际负责与 Master 通信
•7)ClientEndpoint 向 Master 发送 RegisterApplication 消息,注册 Application
•8)Master 收到 RegisterApplication 请求后,回复 ClientEndpoint 一个 RegisteredApplication 消息,表示已经注册成功
蓝色:启动 Executor 进程
•1)Master 向 Worker 发送 LaunchExecutor 消息,请求启动 Executor;同时 Master 会向 Driver 发送 ExecutorAdded 消息,表示 Master 已经新增了一个 Executor(此时还未启动)
•2)Worker 收到 LaunchExecutor 消息,会启动一个 ExecutorRunner 线程去执行 LaunchExecutor 的任务
•3)Worker 向 Master 发送 ExecutorStageChanged 消息,通知 Executor 状态已发生变化
•4)Master 向 Driver 发送 ExecutorUpdated 消息,此时 Executor 已经启动
粉色:启动 Task 执行
•1)StandaloneSchedulerBackend 启动一个 DriverEndpoint
•2)DriverEndpoint 启动后,会周期性地检查 Driver 维护的 Executor 的状态,如果有空闲的 Executor 便会调度任务执行
•3)DriverEndpoint 向 TaskScheduler 发送 Resource Offer 请求
•4)如果有可用资源启动 Task,则 DriverEndpoint 向 Executor 发送 LaunchTask 请求
•5)Executor 进程内部的 CoarseGrainedExecutorBackend 调用内部的 Executor 线程的 launchTask 方法启动 Task
•6)Executor 线程内部维护一个线程池,创建一个 TaskRunner 线程并提交到线程池执行
绿色:Task 运行完成
•1)Executor 进程内部的 Executor 线程通知 CoarseGrainedExecutorBackend,Task 运行完成
•2)CoarseGrainedExecutorBackend 向 DriverEndpoint 发送 StatusUpdated 消息,通知 Driver 运行的 Task 状态发生变更
•3)StandaloneSchedulerBackend 调用T askScheduler 的 updateStatus 方法更新 Task 状态
•4)StandaloneSchedulerBackend 继续调用 TaskScheduler 的 resourceOffers 方法,调度其他任务运行
1.9 Block 管理
Block 管理,主要是为 Spark 提供的 Broadcast 机制提供服务支撑的。Spark 中内置采用 TorrentBroadcast 实现,该 Broadcast 变量对应的数据(Task 数据)或数据集(如 RDD),默认会被切分成若干 4M 大小的 Block,Task 运行过程中读取到该 Broadcast 变量,会以 4M 为单位的 Block 为拉取数据的最小单位,最后将所有的 Block 合并成 Broadcast 变量对应的完整数据或数据集。将数据切分成 4M 大小的 Block,Task 从多个 Executor 拉取 Block,可以非常好地均衡网络传输负载,提高整个计算集群的稳定性。
通常,用户程序在编写过程中,会对某个变量进行 Broadcast,该变量称为 Broadcast 变量。在实际物理节点的 Executor 上执行 Task 时,需要读取 Broadcast 变量对应的数据集,那么此时会根据需要拉取 DAG 执行流上游已经生成的数据集。采用 Broadcast 机制,可以有效地降低数据在计算集群环境中传输的开销。具体地,如果一个用户对应的程序中的 Broadcast 变量,对应着一个数据集,它在计算过程中需要拉取对应的数据,如果在同一个物理节点上运行着多个 Task,多个 Task 都需要该数据,有了 Broadcast 机制,只需要拉取一份存储在本地物理机磁盘即可,供多个 Task 计算共享。
另外,用户程序在进行调度过程中,会根据调度策略将 Task 计算逻辑数据(代码)移动到对应的 Worker 节点上,最优情况是对本地数据进行处理,那么代码(序列化格式)也需要在网络上传输,也是通过 Broadcast 机制进行传输,不过这种方式是首先将代码序列化到 Driver 所在 Worker 节点,后续如果 Task 在其他 Worker 中执行,需要读取对应代码的 Broadcast 变量,首先就是从 Driver 上拉取代码数据,接着其他晚一些被调度的 Task 可能直接从其他 Worker 上的 Executor 中拉取代码数据。
我们通过以 Broadcast 变量 taskBinary 为例,说明 Block 是如何管理的,如下图所示:
上图中,Driver 负责管理所有的 Broadcast 变量对应的数据所在的 Executor,即一个 Executor 维护一个 Block 列表。在 Executor 中运行一个 Task 时,执行到对应的 Broadcast 变量 taskBinary,如果本地没有对应的数据,则会向 Driver 请求获取 Broadcast 变量对应的数据,包括一个或多个 Block 所在的 Executor 列表,然后该 Executor 根据 Driver 返回的 Executor 列表,直接通过底层的 BlockTransferService 组件向对应 Executor 请求拉取 Block。Executor 拉取到的 Block 会缓存到本地,同时向 Driver 报告该 Executor 上存在的 Block 信息,以供其他 Executor 执行 Task 时获取 Broadcast 变量对应的数据。
1.10整体应用
用户通过 spark-submit 提交或者运行 spark-shell REPL,集群创建 Driver,Driver 加载 Application,最后 Application 根据用户代码转化为 RDD,RDD 分解为 Tasks,Executor 执行 Task 等系列知识,整体交互蓝图如下:
第2章 Spark 通信架构
Spark作为分布式计算框架,多个节点的设计与相互通信模式是其重要的组成部分。Spark 一开始使用 Akka 作为内部通信部件。在 Spark 1.3 年代,为了解决大块数据(如 Shuffle)的传输问题,Spark 引入了 Netty 通信框架。到了 Spark 1.6,Spark 可以配置使用 Akka 或者 Netty 了,这意味着 Netty 可以完全替代 Akka了。再到 Spark 2,Spark 已经完全抛弃 Akka了,全部使用 Netty 了。
为什么呢?官方的解释是:
•1)很多 Spark 用户也使用 Akka,但是由于 Akka 不同版本之间无法互相通信,这就要求用户必须使用跟 Spark 完全一样的 Akka 版本,导致用户无法升级 Akka。
•2)Spark 的 Akka 配置是针对 Spark 自身来调优的,可能跟用户自己代码中的 Akka 配置冲突。
•3)Spark 用的 Akka 特性很少,这部分特性很容易自己实现。同时,这部分代码量相比 Akka 来说少很多,debug 比较容易。如果遇到什么 bug,也可以自己马上 fix,不需要等 Akka 上游发布新版本。而且,Spark 升级 Akka 本身又因为第一点会强制要求用户升级他们使用的 Akka,对于某些用户来说是不现实的。
SPARK 的通信架构 - Actor 比较,如下图所示:
2.1 通信组件概览
对源码分析,对于设计思路理解如下:
•1)RpcEndpoint:RPC 端点,Spark 针对于每个节点(Client/Master/Worker)都称之一个 Rpc 端点且都实现 RpcEndpoint 接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher。
•2)RpcEnv:RPC 上下文环境,每个 Rpc 端点运行时依赖的上下文环境称之为 RpcEnv。
•3)Dispatcher:消息分发器,针对于 RPC 端点需要发送消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱。
•4)Inbox:指令消息收件箱,一个本地端点对应一个收件箱,Dispatcher 在每次向 Inbox 存入消息时,都将对应 EndpointData 加入内部待 Receiver Queue中,另外 Dispatcher 创建时会启动一个单独线程进行轮询 Receiver Queue,进行收件箱消息消费。
•5)OutBox:指令消息发件箱,一个远程端点对应一个发件箱,当消息放入 Outbox 后,紧接着将消息通过 TransportClient 发送出去。消息放入发件箱以及发送过程是在同一个线程中进行,这样做的主要原因是远程消息分为 RpcOutboxMessage,OneWayOutboxMessage 两种消息,而针对于需要应答的消息直接发送且需要得到结果进行处理
•6)TransportClient:Netty 通信客户端,根据 OutBox 消息的 receiver 信息,请求对应远程 TransportServer。
•7)TransportServer:Netty 通信服务端,一个 RPC 端点一个 TransportServer,接受远程消息后调用 Dispatcher 分发消息至对应收发件箱。
注意:
TransportClient 与 TransportServer 通信虚线表示两个 RpcEnv 之间的通信,图示没有单独表达式。
一个 Outbox 一个 TransportClient,图示没有单独表达式。
一个 RpcEnv 中存在两个 RpcEndpoint,一个代表本身启动的 RPC 端点,另外一个为 RpcEndpointVerifier。
Spark的通信架构 – 高层视图
Spark 的通信架构 – 类图
2.2 Endpoint 启动过程
启动的流程如下:
Endpoint 启动后,默认会向 Inbox 中添加 OnStart 消息,不同的端点(Master/Worker/Client)消费 OnStart 指令时,进行相关端点的启动额外处理。
Endpoint 启动时,会默认启动 TransportServer,且启动结束后会进行一次同步测试 rpc 可用性(askSync-BoundPortsRequest)。
Dispatcher 作为一个分发器,内部存放了 Inbox,Outbox 的等相关句柄和存放了相关处理状态数据,结构大致如下:
2.3 Endpoint Send&Ask 流程
Endpoint 的消息发送与请求流程,如下:
Endpoint 根据业务需要存入两个维度的消息组合:send/ask 某个消息,receiver 是自身与非自身
•1)OneWayMessage:send + 自身,直接存入收件箱
•2)OneWayOutboxMessage:send + 非自身,存入发件箱并直接发送
•3)RpcMessage:ask + 自身,直接存入收件箱,另外还需要存入 LocalNettyRpcCallContext,需要回调后再返回
•4)RpcOutboxMessage:ask + 非自身,存入发件箱并直接发送,需要回调后再返回
2.4 Endpoint Receive 流程
Endpoint 的消息的接收,流程如下:
上图 ServerBootstrap 为 Netty 启动服务,SocketChanel为Netty 数据通道。
上述包含 TransportSever 启动与消息接收两个流程。
2.5 Endpoint Inbox 处理流程
Spark 在 Endpoint 的设计上核心设计即为 Inbox 与 Outbox,其中 Inbox 核心要点为:
•1)内部的处理流程拆分为多个消息指令(InboxMessage)存放入 Inbox。
•2)当 Dispatcher 启动最后,会启动一个名为【dispatcher-event-loop】的线程扫描 Inbox 待处理 InboxMessage,并调用 Endpoint 根据 InboxMessage 类型做相应处理
•3)当 Dispatcher 启动最后,默认会向 Inbox 存入 OnStart 类型的 InboxMessage,Endpoint 在根据 OnStart 指令做相关的额外启动工作,三端启动后所有的工作都是对 OnStart 指令处理衍生出来的,因此可以说 OnStart 指令是相互通信的源头。
消息指令类型大致如下三类:
•1)OnStart/OnStop
•2)RpcMessage/OneWayMessage
•3)RemoteProcessDisconnected/RemoteProcessConnected/RemoteProcessConnectionError
2.6 Endpoint 画像
第3章 脚本解析
在看源码之前,我们一般会看相关脚本了解其初始化信息以及 Bootstrap 类,Spark 也不例外,而 Spark 中相关的脚本如下:
%SPARK_HOME%/sbin/start-master.sh
%SPARK_HOME%/sbin/start-slaves.sh
%SPARK_HOME%/sbin/start-all.sh
%SPARK_HOME%/bin/spark-submit
启动脚本中对于公共处理部分进行抽取为独立的脚本,如下:
脚本 说明
sbin/spark-config.sh 初始化环境变量 SPARK_CONF_DIR, PYTHONPATH
bin/load-spark-env.sh 初始化环境变量 SPARK_SCALA_VERSION,调用 %SPARK_HOME%
conf/spark-env.sh 加载用户自定义环境变量
3.1 start-daemon.sh
主要完成进程相关基本信息初始化,然后调用 bin/spark-class 进行守护进程启动,该脚本是创建端点的通用脚本,三端各自脚本都会调用 spark-daemon.sh 脚本启动各自进程
详解如下:
1)初始化 SPRK_HOME、SPARK_CONF_DIR、SPARK_IDENT_STRING、SPARK_LOG_DIR 环境变量 (如果不存在)
2)初始化日志并测试日志文件夹读写权限,初始化 PID 目录并校验 PID 信息
3)调用 /bin/spark-class 脚本,/bin/spark-class 见下面
3.2 spark-class
Master 调用举例:
[mw_shl_code=shell,true]bin/spark-class \
--class org.apache.spark.deploy.master.Master \
--host $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT $ORIGINAL_ARGS[/mw_shl_code]
1)初始化 RUNNER(java)、SPARK_JARS_DIR (%SPARK_HOME%/jars)、LAUNCH_CLASSPATH 信息
2)调用 ("$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") 获取最终执行的 shell 语句
3)执行最终的 shell 语句,示例如下:
[mw_shl_code=shell,true]/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.master.Master \
--host hadoop102 \
--port 7077 \
--webui-port 8080[/mw_shl_code]
如果是 Client,那么可能为 r,或者 python 脚本。
3.3 start-master.sh
启动 Master 的脚本,流程如下:
详解如下:
1)用户执行 start-master.sh 脚本,初始化环境变量 SPARK_HOME (如果 PATH 不存在 SPARK_HOME,初始化脚本的上级目录为 SPARK_HOME),调用 spark-config.sh,调用 load-spark-env.sh
2)如果环境变量 SPARK_MASTER_HOST、SPARK_MASTER_PORT、SPARK_MASTER_WEBUI_PORT 不存在,进行初始化 7077,hostname -f,8080
3)调用 spark-daemon.sh 脚本启动 master 进程,如下:
[mw_shl_code=shell,true]spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
--host $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT $ORIGINAL_ARGS)[/mw_shl_code]
3.4 start-slaves.sh
启动 Worker 的脚本,流程如下:
详解如下:
1)用户执行 start-slaves.sh 脚本,初始化环境变量 SPARK_HOME,调用 spark-config.sh,调用 load-spark-env.sh,初始化 Master host/port 信息
2)调用 slaves.sh 脚本,读取 conf/slaves 文件并遍历,通过 ssh 连接到对应 slave 节点,启动 ${SPARK_HOME}/sbin/start-slave.sh spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT
3)start-slave.sh 在各个节点中,初始化环境变量 SPARK_HOME,调用 spark-config.sh,调用 load-spark-env.sh,根据 $SPARK_WORKER_INSTANCES 计算 WEBUI_PORT 端口 (worker 端口号依次递增) 并启动 Worker 进程,如下:
[mw_shl_code=shell,true]${SPARK_HOME}/sbin/spark-daemon.sh \
start org.apache.spark.deploy.worker.Worker $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"[/mw_shl_code]
3.5 start-all.sh
属于快捷脚本,内部调用 start-master.sh 与 start-slaves.sh 脚本,并无额外工作。
3.6 spark-submit
任务提交的基本脚本,流程如下:
详解如下:
1)直接调用 spark-class 脚本进行进程创建,示例如下:
[mw_shl_code=shell,true]./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
../examples/jars/spark-examples_2.11-2.1.0.jar 10[/mw_shl_code]
2)如果是 java/scala 任务,那么最终调用 SparkSubmit.scala 进行任务处理,示例如下:
[mw_shl_code=shell,true]/opt/module/jdk1.8.0_144 -cp \
/opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g -XX:MaxPermSize=256m \
org.apache.spark.deploy.SparkSubmit \
--master spark://hadoop102:7077 \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.1.0.jar 10[/mw_shl_code]
第4章 Master 节点启动
Master 作为 Endpoint 的具体实例,下面我们介绍一下 Master 启动以及 OnStart 指令后的相关工作。
4.1 脚本概览
下面是一个举例:
[mw_shl_code=shell,true]/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.master.Master \
--host hadoop102 \
--port 7077 \[/mw_shl_code]
4.2 启动流程
Master 的启动流程如下:
详解如下:
1)SparkConf:加载 key 以 spark. 开头的系统属性 (Utils.getSystemProperties)。
2)MasterArguments:
a) 解析 Master 启动的参数:
--ip -i --host -h --port -p --webui-port --properties-file
b)将 --properties-file (没有配置默认为 conf/spark-defaults.conf) 中以 spark. 开头的配置存入 SparkConf。
3)NettyRpcEnv 中的内部处理遵循 RpcEndpoint 统一处理,这里不再赘述。
4)BoundPortsResponse 返回 rpcEndpointPort、webUIPort、restPort 真实端口。
5)最终守护进程会一直存在等待结束信 awaitTermination。
4.3 OnStart 监听事件
Master 的启动完成后异步执行工作如下:
详解如下:
1)【dispatcher-event-loop】线程扫描到 OnStart 指令后会启动相关 MasterWebUI (默认端口 8080),根据配置选择安装 ResetServer (默认端口 6066)。
2)另外新起【master-forward-message-thread】线程定期检查 Worker 心跳是否超时。
3)如果 Worker 心跳检测超时,那么对 Worker 下的发布的所有任务所属 Driver 进行 ExecutorUpdated 发送,同时自己再重新 LaunchDriver。
4.4 RpcMessage 处理 (receiveAndReply)
4.5 OneWayMessage 处理 (receive)
4.6 Master 对 RpcMessage/OneWayMessage 处理逻辑
这部分对整体 Master 理解作用不是很大且理解比较抽象,可以先读后续内容,回头再考虑看这部分内容,或者不读。
第5章 Worker 节点启动
Worker 作为 Endpoint 的具体实例,下面我们介绍一下 Worker 启动以及 OnStart 指令后的额外工作。
5.1 脚本概览
下面是一个举例:
[mw_shl_code=shell,true]/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.worker.Worker \
--webui-port 8081
spark://hadoop102:7077[/mw_shl_code]
5.2 启动流程
Worker 的启动流程如下:
详解如下:
1)SparkConf:加载 key 以 spark. 开头的系统属性 (Utils.getSystemProperties)。
2)WorkerArguments:
a) 解析 Master 启动的参数:
--ip -i --host -h --port -p --cores -c --memory -m --work-dir --webui-port --properties-file
b) 将 --properties-file (没有配置默认为conf/spark-defaults.conf) 中以 spark. 开头的配置存入 SparkConf。
c) 在没有配置情况下,cores 默认为服务器 CPU 核数。
d) 在没有配置情况下,memory 默认为服务器内存减 1G,如果低于 1G 取 1G。
e) webUiPort 默认为 8081。
3)NettyRpcEnv 中的内部处理遵循 RpcEndpoint 统一处理,这里不再赘述。
4)最终守护进程会一直存在等待结束信 awaitTermination。
5.3 OnStart 监听事件
Worker 的启动完成后异步执行工作如下:
详解如下:
1)【dispatcher-event-loop】线程扫描到 OnStart 指令后会启动相关 WorkerWebUI (默认端口 8081)。
2)Worker 向 Master 发起一次 RegisterWorker 指令。
3)另起【master-forward-message-thread】线程定期执行 ReregisterWithMaster 任务,如果注册成功 (RegisteredWorker) 则跳过,否则再次向 Master 发起 RegisterWorker 指令,直到超过最大次数报错 (默认16次)。
4)Master 如果可以注册,则维护对应的 WorkerInfo 对象并持久化,完成后向 Worker 发起一条 RegisteredWorker 指令,如果 Master 为 standby 状态,则向 Worker 发起一条 MasterInStandby 指令。
5)Worker 接受 RegisteredWorker 后,提交【master-forward-message-thread】线程定期执行 SendHeartbeat 任务,完成后向 Worker 发起一条 WorkerLatestState 指令。
6)Worker 发心跳检测,会触发更新 Master 对应 WorkerInfo 对象,如果 Master 检测到异常,则发起 ReconnectWorker 指令至 Worker,Worker 则再次执行 ReregisterWithMaster 工作。
5.4 RpcMessage 处理 (receiveAndReply)
5.5 OneWayMessage 处理 (receive)
第6章 Client 启动流程
Client 作为 Endpoint 的具体实例,下面我们介绍一下 Client 启动以及 OnStart 指令后的额外工作。
6.1 脚本概览
下面是一个举例:
[mw_shl_code=shell,true]
/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g
-XX:MaxPermSize=256m
org.apache.spark.deploy.SparkSubmit
--master spark://hadoop102:7077
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.1.0.jar 10[/mw_shl_code]
6.2 SparkSubmit 启动流程
SparkSubmit 的启动流程如下:
详解如下:
1)SparkSubmitArguments:
a) 解析 Client 启动的参数
--name --master --class --deploy-mode
--num-executors --executor-cores --total-executor-cores --executor-memory
--driver-memory --driver-cores --driver-class-path --driver-java-options --driver-library-path
--properties-file
--kill --status --supervise --queue
--files --py-files
--archives --jars --packages --exclude-packages --repositories
--conf (解析存入 Map:sparkProperties 中)
--proxy-user --principal --keytab --help --verbose --version --usage-error
b) 合并 --properties-file (没有配置默认为 conf/spark-defaults.conf) 文件配置项 (不在 --conf 中的配置 ) 至 sparkProperties
c) 删除 sparkProperties 中不以 spark. 开头的配置项目
d) 启动参数为空的配置项从 sparkProperties 中合并
e) 根据 action (SUBMIT、KILL、REQUEST_STATUS) 校验各自必需参数是否有值
2)Case Submit:
a) 获取childMainClass
[--deploy-mode] = clent(默认):用户任务启动类 mainClass (--class)
[--deploy-mode] = cluster & [--master] = spark:* & useRest org.apache.spark.deploy.rest.RestSubmissionClient
[--deploy-mode] = cluster & [--master] = spark:* & !useRest org.apache.spark.deploy.Client
[--deploy-mode] = cluster & [--master] = yarn org.apache.spark.deploy.yarn.Client
[--deploy-mode] = cluster & [--master] = mesos:* org.apache.spark.deploy.rest.RestSubmissionClient
b) 获取 childArgs (子运行时对应命令行组装参数)
[--deploy-mode] = cluster & [--master] = spark:* & useRest 包含 primaryResource 与 mainClass
[--deploy-mode] = cluster & [--master] = spark:* & !useRest 包含 --supervise --memory --cores launch childArg, primaryResource, mainClass
[--deploy-mode] = cluster & [--master] = yarn --class --arg --jar/--primary-py-file/--primary-r-file
[--deploy-mode] = cluster & [--master] = mesos:* primaryResource
c) 获取 childClasspath
[--deploy-mode] = clent 读取 --jars 配置,与 primaryResource 信息 (../examples/jars/spark-examples_2.11-2.1.0.jar)
d) 获取 sysProps
将 sparkPropertie 中的所有配置封装成新的 sysProps 对象,另外还增加了一下额外的配置项目
e) 将 childClasspath 通过当前的类加载器加载中
f) 将 sysProps 设置到当前 jvm 环境中
g) 最终反射执行 childMainClass,传参为 childArgs
6.3 Client 启动流程
Client 的启动流程如下:
详解如下:
1)SparkConf:加载 key 以 spark. 开头的系统属性 (Utils.getSystemProperties)。
2)ClientArguments:
a) 解析 Client 启动的参数:
--cores -c --memory -m --supervise -s --verbose -v
launch jarUrl master mainClass
kill master driverId
b) 将 --properties-file (没有配置默认为 conf/spark-defaults.conf) 中以 spark. 开头的配置存入 SparkConf。
c) 在没有配置情况下,cores 默认为 1 核。
d) 在没有配置情况下,memory 默认为 1G。
e) NettyRpcEnv 中的内部处理遵循 RpcEndpoint 统一处理,这里不再赘述。
3)最终守护进程会一直存在等待结束信 awaitTermination。
6.4 Client 的 OnStart 监听事件
Client 的启动完成后异步执行工作如下:
详解如下:
1)如果是发布任务(case launch),Client 创建一个 DriverDescription,并向 Master 发起 RequestSubmitDriver 请求。
a) Command 中的 mainClass 为: org.apache.spark.deploy.worker.DriverWrapper
b) Command 中的 arguments 为: Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass)
2)Master 接受 RequestSubmitDriver 请求后,将 DriverDescription 封装为 一个DriverInfo。
a) startTime 与 submitDate 都为当前时间
b) driverId 格式为:driver-yyyyMMddHHmmss-nextId,nextId 是全局唯一的
3)Master 持久化 DriverInfo,并加入待调度列表中 (waitingDrivers),触发公共资源调度逻辑。
4)Master 公共资源调度结束后,返回 SubmitDriverResponse给Client。
6.5 RpcMessage 处理 (receiveAndReply)
无。
6.6 OneWayMessage 处理(receive)
第7章 Driver 和 DriverRunner
Client 向 Master 发起 RequestSubmitDriver 请求,Master 将 DriverInfo 添加待调度列表中 (waitingDrivers),下面针对于 Driver 进一步梳理。
7.1 Master 对 Driver 资源分配
大致流程如下:
详解如下:
waitingDrivers 与 aliveWorkers 进行资源匹配:
1)在 waitingDrivers 循环内,轮询所有 aliveWorker。
2)如果 aliveWorker 满足当前 waitingDriver 资源要求,给 Worker 发送 LaunchDriver 指令并将 waitingDriver 移除 waitingDrivers,则进行下一次 waitingDriver 的轮询工作。
3)如果轮询完所有 aliveWorker 都不满足 waitingDriver 资源要求,则进行下一次 waitingDriver 的轮询工作。
4)所有发起的轮询开始点都上次轮询结束点的下一个点位开始。
7.2 Worker 运行 DriverRunner
Driver 的启动,流程如下:
详解如下:
1)当 Worker 遇到 LaunchDriver 指令时,创建并启动一个 DriverRunner。
2)DriverRunner 启动一个线程 DriverRunner for [driverId] 处理 Driver 启动工作。
3)DriverRunner for [driverId]:
a) 添加 JVM 钩子,针对于每个 diriverId 创建一个临时目录。
b) 将 DriverDesc.jarUrl 通过 Netty 从 Driver 机器远程拷贝过来。
c) 根据 DriverDesc.command 模板构建本地执行的 command 命令,并启动该 command 对应的 Process 进程。
d) 将 Process 的输出流输出到文件 stdout/stderror,如果 Process 启动失败,进行 1-5 的秒的反复启动工作,直到启动成功,在释放 Worker 节点的 DriverRunner 的资源。
7.3 DriverRunner 创建并运行 DriverWrapper
DriverWrapper 的运行,流程如下:
详解如下:
1)DriverWapper 创建了一个 RpcEndpoint 与 RpcEnv。
2)RpcEndpoint 为 WorkerWatcher,主要目的为监控 Worker 节点是否正常,如果出现异常就直接退出。
3)然后当前的 ClassLoader 加载 userJar,同时执行 userMainClass。
4)执行用户的 main 方法后关闭 workerWatcher。
作者:黑泽君
来源:https://www.cnblogs.com/chenmingjun/p/10803261.html#commentform
最新经典文章,欢迎关注公众号
|
|