分享

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

本帖最后由 pig2 于 2015-1-6 14:18 编辑

问题导读


1.http server是如何启动的?
2.页面中的数据是从哪里获取到的?













概要
WEB UI和Metrics子系统为外部观察监测Spark内部运行情况提供了必要的窗口,本文将简略的过一下其内部代码实现。

WEB UI
先上图感受一下spark webui 假设当前已经在本机运行standalone cluster模式,输入http://127.0.0.1:8080将会看到如下页面
下载.png

  driver application默认会打开4040端口进行http监听,可以看到application相关的详细信息
下载 (1).png
显示每个stage的详细信息
下载 (3).png

启动过程
本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是非常轻巧的servlet engine和http server。能够嵌入到用户程序中执行,不用像tomcat或jboss那样需要自己独立的jvm进程。
下载 (2).png
SparkUI在SparkContext初始化的时候创建
  1. // Initialize the Spark UI , registering all
  2. associated listeners
  3. private [spark] val ui = new SparkUI (this)
  4. ui.bind ()
复制代码

initialize的主要工作是注册页面处理句柄,WebUI的子类需要实现自己的initialize函数
bind将真正启动jetty server.
  1. def
  2. bind () {
  3. assert (! serverInfo .isDefined ,
  4. " Attempted to bind %
  5. s more than once!". format ( className ))
  6. try {
  7. // 启 动 JettyServer
  8. serverInfo = Some( startJettyServer (" 0.0.0.0 ",
  9. port , handlers , conf))
  10. logInfo (" Started %s at http ://%s:%d". format (
  11. className , publicHostName , boundPort ))
  12. } catch {
  13. case e: Exception =>
  14. logError (" Failed to bind %s". format ( className )
  15. , e)
  16. System .exit (1)
  17. }
  18. }
复制代码

在startJettyServer函数中将JettyServer运行起来的关键处理函数是connect
  1. def connect(currentPort: Int): (Server, Int) = {
  2.       val server = new Server(new InetSocketAddress(hostName, currentPort))
  3.       val pool = new QueuedThreadPool
  4.       pool.setDaemon(true)
  5.       server.setThreadPool(pool)
  6.       server.setHandler(collection)
  7.       Try {
  8.         server.start()
  9.       } match {
  10.         case s: Success[_] =>
  11.           (server, server.getConnectors.head.getLocalPort)
  12.         case f: Failure[_] =>
  13.           val nextPort = (currentPort + 1) % 65536
  14.           server.stop()
  15.           pool.stop()
  16.           val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
  17.           if (f.toString.contains("Address already in use")) {
  18.             logWarning(s"$msg - $f")
  19.           } else {
  20.             logError(msg, f.exception)
  21.           }
  22.           connect(nextPort)
  23.       }
  24.     }
  25.     val (server, boundPort) = connect(port)
  26.     ServerInfo(server, boundPort, collection)
  27.   }
复制代码

数据获取
页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。
需要指出的是,数据尽管得以自动更新,但页面并没有,还是需要手工刷新才能得到最新的数据。

