我调用这个方法 从kafka取到重复数据,怎么保证不消费重复数据
private void putDataByKafka(Properties props, ClientHandler handler) {
// 获取kafka数据
System.out.println("从kafka获取数据");
int i = 1;
String topic = props.getProperty("topic.name");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(props.getProperty("topic.name"), 1); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
System.out.println(handler.toString()+":开始消费");
MessageAndMetadata<byte[], byte[]> messagemeta = iterator.next();
long offset = messagemeta.offset();
int partition=messagemeta.partition();
String message = new String(messagemeta.message());
// hostName+";"+ip+";"+commandName+";"+res+";"+System.currentTimeMillis();
// 这里指的注意,如果没有下面这个语句的执行很有可能回从头来读消息的
try {
handler.sendClientMsg(offset + ":" + message);
System.out.println(handler.toString()+":send data 第" + i + "条:" + "消息:" + message+ "分区:"+partition+ "偏移:"+offset);
consumer.commitOffsets();
i++;
} catch (IOException e) {
long now = System.currentTimeMillis();
Map<TopicAndPartition, OffsetAndMetadata> tpAndOffsetMetadata = new HashMap<TopicAndPartition, OffsetAndMetadata>();
tpAndOffsetMetadata.put(
new TopicAndPartition(messagemeta.topic(), messagemeta.partition()),
new OffsetAndMetadata(
new OffsetMetadata(messagemeta.offset(), UUID.randomUUID().toString()),
-1L,-1L)
);
System.out.println("异常send data 第" + i + "条:" + offset + ":" + message+ "分区:"+partition);
consumer.commitOffsets(tpAndOffsetMetadata, true);
consumer.shutdown();
e.printStackTrace();
return;
}
}
consumer.shutdown();
}
|
|