问题导读
1、Spark如何快速上手?
2、如何运行你的Spark 应用程序?
3、学习Spark,你有怎样的见解?
快速上手
本节课程提供一个使用 Spark 的快速介绍,首先我们使用 Spark 的交互式 shell(用 Python 或 Scala) 介绍它的 API。当演示如何在 Java, Scala 和 Python 写独立的程序时,看编程指南里完整的参考。
依照这个指南,首先从Spark 网站下载一个 Spark 发行包。因为我们不会使用 HDFS,你可以下载任何 Hadoop 版本的包。
Spark 的 shell 作为一个强大的交互式数据分析工具,提供了一个简单的方式来学习 API。它可以使用 Scala(在 Java 虚拟机上运行现有的 Java 库的一个很好方式) 或 Python。在 Spark 目录里使用下面的方式开始运行:
复制代码
Spark 最主要的抽象是叫Resilient Distributed Dataset(RDD) 的弹性分布式集合。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件)创建,也可以从其他的 RDDs 转换。让我们在 Spark 源代码目录从 README 文本文件中创建一个新的 RDD。
- scala> val textFile = sc.textFile("README.md")
- textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
复制代码
RDD 的 actions从 RDD 中返回值,transformations可以转换成一个新 RDD 并返回它的引用。让我们开始使用几个操作:
- scala> textFile.count() // RDD 的数据条数
- res0: Long = 126
-
- scala> textFile.first() // RDD 的第一行数据
- res1: String = # Apache Spark
复制代码
现在让我们使用一个 transformation,我们将使用 filter 在这个文件里返回一个包含子数据集的新 RDD。
- scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
- linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
复制代码
我们可以把 actions 和 transformations 链接在一起:
- scala> textFile.filter(line => line.contains("Spark")).count() // 有多少行包括 "Spark"?
- res3: Long = 15
复制代码
更多 RDD 操作
RDD actions 和 transformations 能被用在更多的复杂计算中。比方说,我们想要找到一行中最多的单词数量:
- scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
- res4: Long = 15
复制代码
首先将行映射成一个整型数值产生一个新 RDD。 在这个新的 RDD 上调用 reduce 找到行中最大的个数。map 和 reduce 的参数是 Scala 的函数串(闭包),并且可以使用任何语言特性或者 Scala/Java 类库。例如,我们可以很方便地调用其他的函数声明。 我们使用 Math.max() 函数让代码更容易理解:
- scala> import java.lang.Math
- import java.lang.Math
-
- scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
- res5: Int = 15
复制代码
Hadoop 流行的一个通用的数据流模式是 MapReduce。Spark 能很容易地实现 MapReduce:
- scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
- wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
复制代码
这里,我们结合flatMap, map和 reduceByKey 来计算文件里每个单词出现的数量,它的结果是包含一组(String, Int) 键值对的 RDD。我们可以使用 [collect] 操作在我们的 shell 中收集单词的数量:
- scala> wordCounts.collect()
- res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
复制代码
缓存
Spark 支持把数据集拉到集群内的内存缓存中。当要重复访问时这是非常有用的,例如当我们在一个小的热(hot)数据集中查询,或者运行一个像网页搜索排序这样的重复算法。作为一个简单的例子,让我们把linesWithSpark 数据集标记在缓存中:
- scala> linesWithSpark.cache()
- res7: spark.RDD[String] = spark.FilteredRDD@17e51082
-
- scala> linesWithSpark.count()
- res8: Long = 15
-
- scala> linesWithSpark.count()
- res9: Long = 15
复制代码
缓存 100 行的文本文件来研究 Spark 这看起来很傻。真正让人感兴趣的部分是我们可以在非常大型的数据集中使用同样的函数,甚至在 10 个或者 100 个节点中交叉计算。你同样可以使用 bin/spark-shell 连接到一个 cluster 来替换掉编程指南中的方法进行交互操作。
独立应用程序
现在假设我们想要使用 Spark API 写一个独立的应用程序。我们将通过使用 Scala(用 SBT),Java(用 Maven) 和 Python 写一个简单的应用程序来学习。
我们用 Scala 创建一个非常简单的 Spark 应用程序。如此简单,事实上它的名字叫 SimpleApp.scala:
- /* SimpleApp.scala */
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
-
- object SimpleApp {
- def main(args: Array[String]) {
- val logFile = "YOUR_SPARK_HOME/README.md" // 应该是你系统上的某些文件
- val conf = new SparkConf().setAppName("Simple Application")
- val sc = new SparkContext(conf)
- val logData = sc.textFile(logFile, 2).cache()
- val numAs = logData.filter(line => line.contains("a")).count()
- val numBs = logData.filter(line => line.contains("b")).count()
- println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
- }
- }
复制代码
这个程序仅仅是在 Spark README 中计算行里面包含 'a' 和包含 'b' 的次数。你需要注意将 YOUR_SPARK_HOME替换成你已经安装 Spark 的路径。不像之前的 Spark Shell 例子,这里初始化了自己的 SparkContext,我们把 SparkContext 初始化作为程序的一部分。
我们通过 SparkContext 的构造函数参入SparkConf 对象,这个对象包含了一些关于我们程序的信息。
我们的程序依赖于 Spark API,所以我们需要包含一个 sbt 文件文件,simple.sbt 解释了 Spark 是一个依赖。这个文件还要补充 Spark 依赖于一个 repository:
- name := "Simple Project"
-
- version := "1.0"
-
- scalaVersion := "2.10.4"
-
- libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
复制代码
要让 sbt 正确工作,我们需要把 SimpleApp.scala 和 simple.sbt 按照标准的文件目录结构布局。上面的做好之后,我们可以把程序的代码创建成一个 JAR 包。然后使用 spark-submit 来运行我们的程序。
- # Your directory layout should look like this
- $ find .
- .
- ./simple.sbt
- ./src
- ./src/main
- ./src/main/scala
- ./src/main/scala/SimpleApp.scala
-
- # Package a jar containing your application
- $ sbt package
- ...
- [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
-
- # Use spark-submit to run your application
- $ YOUR_SPARK_HOME/bin/spark-submit \
- --class "SimpleApp" \
- --master local[4] \
- target/scala-2.10/simple-project_2.10-1.0.jar
- ...
- Lines with a: 46, Lines with b: 23
复制代码
开始翻滚吧!
祝贺你成功运行你的第一个 Spark 应用程序!
- 要深入了解 API,可以从Spark编程指南开始,或者从其他的组件开始,例如:Spark Streaming。
- 要让程序运行在集群(cluster)上,前往部署概论。
- 最后,Spark 在 examples 文件目录里包含了Scala, Java和 Python的几个简单的例子,你可以直接运行它们:
- # For Scala and Java, use run-example:
- ./bin/run-example SparkPi
-
- # For Python examples, use spark-submit directly:
- ./bin/spark-submit examples/src/main/python/pi.py
复制代码
本文转载自:Spark Shell
|