学习了,感谢分享 |
本帖最后由 desehawk 于 2018-4-18 15:36 编辑 这里在补充: 具备新的里程碑意义的功能的Kafka 0.11.x版本(对应 Confluent Platform 3.3)已经release,该版本引入了exactly-once语义,本文阐述的内容包括:
消息语义概述 在分布式系统中,构成系统的任何节点都是被定义为可以彼此独立失败的。比如在 Kafka中,broker可能会crash,在producer推送数据至topic的过程中也可能会遇到网络问题。根据producer处理此类故障所采取的提交策略类型,我们可以获得不同的语义:
必须处理的常见灾难场景 为了清楚描述实现 exactly-once delivery语义的挑战,我们来看一个简单的例子。
Apache Kafka的exactly-once语义 在0.11.x版本之前,Apache Kafka支持at-least-once delivery语义以及partition内部的顺序delivery,如前所述这在某些场景下可能会导致数据重复消费。而Kafka 0.11.x支持exactly-once语义,不会导致该情况发生,其中主要包括三个内部逻辑的改造: 幂等:partition内部的exactly-once顺序语义 幂等操作,是指可以执行多次,而不会产生与仅执行一次不同结果的操作,Producer的send操作现在是幂等的。在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次。要打开此功能,并让所有partition获得exactly-once delivery、无数据丢失和in-order语义,需要修改broker的配置:enable.idempotence = true。 事务:跨partition的原子性写操作 第二点,Kafka现在支持使用新事务API原子性的对跨partition进行写操作,该API允许producer发送批量消息到多个partition。该功能同样支持在同一个事务中提交消费者offsets,因此真正意义上实现了end-to-end的exactly-once delivery语义。以下是一段示例代码: [mw_shl_code=bash,true]producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { producer.close(); } catch(KafkaException e) { producer.abortTransaction(); }[/mw_shl_code] 该代码片段描述了如何使用新的producer事务API原子性的发送消息至多个partition。值得注意的是,某个Kafka topic partition内部的消息可能是事务完整提交后的消息,也可能是事务执行过程中的部分消息。
我们必须通过配置consumer端的配置isolation.level,来正确使用事务API,通过使用 new Producer API并且对一些unique ID设置transaction.id(该配置属于producer端),该unique ID用于提供事务状态的连续性。 Exactly-once 流处理 基于幂等和原子性,通过Streams API实现exactly-once流处理成为可能。如果要在流应用中实现相关语义,只需要配置 processing.guarantee=exactly_once,这会影响所有的流处理环境中的语义,包括将处理作业和由加工作业创建的所有物理状态同时写回到Kafka的操作。 这就是为什么Kafka Streams API提供的exactly-once保证是迄今为止任何流处理系统中的最强实现的原因。 它为以Kafka作为数据源的流处理应用程序提供端对端的exactly-once保证,Streams应用程序将任何Kafka的物化状态在最终环节写回到Kafka。 仅依靠外部数据系统实现物化状态的流处理系统仅支持对exactly-once的较弱保证。 即使他们使用Kafka作为流处理来源,在需要从故障中恢复的情况下,也只能rollback他们的Kafka消费者offset以重新消费并处理消息,而不能回滚关联状态,当更新不是幂等的时候会导致结果不正确。 我来解释下这段话的细节。 流处理系统的关键问题是我的流处理应用程序是否获得正确的答案,即使其中一个实例在处理过程中崩溃,恢复失败实例时的关键是把状态恢复到与崩溃前相同。 流处理可以看成是一个关于Kafka topic的读写操作集合, 消费者从Kafka topic读取消息,其他一些处理逻辑转换消息或修改cpu维护的状态,同时生产者将消息写入另一个Kafka topic。 Exactly-once流处理就是保证读写数据有且只有一次的一种能力。,在这种情况下,获得正确结果意味着不丢失任何输入消息或产生任何重复的输出,而这就是用户所期望的。 除了我们迄今为止讨论的简单灾难场景之外,还有许多其他故障情况需要考虑:
特别是当与非确定性操作和应用程序计算的持久状态的更改相结合时,如果实例失败或者重新启动,可能导致数据重复甚至是计算结果错误。 "流处理保证确定性操作exactly-once的正确方法是:保证读取写入操作的输出在任何非灾难场景下一致。" 针对非确定性操作的exactly-one流处理 Exactly-once流处理对确定性操作是有意义的,但是当处理逻辑本身存在不确定的逻辑时呢?假设有这样一个场景,流处理器用于计算满足条件的流入的事件数量,条件由外部服务动态决定。从根本上来说这种操作本质上是非决定性的,因为外部服务指定的条件是不确定的,这可能会导致下游数据流得到不同的结果。那么,对这样的非确定性操作来说,正确的策略又是什么呢? "对于非确定性操作来说,正确的处理方式是确保读取写入流处理操作的输出属于预期输出的子集,该集合应该可以由非确定性输入得到的预期值组合得到。" 因此,对于我们的示例流处理器,假设当前计数为31,输入事件值为2,故障时正确输出只能是31或者33其中一个:如果输入事件被外部条件指定需要丢弃那么就是31 ,反之则为33。 Kafka的exactly-once保证真的起作用了吗?为了回答这个关于Kafka exactly-once保证的问题,让我们来看看正确性(也就是我们如何设计,构建和测试这个功能)和性能。 精妙的设计和review过程 正确性和性能都从坚实的设计开始。 大约三年前,我们开始在LinkedIn上进行设计和原型开发工作。 我们在Confluent上寻求一个优雅的方式来将幂等和事务的功能性要求融合成一个整体的封装。 我们写了一个60+页的设计文档,概述了设计的各个方面:从高级消息流到每个数据结构和RPC的细节实现细节。 经过9个月的广泛公众监督,设计也从社区的不断反馈中大大得到改善。 例如,基于开源讨论,我们用更智能的服务器端过滤替代消费者端缓存以进行事务读取,从而避免了潜在的性能开销。 同时,我们也改进了事务与compacted topic,并增加了相应的安全机制。 最终我们机智地得到了一个极简设计,在很大程度上也依赖于强大的Kafka原型:
这种足够简单、专注于细节的设计,实施效果非常好。 迭代的开发过程 我们在开发该功能时,会确保每一个pull request经过广泛的审查。这意味着在几个月的时间内一些pull request经历过几十次迭代,审查过程中发现了之前设计上没有考虑到的无数边界问题。 我们编写了超过15,000个测试用例,包括分布式测试,运行时的故障测试。该流程揭示了各个方面的问题,从测试工具中的基本编码错误到深奥的NTP同步问题。其中的一个子集是分布式混沌测试,我们为多个事务客户端提供了一个完整的Kafka集群,通过事务产生消息,同时读取这些消息,并在过程中强行终止客户端或服务器,以确保数据既不丢失也不重复。 因此经过良好测试,高质量代码库的简单而坚固的设计构成了我们解决方案的基石。 好消息:Kafka 还是非常快! 在设计此功能时,一个重点是性能的保证:由于exactly-once设计带来的性能开销,我们淘汰了许多更简单的设计选型。经过多番思考,我们采用的设计尽可能地使每个事务的开销最小(每个分区约1次写入,尽可能少的写入记录至中心事务日志)。对于耗时100ms的1KB消息和事务写入,与配置为at-least-once并且保序交付(acks = all,max.in.flight.requests.per.connection = 1)的生产者的吞吐量相比吞吐量仅下降3%;与at-most-once并且无排序保证(acks = 1,max.in.flight.requests.per.connection = 5)的生产者的吞吐量相比下降20%。 具体的测试benchmark可以看这里。 除了确保新功能的低性能开销之外,我们也不希望在没有使用exactly-once功能的应用程序中看到性能有意外损耗。为了确保这一点,我们不仅在Kafka消息头中添加了一些新的字段来实现exactly-once功能,而且还重新设计了Kafka消息格式,在网络传输和磁盘存储时,更有效地压缩消息。特别是,我们将一大堆常见的元数据转移到批量头文件中,并将可变长度编码引入批次中的每个记录。通过这种批量优化,整体信息的size显著减小。例如,一批7条记录、每条10个字节的批量消息,使用新的格式将减少35%的体量,这使得生产者吞吐量提高了20%,处理小消息时提高了50%的消费者吞吐量。任何Kafka 0.11用户都可以使用此性能提升,即使没有使用任何exactly-once功能。 我们还着眼于优化Streams API中的exactly-once流处理的开销。 以100ms作为提交间隔的情况下(保证端到端延迟较低的一个值),我们看到吞吐量下降了15%至30%(损耗百分比取决于消息大小,前者为1KB的消息大小,后者为100字节)。 但是,对于>=1KB的消息,30秒的提交间隔是没有任何吞吐性能损耗的。 在下一个版本中,我们计划引入推测性执行机制:即使我们使用较大的提交间隔,我们也可以保持端到端的延迟较低,最终我们期望将事务的开销降至零。 总而言之,通过从根本上重新调整我们的一些核心数据结构,我们在较小的性能损耗下实现了幂等和事务功能使得Kafka在大部分场景下依然很快。 这个魔法小精灵粉尘可以洒在我的应用程序上吗? Exacrtly-once处理是一种端到端的保证,在洒上去之前应用程序必须保证自身设计不违反该原则。 如果您使用的是消费者API,则必须保证你提交的应用程序状态变更和你的偏移量是一致的。 对于流处理应用,情况会更好一些。 因为流处理是一个封闭的系统,其中输入、输出和状态修改都在相同的操作中建模,它实际上已经类似于exactly-once中的事务,具备原子性了。 配置更改就直接可以为您提供端到端的保证。 但是,您仍然需要从Kafka获取数据,当与exactly-once的connector组合时,将直接拥有该特性。 https://blog.csdn.net/zhangjun5965/article/details/78218169 |