分享

KafkaUtils.createDirectStream比较及详解

qcbb001 发表于 2016-12-26 20:40:45 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 20244
问题导读

1.KafkaUtils.createStream有什么优点和缺点?
2.KafkaUtils.createStream如何实现监控offset?
3.如何实现offset写入zookeeper?







官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比KafkaUtils.createStream要快上很多。但有利就有弊:无法进行offset的监控。

项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。

方法1
[mw_shl_code=scala,true]def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] {...}[/mw_shl_code]

这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数auto.offset.reset="smallest"后将会从Earliest offset开始读取。

显然这2种读取位置都不适合生产环境。

方法2
[mw_shl_code=scala,true]def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag,
    R: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {...}[/mw_shl_code]

这个方法可以在启动的时候可以设置offset,但参数设置起来复杂很多,首先是fromOffsets: Map[TopicAndPartition, Long]的设置,参考下方代码。

[mw_shl_code=scala,true]val topic2Partitions = ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic)
var fromOffsets: Map[TopicAndPartition, Long] = Map()

topic2Partitions.foreach(topic2Partitions => {
  val topic:String = topic2Partitions._1
  val partitions:Seq[Int] = topic2Partitions._2
  val topicDirs = new ZKGroupTopicDirs(Config.kafkaConfig.kafkaGroupId, topic)

  partitions.foreach(partition => {
    val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
    ZkUtils.makeSurePersistentPathExists(zkClient, zkPath)
    val untilOffset = zkClient.readData[String](zkPath)

    val tp = TopicAndPartition(topic, partition)
    val offset = try {
      if (untilOffset == null || untilOffset.trim == "")
        getMaxOffset(tp)
      else
        untilOffset.toLong
    } catch {
      case e: Exception => getMaxOffset(tp)
    }
    fromOffsets += (tp -> offset)
    logger.info(s"Offset init: set offset of $topic/$partition as $offset")

  })
})[/mw_shl_code]

其中getMaxOffset方法是用来获取最大的offset。当第一次启动spark任务或者zookeeper上的数据被删除或设置出错时,将选取最大的offset开始消费。代码如下:

[mw_shl_code=scala,true]private def getMaxOffset(tp:TopicAndPartition):Long = {
  val request = OffsetRequest(immutable.Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))

  ZkUtils.getLeaderForPartition(zkClient, tp.topic, tp.partition) match {
    case Some(brokerId) => {
      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
        case Some(brokerInfoString) => {
          Json.parseFull(brokerInfoString) match {
            case Some(m) =>
              val brokerInfo = m.asInstanceOf[Map[String, Any]]
              val host = brokerInfo.get("host").get.asInstanceOf[String]
              val port = brokerInfo.get("port").get.asInstanceOf[Int]
              new SimpleConsumer(host, port, 10000, 100000, "getMaxOffset")
                .getOffsetsBefore(request)
                .partitionErrorAndOffsets(tp)
                .offsets
                .head
            case None =>
              throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
          }
        }
        case None =>
          throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
      }
    }
    case None =>
      throw new Exception("No broker for partition %s - %s".format(tp.topic, tp.partition))
  }
}[/mw_shl_code]
然后是参数messageHandler的设置,为了后续处理中能获取到topic,这里形成(topic, message)的tuple:
[mw_shl_code=scala,true]val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
[/mw_shl_code]
接着将从获取rdd的offset并写入到zookeeper中:
[mw_shl_code=scala,true]var offsetRanges = Array[OffsetRange]()
messages.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD(rdd => {
  rdd.foreachPartition(HBasePuter.batchSave)
  offsetRanges.foreach(o => {
    val topicDirs = new ZKGroupTopicDirs(Config.kafkaConfig.kafkaGroupId, o.topic)
    val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
    ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
    logger.info(s"Offset update: set offset of ${o.topic}/${o.partition} as ${o.untilOffset.toString}")
  })
})[/mw_shl_code]

最后附上batchSave的示例:
[mw_shl_code=bash,true]

def batchSave(iter:Iterator[(String,String)]):Unit = {
  iter.foreach(item => {
    val topic = item._1
    val message = item._2
    ...
  })
}[/mw_shl_code]

出处 程序员的自我修养 – SelfUp.cn






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条