分享

大数据kafka技术分析【kafka经典文章系列】



公众号:
gh_3af7b8516eec

功能介绍
apache kafka使用心得、实战攻略、会议ppt、技术博文等

经典文章推荐:

1.关于Kafka配额的讨论(1)
Kafka自0.9.0.0版本引入了配额管理(quota management),旨在broker端对clients发送请求进行限流(throttling)。目前Kafka支持两大类配额管理:

  • 网络带宽(network bandwidth)配额管理:定义带宽阈值来限制请求发送速率,阈值单位是字节/秒,即bytes/s。该功能是0.9.0.0版本引入的
  • CPU配额管理:定义CPU使用率阈值来限制请求发送速率,阈值以百分比的形式给出,如quota = 50表示50%的CPU使用率上限。该功能是0.11.0.0版本引入的

本文主要讨论网络带宽配额管理。关于CPU配额管理的部分我们将在下一篇中进行讨论。

一、配额能做什么?

设置了基于带宽的配额之后,Kafka能够做到:

1. 限制follower副本拉取leader副本消息的速率
2. 限制producer的生产速率
3. 限制consumer的消费速率

二、配额作用域

目前可以在3个层级上设置配额:

1. client id
2. user
3. user + client id

第一种是client id,即新版本clients端的通用参数client.id设置的值。一旦为某个client id设置了配额,所有具有该client id的clients段程序都将受到该配额的限制。

第二种是user,为启用认证之后位于认证体系中的某个用户主体(user principal),比如一个Kerberos用户:user1/kafka.host1.com@REALM,Kafka解析出来的用户名是'user1’。当然我们可以设置sasl.kerberos.principal.to.local.rules参数修改这种解析规则,不过这不在本文的讨论范围内。

第三种就是user + client id,实际上是包含前两种的一个二元组。它是最细粒度化的配额管理作用域。

当然,这3种作用域还可以设置各自的默认值配额(默认是没有配额的,即默认值通常是无穷大),包括:client id作用域默认值、user作用域默认值、user + client id作用域默认值,其中最后一项又可细分为4个子作用域,即

  • user作用域默认值 + client id作用域指定值
  • user作用域指定值 + client id作用域指定值
  • user作用域默认值 + client id作用域默认值
  • user作用域指定值 + client id作用域默认值

因此,实际上总共有8种可能的配额作用域设置值,它们的优先级关系依次如下(从高到低):

  • user作用域指定值 + client id作用域指定值(即为user + client id设置了特定值配额)
  • user作用域指定值 + client id作用域默认值(为user设置了特定值配额,为client id设置了默认值配额)
  • user作用域(为user设置了特定值配额)
  • user作用域默认值 + client id作用域指定值(为user设置了默认值配额,为client id设置了特定值配额)
  • user作用域默认值 + client id作用域默认值(为user和client id设置了默认值配额)
  • user作用域默认值(为user设置了默认值配额)
  • client id作用域(为client id设置了特定值配额)
  • client id作用域默认值(为client id设置了默认值配额)

当多条配额规则冲突时我们可以根据以上规则确定应用的是哪一条。举个例子,我们为user = 'good-user'的用户配置了100MB/s的配额,同时为[user='good-user', client id = 'producer-1']设置配额为50MB/s,那么当good-user用户使用名为‘producer-1’的producer发送消息时Kafka保证它的请求处理速率不会超过50MB/s,即第二条规则覆盖了第一条规则。


更多
Kafka网络带宽配额管理



【译】Flink端到端精确一次处理语义的实现


什么是Flink

Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点。Flink 可以支持本地的快速迭代,以及一些环形的迭代任务。

Flink有哪些概念
包括
程序和数据流
并行数据流
tasks & operator chains

分布式执行

Workers,Slots,Resources

等更多参考
http://blog.csdn.net/yanghua_kobe/article/details/51298871

Flink与kafka的关系
Apache Flink是新一代的分布式流式数据处理框架,它统一的处理引擎既可以处理批数据(batch data)也可以处理流式数据(streaming data)。在实际场景中,Flink利用Apache Kafka作为上下游的输入输出十分常见,二者集成。


有了上面知识下面,阅读下面内容会明白
更多链接

Kafka元数据缓存(metadata cache)


  我猜想作者的意思应该是说:broker不保存消费者的状态。如果从这个角度来说,broker无状态的说法倒也没有什么问题。不过实际上,broker是有状态的服务:每台broker在内存中都维护了集群上所有节点和topic分区的状态信息——Kafka称这部分状态信息为元数据缓存(metadata cache)。本文就将讨论一下这个metadata cache的设计与实现。

