当数据量比较大的时候,通常会增加分区来提高并行度。在市面上一些spark书中(大部分基于spark1.2左右版本),数据读取后重新分区的RDD,如果之后计算会重复用到,需要通过persist缓存到内存或者磁盘中来避免重复计算。
写了一个从hbase中读数据,在分区,之后用count统计行数的例子来验证:
[mw_shl_code=scala,true] //从hbase读数据,200G,默认分区30
val hrdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
//重新分区1000个
val hrddre = hrdd.repartition(1000)
//对一个RDD进行4次action动作
hrddre.count()
hrddre.count()
hrddre.count()
hrddre.count()[/mw_shl_code]
理论上来讲,spark是惰性计算的,没有persist缓存时,每次count都会从hbase重新读取,分区,在计算。但是,通过spark web监控可以看到,再进行第二次count时,跳过了读取和分区的stage,直接进行分区shuffle后的stage:
猜想,在第一次count分区shuffle时,磁盘上生成了许多shuffle写文件,在第二次count时,这些文件并没有删除,直接从这些文件fetch了,不用再重新读hbase再分区了?
如果是这样,是不是相当于做了persist(StorageLevel.DISK_ONLY)操作?
|
|