[mw_shl_code=java,true] List<String> values = new ArrayList<String>();
/* 向 values 中添加数据 */
LinkedList<Future<RecordMetadata>> kafkaFuture = new LinkedList<Future<RecordMetadata>>();
for(int i = 0; i < values.size(); i++) {
ProducerRecord record =
new ProducerRecord<String, String>("test_5", Integer.toString(i), values.get(i));
kafkaFuture.add(producer.send(record));
}
producer.flush();
for(int i=0; i<kafkaFuture.size(); i++) {
kafkaFuture.get(i).get();
values.remove(i);
}[/mw_shl_code]
在上面代码中,如果写入kafka异常,values中保存未写入的数据
|
|