分享

Spark Streaming中读取本地文件,无法获取数据

txknick 发表于 2016-10-22 11:50:33 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 18 44326
zstu 发表于 2016-10-25 19:18:38
val fileStream=  ssc.fileStream("d:/WordCount.txt")这个里面的参数是所监控目录,然后将文件放到所监控的目录下,写成这样 val fileStream=  ssc.fileStream("d:\\testdirect") 然后将WordCount.txt放到d:\\testdirect这个目录,
回复

使用道具 举报

txknick 发表于 2016-10-25 23:38:16
zstu 发表于 2016-10-25 19:18
val fileStream=  ssc.fileStream("d:/WordCount.txt")这个里面的参数是所监控目录,然后将文件放到所监控 ...

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingLocalfile {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);

    val sparkConf = new SparkConf().setAppName("SparkStreamingLocalfile").setMaster("local[2]");
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val fileStream=  ssc.fileStream("d:\\WordCount.txt")
    fileStream.print()

    val textFileStream=ssc.textFileStream("d:\\WordCount.txt")
    val words = textFileStream.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1))
    wordCounts.print()
    textFileStream.print()
    ssc.start()
    ssc.awaitTermination()
  }

昨天是这么做的,还是不行
回复

使用道具 举报

zstu 发表于 2016-10-26 12:41:59
txknick 发表于 2016-10-25 23:38
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingCont ...

val textFileStream=ssc.textFileStream("d:\\WordCount.txt")
你textFileStream里面是文件,应该是目录
回复

使用道具 举报

txknick 发表于 2016-10-28 10:14:12
zstu 发表于 2016-10-26 12:41
val textFileStream=ssc.textFileStream("d:\\WordCount.txt")
你textFileStream里面是文件,应该是目 ...

import java.util.Properties

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingLocalfile {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);

    val sparkConf = new SparkConf().setAppName("SparkStreamingLocalfile").setMaster("local[2]");
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val fileStream=  ssc.fileStream("d:\\temp")
    fileStream.print()

    val textFileStream=ssc.textFileStream("d:\\temp")
    val words = textFileStream.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1))
    wordCounts.print()
    textFileStream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
试了,还是不行
回复

使用道具 举报

wx_RYClUEop 发表于 2017-2-21 19:24:31
请问,楼主最后解决了吗?我也是打印不出来,程序虽然跑了
回复

使用道具 举报

xiaobaiyang 发表于 2017-4-19 17:40:17
val lines : DStream[String] = streamingContext.textFileStream("C:\\tools\\spark-2.0.0-bin-hadoop2.7\\data\\streaming\\")

读取文件不能指定到具体文件,你可以尝试指定到文件所在的目录那一级试一下。
回复

使用道具 举报

xiaobaiyang 发表于 2017-4-19 17:42:38
/**
   * Create a input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them as text files (using key as LongWritable, value
   * as Text and input format as TextInputFormat). Files must be written to the
   * monitored directory by "moving" them from another location within the same
   * file system. File names starting with . are ignored.
   * @param directory HDFS directory to monitor for new file
   */
  def textFileStream(directory: String): DStream[String] = {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }


上面是 该方法的源码,是目录,不是文件
回复

使用道具 举报

星语心愿 发表于 2018-1-23 17:34:13
先运行程序,监控目录,然后使用inputStream创建文本到监控目录并写入文本内容即可。
b08d1a178a82b901597f9273798da9773812ef15.jpg
回复

使用道具 举报

星语心愿 发表于 2018-1-23 17:35:37
先运行程序,监控目录,然后使用inputStream创建文本到监控目录并写入文本内容即可。
b08d1a178a82b901597f9273798da9773812ef15.jpg
回复

使用道具 举报

12
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条