分享

Spark Streaming中空RDD的处理

levycui 2016-7-26 18:11:15 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 23757
问题导读:
1、用什么方式判断空RDD?
2、Spark Streaming与Kafka如何处理空RDD?




在Spark Streaming中,job不断的产生,有时候会产生一些空RDD,而基于这些空RDD生成的job大多数情况下是没必要提交到集群执行的。执行没有结果的job,就是浪费计算资源,数据库连接资源,产生空文件等。

这里介绍两种判断空RDD的方式,第一种是以Receiver接收数据时产生的BlockRDD或WriteAheadLogBackedBlockRDD,所有以Receiver方式接收数据都会产生BlockRDD或WriteAheadLogBackedBlockRDD,第二种是以Direct Kafka方式接收数据产生的KafkaRDD。

第一种情况
以Receiver方式接收数据,计算wordCount为例来说明空RDD如何处理,代码如下
[mw_shl_code=applescript,true]object ReceiverWordCount {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[3]")
conf.set("spark.testing.memory", "2147480000")
val ssc = new StreamingContext(conf, Seconds(10))

val lines = ssc.socketTextStream("10.10.63.106", 8589, StorageLevel.MEMORY_AND_DISK_SER)

val words= lines.flatMap(_.split(""))
val wordCounts= words.map(x => (x,1)).reduceByKey((num1:Int,num2:Int)=>num1+num2,2)
wordCounts.foreachRDD(rdd=>{
  if(rdd.dependencies(0).rdd.partitions.isEmpty){
     println(">>>RDD:Empty")
   }else{
     rdd.foreach(x=>println(x._1+"\t"+x._2))
   }
})
ssc.start()
ssc.awaitTermination()
}
}[/mw_shl_code]

这里为了方便,在foreachRDD中使用了rdd.foreach(x=>println(x._1+"\t"+x._2))来打印结果,只是简单的效果演示,生产环境一般会输出到外部存储系统中,例如mysql、redis 、hdfs等
这里总结了三种判断空RDD方式的,我们来看一下这三种方式有什么不同:

第一种:if(rdd.count==0)
RDD的count操作会触发一个action,提交一个job,这种方式不是我们想要的

第二种:if(rdd.partitions.isEmpty)
判断rdd的partitions是否为空,那我们需要看一下这里的rdd是怎么得来的,经过上面WordCount中的一系列transformation操作后,最后一个reduceByKey操作产生的ShuffledRDD 。经过reduceByKey操作后,分区数量会受到默认分区数或用户指定的分区数的影响,和最初BlockRDD的分区数不一样,因为ShuffledRDD的分区数不可能为0,所以if(rdd.partitions.isEmpty)无效。if(rdd.partitions.isEmpty)在什么有效呢?只有在当前rdd和BlockRDD在同一个stage时才会有效,因为分区数没有变化

第三种:if(rdd.dependencies(0).rdd.partitions.isEmpty)
根据RDD的依赖关系,从后向前寻找BlockRDD,因为在BlockRDD生成的时候分区数受blockInfos(Receiver接收数据的元数据信息)的影响,代码如下

[mw_shl_code=applescript,true]private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {

if (blockInfos.nonEmpty) {
   val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

   // Are WAL record handles present with all the blocks
   val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

   if (areWALRecordHandlesPresent) {
     // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
     val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
     val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
     new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
   } else {
     // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
     // others then that is unexpected and log a warning accordingly.
     if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
       if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
         logError("Some blocks do not have Write Ahead Log information; this is unexpected and data may not be recoverable after driver failures")
       } else {
         logWarning("Some blocks have Write Ahead Log information; this is unexpected")
       }
     }
     val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) }
     if (validBlockIds.size != blockIds.size) {
       logWarning("Some blocks could not be recovered as they were not found in memory. " +
         "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
         "for more details.")
     }
     new BlockRDD[T](ssc.sc, validBlockIds)
   }
} else {
   // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
   // according to the configuration
   if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
     new WriteAheadLogBackedBlockRDD[T](
       ssc.sparkContext, Array.empty, Array.empty, Array.empty)
   } else {
     new BlockRDD[T](ssc.sc, Array.empty)
   }
}
}[/mw_shl_code]


如果blockInfos为空,BlockRDD的分区数也为空,所以要判断BlockRDD的分区数。这里只判断了当前rdd的父RDD分区是否为空,因为父RDD和BlockRDD在同一个stage内,分区数是一致的。RDD的依赖关系可以通过rdd.toDebugString和web页面获得,stage划分也可以通过web页面获得。

