本帖最后由 pig2 于 2015-1-6 14:18 编辑
问题导读
1.http server是如何启动的?
2.页面中的数据是从哪里获取到的?
概要WEB UI和Metrics子系统为外部观察监测Spark内部运行情况提供了必要的窗口,本文将简略的过一下其内部代码实现。
WEB UI
driver application默认会打开4040端口进行http监听,可以看到application相关的详细信息
显示每个stage的详细信息
启动过程本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是非常轻巧的servlet engine和http server。能够嵌入到用户程序中执行,不用像tomcat或jboss那样需要自己独立的jvm进程。
SparkUI在SparkContext初始化的时候创建
- // Initialize the Spark UI , registering all
-
- associated listeners
- private [spark] val ui = new SparkUI (this)
- ui.bind ()
复制代码
initialize的主要工作是注册页面处理句柄,WebUI的子类需要实现自己的initialize函数 bind将真正启动jetty server. - def
- bind () {
- assert (! serverInfo .isDefined ,
- " Attempted to bind %
- s more than once!". format ( className ))
- try {
- // 启 动 JettyServer
- serverInfo = Some( startJettyServer (" 0.0.0.0 ",
- port , handlers , conf))
- logInfo (" Started %s at http ://%s:%d". format (
- className , publicHostName , boundPort ))
- } catch {
- case e: Exception =>
- logError (" Failed to bind %s". format ( className )
- , e)
- System .exit (1)
- }
- }
复制代码
在startJettyServer函数中将JettyServer运行起来的关键处理函数是connect - def connect(currentPort: Int): (Server, Int) = {
- val server = new Server(new InetSocketAddress(hostName, currentPort))
- val pool = new QueuedThreadPool
- pool.setDaemon(true)
- server.setThreadPool(pool)
- server.setHandler(collection)
-
- Try {
- server.start()
- } match {
- case s: Success[_] =>
- (server, server.getConnectors.head.getLocalPort)
- case f: Failure[_] =>
- val nextPort = (currentPort + 1) % 65536
- server.stop()
- pool.stop()
- val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
- if (f.toString.contains("Address already in use")) {
- logWarning(s"$msg - $f")
- } else {
- logError(msg, f.exception)
- }
- connect(nextPort)
- }
- }
-
- val (server, boundPort) = connect(port)
- ServerInfo(server, boundPort, collection)
- }
复制代码
数据获取页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。 需要指出的是,数据尽管得以自动更新,但页面并没有,还是需要手工刷新才能得到最新的数据。
上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在什么时候注册进去的, 注意研究一下SparkUI.initialize函 - def initialize() {
- listenerBus.addListener(storageStatusListener)
- val jobProgressTab = new JobProgressTab(this)
- attachTab(jobProgressTab)
- attachTab(new StorageTab(this))
- attachTab(new EnvironmentTab(this))
- attachTab(new ExecutorsTab(this))
- attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
- attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
- attachHandler(
- createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
- if (live) {
- sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
- }
- }
复制代码
举一个实际例子来看看Notifier发送Event的时刻,比如有任务提交的时 resourceOffer->taskStarted->handleBeginEvent - private
- [ scheduler ]
- def handleBeginEvent (task: Task[_
- ], taskInfo : TaskInfo ) {
- listenerBus .post( SparkListenerTaskStart (task.
- stageId , taskInfo ))
- submitWaitingStages ()
- }
复制代码
post其实是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另一个处理线程listenerThread - override
-
- def run (): Unit = Utils.
- logUncaughtExceptions {
- while (true) {
- eventLock . acquire ()
- // Atomically remove and process this event
- LiveListenerBus .this. synchronized {
- val event = eventQueue .poll
- if (event == SparkListenerShutdown ) {
- // Get out of the while loop and shutdown
- the daemon thread
- return
- }
- Option (event). foreach ( postToAll )
- }
- }
- }
复制代码
Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现如下 - def
- postToAll(event: SparkListenerEvent) {
- event
- match {
- case stageSubmitted: SparkListenerStageSubmitted =>
- foreachListener(_.onStageSubmitted(stageSubmitted))
- case stageCompleted: SparkListenerStageCompleted =>
- foreachListener(_.onStageCompleted(stageCompleted))
- case jobStart: SparkListenerJobStart =>
- foreachListener(_.onJobStart(jobStart))
- case jobEnd: SparkListenerJobEnd =>
- foreachListener(_.onJobEnd(jobEnd))
- case taskStart: SparkListenerTaskStart =>
- foreachListener(_.onTaskStart(taskStart))
- case taskGettingResult: SparkListenerTaskGettingResult =>
- foreachListener(_.onTaskGettingResult(taskGettingResult))
- case taskEnd: SparkListenerTaskEnd =>
- foreachListener(_.onTaskEnd(taskEnd))
- case environmentUpdate: SparkListenerEnvironmentUpdate =>
- foreachListener(_.onEnvironmentUpdate(environmentUpdate))
- case blockManagerAdded: SparkListenerBlockManagerAdded =>
- foreachListener(_.onBlockManagerAdded(blockManagerAdded))
- case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
- foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
- case unpersistRDD: SparkListenerUnpersistRDD =>
- foreachListener(_.onUnpersistRDD(unpersistRDD))
- case applicationStart: SparkListenerApplicationStart =>
- foreachListener(_.onApplicationStart(applicationStart))
- case applicationEnd: SparkListenerApplicationEnd =>
- foreachListener(_.onApplicationEnd(applicationEnd))
- case SparkListenerShutdown =>
- }
- }
复制代码
Metrics在系统设计中,测量模块是不可或缺的组成部分。通过这些测量数据来感知系统的运行情况。
在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述如下。 - instance 表示谁在使用metrics system, 目前已知的有master, worker, executor和client driver会创建metrics system用以测量
- source 表示数据源,从哪里获取数据
- sinks 数据目的地,将从source获取的数据发送到哪
Spark目前支持将测量数据保存或发送到如下目的地 - ConsoleSink 输出到console
- CSVSink 定期保存成为CSV文件
- JmxSink 注册到JMX,以通过JMXConsole来查看
- MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task运行时的测量数据
- GraphiteSink 发送给Graphite以对整个系统(不仅仅包括spark)进行监控
下面从MetricsSystem的创建,数据源的添加,数据更新与发送几个方面来跟踪一下源码。
初始化过程MetricsSystem依赖于由codahale提供的第三方库Metrics,可以在metrics.codahale.com找到更为详细的介绍。 以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程中就会创建MetricsSystem,具体调用关系如下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem 注册数据源,继续以SparkContext为例 - private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
- private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
-
- private def initDriverMetrics() {
- SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
- SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
- }
- initDriverMetrics()
复制代码
数据读取数据读取由Sink来完成,在Spark中创建的Sink子类如下图所示
读取最新的数据,以CsvSink为例,最主要的就是创建CsvReporter,启动之后会定期更新最近的数据到console。不同类型的Sink所使用的Reporter是不一样的。 - val reporter: CsvReporter = CsvReporter.forRegistry(registry)
- .formatFor(Locale.US)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .convertRatesTo(TimeUnit.SECONDS)
- .build(new File(pollDir))
-
- override def start() {
- reporter.start(pollPeriod, pollUnit)
- }
复制代码
相关内容
Apache Spark源码走读之1 -- Spark论文阅读笔记
Apache Spark源码走读之2 -- Job的提交与运行
Apache Spark源码走读之3-- Task运行期之函数调用关系分析
Apache Spark源码走读之4 -- DStream实时流数据处理
Apache Spark源码走读之5-- DStream处理的容错性分析
Apache Spark源码走读之6-- 存储子系统分析
Apache Spark源码走读之7 -- Standalone部署方式分析
Apache Spark源码走读之8 -- Spark on Yarn
Apache Spark源码走读之9 -- Spark源码编译
Apache Spark源码走读之10 -- 在YARN上运行SparkPi
Apache Spark源码走读之11 -- sql的解析与执行
Apache Spark源码走读之12 -- Hive on Spark运行环境搭建
Apache Spark源码走读之13 -- hiveql on spark实现详解
Apache Spark源码走读之14 -- Graphx实现剖析
Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
Apache Spark源码走读之16 -- spark repl实现详解
Apache Spark源码走读之17 -- 如何进行代码跟读
Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码
Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放
Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取
Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析
Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现
Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现
Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现
|