问题导读:
1、kafka存储文件结构有哪些?
2、如何理解Partition 分区存在?
3、Segment 存储是什么?
4、消费者如何通过 offset 查找 message?
一、存储文件结构
topic:可以理解为一个消息队列的名字
partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
segment:partition 物理上由多个 segment 组成
message:每个 segment 文件中实际存储的每一条数据就是 message
offset:每个 partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中,partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息
二、Partition 分区
本例中 topic 名称为 test-topic,默认设置 partition 为 3,topic 创建成功后默认的存储位置在:/tmp/kafka-logs 下,分区分别以 topic 名称-分区数命名,(不考虑副本的情况)如下:
[mw_shl_code=shell,true]//分布在不同的broker节点上
test-topic-0
test-topic-1
test-topic-2[/mw_shl_code]
疑问一:为什么要分区呢?
为了性能考虑,如果不分区每个 topic 的消息只存在一个 broker 上,那么所有的消费者都是从这个 broker 上消费消息,那么单节点的 broker 成为性能的瓶颈,如果有分区的话生产者发过来的消息分别存储在各个 broker 不同的 partition 上,这样消费者可以并行的从不同的 broker 不同的 partition 上读消息,实现了水平扩展。
疑问二:分区文件下到底存了那些东西?
如下,其实每个分区下保存了很多文件,而概念上我们把他叫segment,即每个分区都是又多个segment构成的,其中 index(索引文件),log(数据文件),time index(时间索引文件)统称为一个 segment。
[mw_shl_code=shell,true]
test-topic-0
├── 00000000000000000001.index
├── 00000000000000000001.log
├── 00000000000000000001.timeindex
├── 00000000000000001018.index
├── 00000000000000001018.log
├── 00000000000000001018.timeindex
├── 00000000000000002042.index
├── 00000000000000002042.log
├── 00000000000000002042.timeindex[/mw_shl_code]
疑问三: 为什么有了 partition 还需要 segment ?
通过上面目录显示存在多个 segment 的情况,既然有分区了还要存多个 segment 干嘛?如果不引入 segment,那么一个 partition 只对应一个文件(log),随着消息的不断发送这个文件不断增大,由于 kafka 的消息不会做更新操作都是顺序写入的,如果做消息清理的时候只能删除文件的前面部分删除,不符合 kafka 顺序写入的设计,如果多个 segment 的话那就比较方便了,直接删除整个文件即可保证了每个 segment 的顺序写入。
三、Segment 存储
Segment 中核心文件是 index 索引文件和 log 数据文件,既然是索引文件当然是为了更高效的定位到数据,那么索引文件和数据文件中到底是存了那些数据?又是如何快速找到消息数据呢?
3.1 使用 kafka 自带脚本发送测试数据
[mw_shl_code=shell,true]sh kafka-producer-perf-test.sh --topic test-topic --num-records 50000000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=192.168.60.201:9092[/mw_shl_code]
3.2 使用 kafka 自带脚本 Dump index
[mw_shl_code=shell,true]sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-topic-0/00000000000000001018.index --print-data-log
offset: 1049 position: 16205
offset: 1065 position: 32410
offset: 1081 position: 48615
offset: 1097 position: 64820
offset: 1113 position: 81025
offset: 1129 position: 97230[/mw_shl_code]
通过 dump index 我们发现其实索引文件中其实就保存了 offset 和 position,分别是消息的 offset 也就是具体那一条消息,position 表示具体消息存储在 log 中的物理地址。
疑问一:通过上面数据可以看出,kafka 并不是每个 offset 都保存了,每隔 6 个 offset 存储一条索引数据,为什么在 index 文件中这些 offset 编号不是连续的呢?
因为 index 文件中并没有为数据文件中的每条消息都建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的 Message 也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
3.3 使用 kafka 自带脚本 Dump log
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-topic-0/00000000000000001018.log --print-data-log
log 数据文件中并不是直接存储数据,而是通过许多的 message 组成,message 包含了实际的消息数据
疑问一:消费者如何通过 offset 查找 message?
假如我们想要读取 offset=1066 的 message,需要通过下面 2 个步骤查找。
(1)查找 segment file
00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为 0.第二个文件 00000000000000001018.index 的消息量起始偏移量为 1019 = 1018 + 1. 同样,第三个文件 00000000000000002042.index 的起始偏移量为 2043=2042 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。当 offset=1066 时定位到 00000000000000001018.index|log 。
(2)通过 segment file 查找 message
通过第一步定位到 segment file,当 offset=1066时,依次定位到 00000000000000001018.index 的元数据物理位置和 00000000000000001018.log 的物理偏移地址,此时我们只能拿到 1065 的物理偏移地址,然后再通过 00000000000000001018.log 顺序查找直到 offset=1066 为止。每个 message 都有固定的格式很容易判断是否是下一条消息。
作者:Gent
来源:SegmentFault 社区
最新经典文章,欢迎关注公众号
|