1. cache里面存了什么?

  首先,我们来看下cache里面都存了什么,我们以Kafka 1.0.0版本作为分析对象。Metadata cache中保存的信息十分丰富,几乎囊括了Kafka集群的各个方面,它包含了:

controller所在的broker ID,即保存了当前集群中controller是哪台broker
集群中所有broker的信息:比如每台broker的ID、机架信息以及配置的若干组连接信息(比如配置了PLAINTEXT和SASL监听器就有两套连接信息,分别使用不同的安全协议和端口,甚至主机名都可能不同)
集群中所有节点的信息:严格来说,它和上一个有些重复,不过此项是按照broker ID和监听器类型进行分组的。对于超大集群来说,使用这一项缓存可以快速地定位和查找给定节点信息,而无需遍历上一项中的内容,算是一个优化吧
集群中所有分区的信息:所谓分区信息指的是分区的leader、ISR和AR信息以及当前处于offline状态的副本集合。这部分数据按照topic和分区ID进行分组,可以快速地查找到每个分区的当前状态。(注:AR表示assigned replicas,即创建topic时为该分区分配的副本集合)
2. 每台broker都保存相同的cache吗?

  是的,至少Kafka在设计时的确是这样的愿景:每台Kafka broker都要维护相同的缓存,这样客户端程序(clients)随意地给任何一个broker发送请求都能够获取相同的数据,这也是为什么任何一个broker都能处理clients发来的Metadata请求的原因:因为每个broker上都有这些数据!要知道目前Kafka共有38种请求类型,能做到这一点的可谓少之又少。每个broker都能处理的能力可以缩短请求被处理的延时从而提高整体clients端的吞吐,因此用空间去换一些时间的做法是值得的。

3. cache是怎么更新的?

  如前所述,用空间去换时间,好处是降低了延时,提升了吞吐,但劣势就在于你需要处理cache的更新并且维护一致性。目前Kafka是怎么更新cache的?简单来说,就是通过发送异步更新请求(UpdateMetadata request)来维护一致性的。既然是异步的,那么在某一个时间点集群上所有broker的cache信息就未必是严格相同的。只不过在实际使用场景中,这种弱一致性似乎并没有太大的问题。原因如下:1. clients并不是时刻都需要去请求元数据的,且会缓存到本地;2. 即使获取的元数据无效或者过期了,clients通常都有重试机制,可以去其他broker上再次获取元数据; 3. cache更新是很轻量级的,仅仅是更新一些内存中的数据结构,不会有太大的成本。因此我们还是可以安全地认为每台broker上都有相同的cache信息。

  具体的更新操作实际上是由controller来完成的。controller会在一定场景下向特定broker发送UpdateMetadata请求令这些broker去更新它们各自的cache,这些broker一旦接收到请求便开始全量更新——即清空当前所有cache信息,使用UpdateMetadata请求中的数据来重新填充cache。

4. cache什么时候更新?

  实际上这个问题等同于:controller何时向特定broker发送UpdateMetadata请求? 如果从源码开始分析,那么涉及到的场景太多了,比如controller启动时、新broker启动时、更新broker时、副本重分配时等等。我们只需要记住:只要集群中有broker或分区数据发生了变更就需要更新这些cache。

  举个经常有人问的例子:集群中新增加的broker是如何获取这些cache,并且其他broker是如何知晓它的?当有新broker启动时,它会在Zookeeper中进行注册,此时监听Zookeeper的controller就会立即感知这台新broker的加入,此时controller会更新它自己的缓存(注意:这是controller自己的缓存,不是本文讨论的metadata cache)把这台broker加入到当前broker列表中,之后它会发送UpdateMetadata请求给集群中所有的broker(也包括那台新加入的broker)让它们去更新metadata cache。一旦这些broker更新cache完成,它们就知道了这台新broker的存在,同时由于新broker也更新了cache,故现在它也有了集群所有的状态信息。

5. 目前的问题?

  前面说过了,现在更新cache完全由controller来驱动,故controller所在broker的负载会极大地影响这部分操作(实际上,它会影响所有的controller操作)。根据目前的设计,controller所在broker依然作为一个普通broker执行其他的clients请求处理逻辑,所以如果controller broker一旦忙于各种clients请求(比如生产消息或消费消息),那么这种更新操作的请求就会积压起来(backlog),造成了更新操作的延缓甚至是被取消。究其根本原因在于当前controller对待数据类请求和控制类请求并无任何优先级化处理——controller一视同仁地对待这些请求,而实际上我们更希望controller能否赋予控制类请求更高的优先级。社区目前已经开始着手改造当前的设计,相信在未来的版本中此问题可以得到解决。

本文探讨了一些关于metadata cache方面的内容,因为时间有限,并没有涵盖方方面面,希望本文有助于加强我们对于cache工作原理的了解。


链接

