楼主您好,flume1.5,kafka 0.10.0,flume向kafka发送的消息,用kafkaconsumer,接收不到内容。
flume的配置文件:
#config sources
agent.sources.s1.type = netcat
agent.sources.s1.bind = 192.168.0.101
agent.sources.s1.port = 9093
agent.sources.s1.channels=c1
#config channels
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#config sinks
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.channel=c1
agent.sinks.k1.brokerList=192.168.0.101:9092
agent.sinks.k1.topic=test
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.producer.type=sync
agent.sinks.k1.custom.encoding=UTF-8
agent.sinks.k1.custom.topic.name=test
向flume监控端口发送消息
URL url = new URL("http://192.168.0.101:9093");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setRequestMethod("POST");
connection.setUseCaches(false);
connection.setInstanceFollowRedirects(true);
connection.setRequestProperty("Content-Type","application/x-www-form-urlencoded");
connection.connect();
DataOutputStream out = new DataOutputStream(
connection.getOutputStream());
String testStr = "hello";
out.writeBytes(testStr);
out.flush();
out.close();
接收部分的代码:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.101:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumerNew = new KafkaConsumer<>(props);
consumerNew.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumerNew.poll(1);
for (ConsumerRecord<String, String> record : records){
System.out.println("value="+record.value());
}
}
kafkaconsumer接收到的消息是:
value=POST / HTTP/1.1
value=Content-Type: application/x-www-form-urlencoded
value=Cache-Control: no-cache
value=Pragma: no-cache
value=User-Agent: Java/1.8.0_112
value=Host: 192.168.0.101:9093
value=Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
value=Connection: keep-alive
value=Content-Length: 5
value=
请教一下,该如何写,才能读取到flume发送到kafka的消息 |