分享

Spark Streaming中的ssc.textFileStream(),获取不到数据

guxingyu 发表于 2016-3-3 16:04:43 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 12 62363
通过Spark Streaming获取不到HDFS上面的数据,
val conf = new SparkConf().setMaster("local").setAppName("RDDTest");
    val ssc = new StreamingContext(conf, Seconds(5));
   
    val fileStream = ssc.textFileStream("hdfs://redhat01:8020/traindata/data/output/part-00000");
    fileStream.print();
   
    ssc.start();
    ssc.awaitTermination();

HDFS文件是有数据的,但是打印没有数据,后面我换了一种方式
fileStream.foreachRDD { rdd => {
      rdd.foreach { x => println(x) };
    } }
也打印不出数据
哪位大神碰到过这个问题? 帮忙解决一下

已有(12)人评论

跳转到指定楼层
guxingyu 发表于 2016-3-3 16:31:58
wscl1213 发表于 2016-3-3 16:19
你这是读的元数据吧,应该读取文件

你好,我读取文件也是一样的 读不出数据来

回复

使用道具 举报

wscl1213 发表于 2016-3-3 16:19:16
你这是读的元数据吧,应该读取文件
回复

使用道具 举报

wscl1213 发表于 2016-3-3 16:45:59



/**
* Created by Administrator on 2014/9/1.
*/
package com.debugo.example
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf

object WordCountStreaming {
  def main(args: Array[String]): Unit ={
    val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077")

    //create the streaming context
    val  ssc = new StreamingContext(sparkConf, Seconds(30))

    //process file when new file be found.
    val lines = ssc.textFileStream("file:///home/spark/data")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}

楼主参考下上面红字部分代码,希望 有帮助
回复

使用道具 举报

when30 发表于 2016-3-3 17:06:57



Output操作:
Output OperationMeaning
print()Prints first ten elements of every batch of data in a DStream on the driver.
foreachRDD(func)The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system.
saveAsObjectFiles(prefix, [suffix])Save this DStream’s contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]“.
saveAsTextFiles(prefix, [suffix])Save this DStream’s contents as a text files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]“.
saveAsHadoopFiles(prefix, [suffix])Save this DStream’s contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]“.

楼主可以看看print函数的作用,

回复

使用道具 举报

fly2015 发表于 2016-3-4 10:12:18
我怀疑没有action动作
回复

使用道具 举报

UFO 发表于 2016-3-13 20:16:18
你把程序跑起来后,再往那个监控的目录下放文件
回复

使用道具 举报

wjcaitu 发表于 2017-2-5 21:25:41
我貌似也碰到了楼主这样的问题,楼主有解决吗?
以下是我的代码,我都输出到文件了,可是还是没有任何数据出来:
[mw_shl_code=scala,true]import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf

object ScalaStreamTest {
  def main(args:Array[String]):Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("my app")
    val ssc = new StreamingContext(conf, Seconds(1))
    val lines = ssc.textFileStream("/home/wj/Downloads/spark-2.1.0-bin-hadoop2.7/")
    val words = lines.flatMap(line => line.split(" "))
    words.print()
    words.saveAsTextFiles("/home/wj/workspace/SparkScala/errlogs/", "txt")
   
    ssc.start()
    ssc.awaitTermination()
  }
}[/mw_shl_code]
回复

使用道具 举报

wx_RYClUEop 发表于 2017-2-21 19:34:10
请问,大家这个问题解决了吗,看了好几个帖子,都是只有问题,没有解决办法
回复

使用道具 举报

wx_RYClUEop 发表于 2017-2-21 19:34:54
wjcaitu 发表于 2017-2-5 21:25
我貌似也碰到了楼主这样的问题,楼主有解决吗?
以下是我的代码,我都输出到文件了,可是还是没有任何数据 ...

请问层主,这个问题解决了吗?我看了好几个帖子,都是只有问题,都没有解决办法
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条