Kafka CommitFailedException异常的一点思考



一、含义

CommitFailedException异常:位移提交失败时候抛出的异常。通常该异常被抛出时还会携带这样的一段话:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

简单翻译一下,“位移提交失败,原因是消费者组开启了rebalance且已然分配对应分区给其他消费者。这表明poll调用间隔超过了max.poll.interval.ms的值,这通常表示poll循环中的消息处理花费了太长的时间。解决方案有两个:1. 增加session.timeout.ms值;2. 减少max.poll.records值”

下面这段话完全是我自己的理解,请谨慎听取~~

在我看来,上面英文中的最后一句话实际上依然是0.10.0.0或之前版本时的解决之道,因为在那些版本中尚未提供max.poll.interval.ms参数,因此session.timeout.ms既用于失败检测,也用于控制消息处理时间,同时还承担着rebalance过程的超时控制。在0.10.1.0版本时社区对该参数的含义进行了解耦,推出了max.poll.interval.ms参数。实际上,在0.10.1.0.0或之后的版本中,作者推荐用户将session.timeout.ms设置一个很小的值,比如5s,但需要把max.poll.interval.ms设置成平均的消息处理时间。举个例子,假设你一次poll调用返回的消息数是N,你处理每条消息的平均时间是t0,那么你需要设置max.poll.interval.ms稍稍大于N * t0以保证poll调用间隔不会超过该阈值。

如此来看,上面英文最后一句话中的第一个解决办法应该修改成:增加max.poll.interval.ms值,而非session.timeout.ms

二、抛出时机

从源代码方面说,CommitFailedException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法。从使用场景来说,有两种场景可以抛出该异常

2.1 消息处理时间>max.poll.interval.ms时: 如前所述,这是该异常最“正宗”的出现场景。复现也比较容易,用户只需写一个consumer程序,订阅topic(即使用consumer.subscribe),设置max.poll.interval.ms=N,然后在consumer.poll循环中Thread.sleep(>N),之后手动提交位移即可复现,比如:


...
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("topic1", "topic2", ...));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    // 处理消息
    Thread.sleep(6000L);
    consumer.commitSync();
}
2.2 standalone consumer与consumer group冲突时:这里所说的standalone consumer指的是使用KafkaConsumer.assign()而非subscribe()的消费者。当用户系统中同时出现了standalone consumer和consumer group,并且它们的group id相同时,此时standalone consumer手动提交位移时就会立刻抛出此异常。这是因为group coordinator无法识别这个具有相同group id的consumer,从而向它返回“你不是一个合法的组成员”错误。目前Kafka consumer提交位移的代码中一旦碰到这个错误会立即抛出CommitFailedException。

三、个人对CommitFailedException的看法

针对上面的第二种场景,我觉得初始的英文描述中完全没有提及,这实际上是该异常表述不清晰的一个表现。因为在提交位移的源代码中broker端返回“无效组成员”后,coordinator有可能认为这是一个新的成员,需要批准它加入组。这对于正常的组rebalance流程来说并没有什么问题,但对于standalone consumer而言该逻辑就显得有点不适用了。纵然不修改这个逻辑,至少也要完善CommitFailedException的表述,把这种情况加到异常说明里面。这样用户就能明确知晓诱发这种异常的所有场景,而不是像现在这样:只能尝试修改max.poll.records或max.poll.interval.ms。要知道对于第二种情况,无论用户如何设置max.poll.interval.ms或max.poll.records都无法规避。

当然,考虑到consumer端会抛出“The coordinator is not aware of this member”错误显式提醒用户此问题,因此假设给社区提了jira,开发人员估计也会以此为由给“拍”回来,不过依然还是有优化的空间的。比如在输出日志中更加清晰提醒用户这个错误是因为用户的某个“粗心”的standalone consumer“无意冒犯”了一个consumer group而导致的错误。


链接


关于Kafka __consumer_offests的讨论



众所周知,__consumer__offsets是一个内部topic,对用户而言是透明的,除了它的数据文件以及偶尔在日志中出现这两点之外,用户一般是感觉不到这个topic的。不过我们的确知道它保存的是Kafka新版本consumer的位移信息。本文我们简单梳理一下这个内部topic(以1.0.0代码为分析对象)
一、何时被创建?
首先,我们先来看下 它是何时被创建的?__consumer_offsets创建的时机有很多种,主要包括:
  • broker响应FindCoordinatorRequest请求时
  • broker响应MetadataRequest显式请求__consumer_offsets元数据时