下载 (4).png
上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在什么时候注册进去的, 注意研究一下SparkUI.initialize函
  1. def initialize() {
  2.     listenerBus.addListener(storageStatusListener)
  3.     val jobProgressTab = new JobProgressTab(this)
  4.     attachTab(jobProgressTab)
  5.     attachTab(new StorageTab(this))
  6.     attachTab(new EnvironmentTab(this))
  7.     attachTab(new ExecutorsTab(this))
  8.     attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
  9.     attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
  10.     attachHandler(
  11.       createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
  12.     if (live) {
  13.       sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
  14.     }
  15.   }
复制代码

举一个实际例子来看看Notifier发送Event的时刻,比如有任务提交的时 resourceOffer->taskStarted->handleBeginEvent
  1. private
  2. [ scheduler ]
  3. def handleBeginEvent (task: Task[_
  4. ], taskInfo : TaskInfo ) {
  5. listenerBus .post( SparkListenerTaskStart (task.
  6. stageId , taskInfo ))
  7. submitWaitingStages ()
  8. }
复制代码
post其实是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另一个处理线程listenerThread
  1. override
  2. def run (): Unit = Utils.
  3. logUncaughtExceptions {
  4. while (true) {
  5. eventLock . acquire ()
  6. // Atomically remove and process this event
  7. LiveListenerBus .this. synchronized {
  8. val event = eventQueue .poll
  9. if (event == SparkListenerShutdown ) {
  10. // Get out of the while loop and shutdown
  11. the daemon thread
  12. return
  13. }
  14. Option (event). foreach ( postToAll )
  15. }
  16. }
  17. }
复制代码

Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现如下
  1. def
  2. postToAll(event: SparkListenerEvent) {
  3.     event
  4. match {
  5.       case stageSubmitted: SparkListenerStageSubmitted =>
  6.         foreachListener(_.onStageSubmitted(stageSubmitted))
  7.       case stageCompleted: SparkListenerStageCompleted =>
  8.         foreachListener(_.onStageCompleted(stageCompleted))
  9.       case jobStart: SparkListenerJobStart =>
  10.         foreachListener(_.onJobStart(jobStart))
  11.       case jobEnd: SparkListenerJobEnd =>
  12.         foreachListener(_.onJobEnd(jobEnd))
  13.       case taskStart: SparkListenerTaskStart =>
  14.         foreachListener(_.onTaskStart(taskStart))
  15.       case taskGettingResult: SparkListenerTaskGettingResult =>
  16.         foreachListener(_.onTaskGettingResult(taskGettingResult))
  17.       case taskEnd: SparkListenerTaskEnd =>
  18.         foreachListener(_.onTaskEnd(taskEnd))
  19.       case environmentUpdate: SparkListenerEnvironmentUpdate =>
  20.         foreachListener(_.onEnvironmentUpdate(environmentUpdate))
  21.       case blockManagerAdded: SparkListenerBlockManagerAdded =>
  22.         foreachListener(_.onBlockManagerAdded(blockManagerAdded))
  23.       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
  24.         foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
  25.       case unpersistRDD: SparkListenerUnpersistRDD =>
  26.         foreachListener(_.onUnpersistRDD(unpersistRDD))
  27.       case applicationStart: SparkListenerApplicationStart =>
  28.         foreachListener(_.onApplicationStart(applicationStart))
  29.       case applicationEnd: SparkListenerApplicationEnd =>
  30.         foreachListener(_.onApplicationEnd(applicationEnd))
  31.       case SparkListenerShutdown =>
  32.     }
  33.   }
复制代码

Metrics
在系统设计中,测量模块是不可或缺的组成部分。通过这些测量数据来感知系统的运行情况。

在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述如下。
  • instance 表示谁在使用metrics system, 目前已知的有master, worker, executor和client driver会创建metrics system用以测量
  • source 表示数据源,从哪里获取数据
  • sinks 数据目的地,将从source获取的数据发送到哪

Spark目前支持将测量数据保存或发送到如下目的地
  • ConsoleSink 输出到console
  • CSVSink 定期保存成为CSV文件
  • JmxSink 注册到JMX,以通过JMXConsole来查看
  • MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task运行时的测量数据
  • GraphiteSink 发送给Graphite以对整个系统(不仅仅包括spark)进行监控

下面从MetricsSystem的创建,数据源的添加,数据更新与发送几个方面来跟踪一下源码。

初始化过程
MetricsSystem依赖于由codahale提供的第三方库Metrics,可以在metrics.codahale.com找到更为详细的介绍。
以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程中就会创建MetricsSystem,具体调用关系如下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem
注册数据源,继续以SparkContext为例
  1. private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
  2.   private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
  3.   private def initDriverMetrics() {
  4.     SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
  5.     SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
  6.   }
  7. initDriverMetrics()
复制代码

数据读取
数据读取由Sink来完成,在Spark中创建的Sink子类如下图所示
下载 (5).png
读取最新的数据,以CsvSink为例,最主要的就是创建CsvReporter,启动之后会定期更新最近的数据到console。不同类型的Sink所使用的Reporter是不一样的。
  1. val reporter: CsvReporter = CsvReporter.forRegistry(registry)
  2.       .formatFor(Locale.US)
  3.       .convertDurationsTo(TimeUnit.MILLISECONDS)
  4.       .convertRatesTo(TimeUnit.SECONDS)
  5.       .build(new File(pollDir))
  6.       
  7.   override def start() {
  8.     reporter.start(pollPeriod, pollUnit)
  9.   }
复制代码

Spark中关于metrics子系统的配置文件详见conf/metrics.properties. 默认的Sink是MetricsServlet,在任务提交执行之后,输入http://127.0.0.1:4040/metrics/json会得到以json格式保存的metrics信息。



相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现
欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(1)人评论

跳转到指定楼层
355815741 发表于 2015-1-5 09:42:29
学习了,谢谢lz分享~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条