RocketMQ的存储 很有意思 也做了很多的优化 还是很值得学习一下的

前言

broker:
RocketMQ 作为一个发布订阅模型 通过broker节点中转 和 持久化数据 解耦上下游 即broker是存储真是数据的节点

消息模型:
rocketMq的消息模型是按照 主题模型去实现的 在主题模型中 主题是作为存放消息的容器
出于高并发考虑 每一个主题中都有多个队列 在生产者组中的生产者每次生产消息后 指定topic中的某个queue 发送消息 而消费者组 又可以消费一个topic组中的多个queue
而queue作为实际消息的载体则是分布式存储在broker上 broker出于高可用考虑则也会采取主从分布的模式

Broker

Broker 充当着消息中转的角色 负责存储消息、转发消息
Broker在RocketMQ 系统中负责 接收并存储 从生产者发送来的消息 同时为消费者的拉取请求做准备
Broker 同时也存储着消息相关的元数据 包括消费进度便宜offset 主题队列等

Remoting Module:整个broker的实体 负责处理来自 clients端的请求 而这个broker实体 则由以下模块构成

Client Manage: 客户端管理器 负责接收、解析客户端(Producer/Consumer)请求 管理客户端

例如 维护Consumer的Topic订阅信息

StoreService: 存储服务 提供方便简单的API接口 处理消息存储到物理硬盘消息查询 功能

HA Service: 高可用服务 提供 Master Broker 和 Slave Broker之间的数据同步功能

Index Service: 索引服务 根据特定的Message Key 对投递到Broker的消息进行索引服务 同时也提供根据Message Key对消息进行快速查询的功能

Broker集群部署
为了增强Broker性能与吞吐量 Broker一般都是以集群形式出现的 各集群节点中可能存放着相同Topic的不同Queue

不过这里有个问题 如果某Broker节点宕机 如何保证数据不丢失呢?其解决方案是 将每个Broker集群节点进行横向扩展 即将Broker节点再建为一个HA集群 解决单点问题

Broker节点集群是一个主从集群 即集群中具有Master 与Slave 两种角色

Master 负责处理读写操作请求
Slave 负责对Master中的数据进行备份

当Master挂了 Slave则会自动切换为 Master去工作
因此Broker集群更偏向于主备集群 slave只是进行备份

在Rocketmq的官方文档中也可以看到 显示的是主备自动切换 而非主从
https://rocketmq.apache.org/zh/docs/deploymentOperations/03autofailover

Master与slave 是 一对多的关系 Master与slave的对应是通过指定相同的BrokerName 不同的BrokerId来确定的

BrokerId为0 表示Master 非0表示Slave 每个Broker与NameServer 集群中的所有节点建立长连接 定时注册Topic信息到所有NameServer

Raid 磁盘阵列

Raid(Redundat Array of Inexpensive Disks) 廉价冗余磁盘阵列

数据管理

存储目录

RocketMQ的数据默认 存储在 store目录下

commitlog:
存放了commitlog文件 rocketmq的实际消息均是写在commitlog文件中

config:
存放着一些运行期间的配置

comsumequeue/topic/queueId/queue

index

checkpoint:
存储的是上面所有文件的最后刷盘时间

消息数据管理

通过上图可以看到 rocketmq 在写入消息时 首先会先写到 commitlog中 然后将根据topic 生成索引(commitLogOffset、msgSize、tagsCode)写入具体的 ConsumerQueue中
而consumer实际 消费是先查 consumeQueue中 的索引再 再找到 具体commitlog中的消息

commitLog

一个Broker中仅包含一个commitLog目录 其目录下真正存储文件被称为mappedFile

我们知道 底层磁盘的 IO 操作是非常耗时的 而且rocketmq作为一个用户态进程 要想访问磁盘等硬件设备 还需要系统调用的方式

RocketMQ基于Java实现 所以利用了 FileChannel中的 NIO模型
FileChannel 本身具有read、write方法 但这种都是要走系统调用
具体如下:

可以看到read 实际上是基于ByteBuffer 而我们可以看到ByteBuffer 其实真正访问的是HeapByteBuffer

headbyteBuffer 就是堆上内存 但我们知道实际要访问是磁盘上的内容 那么肯定使用系统调用 将磁盘上的数据 先复制到 内核 然后内核 再复制用户空间 对于JVM而言就是堆上

但是FileChannel 还有另一个方法 map

可以看到其 实际返回的是 MappedByteBuffer 也就是基于MappedByteBuffer

