本帖最后由 gdz911 于 2017-7-2 22:25 编辑
自己学习 做了个 log4j-flume-kafka 发现linux(在虚拟机上)上用自带的可消费javaDemo里面的log4信息,通过kafka-clients 却无法消费
kafka-clients 版本0.11.0.0
flume配置如下
发送给flume代码
[mw_shl_code=java,true]public class Demo {
protected static final Logger logger = LoggerFactory.getLogger(Demo.class);
public static void main(String[] args) throws Exception {
while (true) {
logger.info(String.valueOf(new Date().getTime()) + "---" + "我是INFO");
Thread.sleep(3000);
}
}
}[/mw_shl_code]
kafka控制台显示
消费者代码
[mw_shl_code=java,true] @Test
public void kafkaConsumerDemo() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.229.131:9092");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
System.out.println("----------");
for (ConsumerRecord<String, String> record : consumer.poll(1000)){
//System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
System.out.printf("xxxx");
}
}[/mw_shl_code]
控制台信息
最后控制台显示了 消费的错误信息
|
-
|