这个是consumer的部分
public void RecieveMessage() {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
//定义订阅topic数量
topicCount.put(topic, new Integer(1));
//返回的是所有topic的Map
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
//取出我们要需要的topic中的消息流
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
//System.out.println("ready");
for (final KafkaStream<byte[], byte[]> stream : streams) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext())
{
String msg=new String(consumerIte.next().message());
this.rcvAmnout+=msg.length();
System.out.println("c:"+msg+" "+msg.length());
}
}
if (consumer != null)
consumer.shutdown();
}
|