问题导读
1.在我们调用spark API时,背后发生了什么呢?
2.RDD.id是在初始化时生成的,id代表什么?
3.rdd.collect()调用的效果是什么?
本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。 实例伪码
hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
.mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()
SparkUI截图
在最初观察以下UI时,有几个疑问:
- 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
- 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
- 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。
Job 6:
Job 7:
解析全览以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。 - 白色图标代表coding时的API。
- 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
- 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
- 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
在我们调用spark API时,背后发生了什么呢?这个问题得分开看。 在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。 但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。 但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。 toJavaRDD的效果调用该方法时,会触发dataframe的解析(全览图标注为第1步):
lazy val rdd: RDD[Row] = { // use a local variable to make sure the map closure doesn't capture the whole DataFrame val schema = this.schema queryExecution.executedPlan.execute().mapPartitions { rows => val converter = CatalystTypeConverters.createToScalaConverter(schema) rows.map(converter(_).asInstanceOf[Row]) }}
上面的queryExecution.executedPlan会触发以下一系列动作( 注意,不包含execute()调用 ),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。
plan.execute()调用的效果这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:
[mw_shl_code=bash,true]/*Partitioner.RangePartitioner */
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}[/mw_shl_code]
该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。 该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。 更详细地看下job6和7的RDD编号: - #279(含)之前的RDD都是主子job复用的
- 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
[mw_shl_code=bash,true] protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>[/mw_shl_code]
由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。
rdd.collect()调用的效果终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。 问题解答- 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
- 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
- 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
- stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
- stage 6:hc.read.parquet(paths).filter(…).select(…) + groupBy(all fileds).count()的前半段
- stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
- stage 8:被跳过,复用了stage6
- stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
- stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
- 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
经验与教训1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢? 这时同样会生成两个job,且 都是从hdfs读取数据了 ~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。 2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。
|