问题导读
1.什么是异步提交? 2.人工提交的缺点是什么?异步提交的缺点是什么? 3.怎样合并同步和异步提交? 4.怎样提交指定的偏移量?
接上篇 kafka权威指南 第四章 第5节:提交和偏移量 http://www.aboutyun.com/forum.php?mod=viewthread&tid=22089
Asynchronous Commit 异步提交
人工提交的一个缺点是应用程序会被阻塞,直到代理响应提交请求为止。这将限制应用程序的吞吐量。通过不频繁地提交,可以提高吞吐量,但是这样我们就会增加负载均衡所产生的潜在重复的数量。
另一个选项是异步提交API。我们只需发送请求并继续进行,而不是等待代理对提交做出响应。 [mw_shl_code=java,false]while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer =
%s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();//提交最后一个偏移量并继续
}[/mw_shl_code]
异步提交的缺点是commitSync()方法会重试直到提交成功或者遇到无法重试的错误。它不重试的原因是commitasync()从服务器接收到的响应,有可能是后一个已经成功提交的请求。假设我们发送了一个请求来提交偏移量2000,但因存在暂时的通信问题,所以代理永远不会得到请求,也永远不会响应。同时,我们处理了另一批并成功地提交了偏移量3000。如果commitasync()现在重试以前失败的提交,偏移量3000已经被处理和提交后 偏移量2000也可能被成功提交。在负载均衡的情况下,这将导致更多的重复。
我们重申提交顺序的复杂性和重要性,是因为在代理响应后会触发commitAsync()方法调用一个回调方法。使用回调记录提交错误或用它们来统计计算是很常见的,但是如果你想用回调来实现重试,需要了解提交顺序的问题。
[mw_shl_code=java,false]while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d, customer =
%s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});//发送提交并继续,失败则记录偏移量
}[/mw_shl_code]
获得正确顺序提交的异步重试的一种简单模式是使用单调递增的序列号。在每次提交或者调用回调方法asyncCommit()时增加序列号。当你准备发送一个重试时,检查回调得到的提交序列号是否等于实例变量,如果是的话,没有新的提交,重试是安全的。如果实例序列号较高,不要重试,因为新的提交已经发送。
Combining Synchronous and Asynchronous commits 合并同步和异步提交
正常情况下,提交偶尔的失败且没有重试并不是什么大问题,因为如果问题是暂时的,之后的提交是都会成功的。但是,如果我们知道这是我们关闭消费者之前的或者负载均衡之前的最后一个提交,我们要特别确保提交成功。
因此,一个常见的模式是在关闭之前结合commitasync与commitsync。这是它的工作原理(我们将讨论如何在负载均衡之前进行提交,当我们进入关于负载均衡监听者的章节):
[mw_shl_code=java,false]try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d, customer
= %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();//一切正常时,我们调用commitAsync(),如果一个提交失败,下一个提交将作为重试。
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();//但是如果我们关闭,就没有“下一个提交”。我们调用commitsync,因为它将重试直到成功或遭受不可恢复的失败。
} finally {
consumer.close();
}
}[/mw_shl_code]
Commit Specified Offset 提交指定的偏移量
提交最新的偏移量只允许在一个批次处理完成后提交。然而,如果想要提交的更频繁些,或者poll()方法返回大量批次却要在批次中间提交以避免负载均衡发生后引起的所有数据行重复处理,这时该怎么做呢?不能只调用方法commitSync() 或 commitAsync() -- 这会提交最后返回还未处理的偏移量。
很幸运,消费者API允许我们调用方法commitSync()或 commitAsync()并且传递要提交的分区和偏移量的map作为参数。假如你在正在处理的记录批次中,从主题“customers”获取到分区3的上一个消息的偏移量5000,你可以调用commitSync()提交此偏移量。由于你的消费者可能会消耗不止一个分区,所以你需要跟踪所有这些分区的偏移量,因此在控制偏移量时移动到这个精度级别会增加代码的复杂度。
下面是特定偏移量的提交的示例: [mw_shl_code=java,false]private Map<TopicPartition, OffsetAndMetadata> currentOffsets;//我们用来跟踪偏移量的map
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer =
%s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());//println代替日志记录
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
record.offset());//在读取每个记录之后,我们更新偏移映射,并与上次所看到的偏移量相比较。
if (count % 1000 == 0)//这里,我们决定每1000条记录提交当前偏移量。在应用程序中,可以根据时间或记录内容进行提交。
consumer.commitAsync(currentOffsets);//我选择调用commitAsync(),但commitSync ()在这里也是完全有效的。当然,当提交特定的偏移量时,仍然需要执行我们在前面章节中看到的所有错误处理。
count++;
}
}[/mw_shl_code]
|