首先自己犯了一个错误,自己的spark是单机版,首先要改成 sparkConf.setMaster("local[2]")或者
sparkConf.setMaster("local")
其次spark-streaming只会计算新放入hdfs目录的数据,比如说我们在启动下面的代码之前监控的hdfs目录中已经有文档了,那么不会进行计算,当我们有新的数据文档放入的时候才会启动计算 和打印计算数据。
再次,下面实现了把计算的数据放入hdfs,但是每次都是新建一个目录,上面的图片里边有显示,现在还没解决这个问题,有知道解决方案的可以告知一下,我想实现的是计算的结果放入一个文件,并且计算结果不断累加,而不是每次计算完成后在hdfs上新建一个文件夹
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
sparkConf.setMaster("local[2]")
// Create the context
//spark://h1:7077
val ssc = new StreamingContext(sparkConf, Seconds(10))
//ssc.sparkContext.hadoopConfiguration.set("out put path",
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
wordCounts.saveAsTextFiles("hdfs://h1:9000/data/result/sparkstreaminghdfswordcount")
ssc.start()
ssc.awaitTermination()
}
|