本帖最后由 feilong 于 2017-6-23 12:04 编辑
问题导读
1.什么情况下需要Rebalance Listeners?
2.ConsumerRebalanceListener通过哪两个方法实现?
3.如何编码实现Rebalance Listeners?
Rebalance Listeners
正如我们在前面提到的关于偏移量的一节中提到的,消费者希望在退出之前和分区再平衡之前做一些清理工作。
如果知道某个消费者将失去对某个分区的所有权,需要提交处理过的最后一个事件的偏移量。如果消费者维护了一个处理突发事件的缓冲区(例如,用当前记录地图来解释停止功能),这时,你会希望在失去的分区所有权前处理掉之前积累的事件。也许你还需要关闭文件句柄、数据库连接等。
消费者API允许你运行自己的代码,当分区被添加或从消费者删除。通过ConsumerRebalanceListener调用我们先前讨论的subscribe()方法。
ConsumerRebalanceListener有两种方法可以实现: [mw_shl_code=java,false]• public void onPartitionsRevoked(Collection<TopicPartition> partitions)[/mw_shl_code] 在负载均衡开始,消费者停止消费消息的时候被调用。 [mw_shl_code=java,false]• public void onPartitionsAssigned(Collection<TopicPartition> partitions)[/mw_shl_code] 在分区被重新分配给代理,消费者开始消费消息之前被调用。
下例会展示如何利用onPartitionsRevoked()在失去分区所有权前提交偏移量。下一节我们也会展示onPartitionsAssigned()相关示例。
[mw_shl_code=java,false]private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
private class HandleRebalance implements ConsumerRebalanceListener {//实现ConsumerRebalanceListener接口
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {//本例中我们在获取一个分区后什么都不会做,只开始消费消息
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);//当我们因负载均衡失去分区的时候需要提交偏移量。注意,要提交最新处理的而不是还在消费的批次的最新偏移量。这是因为一个分区在我们还处于批处理时可能被撤销。我们正在为所有分区提交偏移量,而不仅仅是我们将要失去的分区——因为偏移量是已经处理过的事件,这不会造成什么损害。最后一点,我们是调用syncCommit()方法来确保负载均衡之前偏移量已被提交。
}
}
try {
consumer.subscribe(topics, new HandleRebalance());//最重要的部分,传递ConsumerRebalanceListener给subscribe方法,然后会被消费者调用
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer
= %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
record.offset());
}
consumer.commitAsync(currentOffsets);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}
[/mw_shl_code]
|