RocketMQ的存储设计
消息存储结构
RocketMQ消息存储默认路径:${user.home}\store\
消息存储结构包括:
- commitlog 消息存储目录
- config 运行期间的配置信息
- consumequeue 消息消费队列存储目录
- index 消息索引文件存储目录
- abort 如果存在abort文件,说明broker非正常关闭;该文件默认启动时创建,正常退出之前删除;
- checkpoint 文件检测点,存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。
消息存储目录如下图示:
消息存储结构如下图示:
CommitLog
Commitlog以物理文件的方式存在,每台 Broker上的Commitlog被本机器的所有ConsumeQueue共享。
文件地址:${user.home}\store\commitlog\${fileName}
在Commitlog中,一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向Commitlog找那个顺序写,但是随机读。
Commitlo文件默认大小是1G,可在broker的配置中设置mappedFileSizeCommitLog属性来改变默认大小。
CommitLog文件存储的逻辑视图如下图示:
每条消息的前4个字节存储该条消息的总长度。
每个CommitLog文件的大小为1G,一般情况下,第一个CommitLog的起始偏移量为0,第二个CommitLog的起始偏移量为1073741824(1G = 1073741824byte)
-rw-r--r-- 1 root root 1073741824 May 20 21:00 00000000000000000000
-rw-r--r-- 1 root root 1073741824 May 20 21:00 00000000001073741824
-rw-r--r-- 1 root root 1073741824 May 20 21:00 00000000002147483648
-rw-r--r-- 1 root root 1073741824 May 20 21:00 00000000003221225472
每台RocketMQ只会往一个CommitLog文件中写,写完一个接着写下一个。
indexFile和ConsumeQueue中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个CommitLog文件上。
ConsumeQueue
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件。
文件地址:${user.home}\consumequeue\${topicName}${queueId}${fileName}
如下图示:
ConsumeQueue中存储的是消息条目,为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,每一个ConsumeQueue条目不会存储消息的全量信息。
消息条目如下图示:
ConsumeQueue即为CommitLog文件的索引文件,其构建机制是当消息到达CommitLog文件后,由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue)与下文提到的索引文件。
存储机制这样设计有以下几个好处:
(1)CommitLog顺序写,可以大大提交写入效率。
实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度,但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍。
(2)虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
(3)为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message Key、Tag等所有信息,及时ConsumeQueue丢失,也可以通过CommitLog完全恢复出来。
IndexFile
RocketMQ支持通过MEssageID或MessageKey来查询消息,使用ID查询时,因为ID就是用broker+offset生成的,所以很容易就找到对应的Commitlog文件来读取消息。
但是对于用MessageKey来查询消息,RocketMQ则通过构建一个index来提高读取速度。
index存的是索引文件,这个文件用来加速消息查询的速度。消息消费列表RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息检索消息的速度。使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。
Index索引文件路径如下图示:
Config
config文件夹中存储着Topic、Consumer以及主题和消费者群组关系的信息;
文件结构如下图示:
- topics.json:topic配置属性
- subscriptionGroup.json:消息消费组配置信息
- delayOffset.json:延时消息队列拉取进度
- consumerOffset.json:集群消费模式消息消费进度
- consumerFilter.json:主题消息过滤信息
其他
abort:如果存在abort文件,说明broker非正常关闭;该文件默认启动时创建,正常退出之前删除;
checkpoint:文件检测点,存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。
过期文件删除
定时任务触发,清理消息存储文件(commitlog)与消息消费队列文件(ConsumeQueue文件)
过期判断参数:
fileReservedTime:文件保留时间
删除条件参数:
deleteWhen 定时删除时间
(完)