本帖最后由 sunshine_junge 于 2014-12-28 20:07 编辑
问题导读:
1.Spark如何部署到yarn?
2.Spark如何基于Scala进行开发?
下载
虽然Spark已经提供了针对Hadoop1,2、CDH3,4,5的安装包, 但为了便于针对特定版本灵活升级, 这里使用源码进行编译安装。
编译 这里使用Maven进行编译。 首先需要进行内存设置
- export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
复制代码
其次因为Spark(1.0.2)与Hadoop(2.3.0)所使用的Protocol Buffers版本不一致会造成不能正确读取HDFS文件, 所以需要对pom.xml进行相应修改。 - <!--<protobuf.version>2.4.1</protobuf.version>-->
- <protobuf.version>2.5.0</protobuf.version>
复制代码
然后指定Hadoop版本进行编译 - mvn -Pyarn -Dhadoop.version=2.3.0-cdh5.1.0 -DskipTests clean package
复制代码
部署 按照Hadoop的安装规范进行安装 - cp -r spark-1.0.2 /usr/local/cloud/src/
- cd /usr/local/cloud/
- ln -sf /usr/local/cloud/src/spark-1.0.2 spark
复制代码
相关配置- $SPARK_HOME/conf/spark-env.sh
复制代码
- export JAVA_HOME=/usr/java/default
- export HADOOP_HOME=/usr/local/cloud/hadoop
- export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
- export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
- export SPARK_HOME=/usr/local/cloud/spark
- export SPARK_LOCAL_DIRS=/data/spark
- export SPARK_LOG_DIR=/data/logs/spark
- export SPARK_PID_DIR=/data/pids/spark
- export SPARK_LIBRARY_PATH=.:$JAVA_HOME/lib:$JAVA_HOME/jar/lib:$HADOOP_HOME/lib/native
-
- # 如果启用了LZO压缩可以使用此配置 或者在启动时使用 --driver-class-path 进行指定
- export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/share/hadoop/common/hadoop-lzo-0.4.20-SNAPSHOT.jar
-
- # 考虑到使用虚拟机NameNode的压力, 这里找了一个DataNode作为Master
- export SPARK_MASTER_IP=dn2.hadoop.website.com
复制代码
集群的节点信息 复制代码
- dn1.hadoop.website.com
- dn2.hadoop.website.com
- nn1.hadoop.website.com
复制代码
测试
启动Spark集群
- cd $SPARK_HOME
- sbin/start-all.sh
复制代码
可以看到如下进程 - [hadoop@dn2 spark]$ jps
- 32097 Worker
- 31931 Master
复制代码
本地测试
复制代码
可以从输出中找到"Pi is roughly 3.13718"
HDFS测试
- # 上传文件到HDFS
- hadoop fs -put README.md
-
- # 启动spark-shell
- bin/spark-shell
-
- # 加载HDFS中的文件
- scala> val textFile = sc.textFile("hdfs://nn1.hadoop.website.com:9000/user/hadoop/README.md")
- textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
-
- # 统计行数
- scala> textFile.count()
- res0: Long = 127
-
- # 输入首行内容
- scala> textFile.first()
- res1: String = # Apache Spark
-
- # 测试WordCount
- scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
- wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at <console>:14
-
- # 输入统计结果
- scala> wordCounts.collect()
- res2: Array[(String, Int)] = Array((means,1), (under,2), (this,4), (Because,1), (Python,2), (agree,1), (cluster.,1), (its,1), (YARN,,3), (have,2), (pre-built,1), (MRv1,,1), (locally.,1), (locally,2), (changed,1), (several,1), (only,1), (sc.parallelize(1,1), (This,2), (basic,1), (first,1), (requests,1), (documentation,1), (Configuration,1), (MapReduce,2), (without,1), (setting,1), ("yarn-client",1), ([params]`.,1), (any,2), (application,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (version,3), (file,1), (documentation,,1), (test,1), (MASTER,1), (entry,1), (example,3), (are,2), (systems.,1), (params,1), (scala>,1), (<artifactId>hadoop-client</artifactId>,1), (refer,1), (configure,1), (Interactive,2), (artifact,1), (can,7), (file's,1), (build,3), (when,2), (2.0.X,,1), (Apac...
复制代码
YARN测试
- bin/spark-submit
- --master yarn
- --class org.apache.spark.examples.SparkPi
- examples/target/scala-2.10/spark-examples-1.0.2-hadoop2.3.0-cdh5.1.0.jar
复制代码
启动后可以在YARN的管理界面(http://http://nn1.hadoop.qyer.com:8088/cluster)看到正在执行的任务
完成后可以在输出中看到计算结果"Pi is roughly 3.14558"
集群测试
- bin/spark-submit --master spark://dn2.hadoop.website.com:7077 --class org.apache.spark.examples.SparkPi examples/target/scala-2.10/spark-examples-1.0.2-hadoop2.3.0-cdh5.1.0.jar
复制代码
完成后可以在输出中看到计算结果"Pi is roughly 3.14164"
开发Eclipse
Eclipse使用的是Scala IDE, 可以通过MarketPlace进行安装
不过这里安装的是基于最新版本的Scala, 如果对版本有需要(比如spark1.0.2基于scala2.10.3), 可以通过Install Software手工安装指定版本的Scala IDE
基于2.11.x 基于2.10.x 基于2.9.x
新建scala项目 File -> New -> Scala Project 添加编译好的spark集成包($SPARK_HOME/assembly/target/scala-2.10/spark-assembly-1.0.2-hadoop2.3.0-cdh5.1.0.jar)
IDEA
安装Scala插件, 创建Scala项目, 之后同Eclipse类似, 添加集成包到项目依赖即可
WordCount
- package org.matrix.spark.test
-
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
-
- object SparkTest {
- def main(args: Array[String]) {
- if (args.length != 2) {
- println("Usage: <input> <output>")
- return
- }
-
- val sparkConf = new SparkConf().setAppName("WordCount")
-
- val sc = new SparkContext(sparkConf)
- val textFile = sc.textFile(args(0))
- val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
- wordCounts.saveAsTextFile(args(1))
- }
- }
复制代码
运行
将项目打成jar传到集群, 使用以下命令进行测试 - # 在yarn上运行
- $SPARK_HOME/bin/spark-submit --class org.matrix.spark.test.SparkTest /home/hadoop/spark_test.jar --master yarn README.md spark_test
复制代码
- # 在集群上运行
- $SPARK_HOME/bin/spark-submit --class org.matrix.spark.test.SparkTest /home/hadoop/spark_test.jar --master spark://dn2.hadoop.website.com:7077 README.md spark_test
复制代码
完成之后可以在HDFS上看到运行结果
引用:http://matrix-lisp.github.io/blog/2014/08/21/spark-on-yarn/
|