—|——|————-| | rabbitmsgstore | 共享消息存储,用于大消息(>= 4KB) | 经典队列 |" tags: [“RabbitMQ”, “技术文档”, “源码剖析”]

RabbitMQ-05-存储模块

A1. 模块概览

A1.1 职责与边界

存储模块负责 RabbitMQ 消息的持久化,确保消息在节点重启后能够恢复。存储层分为以下几个子系统:

模块 职责 适用队列类型
rabbit_msg_store 共享消息存储,用于大消息(>= 4KB) 经典队列
rabbit_classic_queue_store_v2 队列专属存储,用于小消息(< 4KB) 经典队列
rabbit_classic_queue_index_v2 消息索引(SeqId → 消息位置) 经典队列
Ra WAL Write-Ahead Log,基于 Raft 日志 仲裁队列
Osiris 只追加日志系统,分段存储 流队列

A1.2 存储架构图

flowchart TB
    subgraph Classic[经典队列存储]
        VQ[variable_queue]
        IDX[queue_index_v2<br/>消息索引]
        QS[queue_store_v2<br/>小消息存储]
        MS[msg_store<br/>大消息共享存储]
    end
    
    subgraph Quorum[仲裁队列存储]
        Fifo[rabbit_fifo]
        RaWAL[Ra WAL<br/>Raft 日志]
        Snapshot[Snapshot<br/>快照]
    end
    
    subgraph Stream[流队列存储]
        SQ[stream_queue]
        Osiris[Osiris Log]
        Segment[Segment Files<br/>分段文件]
    end
    
    VQ --> IDX
    VQ --> QS
    VQ --> MS
    
    Fifo --> RaWAL
    Fifo --> Snapshot
    
    SQ --> Osiris
    Osiris --> Segment
    
    style Classic fill:#e3f2fd
    style Quorum fill:#fff3e0
    style Stream fill:#f3e5f5

关键设计

  1. 经典队列:索引与内容分离,小消息嵌入索引减少 I/O
  2. 仲裁队列:基于 Raft 日志,多副本强一致,定期快照压缩
  3. 流队列:只追加日志,非破坏性读取,支持偏移量回溯

A2. 对外 API 列表与规格

A2.1 rabbit_msg_store API(经典队列大消息存储)

(1)write/4 - 写入消息

基本信息

  • 协议:rabbit_msg_store:write(CRef, MsgId, Msg, CState) -> Result
  • 幂等性:是(重复写入同一 MsgId 会增加引用计数)

核心代码

write(CRef, MsgId, Msg, CState = #client_msstate{flying_ets = FlyingEts}) ->
    %% 1)标记消息为 "写入中"
    true = ets:insert(FlyingEts, {MsgId, ?FLYING_WRITE}),
    
    %% 2)发送消息到 msg_store 进程
    ok = rabbit_server:cast(
        CState#client_msstate.server,
        {write, CRef, MsgId, Msg}
    ),
    {ok, CState}.

msg_store 进程处理

handle_cast({write, CRef, MsgId, Msg}, State) ->
    %% 1)检查 flying 状态
    case ets:lookup(FlyingEts, MsgId) of
        [{MsgId, ?FLYING_IS_IGNORED}] ->
            %% 写入前已被删除,跳过
            send_confirm(CRef, MsgId),
            {noreply, State};
        _ ->
            %% 2)追加到当前文件
            {ok, Offset} = file:write(CurrentFileHandle, Msg),
            
            %% 3)更新索引:MsgId -> {File, Offset, Size}
            Location = #msg_location{
                msg_id = MsgId,
                file = CurrentFile,
                offset = Offset,
                total_size = byte_size(Msg),
                ref_count = 1
            },
            ets:insert(IndexEts, {MsgId, Location}),
            
            %% 4)更新 flying 状态
            ets:update_element(FlyingEts, MsgId, {2, ?FLYING_IS_WRITTEN}),
            
            %% 5)记录待确认消息
            State1 = add_pending_confirm(CRef, MsgId, State),
            {noreply, State1}
    end.

时序图

