将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用
本帖最后由 Oner 于 2016-11-10 07:53 编辑问题导读:1. 使用Direct API时为什么需要见offset保存到Zookeeper中?
2. 如何将offset存入到Zookeeper中?
3. 如何解决Zookeeper中offset过期问题?
static/image/hrline/4.gif
实现将offset存入Zookeeper
在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细的可以参考 Spark Streaming + Kafka Integration Guide ,但是第二种使用方式中kafka 的 offset 是保存在 checkpoint 中的,如果程序重启的话,会丢失一部分数据,可以参考 Spark & Kafka - Achieving zero data-loss 。
本文主要讲在使用第二种消费方式(Direct Approach)的情况下,如何将 kafka 中的 offset 保存到 zookeeper 中,以及如何从 zookeeper 中读取已存在的 offset。
大致思想就是,在初始化 kafka stream 的时候,查看 zookeeper 中是否保存有 offset,有就从该 offset 进行读取,没有就从最新/旧进行读取。在消费 kafka 数据的同时,将每个 partition 的 offset 保存到 zookeeper 中进行备份,具体实现参考下面代码val topic : String = "topic_name"//消费的 topic 名字
val topics : Set = Set(topic) //创建 stream 时使用的 topic 名字集合
val topicDirs = new ZKGroupTopicDirs("test_spark_streaming_group", topic)//创建一个 ZKGroupTopicDirs 对象,对保存
val zkTopicPath = s"${topicDirs.consumerOffsetDir}" 获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
val zkClient = new ZkClient("10.4.232.77:2181") //zookeeper 的host 和 ip,创建一个 client
val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
var kafkaStream : InputDStream[(String, String)] = null
var fromOffsets: Map = Map()//如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
if (children > 0) {//如果保存过 offset,这里更好的做法,还应该和kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
for (i <- 0 untilchildren) {
val partitionOffset = zkClient.readData(s"${topicDirs.consumerOffsetDir}/${i}")
val tp = TopicAndPartition(topic, i)
fromOffsets += (tp -> partitionOffset.toLong)//将不同 partition 对应的 offset 增加到 fromOffsets 中
logInfo("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
}
val messageHandler = (mmd : MessageAndMetadata) => (mmd.topic, mmd.message())//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
kafkaStream = KafkaUtils.createDirectStream(ssc, kafkaParam, fromOffsets, messageHandler)
}
else {
kafkaStream = KafkaUtils.createDirectStream(ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
}
var offsetRanges = Array()
kafkaStream.transform{ rdd =>
offsetRanges = rdd.asInstanceOf.offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
rdd
}.map(msg => Utils.msgDecode(msg)).foreachRDD { rdd =>
for (o <- offsetRanges) {
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)//将该 partition 的 offset 保存到 zookeeper
logInfo(s"@@@@@@ topic${o.topic}partition ${o.partition}fromoffset ${o.fromOffset}untiloffset ${o.untilOffset} #######")
}
rdd.foreachPartition(
message => {
while(message.hasNext) {
logInfo(s"@^_^@ [" + message.next() + "] @^_^@")
}
}
)
}使用上面的代码,我们可以做到 Spark Streaming 程序从 Kafka 中读取数据是不丢失
解决Zookeeper中保存的offset过期问题
上一篇文章中,我们讲了如何在将 offset 保存在 zk 中,以及进行重用,但是程序中有个小问题“如果程序停了很长很长一段后再启动,zk 中保存的 offset 已经过期了,那会怎样呢?”本文将解决这个问题
如果 kafka 上的 offset 已经过期,那么就会报 OffsetOutOfRange 的异常,因为之前保存在 zk 的 offset 已经 topic 中找不到了。所以我们需要在 从 zk 找到 offset 的这种情况下增加一个判断条件,如果 zk 中保存的 offset 小于当前 kafka topic 中最小的 offset,则设置为 kafka topic 中最小的 offset。假设我们上次保存在 zk 中的 offset 值为 123(某一个 partition),然后程序停了一周,现在 kafka topic 的最小 offset 变成了 200,那么用前文的代码,就会得到 OffsetOutOfRange 的异常,因为 123 对应的数据已经找不到了。下面我们给出,如何获取 <topic, parition> 的最小 offset,这样我们就可以进行对比了val partitionOffset = zkClient.readData(s"${topicDirs.consumerOffsetDir}/${i}")
val tp = TopicAndPartition(topic, i)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
val consumerMin = new SimpleConsumer("broker_host", 9092, 10000, 10000, "getMinOffset")//注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.length > 0 && nextOffset < curOffsets.head) {// 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的想想
但是上面的代码有一定的问题,因为我们从 kafka 上获取 offset 的时候,需要寻找对应的 leader,从 leader 来获取 offset,而不是 broker,不然可能得到的 curOffsets 会是空的(表示获取不到)。下面的代码就是获取不同 partition 的 leader 相关代码val topic_name = "topic_name" //topic_name 表示我们希望获取的 topic 名字
val topic2 = List(topic_name)
val req = new TopicMetadataRequest(topic2, 0)
val getLeaderConsumer = new SimpleConsumer("broker_host", 9092, 10000, 10000, "OffsetLookup")// 第一个参数是 kafka broker 的host,第二个是 port
val res = getLeaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap// 将结果转化为 partition -> leader 的映射关系
case None =>
Map()
}
上面的代码能够得到所有 partition 的 leader 地址,然后将 leader 地址替换掉上面第一份代码中的 broker_list 即可。到此,在 spark streaming 中将 kafka 的 offset 保存到 zk,并重用的大部分情况都覆盖到了
来源:推酷
作者:klion26
实在,多谢共享 非常 不错,多谢分享 这个方法的确可行,为啥我现在才知道,,,, map(msg => Utils.msgDecode(msg))楼主,这个Utils是哪个包里的 楼主麻烦能吧全部完整代码发个附件吗,拜托了。 ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)是不是写错了,应该是o.untilOffset吧
学习了,谢谢分享
页:
[1]