object QueueStream { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(20)) val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]() val queueStream = ssc.queueStream(rddQueue) val mappedStream = queueStream.map(r => (r % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) reducedStream.print() ssc.start() for (i <- 1 to 10){ rddQueue += ssc.sparkContext.makeRDD(1 to 100,2) Thread.sleep(1000) } ssc.stop() }}object QueueStream {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(20))
val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 10){
rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
Thread.sleep(1000)
}
ssc.stop()
}
} 请教各位大神一个问题,这个是spark官网一个例子,输出结果是 (4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
我咋觉得应该是(4,100)
(0,100)
(6,100)
(8,100)
(2,100)
(1,100)
(3,100)
(7,100)
(9,100)
(5,100)
for (i <- 1 to 10){
rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
Thread.sleep(1000)
} 这个循环总共做了10次,每次都是放1到100啊。。一批流计算结果就是 (4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
那10次 应该就是 (0,100)
(6,100)
(8,100)
(2,100)
(1,100)
(3,100)
(7,100)
(9,100)
(5,100)
|