分享

kafka权威指南 第四章 第6节:负载均衡监听器

feilong 2017-6-23 08:38:45 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 12729
本帖最后由 feilong 于 2017-6-23 12:04 编辑
问题导读

1.什么情况下需要Rebalance Listeners?
2.ConsumerRebalanceListener通过哪两个方法实现?
3.如何编码实现Rebalance Listeners?





Rebalance Listeners


正如我们在前面提到的关于偏移量的一节中提到的,消费者希望在退出之前和分区再平衡之前做一些清理工作。

如果知道某个消费者将失去对某个分区的所有权,需要提交处理过的最后一个事件的偏移量。如果消费者维护了一个处理突发事件的缓冲区(例如,用当前记录地图来解释停止功能),这时,你会希望在失去的分区所有权前处理掉之前积累的事件。也许你还需要关闭文件句柄、数据库连接等。

消费者API允许你运行自己的代码,当分区被添加或从消费者删除。通过ConsumerRebalanceListener调用我们先前讨论的subscribe()方法。

ConsumerRebalanceListener有两种方法可以实现:
[mw_shl_code=java,false]&#8226; public void onPartitionsRevoked(Collection<TopicPartition> partitions)[/mw_shl_code]
在负载均衡开始,消费者停止消费消息的时候被调用。
[mw_shl_code=java,false]&#8226; public void onPartitionsAssigned(Collection<TopicPartition> partitions)[/mw_shl_code]
在分区被重新分配给代理,消费者开始消费消息之前被调用。

下例会展示如何利用onPartitionsRevoked()在失去分区所有权前提交偏移量。下一节我们也会展示onPartitionsAssigned()相关示例。

[mw_shl_code=java,false]private Map<TopicPartition, OffsetAndMetadata> currentOffsets;

private class HandleRebalance implements ConsumerRebalanceListener {//实现ConsumerRebalanceListener接口

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {//本例中我们在获取一个分区后什么都不会做,只开始消费消息
}

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);//当我们因负载均衡失去分区的时候需要提交偏移量。注意,要提交最新处理的而不是还在消费的批次的最新偏移量。这是因为一个分区在我们还处于批处理时可能被撤销。我们正在为所有分区提交偏移量,而不仅仅是我们将要失去的分区——因为偏移量是已经处理过的事件,这不会造成什么损害。最后一点,我们是调用syncCommit()方法来确保负载均衡之前偏移量已被提交。
}
}
try {
consumer.subscribe(topics, new HandleRebalance());//最重要的部分,传递ConsumerRebalanceListener给subscribe方法,然后会被消费者调用
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer
= %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
record.offset());
}
consumer.commitAsync(currentOffsets);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}
[/mw_shl_code]




本帖被以下淘专辑推荐:

已有(3)人评论

跳转到指定楼层
Ytell_K1284 发表于 2017-6-23 16:55:50
66666666666666666
回复

使用道具 举报

Ytell_K1284 发表于 2017-6-23 16:56:34
6666666666666
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条