其中以第一种最为常见,而第一种时机的表现形式可能有很多,比如用户启动了一个消费者组(下称consumer group)进行消费或调用kafka-consumer-groups --describe等
二、消息种类
__consumer_offsets中保存的记录是普通的Kafka消息,只是它的格式完全由Kafka来维护,用户不能干预。严格来说,__consumer_offsets中保存三类消息,分别是:
  • Consumer group组元数据消息
  • Consumer group位移消息
  • Tombstone消息

2.1 Consumer group组元数据消息
我们都知道__consumer_offsets是保存位移的,但实际上每个消费者组的元数据信息也保存在这个topic。这些元数据包括:
1.png
这里不详细展开组元数据各个字段的含义。我们只需要知道组元数据消息也是保存在__consumer_offsets中即可。值得一提的是, 如果用户使用standalone consumer(即consumer.assign(****)方法),那么就不会写入这类消息,毕竟我们使用的是独立的消费者,而没有使用消费者组。
这类消息的key是一个二元组,格式是【版本+groupId】,这里的版本表征这类消息的版本号,无实际用途;而value就是上图所有这些信息打包而成的字节数组。

更多链接

Kafka-4614复盘 (MappedByteBuffer未关闭导致慢磁盘访问)


    很早之前就想动笔就这个kafka bug总结一番了,只是这个问题既不是本人发现,也不是自己动手修复,终归是底气不足,故而一直耽搁下来。怎奈此问题实在是含金量十足,又恰逢最近有人询问Kafka 0.10.2都有哪些提升,我终究还是决定给这个bug写点东西了。
    事先声明:这是一个日本人(下称Yuto)开的bug,其对问题的描述、定位、探查、分析、验证以及结论都堪称完美,令人印象深刻。自该issue报出的第一天起我便全程追踪其进度,整个过程甚觉受益良多,今总结出来希望对自己及各位读者都有所帮助。值得一提的是,我只会写出亲自验证了的结论,对于该issue中阐述的一些未经验证的观点,不会显式强调。有兴趣的读者可以查看issue原文自己去判断。附上bug地址: https://issues.apache.org/jira/browse/KAFKA-4614

一、环境背景
操作系统:CentOS 6
内核版本:2.6.32-xx
文件系统:XFS
Java版本:1.8.0_66
GC:            G1
Kafka版本:0.10.0.1

二、问题描述
    生产环境中有一半左右的PRODUCE请求平均响应时间是1ms,99%请求的响应时间指标是10ms,但有时99%请求响应时间指标会飙升至100ms~500ms,且绝大多数情况下时间都花在了“Remote”部分,即意味着是由于备份机制慢造成的。另外,每天都会在不同的broker上稳定地观察到3~5次,同时伴随着Remote的增加,Total和RequestQueue也会相应地增加,就像下图这样:

    上面提到的Remote、Total和RequestQueue分别表示RemoteTimeMs、TotalTimeMs和RequestQueueTimeMs这3个JMX指标,即副本备份时间(设置了acks=-1)、请求处理总时间以及请求在队列中的等待时间。具体来说,remote time就是leader broker等待所有follower副本拉取消息的时间。显然,只有在acks=-1下才会有remote time;RequestQueueTimeMs指客户端发过来的PRODUCE请求在broker端的请求队列中待的时间,由于请求队列是使用阻塞队列实现的,所以如果该值过大的话,说明broker端负载过大,需要增大请求队列的大小,即增加参数queued.max.requests的值。另外,在上述三个指标飙升的同时还观测到broker所在磁盘的读操作指标也跟着飙升。


更多链接

关于Kafka日志留存策略的讨论



关于Kafka日志留存(log retention)策略的介绍,网上已有很多文章。不过目前其策略已然发生了一些变化,故本文针对较新版本的Kafka做一次统一的讨论。如果没有显式说明,本文一律以Kafka 1.0.0作为分析对象。

所谓日志留存策略,就是Kafka保存topic数据的规则,我将按照以下几个方面分别介绍留存策略:

留存策略类型
留存机制及其工作原理

一、留存策略类型

目前,与日志留存方式相关的策略类型主要有两种:delete和compact。这两种留存方式的机制完全不同。本文主要讨论针对delete类型的留存策略。用户可以通过设置broker端参数log.cleanup.policy来指定集群上所有topic默认的策略类型。另外也可以通过topic级别参数cleanup.policy来为某些topic设置不同于默认值的策略类型。当前log.cleanup.policy参数的默认值是[delete,compact],这是一个list类型的参数,表示集群上所有topic会同时开启delete和compact两种留存策略——这是0.10.1.0新引入的功能,在0.10.1.0之前,该参数只能两选一,不能同时兼顾,但在实际使用中很多用户都抱怨compact类型的topic存在过期key消息未删除的情况,故社区修改了该参数配置,允许一个topic同时开启两种留存策略。

再次强调下, 本文只讨论delete类型的留存策略。

二、留存机制及其工作原理

