分享

关于spark1.5分区的问题

当数据量比较大的时候,通常会增加分区来提高并行度。在市面上一些spark书中(大部分基于spark1.2左右版本),数据读取后重新分区的RDD,如果之后计算会重复用到,需要通过persist缓存到内存或者磁盘中来避免重复计算。
写了一个从hbase中读数据,在分区,之后用count统计行数的例子来验证:
[mw_shl_code=scala,true]  //从hbase读数据,200G,默认分区30
  val hrdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])

  //重新分区1000个
  val hrddre = hrdd.repartition(1000)
  
  //对一个RDD进行4次action动作
  hrddre.count()
  hrddre.count()
  hrddre.count()
  hrddre.count()[/mw_shl_code]

理论上来讲,spark是惰性计算的,没有persist缓存时,每次count都会从hbase重新读取,分区,在计算。但是,通过spark web监控可以看到,再进行第二次count时,跳过了读取和分区的stage,直接进行分区shuffle后的stage:
1000-无persist-Hash.PNG

猜想,在第一次count分区shuffle时,磁盘上生成了许多shuffle写文件,在第二次count时,这些文件并没有删除,直接从这些文件fetch了,不用再重新读hbase再分区了?
如果是这样,是不是相当于做了persist(StorageLevel.DISK_ONLY)操作?

已有(1)人评论

跳转到指定楼层
xuanxufeng 发表于 2016-3-9 13:36:18
赞同楼主的观点,不过最好的验证方式还是看看代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条