问题导读
1.Segment与Partition的关系是什么?
2.Segment文件包含哪些组成部分?
2.在分区中,可以是否可以通过偏移量来查找消息?
1.概述 Kafka 快速稳定的发展,得到越来越多开发者和使用者的青睐。它的流行得益于它底层的设计和操作简单,存储系统高效,以及充分利用磁盘顺序读写等特性,和其实时在线的业务场景。对于Kafka来说,它是一个分布式的,可分区的,多副本,多订阅者的,基于Zookeeper统一协调的分布式日志系统。常见的可以用于系统日志,业务日志,消息数据等。那今天笔者给大家分析Kafka的存储机制和副本的相关内容。
2.Replication Replication是Kafka的重要特性之一,针对其Kafka Brokers进行自动调优Replication数,是比较有难度的。原因之一在于要知道怎么避免Follower进入和退出同步 ISR (In-Sync Replicas)。再消息生产的过程当中,在有一大批海量数据写入时,可能会引发Broker告警。如果某些Topic的部分Partition长期处于 “under replicated”,这样是会增加丢失数据的几率的。Kafka 通过多副本机制实现高可用,确保当Kafka集群中某一个Broker宕机的情况下,仍然可用。而 Kafka 的复制算法保证,如果Leader发生故障或者宕机,一个新的Leader会被重新选举出来,并对外提供服务,供客户端写入消息。Kafka 在同步的副本列表中选举一个副本为Leader。 在Topic中,每个分区有一个预写式日志文件,每个分区都由一系列有序,不可变的消息组成,这些消息被连续的追加到分区中,分区中的每个消息都包含一个连续的序列号,即:offset。它用于确定在分区中的唯一位置。如下图所示: 在Kafka中,假如每个Topic的分区有N个副本,由于Kafka通过多副本机制实现故障自动转移,这里需要说明的是,当KafkaController出现故障,进而不能继续管理集群,则那些KafkaController Follower开始竞选新的Leader,而启动的过程则是在KafkaController的startup方法中完成的,如下所示:
[mw_shl_code=scala,true]def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
registerSessionExpirationListener()
isRunning = true
controllerElector.startup
info("Controller startup complete")
}
}[/mw_shl_code]
然后启动ZookeeperLeaderElector,在创建临时节点,进行session检查,更新leaderId等操作完成后,会调用故障转移函数onBecomingLeader,也就是KafkaController中的onControllerFailover方法,如下所示:
[mw_shl_code=scala,true]def onControllerFailover() {
if(isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
readControllerEpochFromZookeeper()
incrementControllerEpoch(zkUtils.zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
replicaStateMachine.startup()
partitionStateMachine.startup()
// register the partition change listeners for all existing topics on failover
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}[/mw_shl_code]
正因为有这样的机制存在,所示当Kafka集群中的某个Broker宕机后,仍然保证服务是可用的。在Kafka中发生复制操作时,确保分区的预写式日志有序的写到其他节点,在N个复制因子中,其中一个复制因子角色为Leader,那么其他复制因子的角色则为Follower,Leader处理分区的所有读写请求,同时,Follower会被动的定期去复制Leader上的数据。以上分析可以总结为以下几点,如下所示: - Leader负责处理分区的所有读写请求。
- Follower会复制Leader上数据。
- Kafka 的故障自动转移确保服务的高可用。
3.存储 对于消息对应的性能评估,其文件存储机制设计是衡量的关键指标之一,在分析Kafka的存储机制之前,我们先了解Kafka的一些概念: - Broker:Kafka消息中间件节点,一个节点代表一个Broker,多个Broker可以组建成Kafka Brokers,即:Kafka集群。
- Topic:消息存储主题,即可以理解为业务数据名,Kafka Brokers能够同时负责多个Topic的处理。
- Partition:针对于Topic来说的,一个Topic上可以有多个Partition,每个Partition上的数据是有序的。
- Segment:对于Partition更小粒度,一个Partition由多个Segment组成。
- Offset:每个Partition上都由一系列有序的,不可变的消息组成,这些消息被连续追加到Partition中。而在其中有一个连续的序列号offset,用于标识消息的唯一性。
3.1 Topic存储 在Kafka文件存储中,同一个Topic下有多个不同的Partition,每个Partition为一个单独的目录,Partition的命名规则为:Topic名称+有序序号,第一个Partition序号从0开始,序号最大值等于Partition的数量减1,如下图所示:
3.2 分区文件存储 每个分区相当于一个超大的文件被均分到多个大小相等的Segment数据文件中,但是每个Segment消息数量不一定相等,正因为这种特性的存在,方便了Old Segment File快速被删除。而对于每个分区只需要支持顺序读写即可,Segment文件生命周期由服务端配置的参数决定。这样即可快速删除无用数据文件,有效提高磁盘利用率。
3.3 Segment文件存储 这里,Segment文件由Index File和Data File组成,文件是一一对应的,后缀为 .index 表示索引文件, .log 表示数据文件,如下图所示: 如上图所示,Segment文件命名规则由分区全局第一个Segment从0开始,后续每一个Segment文件名为上一个Segment文件最后一个消息的Offset值。这里Segment数据文件由许多消息组成,消息物理结构如下所示: Key | Describer | offset | 用于标识每个分区中每条消息的唯一性,Offset的数值标识该分区的第几条消息 | message Size | 消息大小 | CRC32 | 用CRC32校验消息 | “magic” | 当前发布Kafka服务程序的协议版本号 | “attribute” | 独立版本,或标识压缩类型,或者编码类型 | key length | key的长度 | key | 可选 | payload length | 实际消息数据 |
3.4 分区中查找消息 在分区中,可以通过offset偏移量来查找消息,如上图中,文件00000000000046885905.index的消息起始偏移量为46885906=46885905+1,其他文件依此类推,以起始偏移量命名并排序这些文件,这样能够快速的定位到具体的文件。通过segment file,当offset为46885906时,我们可以定位到00000000000046885905.index元数据物理位置和00000000000046885905.log物理偏移地址。
4.总结 通过对副本和存储机制的分析,我们可以清楚的知道,Kafka通过自动故障转移来确保服务的高可用,Leader负责分区的所有读写操作,Follower会复制Leader上的数据。Kafka针对Topic,使某一个分区中的大文件分成多个小文件,通过多个小的segment file,使之便捷定期清理或删除已经消费的文件,减少磁盘占用。另外,通过索引文件稀疏存储,可以大幅度降低索引文件元数据所占用的空间。
5.结束语 这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉。
作者:哥不是小萝莉
出处:http://www.cnblogs.com/smartloli/
|