1)数据位置感知
下面是WordCount的业务逻辑代码:
val file = "hdfs://127.0.0.1:9000/file.txt"
val lines = sc.textFile(file)
val words = lines.flatMap(line => line.split("\\s+"))
val wordCount = words.countByValue()
lines是Spark的RDD,它包含了在哪些机器上有file文件的块,信息是从HDFS来的。每文件块映射到RDD上就是一个分区,对的,没看错。如果一个文件块128MB,那么HDFS上一个1GB大小的文件就有8个文件块,由这个文件创建的RDD就会有8个分区。
之前说了,在HDFS上每个文件块默认会有3份,那RDD的分区选择了那一份呢?对滴,根据负载选择服务器负载最低的那一份。负载自动均衡了吧。
Spark要分发计算逻辑,也是分了两部分。
第一部分是代码。为什么spark-submit执行一开始,总是一堆jar包被分发,原因就在这儿。
第二部分是类实例。类在哪儿?作为RDD各API参数的闭包。
val words = lines.flatMap(line => line.split("\\s+"))flatMap的参数 **_.split("\s+")** 是闭包,闭包是引用了外部自由变量的函数,在Scala中是由匿名类实现的。更多信息,请小伙伴们GFSOSO哈。
上面的一行代码中,Spark要分发的实例就是 **_.split("\s+")** 的实例。
val wordCount = words.countByValue()实际上RDD的API countByValue 也有需要分发的闭包实例,只是都在Spark的源码中,让一码给大家整理到明面上来哈。
val wordCount = words .mapPartitions(convertWordsInPartitionToWordCountMap) .reduce(mergeMaps)前面我们提到了RDD的分区,mapPartitions会方法中的逻辑放到RDD的每个分区上执行,注意是远程在Slave上执行的哈。而reduce是在把每个分区的结果拿到Driver后,对结果进行两两合并,最终得到结果。