MappedByteBuffer源码如下 也基本是NIO的精髓所在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
public abstract class MappedByteBuffer extends ByteBuffer
{

// This is a little bit backwards: By rights MappedByteBuffer should be a
// subclass of DirectByteBuffer, but to keep the spec clear and simple, and
// for optimization purposes, it's easier to do it the other way around.
// This works because DirectByteBuffer is a package-private class.

// For mapped buffers, a FileDescriptor that may be used for mapping
// operations if valid; null if the buffer is not mapped.
private final FileDescriptor fd;

// This should only be invoked by the DirectByteBuffer constructors
//
MappedByteBuffer(int mark, int pos, int lim, int cap, // package-private
FileDescriptor fd)
{
super(mark, pos, lim, cap);
this.fd = fd;
}

MappedByteBuffer(int mark, int pos, int lim, int cap) { // package-private
super(mark, pos, lim, cap);
this.fd = null;
}

private void checkMapped() {
if (fd == null)
// Can only happen if a luser explicitly casts a direct byte buffer
throw new UnsupportedOperationException();
}

// Returns the distance (in bytes) of the buffer from the page aligned address
// of the mapping. Computed each time to avoid storing in every direct buffer.
private long mappingOffset() {
int ps = Bits.pageSize();
long offset = address % ps;
return (offset >= 0) ? offset : (ps + offset);
}

private long mappingAddress(long mappingOffset) {
return address - mappingOffset;
}

private long mappingLength(long mappingOffset) {
return (long)capacity() + mappingOffset;
}

/**
* Tells whether or not this buffer's content is resident in physical
* memory.
*
* <p> A return value of <tt>true</tt> implies that it is highly likely
* that all of the data in this buffer is resident in physical memory and
* may therefore be accessed without incurring any virtual-memory page
* faults or I/O operations. A return value of <tt>false</tt> does not
* necessarily imply that the buffer's content is not resident in physical
* memory.
*
* <p> The returned value is a hint, rather than a guarantee, because the
* underlying operating system may have paged out some of the buffer's data
* by the time that an invocation of this method returns. </p>
*
* @return <tt>true</tt> if it is likely that this buffer's content
* is resident in physical memory
*/
public final boolean isLoaded() {
checkMapped();
if ((address == 0) || (capacity() == 0))
return true;
long offset = mappingOffset();
long length = mappingLength(offset);
return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
}

// not used, but a potential target for a store, see load() for details.
private static byte unused;

/**
* Loads this buffer's content into physical memory.
*
* <p> This method makes a best effort to ensure that, when it returns,
* this buffer's content is resident in physical memory. Invoking this
* method may cause some number of page faults and I/O operations to
* occur. </p>
*
* @return This buffer
*/
public final MappedByteBuffer load() {
checkMapped();
if ((address == 0) || (capacity() == 0))
return this;
long offset = mappingOffset();
long length = mappingLength(offset);
load0(mappingAddress(offset), length);

// Read a byte from each page to bring it into memory. A checksum
// is computed as we go along to prevent the compiler from otherwise
// considering the loop as dead code.
Unsafe unsafe = Unsafe.getUnsafe();
int ps = Bits.pageSize();
int count = Bits.pageCount(length);
long a = mappingAddress(offset);
byte x = 0;
for (int i=0; i<count; i++) {
x ^= unsafe.getByte(a);
a += ps;
}
if (unused != 0)
unused = x;

return this;
}

/**
* Forces any changes made to this buffer's content to be written to the
* storage device containing the mapped file.
*
* <p> If the file mapped into this buffer resides on a local storage
* device then when this method returns it is guaranteed that all changes
* made to the buffer since it was created, or since this method was last
* invoked, will have been written to that device.
*
* <p> If the file does not reside on a local device then no such guarantee
* is made.
*
* <p> If this buffer was not mapped in read/write mode ({@link
* java.nio.channels.FileChannel.MapMode#READ_WRITE}) then invoking this
* method has no effect. </p>
*
* @return This buffer
*/
public final MappedByteBuffer force() {
checkMapped();
if ((address != 0) && (capacity() != 0)) {
long offset = mappingOffset();
force0(fd, mappingAddress(offset), mappingLength(offset));
}
return this;
}

private native boolean isLoaded0(long address, long length, int pageCount);
private native void load0(long address, long length);
private native void force0(FileDescriptor fd, long address, long length);
}

可以看到 load 方法的描述 Loads this buffer's content into physical memory.
也就是会将 本地buffer缓冲区的内容 直接load 到 physical memory 即IO设备 相当于跳过了 内核这一步
也就是操作 内存相当于 直接操作磁盘 所以速度会快很多

如下所示:

上述即下面这段博客中原理的具体阐释