第二种情况
以Direct kafka的方式接收数据的方式,计算WordCount为例,代码如下
[mw_shl_code=applescript,true]object DirectKafkaDemo{
def main(args: Array[String]) {

val topics = "DirectKafkaDemo"
val brokers = "*:9092,*:9092"
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
sparkConf.set("spark.testing.memory", "2147480000")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
   ssc, kafkaParams, topicsSet)
val result = messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

result.foreachRDD(rdd => {

   val num= rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.count()

   if(num>0) {
     rdd.foreachPartition(data => {
       val conn = MDBManager.getConnection
       conn.setAutoCommit(false)
       val sql = "insert into word set key1=?,num=?;"
       val preparedStatement = conn.prepareStatement(sql)
       data.foreach(recode => {
         val key = recode._1;
         val num = recode._2;
         preparedStatement.setString(1, key)
         preparedStatement.setInt(2, num)
         preparedStatement.addBatch()
         println("key:" + key + "\tnum:" + num)
       })
       preparedStatement.executeBatch()
       conn.commit()
       conn.close()
     })
   }else{
       println(">>>>>>>>>>>>>>>>>>>>>>RDD Empty")
   }
})

ssc.start()
ssc.awaitTermination()

}
}[/mw_shl_code]

这里使用了KafkaRDD的count操作来判断KafkaRDD是否为空,如果不为空,将计算结果保存到数据库中,减少不必要是数据库操作。获取KafkaRDD的代码如下,不同代码编写RDD的依赖关系是不一样的,要根据代码而定
[mw_shl_code=applescript,true]val num= rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.count()[/mw_shl_code]

看一下KafkaRDD的count()方法,他重写了RDD的count方法,代码如下
[mw_shl_code=applescript,true]override def count(): Long = offsetRanges.map(_.count).sum[/mw_shl_code]

他并没有触发一个runJob操作,而是通过读取kafka分区的offset偏移量来计算RDD记录的个数,这里是利用了kafka的特性。通过依赖关系找到KafkaRDD,然后调用KafkaRDD的count()方法,就知道KafkaRDD是否为空,如果KafkaRDD为空,就没必要runJob了。
那么判断KafkaRDD的分区数是否也可以,看一下KafkaRDD的分区数是怎么得来的,代码如下
[mw_shl_code=applescript,true]override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
     val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
     new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}[/mw_shl_code]

和offsetRanges的数量有关,因为offsetRanges是根据kafka的分区数而来,offsetRanges的数量是固定不变的,从而KafkaRDD的分区数是固定的,不管分区有没有数据,因此不能判断KafkaRDD的分区数

总结
不同数据接收方式的RDD,表现数据为空都可能是不一样的,通过RDD的依赖关系正确找到数据源RDD是最关键的。此方法使用一定要结合业务和RDD的具体生成方式,这里说的依赖关系都是之有一个父RDD,如果有多个父RDD要根据情况决定是否可以使用此方法。

来源:海纳百川

已有(3)人评论

跳转到指定楼层
amanikong 发表于 2016-7-27 09:19:21
定一个 顺便问下 conf.set("spark.testing.memory", "2147480000") 这里是设置集群单个节点的内存大小还是整个集群的运行内存大小,设置了有什么好处,谢谢
回复

使用道具 举报

levycui 发表于 2016-7-28 09:54:36
本帖最后由 levycui 于 2016-7-28 10:14 编辑
amanikong 发表于 2016-7-27 09:19
定一个 顺便问下 conf.set("spark.testing.memory", "2147480000") 这里是设置集群单个节点的内存大小还是 ...

[mw_shl_code=applescript,true]public static void main(String[] args) {
                SparkConf conf = new SparkConf();
                conf.set("spark.testing.memory", "269522560000");
                JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
                System.out.println(  sc );[/mw_shl_code]

以上是源码部分,个人认为是单台服务器的内存,如果内存太小,JVM申请的memory不够导致无法启动SparkContext,后面的值大于512m即可
回复

使用道具 举报

amanikong 发表于 2016-7-28 10:11:08
levycui 发表于 2016-7-28 09:54
[mw_shl_code=applescript,true]public static void main(String[] args) {
                SparkConf conf = new Spa ...

多谢指点。太感谢啦
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条