问题导读:
1.如何下载spark-jobserver?
2.如何启动jobserver?
3.Rest如何提交job?
Job Server形式的rest service
https://github.com/ooyala/spark-jobserver
一、安装和启动jobserver
1、git clone spark-jobserver
2、进入该目录,敲sbt
sbt安装见 http://www.scala-sbt.org/release ... l-Installation.html
3、re-start,在本机启动jobserver
- re-start --- -XX:PermSize=512M
复制代码
web访问:localhost:8090
二、上传jar包
1、上传示例- sbt job-server-tests/package
复制代码
- curl --data-binary @job-server-tests/target/job-server-tests-0.3.1.jar localhost:8090/jars/test
复制代码
- curl localhost:8090/jars/
复制代码
2、提交job
- curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
-
- #返回
- {
- "status": "STARTED",
- "result": {
- "jobId": "d56eadea-5a73-46f4-803e-418e6f5b990f",
- "context": "f6a1c9b7-spark.jobserver.WordCountExample"
- }
复制代码
3、查看结果
- curl localhost:8090/jobs/d56eadea-5a73-46f4-803e-418e6f5b990f
复制代码
三、预先启动Context
- curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m'
复制代码
(1)配置文件配置
- ~/app/spark-jobserver/config# vim local.conf.template
复制代码
(2)url参数配置
- POST /contexts/my-new-context?num-cpu-cores=10
复制代码
四、部署1、复制config/local.sh.template到environment.sh ,并且设置相关参数。
- ~/app/spark-jobserver/config# cp local.sh.template ~/app/spark-jobserver-src/bin/config/cfg.sh
- ~/app/spark-jobserver-src/bin# ./server_deploy.sh cfg
复制代码
cfg.sh
- DEPLOY_HOSTS="192.168.2.215"
-
- APP_USER=root
- APP_GROUP=root
- INSTALL_DIR=/root/app/spark-job-server
- LOG_DIR=/var/log/job-server
- SPARK_HOME=/root/app/spark-1.0.0-bin-hadoop2
复制代码
2、运行bin/server_deploy.sh ,将打好包后与相关配置一起放到指定服务器的目录 出现Error during sbt execution: java.lang.OutOfMemoryError: PermGen space
- ~/app/spark-jobserver-src/bin# cat /bin/sbt
- SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512m"
- java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"
- ~/app/spark-jobserver-src/bin# vim /bin/sbt
复制代码
3、进入服务器指定目录,运行server_start.sh deploy可能有问题,需要把config下的local.conf.template 放到 server的spark-job-server目录下,改名为local.conf,另外,把cfg.sh 拷贝到 spark-job-server目录下 改名为 settings.sh
五、创建JobServer工程(1)创建sbt项目 新建目录JobServer,建立build.sbt,内容如下
- name := "job server demo"
-
- version := "1.0"
-
- scalaVersion := "2.10.4"
-
- resolvers += "Ooyala Bintray" at "http://dl.bintray.com/ooyala/maven"
-
- libraryDependencies += "ooyala.cnd" % "job-server" % "0.3.1" % "provided"
-
- libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.0.0"
复制代码
sbt添加eclipse插件
- C:\Users\Administrator\.sbt\0.13\plugins\plugins.sbt
- #如果没有则创建,添加
- addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
-
- #然后
- sbt,sbt update
- eclipse
复制代码
(2)继承SparkJob 继承SparkJob,重写validate与runJob
- val str = "a a a b b c"
- val rdd = sc.parallelize(str.split(" ").toSeq)
- val rsList = rdd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collect
- rsList(0)._2
复制代码
完整代码
- import com.typesafe.config.{Config, ConfigFactory}
- import org.apache.spark._
- import org.apache.spark.SparkContext._
- import scala.util.Try
- import spark.jobserver.SparkJob
- import spark.jobserver.SparkJobValidation
- import spark.jobserver.SparkJobValid
- import spark.jobserver.SparkJobInvalid
-
- object WordCount extends SparkJob{
- def main(args: Array[String]) {
- val sc = new SparkContext("local[4]", "WordCountExample")
- val config = ConfigFactory.parseString("")
- val results = runJob(sc, config)
- println("Result is " + results)
- }
-
- override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
- Try(config.getString("input.string"))
- .map(x => SparkJobValid)
- .getOrElse(SparkJobInvalid("No input.string config param"))
- }
-
- override def runJob(sc: SparkContext, config: Config): Any = {
- val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq)
- val rsList = dd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collect
- rsList(0)._2
- }
- }
复制代码
打包出jar
- sbt package
-
- d:\scalaWorkplace\JobServer>sbt package
- [info] Loading global plugins from C:\Users\Administrator\.sbt\0.13\plugins
- [info] Set current project to job server demo (in build file:/D:/scalaWorkplace/JobServer/)
- [info] Compiling 1 Scala source to D:\scalaWorkplace\JobServer\target\scala-2.10\classes...
- [info] 'compiler-interface' not yet compiled for Scala 2.10.1. Compiling...
- [info] Compilation completed in 38.446 s
- [info] Packaging D:\scalaWorkplace\JobServer\target\scala-2.10\job-server-demo_2.10-1.0.jar ...
- [info] Done packaging.
- [success] Total time: 45 s, completed 2014-7-12 16:59:06
复制代码
(3)提交jar
(4)测试
- curl -i -d "input.string=a a a b b c" 'localhost:8090/jobs?appName=example&classPath=com.persia.spark.WordCount'
-
- HTTP/1.1 202 Accepted
- Server: spray-can/1.2.0
- Date: Sat, 12 Jul 2014 09:22:26 GMT
- Content-Type: application/json; charset=UTF-8
- Content-Length: 150
-
- {
- "status": "STARTED",
- "result": {
- "jobId": "b5b2e80f-1992-471f-8a5d-44c08c3a9731",
- "context": "6bd9aa29-com.persia.spark.WordCount"
- }
- }
复制代码
查看结果
- curl localhost:8090/jobs/b5b2e80f-1992-471f-8a5d-44c08c3a9731
复制代码
报找不到方法,修改源代码的
Dependencies.scala
- root@caibosi:~/app/spark-jobserver/project# ls
- Assembly.scala Build.scala Dependencies.scala build.properties plugins.sbt project target
- root@caibosi:~/app/spark-jobserver/project# vim Dependencies.scala
-
-
- lazy val sparkDeps = Seq(
- "org.apache.spark" %% "spark-core" % "1.0.0" exclude("io.netty", "netty-all"),
- // Force netty version. This avoids some Spark netty dependency problem.
- "io.netty" % "netty" % "3.6.6.Final"
- )
复制代码
|