在开始详细介绍各种留存机制之前,先简要说下Kafka是如何处理日志留存的。每个Kafka broker启动时,都会在后台开启一个定时任务,定期地去检查并执行所有topic日志留存,这个定时任务触发的时间周期由broker端参数log.retention.check.interval.ms控制,默认是5分钟,即每台broker每5分钟都会尝试去检查一下是否有可以删除的日志。因此如果你要缩短这个间隔,只需要调小log.retention.check.interval.ms即可。

鉴于日志留存和日志删除实际上是一个问题的两个方面,因而我们下面讨论的是关于Kafka根据什么规则来删除日志。但有一点要强调一下,待删除的标的是日志段,即LogSegment,也就是以.log结尾的一个个文件,而非整个文件夹。另外还有一点也很重要,当前日志段(active logsegment)是永远不会被删除的,不管用户配置了哪种留存机制。



当前留存机制共有3种:

基于空间维度
基于时间维度
基于起始位移维度
前两种策略相信大家已经耳熟能详,而第三种策略由于新加入的时间不长,目前网上对其的介绍并不多。我们一个一个来看。

2.1 基于空间维度

也称size-based retention,指的是Kafka定期为那些超过磁盘空间阈值的topic进行日志段的删除。这个阈值由broker端参数log.retention.bytes和topic级别参数retention.bytes控制,默认是-1,表示Kafka当前未开启这个留存机制,即不管topic日志量涨到多少,Kafka都不视其为“超过阈值”。如果用户要开启这种留存机制,必须显式设置log.retention.bytes(或retention.bytes)。

一旦用户设置了阈值,那么Kafka就会在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。这里所说的总大小是指所有日志段文件的大小,不包括索引文件的大小!如果是则会尝试从最老的日志段文件开始删起。注意这里的“超过阈值至少一个日志段的大小”,这就是说超过阈值的部分必须要大于一个日志段的大小,否则不会进行删除的,原因就是因为删除的标的是日志段文件——即文件只能被当做一个整体进行删除,无法删除部分内容。

举个例子来说明,假设日志段大小是700MB,当前分区共有4个日志段文件,大小分别是700MB,700MB,700MB和1234B——显然1234B那个文件就是active日志段。此时该分区总的日志大小是3*700MB+1234B=2100MB+1234B,如果阈值设置为2000MB,那么超出阈值的部分就是100MB+1234B,小于日志段大小700MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;反之如果阈值设置为1400MB,那么超过阈值的部分就是700MB+1234B > 700MB,此时Kafka会删除最老的那个日志段文件。

2.2 基于时间维度

也称time-based retention,指的是Kafka定期未那些超过时间阈值的topic进行日志段删除操作。这个阈值由broker端参数log.retention.ms、log.retention.mintues、log.retention.hours以及topic级别参数retention.ms控制。如果同时设置了log.retention.ms、log.retention.mintues、log.retention.hours,以log.retention.ms优先级为最高,log.retention.mintues次之,log.retention.hours最次。当前这三个参数的默认值依次是null, null和168,故Kafka为每个topic默认保存7天的日志。

这里需要讨论下这“7天”是如何界定的?在0.10.0.0之前,Kafka每次检查时都会将当前时间与每个日志段文件的最新修改时间做比较,如果两者的差值超过了上面设定的阈值(比如上面说的7天),那么Kafka就会尝试删除该文件。不过这种界定方法是有问题的,因为文件的最新修改时间是可变动的——比如用户在终端通过touch命令查看该日志段文件或Kafka对该文件切分时都可能导致最新修改时间的变化从而扰乱了该规则的判定,因此自0.10.0.0版本起,Kafka在消息体中引入了时间戳字段(当然不是单纯为了修复这个问题),并且为每个日志段文件都维护一个最大时间戳字段。通过将当前时间与该最大时间戳字段进行比较来判定是否过期。使用当前最大时间戳字段的好处在于它对用户是透明的,用户在外部无法直接修改它,故不会造成判定上的混乱。

最大时间戳字段的更新机制也很简单,每次日志段写入新的消息时,都会尝试更新该字段。因为消息时间戳通常是递增的,故每次写入操作时都会保证最大时间戳字段是会被更新的,而一旦一个日志段写满了被切分之后它就不再接收任何新的消息,其最大时间戳字段的值也将保持不变。倘若该值距离当前时间超过了设定的阈值,那么该日志段文件就会被删除。

2.3 基于起始位移维度

