SparkSQL+SparkCore任务解析
问题导读1.在我们调用spark API时,背后发生了什么呢?
2.RDD.id是在初始化时生成的,id代表什么?
3.rdd.collect()调用的效果是什么?
static/image/hrline/4.gif
本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。实例伪码
hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
.mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()
SparkUI截图
在最初观察以下UI时,有几个疑问:
[*]为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
[*]为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
[*]为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。
Job 6:
Job 7:
解析全览以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。
[*]白色图标代表coding时的API。
[*]灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
[*]蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
[*]黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
在我们调用spark API时,背后发生了什么呢?这个问题得分开看。在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。toJavaRDD的效果调用该方法时,会触发dataframe的解析(全览图标注为第1步):
lazy val rdd: RDD = {// use a local variable to make sure the map closure doesn't capture the whole DataFrameval schema = this.schemaqueryExecution.executedPlan.execute().mapPartitions { rows => val converter = CatalystTypeConverters.createToScalaConverter(schema) rows.map(converter(_).asInstanceOf)}}
上面的queryExecution.executedPlan会触发以下一系列动作( 注意,不包含execute()调用 ),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。
plan.execute()调用的效果这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:
/*Partitioner.RangePartitioner */
def sketch(
rdd: RDD,
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array)]) = {
val shift = rdd.id
// val classTagK = classTag // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}
该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。更详细地看下job6和7的RDD编号:
[*]#279(含)之前的RDD都是主子job复用的
[*]子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
protected override def doExecute(): RDD = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。
rdd.collect()调用的效果终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。问题解答
[*]为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
[*]因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
[*]为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
[*]stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
[*]stage 6:hc.read.parquet(paths).filter(…).select(…)+ groupBy(all fileds).count()的前半段
[*]stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
[*]stage 8:被跳过,复用了stage6
[*]stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
[*]stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
[*]为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
[*]解答与上面一样
经验与教训1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?这时同样会生成两个job,且 都是从hdfs读取数据了 ~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。
不错不错 这个比较有用! {:2_25:} 版主写的很好 但是还是第四张图看不清楚啊,能不能传到百度云一下,发个地址让我们下一下。 ~~~~~~~~~~~~~
页:
[1]