RocketMQ 使用了一种称为 MappedByteBuffer 的内存映射文件的办法,将一个文件映射到进程的地址空间,实现文件的磁盘地址和进程的一段虚拟地址关联,实际上是利用了NIO 中的 FileChannel 模型。在进行这种绑定后,用户进程就可以用指针(偏移量)的形式写入磁盘而不用进行 read / write 的系统调用,减少了数据在缓冲区之间来回拷贝的开销。当然这种内核实现的机制有一些限制,单个 mmap 的文件不能太大 (RocketMQ 选择了 1G),此时再把多个 mmap 的文件用一个链表串起来构成一个逻辑队列 (称为 MappedFileQueue),就可以在逻辑上实现一个无需考虑长度的存储空间来保存全部的消息。

所有的mappedFile文件 都存放在该目录中 即无论当前broker中存放着多少topic的消息 这些消息 都是被顺序写入到了 mappedFile中
即broker中存放时并没有按照topic进行分类存放的

commitLog文件存放于 commitLog目录之下 可以看到commitLog文件的文件名为一串数字 具体而言 commitLog文件由20位十进制数构成 表示当前文件的第一条消息的起始位置偏移量
而这些commitLog 文件其实是 mappedFile文件 默认大小为1G(实际为小于等于1G 因为不一定会写满 不同消息大小不一样) 一个写满之后 就会去自动写下一个 文件之间就由偏移量进行关联映射

(因此第一个文件的文件名 一定为0 如果第一个文件名的大小为x 那么第二个文件名的大小即为x 第n个文件名的大小即为 前n个 文件大小之和 即一个broker中所有mappedfile文件的 commitlog offset是连续的 符合顺序写入的特性)

因此mappedFile文件是 顺序读写的 其访问效率很高

而具体单条消息的存储 如下所示

msglen 表示消息总长
msg body 中存储的就是具体消息内容
queueid 表示所在队列的Id

索引数据管理

由上可知 实际上所有消息 都存储在 commitLog

但RocketMQ采取 订阅模式 只关心所订阅的内容 那么消费者要消费的时候 显然不可能 遍历所有commitlog的内容 去找要消费的消息 否则效率太低

在数据写入 CommitLog 后,在服务端当 MessageStore 向 CommitLog 写入一些消息后,有一个后端的 ReputMessageService 服务 (dispatch 线程) 会异步的构建多种索引,满足不同形式的读取诉求。

这就引出了 我们下面要讨论的 consumequeue

consumequeue

在RocketMQ模型下 消息本身存在的逻辑队列称为MessageQueue 而对应的物理索引文件称为 ConsukeQueue

从某种意义上说 MessageQueue = 多个连续 ConsumeQueue 索引 + CommitLog 文件

consumequeue的结构为
consumequeue/topic/queueId/queue

每个topic 在 store/consumequeue 中会有一个目录 会再为每个该Topic的queue建立一个目录 目录名为queueId 每个目录中存放着若干 consumequeue文件
consumequeue文件 是commitlog的索引文件 可以根据consumequeue定位到具体的消息

consumequeue文件和 mappedfile 不同的是 consumequeue文件名是不变的 因为consumequeue的大小是固定不变的

consumequeue 的索引条目如下

一个consumequeue文件中所有消息的topic一定是相同的 但每条消息的tag 可能是不同的 (这里基于tag 就有一个优化小点 就是如果对tag访问要求比较高 那么可以一个tag一个queue 提高性能)

因此结合了 consuequeue索引

消息写入
现在RocketMQ写入一条消息进入broker后 持久化的过程如下:

  1. Broker根据queueId 获取到该消息对应索引条目要在 consumequeue目录中的写入偏移量 即QueueOffset
    queueId是已经在负载均衡的时候指定要 发送给哪个queue了 QueueOffset是不知道的 要根据queueId 拿到索引条目后 才知道的
  2. 将queueId queueOffset等数据 与 消息义器封装为消息单元
  3. 将消息单元写进commitLog

至此消息已经写入commitLog

然后

dispatch 线程会源源不断的将消息从 CommitLog 取出,再拿出消息在 CommitLog 中的物理偏移量 (相对于文件存储的 Index),消息长度以及Tag Hash 作为单条消息的索引,分发到对应的消费队列。偏移 + 长度构成了对 CommitLog 的引用 (Ref)。这种 Ref 机制对于单挑消息只有 20B,显著降低了索引存储开销。ConsumeQueue 实际写入的实现与 CommitLog 不同,CommitLog 有很多存储策略可以选择且混合存储,一个 ConsumeQueue 只会保存一个 Topic 的一个分区的索引,持久化默认使用 FileChannel
(同时 形成消息索引条目 将消息索引条目分发到相应的consumequeue)