用户对前两种留存机制实际上是相当熟悉的,下面我们讨论下第三种留存机制:基于日志起始位移(log start offset)。这实际上是0.11.0.0版本新增加的功能。其实增加这个功能的初衷主要是为了Kafka流处理应用——在流处理应用中存在着大量的中间消息,这些消息可能已经被处理过了,但依然保存在topic日志中,占用了大量的磁盘空间。如果通过设置基于时间维度的机制来删除这些消息就需要用户设置很小的时间阈值,这可能导致这些消息尚未被下游操作算子(operator)处理就被删除;如果设置得过大,则极大地增加了空间占用。故社区在0.11.0.0引入了第三种留存机制:基于起始位移

所谓起始位移,就是指分区日志的当前起始位移——注意它是分区级别的值,而非日志段级别。故每个分区都只维护一个起始位移值。该值在初始化时被设置为最老日志段文件的基础位移(base offset),随着日志段的不断删除,该值会被更新当前最老日志段的基础位移。另外Kafka提供提供了一个脚本命令帮助用户设置指定分区的起始位移:kafka-delete-records.sh。

该留存机制是默认开启的,不需要用户任何配置。Kafka会为每个日志段做这样的检查:1. 获取日志段A的下一个日志段B的基础位移;2. 如果该值小于分区当前起始位移则删除此日志段A。

依然拿例子还说明,假设我有一个topic,名字是test,该topic只有1个分区,该分区下有5个日志段文件,分别是A1.log, A2.log, A3.log, A4.log和A5.log,其中A5.log是active日志段。这5个日志段文件中消息范围分别是0~9999,10000~19999,20000~29999,30000~39999和40000~43210(A5未写满)。如果此时我确信前3个日志段文件中的消息已经被处理过了,于是想删除这3个日志段,此时我应该怎么做呢?由于我无法预知这些日志段文件产生的速度以及被消费的速度,因此不管是基于时间的删除机制还是基于空间的删除机制都是不适用的。此时我便可以使用kafka-delete-records.sh脚本将该分区的起始位移设置为A4.log的起始位移,即40000。为了做到这点,我需要首先创建一个JSON文件a.json,内容如下:

{"partitions":[{"topic": "test", "partition": 0,"offset": 40000}],"version":1}

然后执行下列命令:

bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file a.json

如果一切正常,应该可以看到类似于这样的输出:

Executing records delete operation

Records delete operation completed:

partition: test-0 low_watermark: 40000

此时test的分区0的起始位移被手动调整为40000,那么理论上所有最大消息位移< 40000的日志段都可以被删除了。有了这个机制,用户可以实现更为灵活的留存策略。



以上就是关于当前Kafka针对于delete留存类型的topic的3种留存机制。也许在未来社区会增加更多的留存策略,我们拭目以待~


链接


Java API获取topic所占磁盘空间(Kafka 1.0.0)


很多用户都有这样的需求:实时监控某个topic各分区在broker上所占的磁盘空间大小总和。Kafka并没有提供直接的脚本工具用于统计这些数据。

如果依然要实现这个需求,一种方法是通过监控JMX指标得到分区当前总的日志大小,然后手动相加所有分区的值得出;另一种方法就是使用1.0.0引入的DescribeLogDirsRequest请求。本文即介绍如何通过Java API获取某broker上某topic总的空间大小,代码如下:

package huxihx;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TopicDiskSizeSummary {

    private static AdminClient admin;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String brokers = "localhost:9092";
        initialize(brokers);
        try {
            long topic1InBroker1 = getTopicDiskSizeForSomeBroker("t2", 1);
            long topic2InBroker0 = getTopicDiskSizeForSomeBroker("t1", 0);
            System.out.println(topic1InBroker1);
            System.out.println(topic2InBroker0);
        } finally {
            shutdown();
        }
    }

    public static long getTopicDiskSizeForSomeBroker(String topic, int brokerID)
            throws ExecutionException, InterruptedException {
        long sum = 0;
        DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));
        Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
        for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {
            Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();
            for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {
                DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();
                Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;
                for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {
                    if (topic.equals(replicas.getKey().topic())) {
                        sum += replicas.getValue().size;
                    }
                }
            }
        }
        return sum;
    }

    private static void initialize(String bootstrapServers) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        admin = AdminClient.create(props);
    }

    private static void shutdown() {
        if (admin != null) {
            admin.close();
        }
    }
}

其中主要的方法是AdminClient.describeLogDirs(),它返回DescribeLogDirsResult实例,里面封装了给定broker上所有log.dirs路径下对应的分区的日志大小,将它们加到一起即可实现统计topic磁盘空间占用的功能。


链接



查询订阅某topic的所有consumer group(Java API)



在网上碰到的问题,想了下使用现有的API还是可以实现的。

首先,需要引入Kafka服务器端代码,比如加入Kafka 1.0.0依赖:

Maven

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>

Gradle

compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.0'

然后编写获取订阅某topic的所有group的方法,代码如下:

