pig2 发表于 2014-7-12 19:39:07

Spark1.0.0 编程模型


问题导读:


1.spark应用程序包含哪两个部分?
2.driver部分的作用是什么?
3.executor部分的作用是什么?
4.executor对于处理的数据分为几种?

static/image/hrline/4.gif





Spark Application可以在集群中并行运行,其关键是抽象出RDD的概念,也使得Spark Application的开发变得简单明了。下图浓缩了Spark的编程模型。


   
1:Spark应用程序的结构
      Spark应用程序可分两部分:driver部分和executor部分初始化SparkContext和主体程序

A:driver部分
      driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;

在executor部分运行完毕后,需要将SparkContext关闭。driver部分的基本代码框架如下:


package week2

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object WordCount1 {
def main(args: Array) {
    if (args.length == 0) {
      System.err.println("Usage: bin/spark-submit --class week2.WordCount1 WordCount.jar <file1> ")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("WordCount1")
    val sc = new SparkContext(conf)

    。。。//executor部分

    sc.stop()
}
}
要注意的是,Spark1.0.0由于采用了spark-submit统一的应用程序提交工具,代码上有所改变:

[*]不需要在代码里将应用程序本身通过addJars上传给资源管理器
[*]增加了history server,需要在代码末尾关闭SparkContext,才能将完整的运行信息发布到history server。


B:executor部分
      Spark应用程序的executor部分是对数据的处理,数据分三种:

[*]原生数据,包含输入的数据和输出的数据

[*]对于输入原生数据,Spark目前提供了两种:

[*]scala集合数据集,如Array(1,2,3,4,5),Spark使用parallelize方法转换成RDD。
[*]hadoop数据集,Spark支持存储在hadoop上的文件和hadoop支持的其他文件系统,如本地文件、HBase、SequenceFile和Hadoop的输入格式。例如Spark使用txtFile方法可以将本地文件或HDFS文件转换成RDD。
[*]对于输出数据,Spark除了支持以上两种数据,还支持scala标量

[*]生成Scala标量数据,如count(返回RDD中元素的个数)、reduce、fold/aggregate;返回几个标量,如take(返回前几个元素)。
[*]生成Scala集合数据集,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)。
[*]生成hadoop数据集,如saveAsTextFile、saveAsSequenceFile
[*]RDD,Spark进行并行运算的基本单位。RDD提供了四种算子:

[*]输入算子,将原生数据转换成RDD,如parallelize、txtFile等
[*]转换算子,最主要的算子,是Spark生成DAG图的对象,转换算子并不立即执行,在触发行动算子后再提交给driver处理,生成DAG图 -->Stage --> Task--> Worker执行。按转化算子在DAG图中作用,可以分成两种:

[*]窄依赖算子

[*]输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatMap;
[*]输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce;
[*]从输入中选择部分元素的算子,如filter、distinct、subtract、sample。
[*]宽依赖算子,宽依赖会涉及shuffle类,在DAG图解析时以此为边界产生Stage,如图所示。

[*]对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;
[*]对两个RDD基于key进行join和重组,如join、cogroup。
[*]缓存算子,对于要多次使用的RDD,可以缓冲加快运行速度,对重要数据可以采用多备份缓存。
[*]行动算子,将运算结果RDD转换成原生数据,如count、reduce、collect、saveAsTextFile等。
[*]共享变量,在Spark运行时,一个函数传递给RDD内的patition操作时,该函数所用到的变量在每个运算节点上都复制并维护了一份,并且各个节点之间不会相互影响。但是在Spark Application中,可能需要共享一些变量,提供Task或驱动程序使用。Spark提供了两种共享变量:

[*]广播变量,可以缓存到各个节点的共享变量,通常为只读,使用方法:



scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
scala> broadcastVar.value

[*]累计器,只支持加法操作的变量,可以实现计数器和变量求和。用户可以调用SparkContext.accumulator(v)创建一个初始值为v的累加器,而运行在集群上的Task可以使用“+=”操作,但这些任务却不能读取;只有驱动程序才能获取累加器的值。使用方法:
cala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
2:例程示范
      下面拿一个简单的例子WorCount来示例:



3:Spark应用程序的多语言编程
      Spark提供了Scala、Python、Java开发API。用户可以根据自己的喜好选择相应的编程语言和工具。建议使用Scala和IntelliJ IDEA开发。




页: [1]
查看完整版本: Spark1.0.0 编程模型