object FlumePollWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
//从flume拉取数据
val address = Seq(new InetSocketAddress("192.168.84.138",8888))
val flumeStream = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK)
val words = flumeStream.flatMap(x => new String(x.event.getBody.array()).split(" ")).map((_,1))
val result = words.reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
spark代码,从flume拉取数据,但是没有数据 |