分享

Apache Spark源码走读之17 -- 如何进行代码跟读

本帖最后由 pig2 于 2015-1-6 14:16 编辑

问题导读

1.Spark基于Akka来进行消息交互,那如何知道谁是接收方呢?
2.对代码作了修改之后,如果并不想提交代码,那该如何将最新的内容同步到本地呢?










概要
今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢?

new Throwable().printStackTrace
代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁。但有时苦于对spark系统的了解程度不深,或者对scala认识不够,一时半会之内无法找到答案,那么有没有什么简便的办法呢?
我的办法就是在日志出现的地方加入下面一句话
  1. new Throwable().printStackTrace()
复制代码

现在举一个实际的例子来说明问题。
比如我们在启动spark-shell之后,输入一句非常简单的sc.textFile("README.md"),会输出下述的log
  1. 14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
  2. 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
  3. 14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms
  4. 14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms
  5. res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
复制代码


那我很想知道是第二句日志所在的tryToPut函数是被谁调用的该怎么办?
办法就是打开MemoryStore.scala,找到下述语句
  1. logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
  2.           blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
复制代码

在这句话之上,添加如下语句
  1. new Throwable().printStackTrace()
复制代码
然后,重新进行源码编译
  1. sbt/sbt assembly
复制代码

再次打开spark-shell,执行sc.textFile("README.md"),就可以得到如下输出,从中可以清楚知道tryToPut的调用者是谁
  1. 14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
  2. 14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code
  3. java.lang.Throwable
  4.   at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182)
  5.   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)
  6.   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)
  7.   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699)
  8.   at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570)
  9.   at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821)
  10.   at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52)
  11.   at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
  12.   at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)
  13.   at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  14.   at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787)
  15.   at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556)
  16.   at org.apache.spark.SparkContext.textFile(SparkContext.scala:468)
  17.   at $line5.$read$iwC$iwC$iwC$iwC.(:13)
  18.   at $line5.$read$iwC$iwC$iwC.(:18)
  19.   at $line5.$read$iwC$iwC.(:20)
  20.   at $line5.$read$iwC.(:22)
  21.   at $line5.$read.(:24)
  22.   at $line5.$read$.(:28)
  23.   at $line5.$read$.()
  24.   at $line5.$eval$.(:7)
  25.   at $line5.$eval$.()
  26.   at $line5.$eval.$print()
  27.   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  28.   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  29.   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  30.   at java.lang.reflect.Method.invoke(Method.java:483)
  31.   at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
  32.   at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
  33.   at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
  34.   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
  35.   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
  36.   at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
  37.   at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
  38.   at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
  39.   at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
  40.   at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
  41.   at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
  42.   at org.apache.spark.repl.SparkILoop$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
  43.   at org.apache.spark.repl.SparkILoop$anonfun$process$1.apply(SparkILoop.scala:884)
  44.   at org.apache.spark.repl.SparkILoop$anonfun$process$1.apply(SparkILoop.scala:884)
  45.   at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
  46.   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
  47.   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
  48.   at org.apache.spark.repl.Main$.main(Main.scala:31)
  49.   at org.apache.spark.repl.Main.main(Main.scala)
  50.   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  51.   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  52.   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  53.   at java.lang.reflect.Method.invoke(Method.java:483)
  54.   at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
  55.   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
  56.   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  57. 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
  58. 14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms
  59. 14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms
  60. res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
复制代码

git同步
对代码作了修改之后,如果并不想提交代码,那该如何将最新的内容同步到本地呢?
  1. git reset --hard
  2. git pull origin master
复制代码


Akka消息跟踪
追踪消息的接收者是谁,相对来说比较容易,只要使用好grep就可以了,当然前提是要对actor model有一点点了解。
还是举个实例吧,我们知道CoarseGrainedSchedulerBackend会发送LaunchTask消息出来,那么谁是接收方呢?只需要执行以下脚本即可。
  1. grep LaunchTask -r core/src/main
复制代码

从如下的输出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到该函数之后的业务处理,只需要去看看接收方的receive函数即可。
  1. core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:    case LaunchTask(data) =>
  2. core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:        logError("Received LaunchTask command but executor was null")
  3. core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
  4. core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
复制代码

小结
今天的内容相对简单,没有技术含量,自己做个记述,免得时间久了,不记得。


相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解


Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现










欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条