分享

spark streaming 取不到kafka的数据

dlh 发表于 2015-12-13 15:09:00 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 34448
看zookeeper的日志来看,spark streaming应该是连上了,而且zookeeper给sparkstreaming发了,但是sparkstreaming里就是收不到,我写了个消费者的java demo可以收到 卡2天了,spark版本是1.5  kafka是_2.10 -0.8.2.2.

已有(5)人评论

跳转到指定楼层
langke93 发表于 2015-12-13 16:31:45
本帖最后由 langke93 于 2015-12-13 16:34 编辑

是不是配置的问题,下面是一个示例,楼主参考下,希望有所帮助
spark streaming 接收 kafka 数据示例

环境部署
这里只讲 kafka 单机的部署。只是示例嘛:
cd kafka_2.10-0.8.2.0/bin/
./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh --daemon ../config/server.properties


数据转发
保持跟之前示例的连贯性,这里继续用 logstash 发送数据到 kafka。
首先创建一个 kafka 的 topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash
然后到 logstash 里,修改配置为:
[mw_shl_code=bash,true]input {
    file { path => "/var/log/*.log" }
}
filter {
    ruby {
        code => "event['lineno'] = 100 * rand(Math::E..Math::PI)"
    }
}
output {
    kafka {
        broker_list => "127.0.0.1:9092"
        topic_id => "logstash"
    }
}[/mw_shl_code]


spark streaming 处理的代码:

处理效果跟之前示例依然保持一致,就不重复贴冗余的函数了,只贴最开始的处理部分:
[mw_shl_code=java,true]import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s._
import org.json4s.jackson.JsonMethods._
object LogStash {
  implicit val formats = DefaultFormats
  case class LogStashV1(message:String, path:String, host:String, lineno:Double, `@timestamp`:String)
  def main(args: Array[String]) {
    val Array(zkQuorum, group, topics, numThreads) = args
    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("LogStash")
    val sc  = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    lines.map(line => {
      val json = parse(line)
      json.extract[LogStashV1]
    }).print()
    ssc.start()
    ssc.awaitTermination()
  }
}[/mw_shl_code]
这里面有一些跟网上常见资料不一样的地方。
第一个,import org.apache.spark.streaming.kafka._ 并不会导出 KafkaUtils,必须明确写明才行。第二个,之前示例里用了 scala 核心自带的 JSON 模块。但是这次我把 lineno 字段从整数改成浮点数后,发现 JSON.parseFull() 有问题。虽然我在 scala 的 repl 里测试没问题,但是写在 spark 里的时候,它并不像文档所说的”总是尝试解析成 Double 类型”,而是一直尝试用Integer.parseInteger() 方法来解析。哪怕我明确定义 JSON.globalNumberParser = {input:String => Float.parseFloat(input)} 都不起作用。


所以,最后这里改用了 json4s 库。据称这也是 scala 里性能和功能最好的 JSON 库。

json4s 库默认解析完后,不是标准的 Map、List 等对象,而是它自己的 JObject、JList、JString 等。想要转换成标准 scala 对象,需要调用 .values 才对。不过我这个示例里没有这么麻烦,而是直接采用 .extract 就变成了 cast class 对象了。非常简便。


另一个需要点出来的变动是:因为采用 .extract,所以 cast class 里的参数命名必须跟 JSON 里的 key 完全对应上。而我们都知道 logstash 里有几个特殊的字段,叫 @timestamp 和 @version 。这个 “@” 是不能直接裸字符的,所以要用反引号(`)包括起来。


sbt 打包

sbt 打包也需要有所变动。spark streaming 的核心代码中,并不包含 kafka 的代码。还跟之前那样 sbt package 的话,就得另外指定 kafka 的 jar 地址才能运行了。更合适的办法,是打包一个完全包含的 jar 包。这就用到 sbt-assembly 扩展
刚刚收到的消息,spark 1.3 版发布 beta 了,spark streaming 会内置对 kafka 的底层直接支持。或许以后不用这么麻烦?
sbt-assembly 使用起来特别简单,尤其是当你使用的 sbt 版本比较新(大于 0.13.6) 的时候。


添加扩展
在项目的 project/ 目录下创建一个 plugins.sbt 文件,内容如下:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
具体的版本选择,看官方 README 的 Setup 部分


添加新增依赖模块
现在可以去修改我们项目的 build.sbt 了:
[mw_shl_code=bash,true]name := "LogStash"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.2.0",
  "org.json4s" %% "json4s-native" % "3.2.10",
  "org.json4s" %% "json4s-jackson" % "3.2.10"
)[/mw_shl_code]

是的。新版本的 sbt-assembly 完全不需要单独修改 build.sbt 了。
需要注意,因为我们这次是需要把各种依赖全部打包到一起,这个可能会导致一些文件相互有冲突。比如我们用 spark-submit 提交任务,有关 spark 的核心文件,本身里面就已经有了的,那么就需要额外通过 % "provided" 指明这部分会另外提供,不需要打进去。这样运行的时候就不会有问题了。

打包
采用 sbt-assembly 后的打包命令是:sbt assembly。注意输出的结果,会是直接读取 build.sbt里的 name 变量,不做处理。,我们之前定义的叫 “LogStash Project”,sbt package 命令自动会转换成全小写且空格改成中横线的格式 logstash-project_2.10-1.0.jar。但是 sbt assembly 就会打包成 LogStash Project-assembly-1.0.jar 包。这个空格在走 spark-submit 提交的时候是有问题的。所以这里需要把 name 改成一个不会中断的字符串。。。



回复

使用道具 举报

fightinglf 发表于 2015-12-13 17:00:28
学习一下,谢谢关注
回复

使用道具 举报

dlh 发表于 2015-12-14 13:50:18
问题解决了。少引了个kafka的jar包。唉,卡了2天
回复

使用道具 举报

luguoyuanf 发表于 2016-3-29 10:35:17
dlh 发表于 2015-12-14 13:50
问题解决了。少引了个kafka的jar包。唉,卡了2天

哥们,你说的少引一个kafka的包,是什么包???
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条