/**
     * get all subscribing consumer group names for a given topic
     * @param brokerListUrl localhost:9092 for instance
     * @param topic         topic name
     * @return
     */
    public static Set<String> getAllGroupsForTopic(String brokerListUrl, String topic) {
        AdminClient client = AdminClient.createSimplePlaintext(brokerListUrl);

        try {
            List<GroupOverview> allGroups = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());
            Set<String> groups = new HashSet<>();
            for (GroupOverview overview: allGroups) {
                String groupID = overview.groupId();
                Map<TopicPartition, Object> offsets = scala.collection.JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID));
                Set<TopicPartition> partitions = offsets.keySet();
                for (TopicPartition tp: partitions) {
                    if (tp.topic().equals(topic)) {
                        groups.add(groupID);
                    }
                }
            }
            return groups;
        } finally {
            client.close();
        }
    }  
简单吧:)



链接



关于0.11版本幂等producer的讨论



众所周知,Kafka 0.11.0.0版本正式支持精确一次处理语义(exactly once semantics,下称EOS)。Kafka的EOS主要体现在3个方面:

幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
事务(transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚
流处理EOS:流处理本质上可看成是“读取-处理-写入”的管道。此EOS保证整个过程的操作是原子性。注意,这只适用于Kafka Streams
  上面3种EOS语义有着不同的应用范围,幂等producr只能保证单分区上无重复消息;事务可以保证多分区写入消息的完整性;而流处理EOS保证的是端到端(E2E)消息处理的EOS。用户在使用过程中需要根据自己的需求选择不同的EOS。以下是启用方法:

启用幂等producer:在producer程序中设置属性enable.idempotence=true,但不要设置transactional.id。注意是不要设置,而不是设置成空字符串或"null"
启用事务支持:在producer程序中设置属性transcational.id为一个指定字符串(你可以认为这是你的事务名称,故最好起个有意义的名字),同时设置enable.idempotence=true
启用流处理EOS:在Kafka Streams程序中设置processing.guarantee=exactly_once
  本文主要讨论幂等producer的设计与实现。

-----------------------------------------------------------------------

  所谓幂等producer指producer.send的逻辑是幂等的,即发送相同的Kafka消息,broker端不会重复写入消息。同一条消息Kafka保证底层日志中只会持久化一次,既不会丢失也不会重复。幂等性可以极大地减轻下游consumer系统实现消息去重的工作负担,因此是非常实用的功能。值得注意的是,幂等producer提供的语义保证是有条件的:

单分区幂等性:幂等producer无法实现多分区上的幂等性。如前所述,若要实现多分区上的原子性,需要引入事务
单会话幂等性:幂等producer无法跨会话实现幂等性。即使同一个producer宕机并重启也无法保证消息的EOS语义
  虽然有上面两个限制,幂等producer依然是一个非常实用的新功能。下面我们来讨论下它的设计原理。如果要实现幂等性, 通常都需要花费额外的空间来保存状态以执行消息去重。Kafka的幂等producer整体上也是这样的思想。

  首先,producer对象引入了一个新的字段:Producer ID(下称PID),它唯一标识一个producer,当producer启动时Kafka会为每个producer分配一个PID(64位整数),因此PID的生成和分配对用户来说是完全透明的,用户无需考虑PID的事情,甚至都感受不到PID的存在。其次,0.11 Kafka重构了消息格式(有兴趣的参见Kafka 0.11消息设计),引入了序列号字段(sequence number,下称seq number)来标识某个PID producer发送的消息。和consumer端的offset类似,seq number从0开始计数并严格单调增加。同时在broker端会为每个PID(即每个producer)保存该producer发送过来的消息batch的某些元信息,比如PID信息、消息batch的起始seq number及结束seq number等。这样每当该PID发送新的消息batch时,Kafka broker就会对比这些信息,如果发生冲突(比如起始seq number和结束seq number与当前缓存的相同),那么broker就会拒绝这次写入请求。倘若没有冲突,那么broker端就会更新这部分缓存然后再开始写入消息。这就是Kafka实现幂等producer的设计思路:1. 为每个producer设置唯一的PID;2. 引入seq number以及broker端seq number缓存更新机制来去重。

  介绍了设计思想,我们来看下具体的实现,如下图所示:

1.png


  以前的博客中提到过,Java producer(区别于Scala producer)是双线程的设计,分为KafkaProducer用户主线程和Sender线程。前者调用send方法将消息写入到producer的内存缓冲区中,即RecordAccumulator中,而后者会定期地从RecordAccumulator中获取消息并将消息归入不同的batch中发送到对应的broker上。在幂等producer中,用户主线程的逻辑变动不大。send方法依然是将消息写入到RecordAccumulator。而Sender线程却有着很大的改动。我们首先来看下上图中的第一步:发送InitProducerIdRequest请求。

  InitProducerIdRequest是0.11.0.0版本新引入的请求类型,它由两个字段组成:transactionalId和timeout,其中transactionalId就是producer端参数transactional.Id的值,timeout则是事务的超时时间。由于我们未引入事务而只是配置幂等producer,故transcationalId为null,而timeout则设置成了Int.MAX,即Sender线程将一直阻塞直到broker端发送PID返回。一旦接收到broker端返回的response,Sender线程就会更新该producer的PID字段。有兴趣的读者可以参考源码:Sender.maybeWaitForProducerId,如下图所示:


2.png

(下面我就不贴源码了,但会给出对应的源码文件,有兴趣的直接看吧~~)

  上图中, 第一步是随机寻找一个负载最低的broker,即当前未完成请求数最少的broker。由此可见,InitProducerIdRequest和MetadataRequest一样,都可由任意的broker完成处理。至于为什么我们稍后讨论,现在先来讨论下broker端是如何确定PID的。其实说起来很简单,Kafka在Zookeeper中新引入了一个节点:/latest_producer_id_block,broker启动时提前预分配一段PID,当前是0~999,即提前分配出1000个PID来,如下图所示:


  一旦PID超过了999,则目前会按照1000的步长重新分配,到时候就应该是这个样子:

{"version":1, "broker":0,"block_start":"1000","block_end":"1999"}

  除了上面的信息,broker在内存中还保存了下一个待分配的PID。这样,当broker端接收到InitProducerIdRequest请求后,它会比较下一个PID是否在当前预分配的PID范围:若是则直接返回;否则再次预分配下一批的PID。现在我们来讨论下为什么这个请求所有broker都能响应——原因就在于集群中所有broker启动时都会启动一个叫TransactionCoordinator的组件,该组件能够执行预分配PID块和分配PID的工作,而所有broker都使用/latest_producer_id_block节点来保存PID块,因此任意一个broker都能响应这个请求。

  上图中的第二步就是发送InitProducerIdRequest的方法,注意当前是同步等待返回结果,即Sender线程会无限阻塞直到broker端返回response(当然依然会受制于request.timeout.ms参数的影响)。当拿到response后,Sender线程就会更新该producer的PID字段,如图中第三步所示。

  确定了PID之后,Sender线程会调用RecordAccumulator.drain()提取当前可发送的消息,在该方法中会将PID,Seq number等信息封装进消息batch中,具体代码参见:RecordAccumulator.java#drain()。一旦获取到消息batch后,Sender线程开始构建ProduceRequest请求然后发送给broker端。至此producer端的工作就算告一段落了。

  下面我们看下broker端是如何响应PRODUCE请求。实际上,broker最重要的事情就是要区别某个PID的同一个消息batch是否重复发送了。因此在消息被写入到leader底层日志之前必须要先做一次判断,即PRODUCE请求中的消息batch是否已然被处理过,判断的逻辑就在:ProducerStateManager.scala中的ProducerAppendInfo#validateAppend方法中。如果请求中包含的消息batch与最近一次成功写入的batch相同(即PID相同,batch起始seq number和batch结束seq number都相同),那么该方法便抛出DuplicateSequenceNumberException,然后由上层方法捕获到该异常封装进ProduceResponse返回。如果batch不相同,则允许此次写入,并在写入完成后更新这些producer信息。

  值得一提的是在0.11.0.0版本中DuplicateSequenceNumberException继承自RetriableException类,即表示Kafka认为它是一个可重试的异常。这其实是个问题,因为抛出该异常已经表明broker不需要处理这次写入,即使重试broker依然会拒绝,因此在1.0.0版本中该类已经不再继承自RetriableException,顺便还改了个名字:DuplicateSequenceException。

  以上就是关于幂等producer的一些讨论。从上面的分析中我们可以看到幂等producer的设计思想主要是基于用空间保存状态并利用状态来去重的思想。了解了这一点,你会发现幂等producer的设计以及代码改动实际上非常容易理解。

  最后再说一点:以上所说的幂等producer一直强调的是“精确处理一次”的语义,实际上幂等producer还有“不乱序”的强语义保证,只不过在0.11版本中这种不乱序主要是通过设置enable.idempotence=true时强行将max.in.flight.requests.per.connection设置成1来实现的。这种实现虽然保证了消息不乱序,但也在某种程度上降低了producer的TPS。据我所知,这个问题将在1.0.0版本中已然得到解决。在后续的Kafka 1.0.0版本中即使启用了幂等producer也能维持max.in.flight.requests.per.connection > 1,具体的算法我还没有看,不过总之是个好消息。至于表现如何就让我们拭目以待吧








已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条