使用ambari2.6.1.0-129版本搭建环境。ambari 2.6.1.0
java openjdk version "1.8.0_141"
spark 2.1.1
kafka 0.10.1.1
使用spark2的streaming与kafka进行集成,消费kafka数据。业务逻辑:消费每条json数据,并清洗、去重后保存mysql。offset使用mysql进行存储,代码如下:
kafkaStream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(iter => {
while (iter.hasNext) {
val consumerRecord = iter.next()
val line = consumerRecord.toString
if(line.contains("value = {")){
val json = line.substring(line.indexOf("value = {")+8, line.length-2)
process2(json)
}
else
LOGGER.error("houxm3: "+line)
}
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
LOGGER.info(s"houxm_offset:${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
KafkaOffsetUtil.updateOffsets(o.topic, o.partition, o.untilOffset, KAFKA_GROUP_ID)
}) 打印出来的log是(kafka有3个partition),第0、1个partition,Offset一直是0,不消费数据,第2个partition的游标正常,正常消费数据。spark-submit提交设置了参数 spark.streaming.kafka.maxRatePerPartition=1000,每次第2个partition的游标增量是3000了。打印游标offet的代码是参考官网的了,应该不会出错了呀。
在kafka命令行查询,0、1、2partition都是有数据的。
日志:
18/01/22 18:26:17 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 2 0 3000
18/01/22 18:28:01 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 2 3000 6000
18/01/22 18:24:42 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 0 0 0
18/01/22 18:26:29 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 0 0 0
18/01/22 18:47:55 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 1 0 0
18/01/22 18:49:30 INFO EtlQccCompany$: houxm_offset:topic_qcc_com2 1 0 0
。。。
spark提交命令:nohup spark-submit --class com.mindatagroup.etl.EtlQccCompany --master yarn-cluster --executor-memory 2g --num-executors 3 --conf spark.streaming.kafka.maxRatePerPartition=1000 /hxm/spark-etl-1.0-SNAPSHOT.jar /hxm/etl_qcc.properties > etl_qcc_com.log 2>&1 &
以上,offset消费的问题,有大神解惑吗?
谢谢!
|