sequenceDiagram
    autonumber
    participant Queue as Queue Process
    participant Client as msg_store Client
    participant Store as msg_store Server
    participant Disk
    
    Queue->>Client: write(MsgId, Msg)
    Client->>Client: ets:insert(flying, {MsgId, WRITE})
    Client->>Store: cast({write, MsgId, Msg})
    Store->>Disk: file:write(CurrentFile, Msg)
    Disk-->>Store: ok
    Store->>Store: ets:insert(index, Location)
    Store->>Store: add_pending_confirm(MsgId)
    Note over Store: 200ms 后同步到磁盘
    Store->>Disk: file:sync(CurrentFile)
    Store->>Client: {confirm, MsgId}
    Client-->>Queue: confirm_to_sender(MsgId)

(2)read/2 - 读取消息

核心代码

read(MsgId, #client_msstate{index_ets = IndexEts, file_handles_ets = FHandlesEts}) ->
    case ets:lookup(IndexEts, MsgId) of
        [] -> not_found;
        [{MsgId, #msg_location{file = File, offset = Offset, total_size = Size}}] ->
            %% 1)获取文件句柄(从缓存或打开)
            FileHandle = get_file_handle(File, FHandlesEts),
            
            %% 2)定位并读取
            {ok, _} = file:position(FileHandle, Offset),
            {ok, Msg} = file:read(FileHandle, Size),
            {ok, Msg}
    end.

(3)remove/2 - 删除消息(减少引用计数)

核心代码

remove(MsgIds, #client_msstate{server = Server, client_ref = CRef}) ->
    %% 异步删除(减少引用计数)
    ok = rabbit_server:cast(Server, {remove, CRef, MsgIds}).

msg_store 处理删除

handle_cast({remove, CRef, MsgIds}, State) ->
    lists:foreach(
        fun(MsgId) ->
            case ets:lookup(IndexEts, MsgId) of
                [{MsgId, #msg_location{ref_count = 1, file = File} = Loc}] ->
                    %% 引用计数为 1,删除索引条目
                    ets:delete(IndexEts, MsgId),
                    %% 标记文件为 GC 候选
                    State1 = add_gc_candidate(File, Loc#msg_location.total_size, State);
                [{MsgId, Loc}] ->
                    %% 引用计数 > 1,递减
                    ets:update_element(IndexEts, MsgId, {#msg_location.ref_count, Loc#msg_location.ref_count - 1})
            end
        end, MsgIds),
    {noreply, State}.

A2.2 rabbit_classic_queue_index_v2 API(经典队列索引)

(1)publish/4 - 写入索引条目

功能说明:记录 SeqId → 消息元数据的映射。

核心代码

publish(Msg, SeqId, MsgProps, State) ->
    %% 1)确定消息存储位置
    StoreLocation = case mc:size(Msg) < State#qistate.embed_msgs_below of
        true ->
            %% 小消息嵌入索引(写入 queue_store_v2)
            {ok, Offset} = rabbit_classic_queue_store_v2:write(Msg, State#qistate.store_state),
            {queue_store, Offset};
        false ->
            %% 大消息写入共享 msg_store
            MsgId = mc:msg_id(Msg),
            {ok, _} = rabbit_msg_store:write(State#qistate.msg_store_client, MsgId, Msg),
            {msg_store, MsgId}
    end,
    
    %% 2)写入索引记录
    Entry = #index_entry{
        seq_id = SeqId,
        location = StoreLocation,
        is_persistent = maps:get(delivery_mode, MsgProps) == 2,
        is_delivered = false
    },
    ok = write_index_entry(Entry, State),
    {ok, State}.

索引文件格式

  • 分段存储:每个段文件包含固定数量的 SeqId 条目(例如 16384 条)
  • 段文件名:{QueueName}.{StartSeqId}.idx
  • 条目格式:<<SeqId:64, Location:Variable, Flags:8>>

A3. 关键数据结构与 UML

A3.1 msg_location 记录(消息位置索引)

-record(msg_location, {
    msg_id :: binary(),         % 消息 ID(内容哈希)
    ref_count :: non_neg_integer(), % 引用计数(多个队列可能共享同一消息)
    file :: file_num(),         % 文件编号(如 00001.rdq)
    offset :: non_neg_integer(), % 文件内偏移量
    total_size :: non_neg_integer() % 消息总大小
}).

A3.2 msstate 记录(msg_store 服务器状态)

-record(msstate, {
    dir :: file:filename(),         % 存储目录(如 /var/lib/rabbitmq/msg_stores/vhosts/{VHost}/msg_store_persistent)
    index_ets :: ets:tid(),         % 索引表:#{MsgId => #msg_location{}}
    current_file :: file_num(),     % 当前写入文件编号
    current_file_handle :: file:fd(), % 当前文件句柄
    current_file_offset :: non_neg_integer(), % 当前写入偏移量
    gc_candidates :: sets:set(),    % 待 GC 的文件集合
    file_summary_ets :: ets:tid(),  % 文件摘要表:#{FileNum => #file_summary{}}
    flying_ets :: ets:tid(),        % 飞行中的写入/删除:#{MsgId => FlyingState}
    clients :: map(),               % 客户端注册表:#{ClientRef => Callbacks}
    file_size_limit :: pos_integer() % 单文件大小限制(默认 16MB)
}).

A3.3 UML 类图

classDiagram
    class msstate {
        +index_ets: ets()
        +current_file: integer()
        +current_file_offset: integer()
        +file_summary_ets: ets()
        +gc_candidates: set()
        +file_size_limit: integer()
    }
    
    class msg_location {
        +msg_id: binary()
        +ref_count: integer()
        +file: integer()
        +offset: integer()
        +total_size: integer()
    }
    
    class file_summary {
        +file: integer()
        +valid_total_size: integer()
        +file_size: integer()
        +locked: boolean()
    }
    
    msstate --> msg_location : index_ets 包含
    msstate --> file_summary : file_summary_ets 包含

A4. 核心算法/流程剖析

A4.1 消息写入流程(经典队列持久化)

完整路径

  1. variable_queue 决策:根据消息大小选择存储路径

    • 小消息(< 4KB):rabbit_classic_queue_store_v2:write/2
    • 大消息(>= 4KB):rabbit_msg_store:write/4
  2. msg_store 追加写入

    handle_cast({write, CRef, MsgId, Msg}, State) ->
        %% 1)检查当前文件是否需要轮转
        case State#msstate.current_file_offset + byte_size(Msg) > State#msstate.file_size_limit of
            true ->
                %% 关闭当前文件,打开新文件
                State1 = rotate_file(State);
            false ->
                State1 = State
        end,
    
        %% 2)追加到当前文件
        ok = file:write(State1#msstate.current_file_handle, Msg),
    
        %% 3)更新索引
        Location = #msg_location{msg_id = MsgId, file = State1#msstate.current_file, offset = State1#msstate.current_file_offset, ...},
        ets:insert(State1#msstate.index_ets, {MsgId, Location}),
    
        %% 4)更新文件摘要
        update_file_summary(State1#msstate.current_file, +byte_size(Msg)),
    
        {noreply, State1}.
    
  3. 定时同步

    • 每 200ms 触发 file:sync/1,将缓冲区写入磁盘
    • 同步后发送 Publisher Confirm 给客户端

时序图

sequenceDiagram
    autonumber
    participant VQ as variable_queue
    participant MS as msg_store
    participant FS as File System
    participant Index as ETS Index
    
    VQ->>MS: write(MsgId, Msg)
    MS->>MS: current_offset + size > limit ?
    alt 文件已满
        MS->>FS: file:close(CurrentFile)
        MS->>FS: file:open(NextFile, [append])
    end
    MS->>FS: file:write(FileHandle, Msg)
    FS-->>MS: ok
    MS->>Index: ets:insert({MsgId, Location})
    MS->>MS: add_pending_confirm(MsgId)
    Note over MS: 等待 200ms 同步定时器
    MS->>FS: file:sync(FileHandle)
    MS->>VQ: {confirm, MsgId}

A4.2 垃圾回收(GC)流程

触发条件

  • 文件中有效数据占比 < 50%(即空洞超过 50%)
  • 定期扫描 gc_candidates 集合

GC 步骤

  1. 标记阶段:删除消息时,将文件加入 gc_candidates

  2. 压实阶段

    compact_file(File, GCState) ->
        %% 1)读取文件中所有消息
        Messages = read_all_messages_from_file(File),
    
        %% 2)过滤出仍有效的消息(在索引中存在)
        ValidMessages = lists:filter(
            fun({MsgId, _Msg}) ->
                case ets:lookup(GCState#gc_state.index_ets, MsgId) of
                    [{MsgId, #msg_location{file = File}}] -> true;
                    _ -> false
                end
            end, Messages),
    
        %% 3)将有效消息移动到文件头部
        NewOffsets = lists:foldl(
            fun({MsgId, Msg}, Offset) ->
                ok = file:pwrite(FileHandle, Offset, Msg),
                %% 更新索引中的偏移量
                ets:update_element(IndexEts, MsgId, {#msg_location.offset, Offset}),
                Offset + byte_size(Msg)
            end, 0, ValidMessages),
    
        %% 4)截断文件
        {ok, _} = file:position(FileHandle, NewOffsets),
        ok = file:truncate(FileHandle).
    
  3. 清理阶段

    • 若文件变为空,直接删除
    • 更新 file_summary

GC 时序图

sequenceDiagram
    autonumber
    participant GC as GC Process
    participant Index as ETS Index
    participant FS as File System
    
    Note over GC: 定时扫描 gc_candidates
    GC->>Index: lookup(MsgIds in File)
    Index-->>GC: ValidMessages
    loop 有效消息
        GC->>FS: pwrite(FileHandle, NewOffset, Msg)
        GC->>Index: update_element(Offset)
    end
    GC->>FS: file:truncate(EndOffset)
    GC->>GC: 从 gc_candidates 移除文件

A4.3 仲裁队列存储(Ra WAL)

Ra 日志结构

  • WAL(Write-Ahead Log):所有命令(enqueue/ack)先写入 WAL
  • Segment Files:按 Raft Index 分段存储(如 00000001.wal
  • Snapshot:定期将状态机状态(rabbit_fifo)序列化为快照

写入流程

%% Ra Leader 接收命令
ra_log:append(Index, Term, Command, State) ->
    %% 1)序列化命令
    Binary = term_to_binary({Index, Term, Command}),
    
    %% 2)追加到 WAL
    ok = file:write(State#ra_log_state.wal_handle, Binary),
    
    %% 3)标记待同步
    State1 = State#ra_log_state{unflushed = Index},
    
    %% 4)触发异步复制到 Followers
    Effects = [{send_rpc, Follower, {append_entries, Index, Command}} || Follower <- Followers],
    {State1, Effects}.

快照流程

%% 当 WAL 条目数超过阈值时触发
take_snapshot(RaftIndex, MacState, State) ->
    %% 1)序列化状态机
    Snapshot = term_to_binary(MacState),
    
    %% 2)写入快照文件
    SnapshotFile = filename:join(State#ra_log_state.dir, integer_to_list(RaftIndex) ++ ".snapshot"),
    ok = file:write_file(SnapshotFile, Snapshot),
    
    %% 3)删除旧 WAL 段
    OldSegments = [Seg || Seg <- State#ra_log_state.segments, Seg < RaftIndex],
    lists:foreach(fun(Seg) -> file:delete(segment_file(Seg)) end, OldSegments),
    
    {ok, State}.

A5. 配置与可观测

A5.1 关键配置项

配置项 默认值 说明
msg_store_file_size_limit 16 MB msg_store 单文件大小限制
msg_store_io_batch_size 2048 批量写入消息数
queue_index_embed_msgs_below 4096 字节 小消息嵌入阈值
msg_store_sync_interval 200 ms 磁盘同步间隔
raft_log_max_entries 32768 Ra WAL 最大条目数(触发快照)
raft_log_snapshot_interval 4096 Ra 快照间隔(条目数)

A5.2 监控指标

msg_store

  • msg_store_write_count:写入消息总数
  • msg_store_read_count:读取消息总数
  • msg_store_gc_count:GC 执行次数
  • msg_store_file_count:当前文件数
  • msg_store_total_size:存储总大小

Ra WAL

  • ra_log_snapshot_index:最新快照索引
  • ra_log_last_written_index:最后写入索引
  • ra_log_last_applied_index:最后应用索引
  • ra_log_commit_latency:提交延迟(毫秒)