分享

spark streaming 接收 kafka 数据示例

langke93 2015-5-29 16:11:04 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 65948

问题导读

1.本文是基于什么环境?
2.如何实现数据转发?
3.spark streaming 处理的代码是如何实现的?






曾经试过了用 spark streaming 读取 logstash 启动的 TCP Server 的数据。不过如果你有多台 logstash 的时候,这种方式就比较难办了 —— 即使你给 logstash 集群申请一个 VIP,也很难确定说转发完全符合。所以一般来说,更多的选择是采用 kafka 等队列方式由 spark streaming 去作为订阅者获取数据。

环境部署
这里只讲 kafka 单机的部署。只是示例嘛:
cd kafka_2.10-0.8.2.0/bin/
./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh --daemon ../config/server.properties
数据转发
保持跟之前示例的连贯性,这里继续用 logstash 发送数据到 kafka。
首先创建一个 kafka 的 topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash
然后到 logstash 里,修改配置为:
[mw_shl_code=java,true]input {
    file { path => "/var/log/*.log" }
}
filter {
    ruby {
        code => "event['lineno'] = 100 * rand(Math::E..Math::PI)"
    }
}
output {
    kafka {
        broker_list => "127.0.0.1:9092"
        topic_id => "logstash"
    }
}[/mw_shl_code]

spark streaming 处理的代码:
处理效果跟之前示例依然保持一致,就不重复贴冗余的函数了,只贴最开始的处理部分:
[mw_shl_code=java,true]import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s._
import org.json4s.jackson.JsonMethods._
object LogStash {
  implicit val formats = DefaultFormats
  case class LogStashV1(message:String, path:String, host:String, lineno:Double, `@timestamp`:String)
  def main(args: Array[String]) {
    val Array(zkQuorum, group, topics, numThreads) = args
    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("LogStash")
    val sc  = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    lines.map(line => {
      val json = parse(line)
      json.extract[LogStashV1]
    }).print()
    ssc.start()
    ssc.awaitTermination()
  }
}[/mw_shl_code]
这里面有一些跟网上常见资料不一样的地方。
第一个,import org.apache.spark.streaming.kafka._ 并不会导出 KafkaUtils,必须明确写明才行。第二个,之前示例里用了 scala 核心自带的 JSON 模块。但是这次我把 lineno 字段从整数改成浮点数后,发现 JSON.parseFull() 有问题。虽然我在 scala 的 repl 里测试没问题,但是写在 spark 里的时候,它并不像文档所说的”总是尝试解析成 Double 类型”,而是一直尝试用Integer.parseInteger() 方法来解析。哪怕我明确定义 JSON.globalNumberParser = {input:String => Float.parseFloat(input)} 都不起作用。

所以,最后这里改用了 json4s 库。据称这也是 scala 里性能和功能最好的 JSON 库。
json4s 库默认解析完后,不是标准的 Map、List 等对象,而是它自己的 JObject、JList、JString 等。想要转换成标准 scala 对象,需要调用 .values 才对。不过我这个示例里没有这么麻烦,而是直接采用 .extract 就变成了 cast class 对象了。非常简便。

另一个需要点出来的变动是:因为采用 .extract,所以 cast class 里的参数命名必须跟 JSON 里的 key 完全对应上。而我们都知道 logstash 里有几个特殊的字段,叫 @timestamp 和 @version 。这个 “@” 是不能直接裸字符的,所以要用反引号(`)包括起来。

sbt 打包
sbt 打包也需要有所变动。spark streaming 的核心代码中,并不包含 kafka 的代码。还跟之前那样 sbt package 的话,就得另外指定 kafka 的 jar 地址才能运行了。更合适的办法,是打包一个完全包含的 jar 包。这就用到 sbt-assembly 扩展

刚刚收到的消息,spark 1.3 版发布 beta 了,spark streaming 会内置对 kafka 的底层直接支持。或许以后不用这么麻烦?
sbt-assembly 使用起来特别简单,尤其是当你使用的 sbt 版本比较新(大于 0.13.6) 的时候。

  • 添加扩展
在项目的 project/ 目录下创建一个 plugins.sbt 文件,内容如下:
[mw_shl_code=java,true]addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")[/mw_shl_code]
具体的版本选择,看官方 README 的 Setup 部分

  • 添加新增依赖模块
现在可以去修改我们项目的 build.sbt 了:
[mw_shl_code=java,true]name := "LogStash"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.2.0",
  "org.json4s" %% "json4s-native" % "3.2.10",
  "org.json4s" %% "json4s-jackson" % "3.2.10"
)[/mw_shl_code]

是的。新版本的 sbt-assembly 完全不需要单独修改 build.sbt 了。
需要注意,因为我们这次是需要把各种依赖全部打包到一起,这个可能会导致一些文件相互有冲突。比如我们用 spark-submit 提交任务,有关 spark 的核心文件,本身里面就已经有了的,那么就需要额外通过 % "provided" 指明这部分会另外提供,不需要打进去。这样运行的时候就不会有问题了。

  • 打包

采用 sbt-assembly 后的打包命令是:sbt assembly。注意输出的结果,会是直接读取 build.sbt里的 name 变量,不做处理。,我们之前定义的叫 “LogStash Project”,sbt package 命令自动会转换成全小写且空格改成中横线的格式 logstash-project_2.10-1.0.jar。但是 sbt assembly 就会打包成 LogStash Project-assembly-1.0.jar 包。这个空格在走 spark-submit 提交的时候是有问题的。所以这里需要把 name 改成一个不会中断的字符串。。。


欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

已有(3)人评论

跳转到指定楼层
wjhdtx 发表于 2015-6-5 15:06:39
借楼主宝地,请教个问题:使用kafka和spark streaming,环境是三台虚拟机(4核,6G内存),安装了cdh5.3.3,我有三个streaming作业,配合使用

①如果在三台机器每台机器上各跑一个作业,且master指定为local[3](即本地模式),是没有问题的
②master为spark://ip:7077,跑三个作业,有时这三个作业正常运行,有时就不动了
③master为yarn-cluster,只有第一个作业分配了applicationmaster,两外两个作业一直是未分配的状态

我想问下,是机器原因导致吗?
回复

使用道具 举报

chyeers 发表于 2015-9-11 09:35:34
wjhdtx 发表于 2015-6-5 15:06
借楼主宝地,请教个问题:使用kafka和spark streaming,环境是三台虚拟机(4核,6G内存),安装了cdh5.3.3 ...

楼主,你 spark streaming 怎么跑yarn-cluster 模式的啊?我跑standalone没问题,跑 on yarn 就报错了[mw_shl_code=shell,true]ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.[/mw_shl_code]


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条