分享

SparkStreaming 消费kafka数据异常

在尝试用sparkstreaming消费kafka topic数据时,在生产环境上编译发现程序卡住不执行。在虚拟机环境一切正常。代码如下:
package kafka



import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.ACL

import scala.collection.mutable.ListBuffer
import org.apache.zookeeper.ZooDefs
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._

object consumer_test {

  private val logger=LoggerFactory.getLogger(consumer_test.getClass)

  def readOffsets(topics: Seq[String], group: String, zkUtils: ZkUtils): Map[TopicPartition, Long] = {

    val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)

    // /consumers/<groupId>/offsets/<topic>/
    partitionMap.foreach(topicPartitions => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(group, topicPartitions._1)
      topicPartitions._2.foreach(partition => {
        val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition

        try {
          val offsetStatTuple = zkUtils.readData(offsetPath)
          if (offsetStatTuple != null) {
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
              offsetStatTuple._1.toLong)
          }
        } catch {
          case e: Exception => topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
            0L)
        }

      })
    })
    //println(topicPartOffsetMap.toMap.foreach(i=>println(i._2)))
    topicPartOffsetMap.toMap
  }


  def persistOffsets(offsets: Seq[OffsetRange], group: String, storeEndOffset: Boolean, zkUtils: ZkUtils) = {

    offsets.foreach(or => {
      val zKGroupTopicDirs = new ZKGroupTopicDirs(group, or.topic)

      val acls = new ListBuffer[ACL]()
      val acl = new ACL()
      acl.setId(ZooDefs.Ids.ANYONE_ID_UNSAFE)
      acl.setPerms(ZooDefs.Perms.ALL)
      acls += acl


      val offsetPath = zKGroupTopicDirs.consumerOffsetDir + "/" + or.partition
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset

      zkUtils.updatePersistentPath(zKGroupTopicDirs.consumerOffsetDir + "/" + or.partition,
        offsetVal + "", acls.toList)

    })
  }


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("example").setMaster("local")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val sc = ssc.sparkContext
    sc.setLogLevel("WARN")
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.11.23:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "console-consumer-71817",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("cloudalarm2012")

    val zkUrl = "192.168.11.23:2181,192.168.11.24:2181,192.168.11.26:2181"
    val ssessionTimeOut = 9999
    val connectionTimeOut = 9999

    val zkClientAndConnection = ZkUtils.createZkClientAndConnection(
      zkUrl,
      ssessionTimeOut,
      connectionTimeOut
    )
    val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)

    val inputDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      //ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, readOffsets(topics, kafkaParams.apply("group.id").toString, zkUtils))
    )



    inputDStream.foreachRDD((rdd, bacthTime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach(offset => {
        logger.warn(s"topic: ${offset.topic}---parttition: ${offset.partition}---fromOffset: ${offset.fromOffset}---untilOffset: ${offset.untilOffset}")

        val count = rdd.map(message => message.value()).count()

        logger.warn(s"count: $count")
        rdd.coalesce(1).foreach(println)

        persistOffsets(offsetRanges.toSeq, kafkaParams.apply("group.id").toString, true, zkUtils)
      })
    })


    ssc.start()
    ssc.awaitTermination()


  }

}



生产环境运行状态如图,会一直卡在那里无法执行下去

1.png




已有(1)人评论

跳转到指定楼层
yaojiank 发表于 2018-11-8 19:20:32
别只看这里,看日志,这里不动,可能后台正在运转,只是你看不到而已。
所以使用tail -f监控下
spark、kafka的日志。这样才能找到问题。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条