zstu 发表于 2017-5-9 17:05:14

使用0.9版Kafka的KafkaConsumer来消费kafka数据,但消费不成功


我用下面的代码来消费kafka的数据,但一直停在ConsumerRecords<String, String> records = consumer.poll(100);这一行。没有消费数据,也不报错。用spark streaming来消费这个topoic的数据是可以消费的。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
      Properties props = new Properties();
      props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("topic1"));
      while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
      }
    }
}

einhep 发表于 2017-5-9 17:42:12

本帖最后由 einhep 于 2017-5-9 17:45 编辑

props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");改为 props.put("bootstrap.servers", "10.11.105.111:9092");
不需要所有的都指定
并且是有数据的话,就消费,没有数据的话就等100sms


zstu 发表于 2017-5-10 08:54:40

einhep 发表于 2017-5-9 17:42
props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");改为 props.put("bootstrap.s ...

bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。

nextuser 发表于 2017-5-10 09:47:56

zstu 发表于 2017-5-10 08:54
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。

生产者代码贴出来看下

NEOGX 发表于 2017-5-10 09:59:38

zstu 发表于 2017-5-10 08:54
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。

0.9不稳定,好像存在这么个现象,楼主试试1.0版本的

zstu 发表于 2017-5-10 10:02:28

nextuser 发表于 2017-5-10 09:47
生产者代码贴出来看下

生产数据是别的公司给生产的, 我这没有,但我用spark streaming就能消费。

zstu 发表于 2017-5-10 10:13:54

本帖最后由 zstu 于 2017-5-10 10:15 编辑

NEOGX 发表于 2017-5-10 09:59
0.9不稳定,好像存在这么个现象,楼主试试1.0版本的
集群部署不是0.10版的 ,我用了kafka-client-0.10的包后,报错了Error reading field 'brokers': Error reading field 'host': Error reading string of length 12601, only 210 bytes available 查了一下是程序的kafka版本跟集群的版本不一致

zxmit 发表于 2017-5-10 17:56:01

用kafka-console-consumer脚本能够消费么。还有楼主把日志给贴一下

zstu 发表于 2017-5-10 19:35:42

zxmit 发表于 2017-5-10 17:56
用kafka-console-consumer脚本能够消费么。还有楼主把日志给贴一下
kafka-console-consumer能消费 。
日志:
log4j:WARN No appenders could be found for logger(org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
>>>>>>>>>>>>> 开始消费数据


日志就一直挂在这不动。


我在consumer.poll(100)这一行上面加了一句System.out.println(">>>>>>>>>>>>> 开始消费数据");

einhep 发表于 2017-5-10 20:04:46

zstu 发表于 2017-5-10 19:35
kafka-console-consumer能消费 。
日志:
log4j:WARN No appenders could be found for logger(org.apa ...

在后面也加上一句,看执行到哪
页: [1] 2
查看完整版本: 使用0.9版Kafka的KafkaConsumer来消费kafka数据,但消费不成功