Spark 高级分析:第十二章第1-3节 深入Spark
本帖最后由 feilong 于 2019-2-22 08:14 编辑问题导读
1.Spark驱动器和执行器分布有什么用,二者有什么关系?
2.Spark如何序列化,用了什么技术?
3.什么是累加器,如何使用?
Spark 高级分析:第十一章第3节 用Thunder对神经元类型进行分类
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26714
第1节Spark执行模型
在转换、操作和RDD层次上理解Spark对于编写Spark程序至关重要。理解Spark的底层执行模型对于编写好的Spark程序至关重要,因为它可以理解程序的性能特征、调试错误和速度慢,以及解释用户界面。
Spark应用程序由一个驱动程序进程(在Spark Shell的例子中)和一组分散在集群节点上的执行程序进程组成,驱动程序进程是用户正在交互的进程。驱动者负责需要完成作业的高级控制流程。执行者进程负责以任务的形式执行这项工作,并存储用户选择缓存的任何数据。驱动程序和执行器通常都会在应用程序运行的整个时间内保持不动。一个执行器有许多用于运行任务的槽,并且在它的整个生命周期中将并发地运行许多个槽。
在执行模型的顶部是作业。在Spark应用程序中调用一个操作会触发一个Spark作业的启动来完成它。为了确定此作业的外观,Spark检查了操作所依赖的RDD图,并制定了一个执行计划,该计划从计算最远的RDD开始,最终得到生成操作结果所需的RDD。执行计划包括将作业的转换组装到各个阶段。阶段对应于所有执行相同代码的任务集合,每个任务都在数据的不同分区上执行。每个阶段包含一个可以在不改变完整数据的情况下完成的转换序列。
什么决定了数据是否需要无序排列?对于由所谓的窄转换(如map)返回的RDD,计算单个分区所需的数据驻留在父RDD中的单个分区中。每个对象只依赖于父对象中的单个对象。但是,spark还支持具有广泛依赖性的转换,如groupbykey和reducebykey。在这些中,计算单个分区所需的数据可能驻留在父RDD的许多分区中。具有相同键的所有元组必须以相同分区结束。为了满足这些操作,spark必须执行一个shuffle,它在集群周围传输数据,并用一组新的分区产生一个新的阶段。
例如,下面的代码将在一个阶段中执行,因为这三个操作的输出都不依赖于来自不同分区的数据,而不是它们的输入。
sc.textFile("someFile.txt").
map(mapFunc).
flatMap(flatMapFunc).
filter(filterFunc).
count()
下面的代码将分为三个阶段,它查找每个字符在文本文件中出现1000次以上的所有单词中出现的次数。reduceByKey操作会导致阶段边界,因为计算它们的输出需要按键重新划分数据。
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).
reduceByKey(_ + _)
charCounts.collect()
在每个阶段边界,数据由父阶段的任务写入磁盘,然后由子阶段的任务通过网络获取。因此,阶段边界可能很昂贵,并且应尽可能避免。父阶段中的数据分区数可能与子阶段中的分区数不同。可能触发阶段边界的转换通常接受一个numPartitions参数,该参数决定要在子阶段中将数据拆分成多少个分区。正如reducer的数量是优化mapreduce作业的一个重要参数,在阶段边界上调整分区的数量通常会影响或破坏应用程序的性能。选择太少的分区会导致在每个任务被强制处理太多数据时的缓慢性。任务完成所需的时间通常随分配给它的数据的大小呈非线性增加,因为当聚合操作的数据不适合内存时,聚合操作必须溢出到磁盘上。另一方面,当按目标分区对记录进行排序时,大量分区会导致父端任务的开销增加,并且与在子端调度和启动每个任务相关联的开销也会增加。
第2节序列化
作为一个分布式系统,SPark经常需要序列化它所运行的原始Java对象。当数据以序列化格式缓存、通过网络传输以进行无序处理时,Spark需要RDD内容的字节流表示。Spark接受一个可插入的序列化程序来定义这个序列化和反序列化。默认情况下,Spark使用Java对象序列化,它可以序列化实现可区分接口的任何Java对象。几乎总是,Spark应该配置为使用kryo序列化。kryo定义了一种更紧凑的格式,可以更快地序列化和反序列化。“catch”是指,为了获得这种效率,kryo需要预先注册在应用程序中定义的任何自定义类。kryo仍然可以在不注册类的情况下工作,但是序列化将占用更多的空间和时间,因为必须在每个记录之前写出类名。打开kryo并在代码中注册类,如下所示:
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(
Array(classOf, classOf))
类也可以通过配置注册到kryo。使用SparkShell时,这是唯一的方法。类似下面的内容可以放在SparkDefaults.conf:
spark.kryo.classesToRegister=org.myorg.MyCustomClass1,org.myorg.MyCustomClass2
spark.serializer=org.apache.spark.serializer.KryoSerializer
像graphx和mllib这样的spark库可能有自己的一组自定义类,使用实用方法将它们全部注册:
GraphXUtils.registerKryoClasses(conf)
第3节累加器
累加器是一个Spark结构,允许在作业运行时“在一侧”收集一些统计信息。在每个任务中执行的代码可以添加到累加器中,并且驱动程序可以访问其值。累加器在计算作业遇到的坏记录数或计算优化过程阶段中的汇总错误等情况下很有用。
例如,spark mllib的k-means集群实现使用累加器来实现后者。算法的每次迭代都从一组簇中心开始,将数据集中的每个点分配到其最近的中心,然后使用分配来计算一组新的簇中心。算法试图优化的聚类的代价是从每个点到其最近的聚类中心的距离之和。为了知道算法何时应该终止,在将点分配给它们的集群之后计算这个开销是很有用的。
var prevCost = Double.MaxValue
var cost = 0.0
var clusterCenters = initialCenters(k)
while (prevCost - cost > THRESHOLD) {
val costAccum = sc.accumulator(0, "Cost")
clusterCenters = dataset.map {
// Find the closest center to the point and the distance from
// that center
val (newCenter, distance) = closestCenterAndDistance(_,
clusterCenters)
costAccum += distance
(newCenter, _)
}.aggregate( /* average the points assigned to each center */ )
prevCost = cost
cost = costAccum.value
}
上面的示例将累加器的加法函数定义为整数加法,但是累加器还可以支持其他关联函数,如集合联合。
任务只在第一次运行时对累加器起作用。例如,如果任务成功完成,但其输出丢失,需要重新运行,则不会再次增加累加器。
从某种意义上说,累加器是一种优化,可以缓存RDD,并在其上运行一个单独的操作来计算相同的结果。通过避免缓存数据和避免执行其他作业,累加器可以更有效地实现这一点。
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享
页:
[1]