—|——|————-| | rabbitmsgstore | 共享消息存储,用于大消息(>= 4KB) | 经典队列 |" tags: [“RabbitMQ”, “技术文档”, “源码剖析”]
RabbitMQ-05-存储模块
A1. 模块概览
A1.1 职责与边界
存储模块负责 RabbitMQ 消息的持久化,确保消息在节点重启后能够恢复。存储层分为以下几个子系统:
| 模块 | 职责 | 适用队列类型 |
|---|---|---|
| rabbit_msg_store | 共享消息存储,用于大消息(>= 4KB) | 经典队列 |
| rabbit_classic_queue_store_v2 | 队列专属存储,用于小消息(< 4KB) | 经典队列 |
| rabbit_classic_queue_index_v2 | 消息索引(SeqId → 消息位置) | 经典队列 |
| Ra WAL | Write-Ahead Log,基于 Raft 日志 | 仲裁队列 |
| Osiris | 只追加日志系统,分段存储 | 流队列 |
A1.2 存储架构图
flowchart TB
subgraph Classic[经典队列存储]
VQ[variable_queue]
IDX[queue_index_v2<br/>消息索引]
QS[queue_store_v2<br/>小消息存储]
MS[msg_store<br/>大消息共享存储]
end
subgraph Quorum[仲裁队列存储]
Fifo[rabbit_fifo]
RaWAL[Ra WAL<br/>Raft 日志]
Snapshot[Snapshot<br/>快照]
end
subgraph Stream[流队列存储]
SQ[stream_queue]
Osiris[Osiris Log]
Segment[Segment Files<br/>分段文件]
end
VQ --> IDX
VQ --> QS
VQ --> MS
Fifo --> RaWAL
Fifo --> Snapshot
SQ --> Osiris
Osiris --> Segment
style Classic fill:#e3f2fd
style Quorum fill:#fff3e0
style Stream fill:#f3e5f5
关键设计:
- 经典队列:索引与内容分离,小消息嵌入索引减少 I/O
- 仲裁队列:基于 Raft 日志,多副本强一致,定期快照压缩
- 流队列:只追加日志,非破坏性读取,支持偏移量回溯
A2. 对外 API 列表与规格
A2.1 rabbit_msg_store API(经典队列大消息存储)
(1)write/4 - 写入消息
基本信息:
- 协议:
rabbit_msg_store:write(CRef, MsgId, Msg, CState) -> Result - 幂等性:是(重复写入同一 MsgId 会增加引用计数)
核心代码:
write(CRef, MsgId, Msg, CState = #client_msstate{flying_ets = FlyingEts}) ->
%% 1)标记消息为 "写入中"
true = ets:insert(FlyingEts, {MsgId, ?FLYING_WRITE}),
%% 2)发送消息到 msg_store 进程
ok = rabbit_server:cast(
CState#client_msstate.server,
{write, CRef, MsgId, Msg}
),
{ok, CState}.
msg_store 进程处理:
handle_cast({write, CRef, MsgId, Msg}, State) ->
%% 1)检查 flying 状态
case ets:lookup(FlyingEts, MsgId) of
[{MsgId, ?FLYING_IS_IGNORED}] ->
%% 写入前已被删除,跳过
send_confirm(CRef, MsgId),
{noreply, State};
_ ->
%% 2)追加到当前文件
{ok, Offset} = file:write(CurrentFileHandle, Msg),
%% 3)更新索引:MsgId -> {File, Offset, Size}
Location = #msg_location{
msg_id = MsgId,
file = CurrentFile,
offset = Offset,
total_size = byte_size(Msg),
ref_count = 1
},
ets:insert(IndexEts, {MsgId, Location}),
%% 4)更新 flying 状态
ets:update_element(FlyingEts, MsgId, {2, ?FLYING_IS_WRITTEN}),
%% 5)记录待确认消息
State1 = add_pending_confirm(CRef, MsgId, State),
{noreply, State1}
end.
时序图:
sequenceDiagram
autonumber
participant Queue as Queue Process
participant Client as msg_store Client
participant Store as msg_store Server
participant Disk
Queue->>Client: write(MsgId, Msg)
Client->>Client: ets:insert(flying, {MsgId, WRITE})
Client->>Store: cast({write, MsgId, Msg})
Store->>Disk: file:write(CurrentFile, Msg)
Disk-->>Store: ok
Store->>Store: ets:insert(index, Location)
Store->>Store: add_pending_confirm(MsgId)
Note over Store: 200ms 后同步到磁盘
Store->>Disk: file:sync(CurrentFile)
Store->>Client: {confirm, MsgId}
Client-->>Queue: confirm_to_sender(MsgId)
(2)read/2 - 读取消息
核心代码:
read(MsgId, #client_msstate{index_ets = IndexEts, file_handles_ets = FHandlesEts}) ->
case ets:lookup(IndexEts, MsgId) of
[] -> not_found;
[{MsgId, #msg_location{file = File, offset = Offset, total_size = Size}}] ->
%% 1)获取文件句柄(从缓存或打开)
FileHandle = get_file_handle(File, FHandlesEts),
%% 2)定位并读取
{ok, _} = file:position(FileHandle, Offset),
{ok, Msg} = file:read(FileHandle, Size),
{ok, Msg}
end.
(3)remove/2 - 删除消息(减少引用计数)
核心代码:
remove(MsgIds, #client_msstate{server = Server, client_ref = CRef}) ->
%% 异步删除(减少引用计数)
ok = rabbit_server:cast(Server, {remove, CRef, MsgIds}).
msg_store 处理删除:
handle_cast({remove, CRef, MsgIds}, State) ->
lists:foreach(
fun(MsgId) ->
case ets:lookup(IndexEts, MsgId) of
[{MsgId, #msg_location{ref_count = 1, file = File} = Loc}] ->
%% 引用计数为 1,删除索引条目
ets:delete(IndexEts, MsgId),
%% 标记文件为 GC 候选
State1 = add_gc_candidate(File, Loc#msg_location.total_size, State);
[{MsgId, Loc}] ->
%% 引用计数 > 1,递减
ets:update_element(IndexEts, MsgId, {#msg_location.ref_count, Loc#msg_location.ref_count - 1})
end
end, MsgIds),
{noreply, State}.
A2.2 rabbit_classic_queue_index_v2 API(经典队列索引)
(1)publish/4 - 写入索引条目
功能说明:记录 SeqId → 消息元数据的映射。
核心代码:
publish(Msg, SeqId, MsgProps, State) ->
%% 1)确定消息存储位置
StoreLocation = case mc:size(Msg) < State#qistate.embed_msgs_below of
true ->
%% 小消息嵌入索引(写入 queue_store_v2)
{ok, Offset} = rabbit_classic_queue_store_v2:write(Msg, State#qistate.store_state),
{queue_store, Offset};
false ->
%% 大消息写入共享 msg_store
MsgId = mc:msg_id(Msg),
{ok, _} = rabbit_msg_store:write(State#qistate.msg_store_client, MsgId, Msg),
{msg_store, MsgId}
end,
%% 2)写入索引记录
Entry = #index_entry{
seq_id = SeqId,
location = StoreLocation,
is_persistent = maps:get(delivery_mode, MsgProps) == 2,
is_delivered = false
},
ok = write_index_entry(Entry, State),
{ok, State}.
索引文件格式:
- 分段存储:每个段文件包含固定数量的 SeqId 条目(例如 16384 条)
- 段文件名:
{QueueName}.{StartSeqId}.idx - 条目格式:
<<SeqId:64, Location:Variable, Flags:8>>
A3. 关键数据结构与 UML
A3.1 msg_location 记录(消息位置索引)
-record(msg_location, {
msg_id :: binary(), % 消息 ID(内容哈希)
ref_count :: non_neg_integer(), % 引用计数(多个队列可能共享同一消息)
file :: file_num(), % 文件编号(如 00001.rdq)
offset :: non_neg_integer(), % 文件内偏移量
total_size :: non_neg_integer() % 消息总大小
}).
A3.2 msstate 记录(msg_store 服务器状态)
-record(msstate, {
dir :: file:filename(), % 存储目录(如 /var/lib/rabbitmq/msg_stores/vhosts/{VHost}/msg_store_persistent)
index_ets :: ets:tid(), % 索引表:#{MsgId => #msg_location{}}
current_file :: file_num(), % 当前写入文件编号
current_file_handle :: file:fd(), % 当前文件句柄
current_file_offset :: non_neg_integer(), % 当前写入偏移量
gc_candidates :: sets:set(), % 待 GC 的文件集合
file_summary_ets :: ets:tid(), % 文件摘要表:#{FileNum => #file_summary{}}
flying_ets :: ets:tid(), % 飞行中的写入/删除:#{MsgId => FlyingState}
clients :: map(), % 客户端注册表:#{ClientRef => Callbacks}
file_size_limit :: pos_integer() % 单文件大小限制(默认 16MB)
}).
A3.3 UML 类图
classDiagram
class msstate {
+index_ets: ets()
+current_file: integer()
+current_file_offset: integer()
+file_summary_ets: ets()
+gc_candidates: set()
+file_size_limit: integer()
}
class msg_location {
+msg_id: binary()
+ref_count: integer()
+file: integer()
+offset: integer()
+total_size: integer()
}
class file_summary {
+file: integer()
+valid_total_size: integer()
+file_size: integer()
+locked: boolean()
}
msstate --> msg_location : index_ets 包含
msstate --> file_summary : file_summary_ets 包含
A4. 核心算法/流程剖析
A4.1 消息写入流程(经典队列持久化)
完整路径:
-
variable_queue 决策:根据消息大小选择存储路径
- 小消息(< 4KB):
rabbit_classic_queue_store_v2:write/2 - 大消息(>= 4KB):
rabbit_msg_store:write/4
- 小消息(< 4KB):
-
msg_store 追加写入:
handle_cast({write, CRef, MsgId, Msg}, State) -> %% 1)检查当前文件是否需要轮转 case State#msstate.current_file_offset + byte_size(Msg) > State#msstate.file_size_limit of true -> %% 关闭当前文件,打开新文件 State1 = rotate_file(State); false -> State1 = State end, %% 2)追加到当前文件 ok = file:write(State1#msstate.current_file_handle, Msg), %% 3)更新索引 Location = #msg_location{msg_id = MsgId, file = State1#msstate.current_file, offset = State1#msstate.current_file_offset, ...}, ets:insert(State1#msstate.index_ets, {MsgId, Location}), %% 4)更新文件摘要 update_file_summary(State1#msstate.current_file, +byte_size(Msg)), {noreply, State1}. -
定时同步:
- 每 200ms 触发
file:sync/1,将缓冲区写入磁盘 - 同步后发送 Publisher Confirm 给客户端
- 每 200ms 触发
时序图:
sequenceDiagram
autonumber
participant VQ as variable_queue
participant MS as msg_store
participant FS as File System
participant Index as ETS Index
VQ->>MS: write(MsgId, Msg)
MS->>MS: current_offset + size > limit ?
alt 文件已满
MS->>FS: file:close(CurrentFile)
MS->>FS: file:open(NextFile, [append])
end
MS->>FS: file:write(FileHandle, Msg)
FS-->>MS: ok
MS->>Index: ets:insert({MsgId, Location})
MS->>MS: add_pending_confirm(MsgId)
Note over MS: 等待 200ms 同步定时器
MS->>FS: file:sync(FileHandle)
MS->>VQ: {confirm, MsgId}
A4.2 垃圾回收(GC)流程
触发条件:
- 文件中有效数据占比 < 50%(即空洞超过 50%)
- 定期扫描
gc_candidates集合
GC 步骤:
-
标记阶段:删除消息时,将文件加入
gc_candidates -
压实阶段:
compact_file(File, GCState) -> %% 1)读取文件中所有消息 Messages = read_all_messages_from_file(File), %% 2)过滤出仍有效的消息(在索引中存在) ValidMessages = lists:filter( fun({MsgId, _Msg}) -> case ets:lookup(GCState#gc_state.index_ets, MsgId) of [{MsgId, #msg_location{file = File}}] -> true; _ -> false end end, Messages), %% 3)将有效消息移动到文件头部 NewOffsets = lists:foldl( fun({MsgId, Msg}, Offset) -> ok = file:pwrite(FileHandle, Offset, Msg), %% 更新索引中的偏移量 ets:update_element(IndexEts, MsgId, {#msg_location.offset, Offset}), Offset + byte_size(Msg) end, 0, ValidMessages), %% 4)截断文件 {ok, _} = file:position(FileHandle, NewOffsets), ok = file:truncate(FileHandle). -
清理阶段:
- 若文件变为空,直接删除
- 更新
file_summary表
GC 时序图:
sequenceDiagram
autonumber
participant GC as GC Process
participant Index as ETS Index
participant FS as File System
Note over GC: 定时扫描 gc_candidates
GC->>Index: lookup(MsgIds in File)
Index-->>GC: ValidMessages
loop 有效消息
GC->>FS: pwrite(FileHandle, NewOffset, Msg)
GC->>Index: update_element(Offset)
end
GC->>FS: file:truncate(EndOffset)
GC->>GC: 从 gc_candidates 移除文件
A4.3 仲裁队列存储(Ra WAL)
Ra 日志结构:
- WAL(Write-Ahead Log):所有命令(enqueue/ack)先写入 WAL
- Segment Files:按 Raft Index 分段存储(如
00000001.wal) - Snapshot:定期将状态机状态(
rabbit_fifo)序列化为快照
写入流程:
%% Ra Leader 接收命令
ra_log:append(Index, Term, Command, State) ->
%% 1)序列化命令
Binary = term_to_binary({Index, Term, Command}),
%% 2)追加到 WAL
ok = file:write(State#ra_log_state.wal_handle, Binary),
%% 3)标记待同步
State1 = State#ra_log_state{unflushed = Index},
%% 4)触发异步复制到 Followers
Effects = [{send_rpc, Follower, {append_entries, Index, Command}} || Follower <- Followers],
{State1, Effects}.
快照流程:
%% 当 WAL 条目数超过阈值时触发
take_snapshot(RaftIndex, MacState, State) ->
%% 1)序列化状态机
Snapshot = term_to_binary(MacState),
%% 2)写入快照文件
SnapshotFile = filename:join(State#ra_log_state.dir, integer_to_list(RaftIndex) ++ ".snapshot"),
ok = file:write_file(SnapshotFile, Snapshot),
%% 3)删除旧 WAL 段
OldSegments = [Seg || Seg <- State#ra_log_state.segments, Seg < RaftIndex],
lists:foreach(fun(Seg) -> file:delete(segment_file(Seg)) end, OldSegments),
{ok, State}.
A5. 配置与可观测
A5.1 关键配置项
| 配置项 | 默认值 | 说明 |
|---|---|---|
msg_store_file_size_limit |
16 MB | msg_store 单文件大小限制 |
msg_store_io_batch_size |
2048 | 批量写入消息数 |
queue_index_embed_msgs_below |
4096 字节 | 小消息嵌入阈值 |
msg_store_sync_interval |
200 ms | 磁盘同步间隔 |
raft_log_max_entries |
32768 | Ra WAL 最大条目数(触发快照) |
raft_log_snapshot_interval |
4096 | Ra 快照间隔(条目数) |
A5.2 监控指标
msg_store:
msg_store_write_count:写入消息总数msg_store_read_count:读取消息总数msg_store_gc_count:GC 执行次数msg_store_file_count:当前文件数msg_store_total_size:存储总大小
Ra WAL:
ra_log_snapshot_index:最新快照索引ra_log_last_written_index:最后写入索引ra_log_last_applied_index:最后应用索引ra_log_commit_latency:提交延迟(毫秒)