使用0.9版Kafka的KafkaConsumer来消费kafka数据,但消费不成功
我用下面的代码来消费kafka的数据,但一直停在ConsumerRecords<String, String> records = consumer.poll(100);这一行。没有消费数据,也不报错。用spark streaming来消费这个topoic的数据是可以消费的。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
本帖最后由 einhep 于 2017-5-9 17:45 编辑
props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");改为 props.put("bootstrap.servers", "10.11.105.111:9092");
不需要所有的都指定
并且是有数据的话,就消费,没有数据的话就等100sms
einhep 发表于 2017-5-9 17:42
props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");改为 props.put("bootstrap.s ...
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。
zstu 发表于 2017-5-10 08:54
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。
生产者代码贴出来看下 zstu 发表于 2017-5-10 08:54
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。
0.9不稳定,好像存在这么个现象,楼主试试1.0版本的
nextuser 发表于 2017-5-10 09:47
生产者代码贴出来看下
生产数据是别的公司给生产的, 我这没有,但我用spark streaming就能消费。
本帖最后由 zstu 于 2017-5-10 10:15 编辑
NEOGX 发表于 2017-5-10 09:59
0.9不稳定,好像存在这么个现象,楼主试试1.0版本的
集群部署不是0.10版的 ,我用了kafka-client-0.10的包后,报错了Error reading field 'brokers': Error reading field 'host': Error reading string of length 12601, only 210 bytes available 查了一下是程序的kafka版本跟集群的版本不一致 用kafka-console-consumer脚本能够消费么。还有楼主把日志给贴一下 zxmit 发表于 2017-5-10 17:56
用kafka-console-consumer脚本能够消费么。还有楼主把日志给贴一下
kafka-console-consumer能消费 。
日志:
log4j:WARN No appenders could be found for logger(org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
>>>>>>>>>>>>> 开始消费数据
日志就一直挂在这不动。
我在consumer.poll(100)这一行上面加了一句System.out.println(">>>>>>>>>>>>> 开始消费数据");
zstu 发表于 2017-5-10 19:35
kafka-console-consumer能消费 。
日志:
log4j:WARN No appenders could be found for logger(org.apa ...
在后面也加上一句,看执行到哪
页:
[1]
2