分享

spark streaming读取kafka数据游标offset错误

aurae 发表于 2018-1-22 18:55:23 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 11045
使用ambari2.6.1.0-129版本搭建环境。ambari 2.6.1.0
java openjdk version "1.8.0_141"
spark 2.1.1
kafka 0.10.1.1

使用spark2的streaming与kafka进行集成,消费kafka数据。业务逻辑:消费每条json数据,并清洗、去重后保存mysql。offset使用mysql进行存储,代码如下:


kafkaStream.foreachRDD(rdd => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(iter => {
        while (iter.hasNext) {
          val consumerRecord = iter.next()
          val line = consumerRecord.toString
          if(line.contains("value = {")){
            val json = line.substring(line.indexOf("value = {")+8, line.length-2)
            process2(json)
          }
          else
            LOGGER.error("houxm3: "+line)
        }
        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        LOGGER.info(s"houxm_offset:${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        KafkaOffsetUtil.updateOffsets(o.topic, o.partition, o.untilOffset, KAFKA_GROUP_ID)
      })
打印出来的log是(kafka有3个partition),第0、1个partition,Offset一直是0,不消费数据,第2个partition的游标正常,正常消费数据。spark-submit提交设置了参数 spark.streaming.kafka.maxRatePerPartition=1000,每次第2个partition的游标增量是3000了。打印游标offet的代码是参考官网的了,应该不会出错了呀。
在kafka命令行查询,0、1、2partition都是有数据的。
日志:
18/01/22 18:26:17 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 2 0 3000
18/01/22 18:28:01 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 2 3000 6000
18/01/22 18:24:42 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 0 0 0
18/01/22 18:26:29 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 0 0 0
18/01/22 18:47:55 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 1 0 0
18/01/22 18:49:30 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 1 0 0
。。。

spark提交命令:nohup spark-submit --class com.mindatagroup.etl.EtlQccCompany --master yarn-cluster --executor-memory 2g --num-executors 3 --conf spark.streaming.kafka.maxRatePerPartition=1000 /hxm/spark-etl-1.0-SNAPSHOT.jar /hxm/etl_qcc.properties > etl_qcc_com.log 2>&1 &

以上,offset消费的问题,有大神解惑吗?
谢谢!

已有(2)人评论

跳转到指定楼层
langke93 发表于 2018-1-22 21:15:28
看下消费是否为空,如果为空,说明这是正常的,可以尝试调整下配置。
如果不为空,那就是代码的问题了。
看看自己的消费代码里面,存不存在处理消息的时候出异常的情况
如果有,需要try-catch一下

还有查看消费端的配置
消费端有一个配置,叫 fetch.message.max.bytes,默认是1M,此时如果有消息大于1M,会发生停止消费的情况。
此时,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可

回复

使用道具 举报

aurae 发表于 2018-1-24 15:37:56
测试了,读出来就是空的,但实际kafka里不是空的,是有数据的。
按照官网最简单的代码读了一下kafka,只打印topic、groupid、partition、offset,发现还是没数据,只有其中一个partition是有数据的。
但是我直接写java代码读取kafka,每个partition都是有数据的。
感觉是环境问题了,难道ambari集成的这个有bug?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条