分享

flume+sparkstream

这是flume配置
01.png

已有(5)人评论

跳转到指定楼层
hutonm 发表于 2017-11-30 16:48:35
object FlumePollWordCount {
  def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(5))

        //从flume拉取数据
        val address = Seq(new InetSocketAddress("192.168.84.138",8888))

        val flumeStream = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK)
        val words = flumeStream.flatMap(x => new String(x.event.getBody.array()).split(" ")).map((_,1))
        val result = words.reduceByKey(_+_)

        result.print()
        ssc.start()
        ssc.awaitTermination()
  }
}



spark代码,从flume拉取数据,但是没有数据
回复

使用道具 举报

hutonm 发表于 2017-11-30 16:49:54
文件夹,没有反应。。。很奇怪。。
06.png
回复

使用道具 举报

hutonm 发表于 2017-11-30 16:50:55
flume这边没有监控到目录的变化,无论是mv cp 还是直接创建
回复

使用道具 举报

hutonm 发表于 2017-11-30 17:07:18
flume监控的目录没有什么变化,这是为啥啊
回复

使用道具 举报

desehawk 发表于 2017-11-30 19:15:54
hutonm 发表于 2017-11-30 17:07
flume监控的目录没有什么变化,这是为啥啊

这里面存在比较多的问题
1.版本是否兼容,如不会查看,可参考
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23252


2.flume配置需要修改:核实下是memory还是memoryChannel
agent.sinks = k1
agent.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.k1.hostname = <hostname of the local machine>
agent.sinks.k1.port = <port to listen on for connection from Spark>
agent.sinks.k1.channel = memoryChannel


3.依赖是否加入
特别是park-streaming-flume必须手动引入:
设置CLASSPATH,把依赖包放入CLASSPATH中或则使用--jars参数手动加入


推荐参考
https://github.com/int32bit/note ... BB%E5%8F%96flume.md



回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条