深入剖析 RocketMQ 源码 - 消息存储模块
扫描二维码
随时随地手机看文章
一、简介
RocketMQ 是阿里巴巴开源的分布式消息中间件,它借鉴了 Kafka 实现,支持消息订阅与发布、顺序消息、事务消息、定时消息、消息回溯、死信队列等功能。RocketMQ 架构上主要分为四部分,如下图所示:
- Producer:消息生产者,支持分布式集群方式部署。
- Consumer:消息消费者,支持分布式集群方式部署。
- NameServer:名字服务,是一个非常简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现,Producer 和 Consumer 通过 NameServer 动态感知 Broker 的路由信息。
- Broker:Broker 主要负责消息的存储、转发和查询。
本文基于 Apache RocketMQ 4.9.1 版本剖析 Broker 中的消息存储模块是如何设计的。
二、存储架构
RocketMQ 的消息文件路径如图所示。
CommitLog消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。
ConsumeQueue消息消费队列,Consumequeue 文件可以看成是基于 CommitLog 的索引文件。ConsumeQueue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M。
IndexFile索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法。单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计类似 JDK 的 HashMap 数据结构。
其他文件:包括 config 文件夹,存放运行时配置信息;abort 文件,说明 Broker 是否正常关闭;checkpoint 文件,存储 Commitlog、ConsumeQueue、Index 文件最后一次刷盘时间戳。这些不在本文讨论的范围。
同 Kafka 相比,Kafka 每个 Topic 的每个 partition 对应一个文件,顺序写入,定时刷盘。但一旦单个 Broker 的 Topic 过多,顺序写将退化为随机写。而 RocketMQ 单个 Broker 所有 Topic 在同一个 CommitLog 中顺序写,是能够保证严格顺序写。RocketMQ 读取消息需要从 ConsumeQueue 中拿到消息实际物理偏移再去 CommitLog 读取消息内容,会造成随机读取。
2.1 Page Cache 和 mmap
在正式介绍 Broker 消息存储模块实现前,先说明下 Page Cache 和 mmap 这两个概念。
Page Cache 是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 Page Cache 机制对读写访问操作进行了性能优化,将一部分的内存用作 Page Cache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 Page Cache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
mmap 是将磁盘上的物理文件直接映射到用户态的内存地址中,减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销。Java NIO 中的 FileChannel 提供了 map() 方法可以实现 mmap。FileChannel (文件通道)和 mmap (内存映射) 读写性能比较可以参照这篇文章。
2.2 Broker 模块
下图是 Broker 存储架构图,展示了 Broker 模块从收到消息到返回响应业务流转过程。
业务接入层:RocketMQ 基于 Netty 的 Reactor 多线程模型实现了底层通信。Reactor 主线程池 eventLoopGroupBoss 负责创建 TCP 连接,默认只有一个线程。连接建立后,再丢给 Reactor 子线程池 eventLoopGroupSelector 进行读写事件的处理。
defaultEventExecutorGroup 负责 SSL 验证、编解码、空闲检查、网络连接管理。然后根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行。Broker 模块通过这四级线程池提升系统吞吐量。
业务处理层:处理各种通过 RPC 调用过来的业务请求,其中:
- SendMessageProcessor 负责处理 Producer 发送消息的请求;
- PullMessageProcessor 负责处理 Consumer 消费消息的请求;
- QueryMessageProcessor 负责处理按照消息 Key 等查询消息的请求。
文件映射层:把 Commitlog、ConsumeQueue、IndexFile 文件映射为存储对象 MappedFile。
数据传输层:支持基于 mmap 内存映射进行读写消息,同时也支持基于 mmap 进行读取消息、堆外内存写入消息的方式进行读写消息。
下面章节将从源码角度来剖析 RocketMQ 是如何实现高性能存储。
三、消息写入
以单个消息生产为例,消息写入时序逻辑如下图,业务逻辑如上文 Broker 存储架构所示在各层之间进行流转。
最底层消息写入核心代码在 CommitLog 的 asyncPutMessage 方法中,主要分为获取 MappedFile、往缓冲区写消息、提交刷盘请求三步。需要注意的是在这三步前后有自旋锁或 ReentrantLock 的加锁、释放锁,保证单个 Broker 写消息是串行的。
//org.apache.rocketmq.store.CommitLog::asyncPutMessage
public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {
...
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
//获取最新的 MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
...
//向缓冲区写消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
...
//提交刷盘请求
CompletableFuture flushResultFuture = submitFlushRequest(result, msg);
...
} finally {
putMessageLock.unlock();
}
...
}
下面介绍这三步具体做了什么事情。
3.1 MappedFile 初始化
在 Broker 初始化时会启动管理 MappedFile 创建的 AllocateMappedFileService 异步线程。消息处理线程 和 AllocateMappedFileService 线程通过队列 requestQueue 关联。
消息写入时调用 AllocateMappedFileService 的 putRequestAndReturnMappedFile 方法往 requestQueue 放入提交创建 MappedFile 请求,这边会同时构建两个 AllocateRequest 放入队列。
AllocateMappedFileService 线程循环从 requestQueue 获取 AllocateRequest 来创建 MappedFile。消息处理线程通过 CountDownLatch 等待获取第一个 MappedFile 创建成功就返回。
当消息处理线程需要再次创建 MappedFile 时,此时可以直接获取之前已预创建的 MappedFile。这样通过预创建 MappedFile ,减少文件创建等待时间。
//org.apache.rocketmq.store.AllocateMappedFileService::putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
//请求创建 MappedFile
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
...
//请求预先创建下一个 MappedFile
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
...
//获取本次创建 MappedFile
AllocateRequest result = this.requestTable.get(nextFilePath);
...
}
//org.apache.rocketmq.store.AllocateMappedFileService::run
public void run() {
..
while (!this.isStopped()