本帖最后由 regan 于 2016-10-12 16:18 编辑
1.driver端R进程如何与JVM通信的?
2.executor端Rworker进程如何与JVM通信的?
在Spark2.X中SparkContext已经被SparkSession所代替。在sparkR.R文件中可以看到有sparkR.session方法的定义,这个方法主要是初始化并放回SparkSession对象,这个对象中握有SparkContext的引用,因此可以这样理解,在SparkR项目中,用R代码重写了Spark中的Scala代码。不过别忘了,这可是在R代码中!调度框架仍然是基于JVM的,R不可能重复制造轮子,既然有了现成的调度框架,那直接用就好了。这就涉及到了R进程与JVM进程的通信了。Driver端的R进程如何与JVM进程通信呢?先来看一看R写的SparkR的小程序,如下: #R中引入包,类似于java、scala中的importlibrary(SparkR)
#R是脚本语言,如果你写过JS代码对R语法就应该很了解了,直接用SparkR包中的spark.init方法创建SparkSession
sc <- sparkR.session(master="local[1]",sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))
#调用SQLContext.R中的read.text方法,读取本地的文本
lines <- read.text("file:///home/spark/test")
#直接打印
head(lines) | 保存文件为test.R 使用./spark-submit test.R命令提交脚本执行。
结果如下所示:
16/10/11 17:22:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/11 17:22:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
value
1 1 2 3 4 5
2 6 7 8 9 10
3 11 12 13 14 15
4 16 17 18 19 20
5 21 22 23 24 25
6 26 27 28 29 30
可以看到调用head函数,直接打印出了文本中的前6行数据!看到这里是不是觉得使用R脚本编写代码比java/scala更简洁!SparkR的确是统计学家的福音啊!!扯远了!赶紧回到正题,在Driver端R进程如何与JVM进程通信的。使用spark-submit提交R脚本,会调用SparkSubmit类进行处理,当判断提交的为R脚本时,是初始化RRunner线程并在线程中启动R进程。R进程将运行上面的test.R脚本。看一下sparkR.session方法的定义: sparkR.session <- function( master = "",#设置master,同spark一样
appName = "SparkR",#设置appName,默认为SparkR
sparkHome = Sys.getenv("SPARK_HOME"),#得到SPARK_HOME环境变量
sparkConfig = list(),#sparkConfig为一个列表,默认为空列表
sparkJars = "",#依赖的sparkJar
sparkPackages = "",#依赖的包
enableHiveSupport = TRUE,#是否启用hive支持,...表示还有任意多个变量
...) {
sparkConfigMap <- convertNamedListToEnv(sparkConfig)#转换配置为map对象,保存于sparkConfigMap
namedParams <- list(...)
if (length(namedParams) > 0) {
paramMap <- convertNamedListToEnv(namedParams)
# Override for certain named parameters
if (exists("spark.master", envir = paramMap)) {
master <- paramMap[["spark.master"]]
}
if (exists("spark.app.name", envir = paramMap)) {
appName <- paramMap[["spark.app.name"]]
}
overrideEnvs(sparkConfigMap, paramMap)
}
# do not download if it is run in the sparkR shell
if (!nzchar(master) || is_master_local(master)) {
if (!is_sparkR_shell()) {
if (is.na(file.info(sparkHome)$isdir)) {
msg <- paste0("Spark not found in SPARK_HOME: ",
sparkHome,
" .\nTo search in the cache directory. ",
"Installation will start if not found.")
message(msg)
packageLocalDir <- install.spark()
sparkHome <- packageLocalDir
} else {
msg <- paste0("Spark package is found in SPARK_HOME: ", sparkHome)
message(msg)
}
}
}
if (!exists(".sparkRjsc", envir = .sparkREnv)) {
sparkExecutorEnvMap <- new.env()
#注意!!!!在创建SparkSession之前,必须先新建SparkContext,sparkR.sparkContext负责创建JavaSparkContext。并在该方法中与JVM建立连接
sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap,
sparkJars, sparkPackages)
stopifnot(exists(".sparkRjsc", envir = .sparkREnv))
}
#如果.sparkREnv环境变量中存在.sparkRsession
if (exists(".sparkRsession", envir = .sparkREnv)) {
sparkSession <- get(".sparkRsession", envir = .sparkREnv)
# Apply config to Spark Context and Spark Session if already there
# Cannot change enableHiveSupport
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"setSparkContextSessionConf",
sparkSession,
sparkConfigMap)
} else {#不存在则调用callJStatic方法,调用SQLUtils类中的getOrCreateSparkSession方法创建SparkSession
jsc <- get(".sparkRjsc", envir = .sparkREnv)
sparkSession <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"getOrCreateSparkSession",
jsc,
sparkConfigMap,
enableHiveSupport)
assign(".sparkRsession", sparkSession, envir = .sparkREnv)
}
sparkSession
} | 上面的代码中,我注解了R与JVM的连接,在sparkR.sparkContext方法中,有如下的代码(代码有点长,可是别紧张!!):
sparkR.sparkContext <- function( master = "",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvirMap = new.env(),
sparkExecutorEnvMap = new.env(),
sparkJars = "",
sparkPackages = "") {
#判断.sparkREnv环境变量中是否已经存在.sparkRjsc,如果已经存在了,则使用get方法从.sparkREnv中取出.sparkRjsc对象并返回
if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat(paste("Re-using existing Spark Context.",
"Call sparkR.session.stop() or restart R to create a new Spark Context\n"))
return(get(".sparkRjsc", envir = .sparkREnv))
}
#处理spark的jar包
jars <- processSparkJars(sparkJars)
packages <- processSparkPackages(sparkPackages)#处理依赖包
#从环境变量中取出EXISTING_SPARK_BACKEND_PORT的值,如果没有默认返回空串
existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
#判断existingPort是否为空串,使用spark-submit提交的R脚本,会调用RRunner,在RRunner中会设置为非空值
if (existingPort != "") {
if (length(packages) != 0) {
warning(paste("sparkPackages has no effect when using spark-submit or sparkR shell",
" please use the --packages commandline instead", sep = ","))
}
#将存在的端口号赋值给backendPort
backendPort <- existingPort
} else {#sparkR命令行交互中运行
#以backend_port创建临时文件
path <- tempfile(pattern = "backend_port")
#如果是用sparkR脚本运行,则会提交SPARK_SUBMIT_ARGS参数,该参数的值为sparkr-shell
submitOps <- getClientModeSparkSubmitOpts(
Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
sparkEnvirMap)
#调用spark-submit脚本,初始化JVM。其实从这里可以看出,在repl命令行中运行R命令,最终也是会转换为运行spark-submit脚本的,而spark-submit脚本的运行将会初始化JVM
launchBackend(
args = path,
sparkHome = sparkHome,
jars = jars,
sparkSubmitOpts = submitOps,
packages = packages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
Sys.sleep(wait)
if (file.exists(path)) {
break
}
wait <- wait * 1.25
}
if (!file.exists(path)) {
stop("JVM is not ready after 10 seconds")
}
f <- file(path, open = "rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
length(monitorPort) == 0 || monitorPort == 0 ||
length(rLibPath) != 1) {
stop("JVM failed to launch")
}
#注意!!!,R与JVM的连接就是在这里建立的啦!!!R中的socketConnection将会与服务端建立连接,而这里的连接是RBackend,是由RRunner启动的!这里的.monitorcOnn为监控连接
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
if (rLibPath != "") {
assign(".libPath", rLibPath, envir = .sparkREnv)
.libPaths(c(rLibPath, .libPaths()))
}
}
#在次调用connectBackend方法,与本地启动的JVM建立连接!
.sparkREnv$backendPort <- backendPort
tryCatch({
connectBackend("localhost", backendPort)
},
error = function(err) {
stop("Failed to connect JVM\n")
})
if (nchar(sparkHome) != 0) {
sparkHome <- suppressWarnings(normalizePath(sparkHome))
}
if (is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
paste0("$LD_LIBRARY_PATH:", Sys.getenv("LD_LIBRARY_PATH"))
}
# Classpath separator is ";" on Windows
# URI needs four /// as from http://stackoverflow.com/a/18522792
if (.Platform$OS.type == "unix") {
uriSep <- "//"
} else {
uriSep <- "////"
}
localJarPaths <- lapply(jars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
# Set the start time to identify jobjs
# Seconds resolution is good enough for this purpose, so use ints
assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
#R与JVM建立连接后,调用callJStatic调用org.apache.spark.api.r.RRDD中的createSparkContext方法创建JavaSparkContext,并将该对象赋值给.sparkRjsc变量。
assign(
".sparkRjsc",
callJStatic(
"org.apache.spark.api.r.RRDD",
"createSparkContext",
master,
appName,
as.character(sparkHome),
localJarPaths,
sparkEnvirMap,
sparkExecutorEnvMap),
envir = .sparkREnv
)
sc <- get(".sparkRjsc", envir = .sparkREnv)
# Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
#返回JavaSparkContext的R对象
sc
}
|
总结:R语言的强大之处在于其丰富的统计功能,以及大量的第三方包。R本身不能进行分布式计算,而且R语言的底层构架也是只支持单线程的。sparkR的出现,将Spark优秀的分布式扩展能力,以及R的统计功能完美的结合,虽然在R于JVM通信会有性能的开销,但是SparkR的确为统计学家及R社区注入了活力。现在SparkR可以分布与几千台计算机上进行分布式并行的计算,可以处理PB级的数据量,而且SparkR中的机器学习、数据挖掘库在减少建模难度的同时,也为企业在最短的时间带来最大的效益!
所以,没事搞搞R,搞搞Spark还是不错的!
|
|