本帖最后由 feilong 于 2017-6-30 08:16 编辑
问题导读
1.什么是一次提交(commit)? 2.什么是偏移量(offset)? 3.消费者如何提交偏移量? 4.KafkaConsumer API 提供的提交偏移量的方法有哪些? 5.commitSync()方法如何使用?
当我们调用poll()时,它会返回我们消费组中的消费者还未消费的写入kafka的记录。这意味着,我们有一种方法可以跟踪用户组读取的记录。正如我们之前所讨论过的,kafka的一个独特之处在于它不像许多JMS队列那样跟踪消费者的确认。相反,它允许消费者使用kafka跟踪它们在每个分区中的位置(偏移量)。
我们称在分区中更新当前位置的动作为提交(commit)。
消费者如何提交偏移量?它产生一个消息给kafka,同时会有一个特别的主题__consumer_offsets,用来记录每个分区已提交的偏移量。无论消费者启动,运行,离开,对它都不会产生什么影响。然而,如果消费者宕掉或新的消费者加入消费组,这将触发负载均衡。负载均衡后,每个消费者都可能被分配一组新的分区,而不是以前处理的分区。为了是系统正常运转,消费者会从每个分区最新提交的偏移量处继续读取、处理。
如果所提交的偏移量小于客户端处理的最后一个消息的偏移量,则最后处理的偏移量和提交的偏移量之间的消息将被处理两次。 如果所提交的偏移量大于客户端实际处理的最后一个消息的偏移量,则最后处理的偏移量和所提交的偏移量之间的所有消息将被消费组忽略。 清楚地管理偏移量对客户端应用程序有很大的影响。
KafkaConsumer API提供了多种提交偏移量的方法。
Automatic Commit 自动提交
提交偏移量好的方法就是让消费者为你工作。如果配置了enable.auto.commit = true,消费者将每5秒提交客户端从poll()方法返回的最大偏移量。5秒的间隔是默认的,可通过配置auto.commit.interval.ms来实现控制。与消费者中的其他内容一样,自动提交是由轮询循环驱动的。当轮询时,消费者检查是否提交,如果是的话,它将提交上次轮询中返回的偏移量。
然而,在使用这个方便的选项之前,了解其后果是很重要的。
考虑到自动提交每5秒发生一次,假设最近一次提交3秒后负载均衡触发,负载均衡后所有的消费者都将从最后的偏移量进行提交。在这种情况下,偏移量的生命周期是3秒,所有在这3秒内到达的事件将被处理两次。可以将提交间隔配置为更频繁地提交,并减少将重复记录的窗口,但不可能完全消除它们。 注意,启用自动提交后,对轮询的调用将始终执行上次轮询返回的最后偏移量。它不知道哪些事件实际上已被处理,所以在再次调用轮询之前,始终处理轮询返回的所有事件是至关重要的(或在调用close()之前,它也将自动提交偏移)。这通常不是问题,但在处理异常或提前退出轮询循环时要注意。 自动提交很方便,但它们没有给开发人员足够的控制权,避免重复消息。
Commit Current Offset 提交当前偏移量
大多数开发人员使用更多的的是控制偏移量提交的时间。既要消除丢失消息的可能性,又要减少负载均衡过程中消息重复的数量。消费者API可以选择在对应用程序开发人员来说有意义的点上提交当前偏移量,而不是基于定时器。
通过设置auto.commit.offset = false,只有当应用程序明确选择这样做时,才会提交偏移量。最简单和最可靠的提交API是commitsync()。这个API将提交方法poll()返回的最新偏移量,并且一旦偏移量被提交就会被返回,如果因一些原因提交失败则会抛出异常。
重要的是要记住,commitSync()将提交poll()返回的最新偏移量,所以调用commitSync(),之前要确保已经集合中所有的记录已处理完成,否则你将面临上述丢数据的风险。注意,当负载平衡被触发,从最新批次开始直到负载平衡时间内的所有消息将被处理两次。
这是一个 当我们处理完最新批次的消息后如何调用commitSync提交偏移量的示例。 [mw_shl_code=java,false]while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer =
%s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());//
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}[/mw_shl_code]
(未完待续...)
|