gdz911 发表于 2017-7-2 22:21:24

求助大神! kafaka无法消费问题

本帖最后由 gdz911 于 2017-7-2 22:25 编辑

自己学习 做了个 log4j-flume-kafka 发现linux(在虚拟机上)上用自带的可消费javaDemo里面的log4信息,通过kafka-clients 却无法消费
kafka-clients 版本0.11.0.0
flume配置如下


发送给flume代码
public class Demo {

      protected static final Logger logger = LoggerFactory.getLogger(Demo.class);

      public static void main(String[] args) throws Exception {

                while (true) {
                        logger.info(String.valueOf(new Date().getTime()) + "---" + "我是INFO");
                        Thread.sleep(3000);

                }

      }

}

kafka控制台显示


消费者代码
      @Test
      public void kafkaConsumerDemo() {
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.229.131:9092");
                props.put("enable.auto.commit", "true");
                props.put("auto.commit.interval.ms", "1000");
                props.put("session.timeout.ms", "30000");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList("test"));
                System.out.println("----------");
                for (ConsumerRecord<String, String> record : consumer.poll(1000)){
                //System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());               
                              System.out.printf("xxxx");                                    
                        }
               
      }

控制台信息


最后控制台显示了 消费的错误信息


gdz911 发表于 2017-7-2 22:53:00

已经解决了 kafka配置文件 加入host.name=ip即可 不然消费者指向localhost
页: [1]
查看完整版本: 求助大神! kafaka无法消费问题