消息拉取
消息拉取过程如下:

  1. Consumer获取到其要消费消息所在Queue的消费偏移offset 计算出其要消费消息的消息offset

    消费偏移offset 即 消费进度 consumer对某个queue的消费offset 即消费到该Queue的第几条消费
    这个offset 具体 存在 store/config/consumeroffset.json 这个文件中
    消息offest = 消费offset + 1 (上次消费完的下一个 就是这次要消费的位置)

  2. Consumer向broker 发送拉取请求 其中会包含其要拉取消息的queue 消息offset 及消息Tag
  3. Broker 计算在该 consumequeue中的 queueOffset

    queueOffset = 消息offset * 20字节(索引条目 8 + 4 + 8)

  4. 从该queueOffset处 开始向后查找第一个指定的Tag的索引条目
  5. 解析该索引条目的前8个字节 即可定位到该消息在 commitlog中的commitlog offset

    前8个字节 即为 commitlog offset

  6. 从对应commitlog offset中 读取消息单元 并发送给consumer

即:

客户端的 pull 请求到服务端执行了如下流程来查询消息:

  1. 根据 Tag 的 Hash 值查询 ConsumeQueue 文件(由 physicOffset + size + Tag HashCode 组成)
  2. 根据 ConsumeQueue 拿到 physicOffset + size
  3. 根据 physicOffset 查询 CommitLog 文件(上文的MappedFileQueue)获得消息

性能优化

在RocketMQ中 无论是消息本身还是消息索引 都是存储在磁盘上的 但却不会影响消息消费的速度
系统采用了一系列相关机制大大提升了性能

首先 RocketMQ 对文件的读写操作是通过 mmap零拷贝进行的 将对文件的操作转化为内存地址进行操作 从而极大地提高了 文件的读写效率

其次 consumequeue中的数据是顺序存放的 还引入了PageCache的预读取机制 使得对consumequeue文件的读取几乎接近于内存读取 即使在有消息堆积情况下也不会影响性能

PageCache机制 页缓存机制 是os 对文件的缓存机制 用于加速对文件的读写操作 一般来说对文件进行 顺序读写的速度 几乎接近于内存读写速度 主要原因是由于OS使用PageCache机制对读写访问操作进行性能优化 将一部分内存用作PageCache
写操作: OS会先将数据写入PageCache中 随后会以异步方式由 pdflush(page dirty flush) 内核线程将Cache中的数据刷盘到物理磁盘
读操作: 若用户要读取数据 其首先会从pageCache中读取 若没有命中 则OS再从物理磁盘上加载该数据到PageCache的同时 也会顺序对其相邻块中的数据进行预读取(相当于提取读入内存)

RocketMQ中可能会影响性能的是对 commitlog文件的读取 因为对commitlog 文件来说 读取消息时会产生大量的随机访问 而随机访问 会严重影响性能 不过如果选择合适的系统IO调度算法 或者SSD 也是可以提升的

indexFile

除了通过通常的指定 Topic进行消息消费外 RocketMQ还提供了根据Key进行消息查询的功能 即通过indexFile 进行索引实现的快速查询
indexFile中的索引数据是在包含了key的消息被发送到Broker时写入的 如果消息中没有包含key 则不会写入

每个broker 会包含一组indexFile 每个indexFile都是以一个时间戳命名的

indexFile文件名为当前文件被创建时的时间戳 这个时间戳的作用是什么呢 ?
根据业务key进行查询时 查询条件除了key之外 还需要指定一个要查询的时间戳 要查询不大于该时间戳的的最新的消息 这个时间戳文件名可以简化查询 提高查询效率

indexFile中包含500w个 slot槽 而每个slot槽有可能会挂载很多的index 索引单元

indexFile存储方式如下

keyd的hash值 % 500w 的 结果即为slot槽位 然后将该slot值 修改该index索引单元的indexNo
根据这个indexNo可以计算出 该index单元在indexFile中的位置 不过该取模结果的重复率是很高的

为了解决该问题 在每个index索引单元中增加了 preIndexNo 用于指定该slot当前index所索引单元的前一个index索引单元
而slot中始终存放的是其下最新的index索引单元的indexNo 这样的话 只要找到了slot就可以找到其最新的index索引单元

indexNo是一个在indexFile中的流水号0 从0开始递增 即在一个indexFile中所有indexNo是以此递增的
indexNo在索引单元中是没有体现的 是在indexs中依次属出来的

而通过这个index索引单元就可以找到其之前的所有index索引单元

indexFile的创建

  1. 当第一条带key的消息发送来后 系统发现没有indexFile 此时会创建第一个indexFile文件
  2. 当一个indexFile中挂载的index索引单元数量 超出2000w个时 会创建新的indexFile 当带key的消息发送到来后 系统会找最新的indexFile 并从其indexHeader的最后4字节中读取到indexCount 若indexCount >= 2000w 时 会创建新的indexFile

indexFile的查询