分享

使用0.9版Kafka的KafkaConsumer来消费kafka数据,但消费不成功

zstu 发表于 2017-5-9 17:05:14 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 19 47670
zstu 发表于 2017-5-11 08:46:03
einhep 发表于 2017-5-10 20:04
在后面也加上一句,看执行到哪

在consumer.poll(100)的后面加了System.out.println(">>>>>>>>>>>")。但这一句没有打印出来。
回复

使用道具 举报

einhep 发表于 2017-5-11 09:24:54
zstu 发表于 2017-5-11 08:46
在consumer.poll(100)的后面加了System.out.println(">>>>>>>>>>>")。但这一句没有打印出来。

这里面有两个问题需要排除1.topic是否正确,订阅方式是什么?

consumer.subscribe(Arrays.asList("topic1"));
2.是否没有检测到数据,尝试把这个注释掉,在看看
ConsumerRecords<String, String> records = consumer.poll(100);
[mw_shl_code=bash,true]  while (true) {
            //ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        }[/mw_shl_code]


回复

使用道具 举报

zstu 发表于 2017-5-11 10:26:43
本帖最后由 zstu 于 2017-5-11 10:28 编辑
einhep 发表于 2017-5-11 09:24
这里面有两个问题需要排除1.topic是否正确,订阅方式是什么?

consumer.subscribe(Arrays.asList("to ...

1、topic没有错;2、注释掉那一行后for循环里面的records怎么定义
while (true) {            //ConsumerRecords<String, String> records = consumer.poll(100);         
  for (ConsumerRecord<String, String> record : records)  
           System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        }

回复

使用道具 举报

easthome001 发表于 2017-5-11 13:36:52
zstu 发表于 2017-5-11 10:26
1、topic没有错;2、注释掉那一行后for循环里面的records怎么定义
while (true) {            //Consume ...

  ConsumerRecords<String, String> records = consumer.poll(100);
这应该没有问题,如果想调整,设置为0即可
回复

使用道具 举报

zstu 发表于 2017-5-12 08:43:45
easthome001 发表于 2017-5-11 13:36
ConsumerRecords records = consumer.poll(100);
这应该没有问题,如果想调整,设置为0即可

设置成0也消费不了, 我换成0.8的API就能消费了,不知道为啥0.9的就不行。
回复

使用道具 举报

zxmit 发表于 2017-5-13 11:11:39
恩。。。你的日志打印的不正常。缺少log4j文件。

既然kafka-console-consumer,不如看看它源码是咋写的。
回复

使用道具 举报

zstu 发表于 2017-5-15 16:43:34
zxmit 发表于 2017-5-13 11:11
恩。。。你的日志打印的不正常。缺少log4j文件。

既然kafka-console-consumer,不如看看它源码是咋写的 ...

kafka-console-consumer可以消费的,是java的API不能消费。那个log4j的日志应该不相关。
回复

使用道具 举报

zstu 发表于 2017-5-31 13:51:31
忘了在这结题了。问题已经解决了。在zookeeper中把kafka的元数据全部给删掉,然后重新启动kafka就行。
回复

使用道具 举报

zrf870920398 发表于 2017-8-18 12:16:06
求问楼主 我也遇到了这个问题 然后按照你的结题方法 确实正常启动了消费者 但是在我把消费者kill掉之后第二次启动 又发生了这个问题 请问楼主有没有遇到过?
回复

使用道具 举报

zstu 发表于 2017-8-18 16:23:35
zrf870920398 发表于 2017-8-18 12:16
求问楼主 我也遇到了这个问题 然后按照你的结题方法 确实正常启动了消费者 但是在我把消费者kill掉之后第二 ...

没有 我启动几次都是正常的。
回复

使用道具 举报

12
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条