本帖最后由 xioaxu790 于 2014-10-16 19:44 编辑
问题导读
1、你如何理解Spark的应用场景?
2、Spark的 "reduce"和MapReduce有什么区别?
3、为什么Spark比MapReduce更受欢迎呢?
Spark应用源码下载见: Github
Apache的Spark是通用的提供类似Hadoop的MapReduce集群计算框架,拥有强大的抽象处理大型数据集。对于涉及到性能,功能和API种种原因,Spark正在变得比MapReduce更受欢迎。
该源码将教会你如何使用Scala学习如何编写,编译和运行一个Spark简单的应用程序。这个案例是一个单词计数WordCount, 这是最经典的MapReduce应用,在字数统计这个案例中,我们的目标是在在我们的语料库的字母中找出最流行的词汇分布。
1. 读入一个文本文档的输入。
2.统计每个单词出现的次数。
3.筛选少于1百万次的所有单词。
3.对于剩余的结果,统计每个字母的次数。
在MapReduce的,这将需要两个MapReduce工作以及在它们之间持续中间数据到HDFS。相反在Spark中,你可以用更少的代码大约90%行写一个作业。
我们的输入文档是剥离标点符号的一个巨大的文本文件。完整的Scala程序看起来像这样:
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
-
- object SparkWordCount {
- def main(args: Array[String]) {
- val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
- val threshold = args(1).toInt
-
- // split each document into words
- val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
-
- // count the occurrence of each word
- val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
-
- // filter out words with less than threshold occurrences
- val filtered = wordCounts.filter(_._2 >= threshold)
-
- // count characters
- val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
-
- System.out.println(charCounts.collect().mkString(", "))
- }
- }
复制代码
Spark 使用 "lazy evaluation", 意味着转换只有一个action操作被调用时才会在集群中执行,action操作在这个案例是collect收集,将数据拉到客户端然后saveAsTextFile, 也就是将数据写到一个类似HDFS文件系统中.
值得注意的是,Spark的 "reduce"稍微区别于MapReduce. 在MapReduce一个reduce函数调用接受一个规定Key的所有记录,而在Spark,只接受两个参数,Spark的reduce类似mao的groupBy操作。
详细配置见英文
Spark一个简单案例Spark是一个类似Map-Reduce的集群计算框架,用于快速进行数据分析。
在这个应用中,我们以统计包含"the"字符的行数为案例,.为建立这个应用,我们使用 Spark 0.9.1, Scala 2.10.3 & sbt 0.13.0.
在构建这个应用之前,必须准备:-
1). 下载 Spark 0.9.1.
2). 解压Unzip
3). 到 Spark目录
4) 运行 ./sbt/sbt assembly
为了使用 sbt 成功构建Spark,我们需要sbt 0.13.0 或其以后版本必须首先已经安装就绪。
在构建Spark以后,我们开始建立我们的这个应用案例,下面步骤:
1). 运行 mkdir SimpleSparkProject.
2). 创建一个.sbt 文件,在目录 SimpleSparkProject/simple.sbt
- name := "Simple Project"
-
- version := "1.0"
-
- scalaVersion := "2.10.3"
-
- libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"
-
- resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
复制代码
3). 创建代码文件:SimpleSparkProject/src/main/scala/SimpleApp.scala
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
-
- object SimpleApp {
- def main(args: Array[String]) {
- val logFile = "src/data/sample.txt"
- val sc = new SparkContext("local", "Simple App", "/path/to/spark-0.9.1-incubating",
- List("target/scala-2.10/simple-project_2.10-1.0.jar"))
- val logData = sc.textFile(logFile, 2).cache()
- val numTHEs = logData.filter(line => line.contains("the")).count()
- println("Lines with the: %s".format(numTHEs))
- }
- }
复制代码
4). 然后到SimpleSparkProject 目录
5). 运行 sbt package
6). 运行 sbt run
下载这个演示应用,可以按here.
|