RabbitMQ-04-队列模块

A1. 模块概览

A1.1 职责与边界

队列模块是 RabbitMQ 消息系统的核心存储层,负责消息的持久化、排队与投递。RabbitMQ 提供三种队列类型,分别针对不同的业务场景与可靠性要求:

队列类型 模块名 适用场景 关键特性
经典队列 rabbit_classic_queue 通用场景、低延迟 单节点存储、可选持久化、灵活消费
仲裁队列 rabbit_quorum_queue 高可用、强一致 Raft 多副本、日志复制、自动故障转移
流队列 rabbit_stream_queue 大吞吐、可回溯 只追加日志、非破坏性消费、持久化偏移量

队列模块的核心职责包括:

  1. 消息存储与检索:支持内存/磁盘混合存储,根据队列类型选择不同的持久化策略
  2. 消费者管理:跟踪活跃消费者、分配消息、维护未确认消息(unacked messages)
  3. 流控与反压:根据内存/磁盘压力、消费速率动态调整消息接收速度
  4. 优先级与 TTL:支持消息优先级排序、过期消息自动删除
  5. 死信与备用交换器:将无法投递的消息路由到 DLX(Dead Letter Exchange)
  6. 一致性保证
    • 经典队列:单节点原子性(不跨节点复制)
    • 仲裁队列:基于 Ra(Raft)的多副本强一致
    • 流队列:基于 Osiris 的 Quorum 日志写入

A1.2 队列类型对比

flowchart TB
    Client[客户端 Channel]
    QT[rabbit_queue_type<br/>类型分发器]
    
    subgraph Classic[经典队列 Classic]
        CQ[rabbit_classic_queue]
        VQ[rabbit_variable_queue<br/>变长队列]
        QI[queue_index<br/>索引]
        MS[msg_store<br/>消息存储]
    end
    
    subgraph Quorum[仲裁队列 Quorum]
        QQ[rabbit_quorum_queue]
        RF[rabbit_fifo<br/>Ra 状态机]
        RA[Ra 共识引擎]
        WAL[Write Ahead Log]
    end
    
    subgraph Stream[流队列 Stream]
        SQ[rabbit_stream_queue]
        OSI[Osiris 日志系统]
        SEG[Segment 分段存储]
    end
    
    Client --> QT
    QT --> CQ
    QT --> QQ
    QT --> SQ
    
    CQ --> VQ
    VQ --> QI
    VQ --> MS
    
    QQ --> RF
    RF --> RA
    RA --> WAL
    
    SQ --> OSI
    OSI --> SEG
    
    style Classic fill:#e3f2fd
    style Quorum fill:#fff3e0
    style Stream fill:#f3e5f5

图说明

  1. 类型分发rabbit_queue_type 根据队列声明时的 x-queue-type 参数选择对应实现
  2. 经典队列路径:消息通过 variable_queue 进入内存队列(q3)或磁盘队列(delta),索引和内容分离存储
  3. 仲裁队列路径:消息通过 rabbit_fifo 状态机写入 Ra 的 WAL,异步复制到多个节点
  4. 流队列路径:消息直接追加到 Osiris 的不可变日志段,消费者通过偏移量读取

A1.3 上下游依赖

上游调用方

  • rabbit_channel:消息发布(basic.publish)、消费者订阅(basic.consume)、确认/拒绝(basic.ack/nack/reject
  • rabbit_amqqueue:队列管理(declare、delete、purge)、元数据查询
  • rabbit_exchange:路由匹配后将消息投递到队列

下游依赖

  • 存储层
    • rabbit_msg_store(经典队列持久化消息体)
    • rabbit_classic_queue_index_v2(经典队列消息索引)
    • ra(仲裁队列共识引擎)
    • osiris(流队列日志系统)
  • 监控/统计
    • rabbit_core_metrics:队列 QPS、消息堆积、内存占用
    • rabbit_event:队列状态变更事件(创建、删除、消费者变化)
  • 限流/告警
    • rabbit_alarm:内存/磁盘告警触发流控
    • rabbit_limiter:QoS prefetch 限制消费速率

A2. 对外 API 列表与规格

A2.1 队列类型行为接口(rabbit_queue_type Behavior)

所有队列类型必须实现的回调接口,定义于 rabbit_queue_type.erl

(1)declare/2 - 声明队列

基本信息

  • 协议:rabbit_queue_type:declare(amqqueue:amqqueue(), node()) -> Result
  • 幂等性:是(重复声明相同配置返回 existing,不同配置返回错误)

请求结构体

%% amqqueue 记录(定义于 amqqueue_v2.hrl)
-record(amqqueue, {
    name        :: rabbit_amqqueue:name(),  % {resource, VHost, queue, Name}
    durable     :: boolean(),               % 是否持久化到磁盘
    auto_delete :: boolean(),               % 无消费者时自动删除
    exclusive_owner :: none | pid(),        % 独占连接(仅该连接可访问)
    arguments   :: rabbit_framing:amqp_table(), % 扩展参数(如 x-max-length)
    pid         :: pid() | ra_server_id(),  % 队列进程 PID(经典)或 Ra 集群 ID(仲裁/流)
    type        :: atom(),                  % rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue
    vhost       :: binary(),                % 虚拟主机
    state       :: atom(),                  % live | stopped
    policy      :: undefined | binary(),    % 应用的策略名称
    operator_policy :: undefined | binary() % 运维策略
}).

响应类型

-type declare_result() ::
    {new, amqqueue:amqqueue()} |          % 新创建
    {existing, amqqueue:amqqueue()} |      % 已存在且配置一致
    {owner_died, amqqueue:amqqueue()} |    % 独占队列的所有者进程已死亡
    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.

核心代码rabbit_amqqueue:declare/7):

declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, Node) ->
    %% 1)参数规范化:合并策略参数、检查最大长度等约束
    NormalizedArgs = augment_declare_args(QueueName, Durable, AutoDelete, Args, ActingUser),
    
    %% 2)确定队列类型(默认 classic,可通过 x-queue-type 指定)
    Type = get_queue_type(NormalizedArgs),
    
    %% 3)检查类型兼容性(如 Quorum 队列必须是 durable=true, auto_delete=false)
    case rabbit_queue_type:is_compatible(Type, Durable, AutoDelete, Owner =/= none) of
        false -> {error, incompatible_queue_arguments};
        true ->
            %% 4)构造 amqqueue 记录
            Q0 = amqqueue:new(QueueName, Owner, Durable, AutoDelete, NormalizedArgs),
            Q = amqqueue:set_type(Q0, Type),
            
            %% 5)委托给队列类型模块声明(事务:检查冲突、写入 Mnesia)
            rabbit_queue_type:declare(Q, Node)
    end.

调用链上游(谁调用了 declare):

  • rabbit_channel:handle_method(#'queue.declare'{}, _, State)
    handle_method(#'queue.declare'{queue = QueueBin, durable = Durable, ...}, _, State) ->
        QueueName = rabbit_misc:r(VHost, queue, QueueBin),
        case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner, User) of
            {new, Q} ->
                %% 声明成功,记录到 channel 状态
                State1 = monitor_queue(Q, State),
                reply(#'queue.declare_ok'{queue = QueueBin, ...}, State1);
            {existing, Q} ->
                %% 幂等声明
                reply(#'queue.declare_ok'{...}, State);
            {protocol_error, Type, Reason, Args} ->
                rabbit_misc:protocol_error(Type, Reason, Args)
        end.
    

时序图

sequenceDiagram
    autonumber
    participant Client
    participant Channel as rabbit_channel
    participant AM as rabbit_amqqueue
    participant QT as rabbit_queue_type
    participant CQ as rabbit_classic_queue<br/>(或 Quorum/Stream)
    participant DB as rabbit_db_queue
    
    Client->>Channel: queue.declare(durable, args)
    Channel->>AM: declare(QueueName, Durable, Args)
    AM->>QT: get_queue_type(Args)
    QT-->>AM: rabbit_classic_queue
    AM->>QT: is_compatible(Type, Durable, AutoDelete)
    QT-->>AM: true
    AM->>CQ: declare(Q, Node)
    CQ->>DB: insert_or_update(Q)
    DB-->>CQ: {ok, NewQ}
    CQ-->>AM: {new, NewQ}
    AM-->>Channel: {new, NewQ}
    Channel->>Channel: monitor_queue(Q)
    Channel-->>Client: queue.declare_ok

异常与边界

场景 行为
重复声明(配置一致) 返回 {existing, Q},幂等操作
重复声明(配置冲突) {protocol_error, precondition_failed, "cannot redeclare..."}
节点不可用 阻塞等待或超时失败(仲裁队列需大多数节点在线)
超过队列数量限制 {protocol_error, precondition_failed, "max queues reached"}

(2)deliver/3 - 投递消息

基本信息

  • 协议:rabbit_queue_type:deliver([amqqueue:amqqueue()], mc:state(), Options) -> Result
  • 幂等性:否(每次调用都会重新路由消息)

请求结构体

-type deliver_options() :: #{
    correlation := term(),     % 关联 ID,用于 Publisher Confirms
    mandatory := boolean(),    % 若无队列可路由则返回 basic.return
    delivery_mode := 1 | 2     % 1=transient, 2=persistent
}.

响应类型

-type deliver_result() ::
    ok |                        % 所有队列投递成功
    {delivered, [amqqueue:amqqueue()]} |  % 部分队列投递(其余崩溃/停止)
    {unroutable, mc:state()}.   % 没有队列接收(mandatory=true 时触发 basic.return)

核心代码rabbit_classic_queue:deliver/3):

deliver(Qs, Msg, Options) ->
    %% 经典队列:同步投递到队列进程
    Correlation = maps:get(correlation, Options),
    lists:foldl(
        fun(Q, Acc) ->
            QPid = amqqueue:get_pid(Q),
            case is_process_alive(QPid) of
                true ->
                    %% 发送消息到队列进程的 mailbox
                    QPid ! {deliver, Msg, Correlation},
                    [Q | Acc];
                false ->
                    %% 队列崩溃,跳过
                    Acc
            end
        end, [], Qs).

仲裁队列的 deliver 实现rabbit_quorum_queue:deliver/3):

deliver(Qs, Msg, Options) ->
    %% 仲裁队列:通过 Ra 客户端异步提交日志
    Correlation = maps:get(correlation, Options),
    lists:foldl(
        fun(Q, Acc) ->
            RaName = amqqueue:get_pid(Q),  % Ra 集群标识 {Name, Node}
            Cmd = rabbit_fifo:make_enqueue(undefined, Msg),
            case ra:pipeline_command(RaName, Cmd, Correlation) of
                ok -> [Q | Acc];
                {error, _} -> Acc
            end
        end, [], Qs).

时序图(经典队列):

sequenceDiagram
    autonumber
    participant Channel as rabbit_channel
    participant QT as rabbit_queue_type
    participant CQ as Classic Queue Process
    participant VQ as rabbit_variable_queue
    participant Index as queue_index
    
    Channel->>QT: deliver([Q1, Q2], Msg, Options)
    QT->>CQ: QPid ! {deliver, Msg, Correlation}
    CQ->>VQ: publish(Msg, MsgProps, Correlation)
    VQ->>Index: publish(MsgId, SeqId, MsgProps)
    Index-->>VQ: ok
    VQ->>VQ: 将消息加入 q3(内存队列)或 delta(磁盘队列)
    VQ-->>CQ: {ok, State1}
    CQ->>CQ: 检查是否有待投递消费者
    CQ->>CQ: 立即投递或排队等待 consume
    CQ-->>QT: ok
    QT-->>Channel: {delivered, [Q1, Q2]}

时序图(仲裁队列):

sequenceDiagram
    autonumber
    participant Channel
    participant QQ as rabbit_quorum_queue
    participant Ra as Ra Consensus
    participant Fifo as rabbit_fifo<br/>(State Machine)
    participant Follower1
    participant Follower2
    
    Channel->>QQ: deliver([Q], Msg, Opts)
    QQ->>Ra: pipeline_command(enqueue, Msg, Corr)
    Ra->>Fifo: apply(enqueue, State)
    Fifo->>Fifo: 将消息加入 messages 队列<br/>分配 RaftIndex
    Fifo-->>Ra: {State1, Effects}
    Ra->>Follower1: AppendEntries RPC(异步复制)
    Ra->>Follower2: AppendEntries RPC
    Follower1-->>Ra: ack
    Follower2-->>Ra: ack
    Note over Ra: 达到 Quorum(2/3)后提交
    Ra-->>QQ: {ok, Correlation}
    QQ-->>Channel: {delivered, [Q]}

异常与性能要点

场景 行为
队列进程崩溃 经典队列跳过该队列;仲裁队列自动切换到新 Leader
内存告警 队列阻塞接收(流控),返回 {busy, _},Channel 暂停发送
磁盘满 仲裁/流队列无法写入 WAL,返回错误;经典队列尝试清理或拒绝
慢消费者 队列堆积,经典队列可能触发消息换页(paging)到磁盘

(3)consume/3 - 注册消费者

基本信息

  • 协议:rabbit_queue_type:consume(amqqueue:amqqueue(), Consume, State) -> {ok, NewState}
  • 幂等性:否(每次调用创建新的消费者订阅)

请求结构体

-record(basic_consume, {
    queue :: binary(),          % 队列名
    consumer_tag :: binary(),   % 消费者标签(客户端提供或自动生成)
    no_ack :: boolean(),        % true=自动确认,false=手动确认
    exclusive :: boolean(),     % true=独占消费(单消费者模式)
    arguments :: rabbit_framing:amqp_table()  % 扩展参数(如 x-priority)
}).

核心代码rabbit_classic_queue 中的消费者注册):

consume(Q, #basic_consume{consumer_tag = CTag, no_ack = NoAck, exclusive = Exclusive}, QState) ->
    QPid = amqqueue:get_pid(Q),
    %% 1)向队列进程发送 consume 请求
    case gen_server2:call(QPid, {basic_consume, NoAck, self(), CTag, Exclusive, Args}, infinity) of
        ok ->
            %% 2)将消费者记录到 Channel 状态
            Consumer = #consumer{tag = CTag, ack_required = not NoAck},
            NewState = QState#{consumers => maps:put(CTag, Consumer, maps:get(consumers, QState))},
            {ok, NewState};
        {error, exclusive_consume_unavailable} ->
            {error, <<"queue is exclusively consumed">>}
    end.

队列进程处理 consumerabbit_classic_queue_v2:handle_call/3):

handle_call({basic_consume, NoAck, ChPid, CTag, Exclusive, Args}, _From, State) ->
    %% 1)检查是否独占冲突
    case Exclusive andalso has_consumers(State) of
        true -> {reply, {error, exclusive_consume_unavailable}, State};
        false ->
            %% 2)创建消费者记录
            Consumer = #consumer{
                channel_pid = ChPid,
                tag = CTag,
                ack_required = not NoAck,
                prefetch_count = get_prefetch_count(Args)
            },
            %% 3)添加到消费者列表
            State1 = add_consumer(CTag, Consumer, State),
            %% 4)立即尝试投递消息
            State2 = deliver_msgs_to_consumers(State1),
            {reply, ok, State2}
    end.

时序图

sequenceDiagram
    autonumber
    participant Client
    participant Channel as rabbit_channel
    participant QProc as Queue Process
    participant VQ as variable_queue
    
    Client->>Channel: basic.consume(queue, tag, no_ack)
    Channel->>QProc: gen_server2:call({basic_consume, ...})
    QProc->>QProc: 检查独占冲突
    QProc->>QProc: add_consumer(Tag, Consumer)
    QProc->>VQ: fetch(AckRequired)
    VQ-->>QProc: {Msg, State1} | empty
    alt 有消息可投递
        QProc->>Channel: deliver_to_consumer(Msg)
        Channel-->>Client: basic.deliver(Msg)
    end
    QProc-->>Channel: ok
    Channel-->>Client: basic.consume_ok(tag)

异常处理

场景 行为
队列不存在 {error, not_found}
独占冲突 {error, exclusive_consume_unavailable}
Consumer Tag 冲突 {error, consumer_tag_in_use}
Channel 进程崩溃 队列自动清理该 Channel 的所有消费者

A2.2 经典队列专有 API

(1)rabbit_variable_queue:publish/5 - 发布消息到后端队列

功能说明:将消息写入经典队列的内存/磁盘混合存储结构,根据消息大小和队列状态决定存储位置。

入口函数

publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
    %% 1)检查消息是否重复(基于 Msg ID 去重)
    case is_duplicate(Msg, State) of
        {true, State1} -> {ok, State1};  % 幂等:已发布过,跳过
        {false, State1} ->
            %% 2)分配序列号(SeqId)
            SeqId = State1#vqstate.next_seq_id,
            State2 = State1#vqstate{next_seq_id = SeqId + 1},
            
            %% 3)持久化消息(如果 durable=true 且 delivery_mode=2)
            State3 = case needs_confirm(MsgProps, State2) of
                true ->
                    %% 记录未确认消息,用于 Publisher Confirms
                    track_confirm(Msg, SeqId, ChPid, State2);
                false -> State2
            end,
            
            %% 4)写入索引(将 SeqId -> MsgId 映射持久化)
            State4 = publish_to_index(Msg, SeqId, MsgProps, State3),
            
            %% 5)根据内存压力决定消息位置
            State5 = case should_keep_in_ram(State4) of
                true ->
                    %% 消息保留在 q3(内存队列)
                    enqueue_message(q3, Msg, SeqId, State4);
                false ->
                    %% 消息直接写入 delta(磁盘队列)
                    enqueue_message(delta, Msg, SeqId, State4)
            end,
            
            {ok, State5}
    end.

关键决策should_keep_in_ram 的逻辑:

should_keep_in_ram(#vqstate{ram_msg_count = RamCount, target_ram_count = Target}) ->
    %% 目标内存消息数由消费速率动态调整(最少1条,最多2048条)
    RamCount < Target.

A3. 关键数据结构与 UML

A3.1 经典队列核心数据结构

(1)amqqueue 记录(队列元数据)

classDiagram
    class amqqueue {
        +name: resource()
        +durable: boolean()
        +auto_delete: boolean()
        +exclusive_owner: none | pid()
        +arguments: amqp_table()
        +pid: pid()
        +type: atom()
        +vhost: binary()
        +state: live | stopped
        +policy: binary()
        +operator_policy: binary()
        +decorators: [atom()]
        +slave_pids: [pid()]
        +sync_slave_pids: [pid()]
        +gm_pids: [pid()]
    }
    
    note for amqqueue "持久化到 Mnesia 的队列元数据\n经典队列:pid 是本地进程\n仲裁队列:pid 是 {RaName, Node} 元组"
字段 类型 说明
name rabbit_types:r(queue) 资源名称:{resource, VHost, queue, NameBin}
durable boolean() 持久化队列(元数据和消息均写入磁盘)
auto_delete boolean() 无消费者时自动删除
exclusive_owner none | pid() 独占连接 PID(仅该连接可访问)
arguments [{binary(), type(), value()}] 扩展参数(如 x-max-length, x-message-ttl
pid pid() | {atom(), node()} 经典队列:本地进程 PID;仲裁队列:Ra 服务器 ID
type atom() rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue
vhost binary() 虚拟主机名称
state live | stopped 队列状态(stopped 用于优雅停机)
policy undefined | binary() 应用的策略名称(动态配置)

(2)vqstate 记录(经典队列运行时状态)

定义于 rabbit_variable_queue.erl

-record(vqstate, {
    %% 消息队列(简化版:q1/q2/q4 已废弃)
    delta,              % 磁盘队列:{DeltaCount, StartSeqId, EndSeqId, PendingAcks}
    q3,                 % 内存队列:lqueue(双端队列)
    
    %% 序列号管理
    next_seq_id,        % 下一个消息的 SeqId
    next_deliver_seq_id,% 首次投递的 SeqId 分界线(用于判断 redelivered)
    
    %% 未确认消息
    ram_pending_ack,    % 内存中的未确认消息:#{SeqId => Msg}
    disk_pending_ack,   % 磁盘上的未确认消息:#{SeqId => {MsgId, IsDelivered}}
    
    %% 索引与存储
    index_state,        % rabbit_classic_queue_index_v2 的状态
    store_state,        % rabbit_classic_queue_store_v2 的状态
    msg_store_clients,  % {PersistentClient, TransientClient} 到共享 msg_store 的连接
    
    %% 统计信息
    len,                % 队列长度(不含未确认)
    bytes,              % 队列字节数
    persistent_count,   % 持久化消息数
    ram_msg_count,      % 内存中的消息数
    target_ram_count,   % 目标内存消息数(根据消费速率动态调整:1~2048)
    
    %% 流控
    rates,              % 入队/出队速率统计
    durable             % 是否持久化队列
}).

关键字段说明

字段 用途 数据结构
q3 内存队列,存放最近的消息(热数据) lqueue:lqueue(#msg_status{})
delta 磁盘队列,存放较旧的消息(冷数据) {Count, StartSeqId, EndSeqId, PendingAcks}
ram_pending_ack 未确认且在内存中的消息 #{SeqId => #msg_status{}}
disk_pending_ack 未确认但已换页到磁盘的消息 #{SeqId => {MsgId, IsDelivered}}
target_ram_count 自适应内存消息数:
- 慢消费:减少到 1 条
- 快消费:增加到 2048 条
1..2048

UML 类图

classDiagram
    class vqstate {
        +delta: delta_queue()
        +q3: lqueue()
        +next_seq_id: integer()
        +ram_pending_ack: map()
        +disk_pending_ack: map()
        +index_state: index_state()
        +len: integer()
        +target_ram_count: 1..2048
    }
    
    class msg_status {
        +seq_id: integer()
        +msg_id: binary()
        +msg: mc:state()
        +is_persistent: boolean()
        +is_delivered: boolean()
        +msg_props: map()
    }
    
    class delta_queue {
        +count: integer()
        +start_seq_id: integer()
        +end_seq_id: integer()
    }
    
    vqstate --> msg_status : q3 包含
    vqstate --> delta_queue
    vqstate --> index_state

A3.2 仲裁队列核心数据结构

(1)rabbit_fifo 记录(Ra 状态机状态)

定义于 rabbit_fifo.hrl

-record(rabbit_fifo, {
    cfg :: #cfg{},                      % 配置:名称、资源名、死信交换器
    messages = rabbit_fifo_q:new(),     % 待投递消息队列(加权优先级队列)
    messages_total = 0,                 % 总消息数
    returns = lqueue:new(),             % 被 nack/reject 返回的消息
    enqueue_count = 0,                  % 入队计数器(用于触发快照)
    
    %% 生产者管理
    enqueuers = #{},                    % #{Pid => #enqueuer{}},跟踪活跃发布者
    
    %% Raft 索引管理
    ra_indexes = rabbit_fifo_index:empty(), % 已投递消息的 Raft Index 集合
    
    %% 消费者管理
    consumers = #{},                    % #{ConsumerKey => #consumer{}}
    service_queue = priority_queue:new(), % 待服务的消费者队列
    
    %% 死信队列
    dlx = rabbit_fifo_dlx:init(),       % 死信交换器状态
    
    %% 统计
    msg_bytes_enqueue = 0,              % 入队字节数
    msg_bytes_checkout = 0,             % 已投递字节数
    
    %% 单活消费者(SAC)
    waiting_consumers = []              % SAC 模式下的等待消费者列表
}).

关键字段说明

字段 用途 数据结构
messages 待投递的消息队列(高优先级/普通优先级) rabbit_fifo_q:state()
enqueuers 跟踪每个发布者的消息数,用于流控 #{Pid => #enqueuer{msg_seqs :: [RaftIndex]}}
ra_indexes 记录所有未确认消息的 Raft Index,用于日志截断 rabbit_fifo_index:state()
consumers 活跃消费者状态(信用额度、未确认消息) #{Key => #consumer{credit :: integer()}}
service_queue 按优先级排序的待投递消费者队列 priority_queue:q()

(2)consumer 记录(消费者状态)

-record(consumer, {
    meta = #{},                  % 元数据:CTag, ChPid, Prefetch
    credit = 0,                  % 剩余信用额度(QoS prefetch)
    checked_out = #{},           % 未确认消息:#{RaftIndex => {SeqId, Msg}}
    next_msg_id = 0,             % 下一个消息序列号
    status = up,                 % up | suspected_down | cancelled
    lifetime = once              % once(断开即取消) | auto(自动恢复)
}).

UML 类图

classDiagram
    class rabbit_fifo {
        +cfg: config()
        +messages: rabbit_fifo_q()
        +returns: lqueue()
        +enqueuers: map()
        +consumers: map()
        +ra_indexes: fifo_index()
        +dlx: dlx_state()
    }
    
    class consumer {
        +credit: integer()
        +checked_out: map()
        +next_msg_id: integer()
        +status: up | down
    }
    
    class enqueuer {
        +msg_seqs: [RaftIndex]
        +pending_confirms: [SeqId]
    }
    
    rabbit_fifo --> consumer
    rabbit_fifo --> enqueuer

A3.3 流队列核心数据结构

(1)stream_client 记录(Channel 中的流队列状态)

定义于 rabbit_stream_queue.erl

-record(stream_client, {
    stream_id :: string(),          % 流名称(内部标识)
    name :: rabbit_amqqueue:name(), % 队列资源名
    leader :: pid(),                % Osiris Leader PID
    local_pid :: undefined | pid(), % 本地副本 PID(如果有)
    next_seq = 1,                   % 下一个发布序列号
    correlation = #{}               % #{AppenderSeq => {Correlation, Msg}}(Publisher Confirms)
}).

(2)stream 记录(消费者状态)

-record(stream, {
    mode :: simple | automatic,     % 消费模式:simple(手动 ack)| automatic(自动)
    credit :: integer(),            % 剩余信用额度
    ack :: boolean(),               % 是否需要确认
    start_offset = 0,               % 起始偏移量
    listening_offset = 0,           % 当前监听偏移量
    log :: osiris_log:state(),      % Osiris 日志句柄
    chunk_iterator :: chunk_iter(), % 块迭代器(用于批量读取)
    buffer_msgs_rev = []            % 预读消息缓冲(逆序)
}).

特性对比

特性 经典队列 仲裁队列 流队列
消息持久化 可选(durable + delivery_mode=2) 强制(Raft Log) 强制(Osiris Segment)
消费语义 破坏性读取(消费后删除) 破坏性读取 非破坏性读取(可重复消费)
多副本 不支持(单节点) 支持(Raft 复制) 支持(Osiris Quorum)
偏移量管理 无(基于 SeqId 但不可回溯) 支持偏移量回溯
顺序保证 单队列 FIFO 单队列 FIFO 单分区严格有序

A4. 核心算法/流程剖析

A4.1 经典队列消息发布流程

流程描述:从 Channel 接收消息到持久化/入队的完整路径。

详细步骤

  1. Channel 路由rabbit_channel:handle_method(#'basic.publish'{})

    • 解析路由键、mandatory 标志
    • 调用 rabbit_exchange:route/3 获取目标队列列表
  2. 队列投递rabbit_classic_queue:deliver/3

    • 向队列进程发送 {deliver, Msg, Correlation}
  3. 队列进程接收rabbit_classic_queue_v2:handle_info({deliver, Msg, _})

    • 委托给 rabbit_variable_queue:publish/5
  4. VQ 发布逻辑rabbit_variable_queue:publish/5):

    publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
        %% 步骤 1:分配 SeqId
        SeqId = State#vqstate.next_seq_id,
        State1 = State#vqstate{next_seq_id = SeqId + 1},
    
        %% 步骤 2:写入索引(SeqId -> MsgId 映射)
        IndexState = rabbit_classic_queue_index_v2:publish(
            Msg, SeqId, MsgProps, State1#vqstate.index_state
        ),
        State2 = State1#vqstate{index_state = IndexState},
    
        %% 步骤 3:决定消息存储位置
        case should_keep_in_ram(State2) of
            true ->
                %% 存入 q3(内存队列)
                MsgStatus = #msg_status{seq_id = SeqId, msg = Msg, ...},
                Q3 = lqueue:in(MsgStatus, State2#vqstate.q3),
                State3 = State2#vqstate{q3 = Q3, ram_msg_count = State2#vqstate.ram_msg_count + 1};
            false ->
                %% 存入 delta(磁盘队列)
                %% 消息体已写入 msg_store(大消息)或 queue_store(小消息)
                Delta = increment_delta(State2#vqstate.delta),
                State3 = State2#vqstate{delta = Delta}
        end,
    
        %% 步骤 4:更新统计
        State4 = State3#vqstate{
            len = State3#vqstate.len + 1,
            bytes = State3#vqstate.bytes + mc:size(Msg)
        },
    
        %% 步骤 5:记录 Publisher Confirm
        case needs_confirm(MsgProps) of
            true ->
                Unconfirmed = maps:put(SeqId, ChPid, State4#vqstate.unconfirmed),
                {ok, State4#vqstate{unconfirmed = Unconfirmed}};
            false ->
                {ok, State4}
        end.
    
  5. 持久化消息体(大消息 >= 4KB):

    • 写入 rabbit_msg_store(VHost 级别共享存储)
    • 返回 {ok, MsgId, RefCount}
  6. 持久化消息体(小消息 < 4KB):

    • 直接嵌入 rabbit_classic_queue_store_v2(队列专属存储)

时序图

sequenceDiagram
    autonumber
    participant Ch as Channel
    participant Ex as Exchange
    participant QProc as Queue Process
    participant VQ as variable_queue
    participant Index as queue_index
    participant Store as msg_store
    
    Ch->>Ex: route(Msg, RoutingKey)
    Ex-->>Ch: [Q1, Q2]
    Ch->>QProc: QPid ! {deliver, Msg, Corr}
    QProc->>VQ: publish(Msg, Props)
    VQ->>VQ: 分配 SeqId = 1001
    VQ->>Index: publish(SeqId, MsgId, Props)
    Index-->>VQ: ok
    alt 消息 >= 4KB
        VQ->>Store: write(MsgId, Msg)
        Store-->>VQ: ok
    end
    VQ->>VQ: should_keep_in_ram() ?
    alt ram_msg_count < target
        VQ->>VQ: q3 = lqueue:in(#msg_status{}, q3)
    else
        VQ->>VQ: delta = increment(delta)
    end
    VQ-->>QProc: {ok, State1}
    QProc->>QProc: 检查待投递消费者
    QProc-->>Ch: (异步 Publisher Confirm

关键决策点

  1. 内存 vs 磁盘target_ram_count 根据消费速率动态调整
    • 快速消费:保留 2048 条在内存
    • 慢速消费:仅保留 1 条在内存
  2. 大小消息分离
    • 小消息(< 4KB):嵌入队列索引,减少 I/O
    • 大消息(>= 4KB):写入共享 msg_store,避免重复存储

A4.2 仲裁队列消息发布流程(Raft 日志复制)

流程描述:消息通过 Ra 共识引擎异步复制到多个节点,达到 Quorum 后提交。

详细步骤

  1. Channel 调用rabbit_quorum_queue:deliver/3

    deliver(Qs, Msg, Options) ->
        Correlation = maps:get(correlation, Options),
        lists:foreach(
            fun(Q) ->
                RaName = amqqueue:get_pid(Q),  % {quorum_queue_1, node1@host}
                Cmd = rabbit_fifo:make_enqueue(undefined, Msg),
                ra:pipeline_command(RaName, Cmd, Correlation)
            end, Qs).
    
  2. Ra 管道命令ra:pipeline_command/3

    • 将命令发送到 Leader 的 mailbox
    • 不等待响应(异步)
  3. Leader 接收命令rabbit_fifo:apply/3

    apply({enqueue, Pid, Seq, Msg}, State) ->
        %% 步骤 1:将消息加入 messages 队列
        Messages = rabbit_fifo_q:in(no, {Seq, Msg}, State#rabbit_fifo.messages),
        State1 = State#rabbit_fifo{messages = Messages, messages_total = State#rabbit_fifo.messages_total + 1},
    
        %% 步骤 2:记录生产者
        Enqueuers = maps:update_with(
            Pid,
            fun(E) -> E#enqueuer{msg_seqs = [Seq | E#enqueuer.msg_seqs]} end,
            #enqueuer{msg_seqs = [Seq]},
            State1#rabbit_fifo.enqueuers
        ),
        State2 = State1#rabbit_fifo{enqueuers = Enqueuers},
    
        %% 步骤 3:返回副作用(监控生产者、发送 Publisher Confirm)
        Effects = [
            {monitor, process, Pid},
            {send_msg, Pid, {confirm, Seq, RaftIndex}}
        ],
        {State2, Effects}.
    
  4. Ra 复制日志

    • Leader 将 {Index, Term, Cmd} 写入本地 WAL
    • 异步发送 AppendEntries RPC 到 Follower
  5. Follower 持久化

    • 接收 AppendEntries,写入本地 WAL
    • 返回 ack 给 Leader
  6. Leader 提交

    • 收到 Quorum(例如 3 节点中的 2 个)确认
    • 更新 commit_index,应用状态机
    • 发送 Publisher Confirm 给 Channel

时序图

sequenceDiagram
    autonumber
    participant Ch as Channel
    participant Leader as Ra Leader
    participant Fifo as rabbit_fifo
    participant F1 as Follower1
    participant F2 as Follower2
    
    Ch->>Leader: pipeline_command(enqueue, Msg)
    Leader->>Leader: 写入本地 WAL<br/>{Index=100, Term=5, Cmd}
    Leader->>Fifo: apply(enqueue, State)
    Fifo->>Fifo: messages = fifo_q:in(Msg)
    Fifo-->>Leader: {State1, Effects}
    
    par 异步复制
        Leader->>F1: AppendEntries(Index=100, Cmd)
        Leader->>F2: AppendEntries(Index=100, Cmd)
    end
    
    F1->>F1: 写入 WAL
    F1-->>Leader: ack(Index=100)
    F2->>F2: 写入 WAL
    F2-->>Leader: ack(Index=100)
    
    Note over Leader: 达到 Quorum(2/3)
    Leader->>Leader: commit_index = 100
    Leader->>Ch: {confirm, Seq, RaftIndex}
    Ch-->>Ch: 标记消息已确认

性能优化

  1. 批量管道:多个命令可连续发送而不等待响应(pipeline)
  2. 本地快速路径:Leader 写入 WAL 后立即返回,不等待 Follower
  3. 异步复制AppendEntries 是异步 RPC,不阻塞客户端

A4.3 消息消费流程(经典队列)

流程描述:从队列取出消息投递给消费者,处理 QoS prefetch 限制和手动确认。

详细步骤

  1. 消费者注册后触发投递rabbit_classic_queue_v2:deliver_msgs_to_consumers/1

    deliver_msgs_to_consumers(State) ->
        case has_consumers(State) of
            false -> State;
            true ->
                %% 遍历消费者,尝试投递消息
                lists:foldl(
                    fun(Consumer, S) ->
                        deliver_to_consumer(Consumer, S)
                    end,
                    State,
                    get_all_consumers(State)
                )
        end.
    
  2. 向单个消费者投递deliver_to_consumer/2

    deliver_to_consumer(#consumer{channel_pid = ChPid, credit = Credit} = C, State) when Credit > 0 ->
        %% 步骤 1:从 variable_queue 取出消息
        case rabbit_variable_queue:fetch(AckRequired = true, State#state.backing_queue_state) of
            {empty, BQS} ->
                State#state{backing_queue_state = BQS};
            {{Msg, IsDelivered, AckTag}, BQS} ->
                %% 步骤 2:发送到 Channel 进程
                ChPid ! {deliver, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Msg}},
    
                %% 步骤 3:更新消费者状态(减少信用、记录未确认)
                C1 = C#consumer{
                    credit = Credit - 1,
                    unacked = [{AckTag, Msg} | C#consumer.unacked]
                },
                State1 = State#state{backing_queue_state = BQS},
                update_consumer(ConsumerTag, C1, State1)
        end.
    
  3. variable_queue 取出消息rabbit_variable_queue:fetch/2

    fetch(AckRequired, State) ->
        case lqueue:out(State#vqstate.q3) of
            {{value, #msg_status{seq_id = SeqId, msg = Msg}}, Q3_1} ->
                %% 从 q3(内存队列)取出
                State1 = State#vqstate{q3 = Q3_1, len = State#vqstate.len - 1},
                case AckRequired of
                    true ->
                        %% 记录到未确认列表
                        RamPA = maps:put(SeqId, Msg, State1#vqstate.ram_pending_ack),
                        State2 = State1#vqstate{ram_pending_ack = RamPA},
                        {{Msg, false, SeqId}, State2};
                    false ->
                        %% 无需确认,直接删除
                        {{Msg, false, undefined}, State1}
                end;
            {empty, _} ->
                %% q3 为空,尝试从 delta(磁盘队列)加载
                case fetch_from_delta(State) of
                    {empty, State1} -> {empty, State1};
                    {Msg, State1} -> {{Msg, true, SeqId}, State1}
                end
        end.
    
  4. Channel 接收并投递给客户端rabbit_channel:handle_info({deliver, ...})

    handle_info({deliver, ConsumerTag, AckRequired, Msg}, State) ->
        %% 步骤 1:构造 AMQP basic.deliver 帧
        {QName, QPid, AckTag, Redelivered, McState} = Msg,
        Method = #'basic.deliver'{
            consumer_tag = ConsumerTag,
            delivery_tag = make_delivery_tag(AckTag),
            redelivered = Redelivered,
            exchange = mc:exchange(McState),
            routing_key = mc:routing_key(McState)
        },
    
        %% 步骤 2:发送帧到客户端
        Writer = State#ch.writer,
        Writer ! {send_command, Method, mc:protocol_state(McState)},
    
        %% 步骤 3:记录未确认消息
        case AckRequired of
            true ->
                Unacked = gb_trees:insert(DeliveryTag, {QPid, AckTag}, State#ch.unacked_message_q),
                State#ch{unacked_message_q = Unacked};
            false ->
                State
        end.
    

时序图

sequenceDiagram
    autonumber
    participant Client
    participant Ch as Channel
    participant QProc as Queue Process
    participant VQ as variable_queue
    
    Client->>Ch: basic.consume(queue, tag)
    Ch->>QProc: gen_server2:call({basic_consume})
    QProc->>QProc: add_consumer(Tag, Consumer)
    
    loop 持续投递
        QProc->>VQ: fetch(AckRequired=true)
        VQ->>VQ: lqueue:out(q3)
        alt q3 有消息
            VQ-->>QProc: {{Msg, IsDelivered=false, AckTag}}
        else q3 为空
            VQ->>VQ: fetch_from_delta()
            VQ-->>QProc: {{Msg, IsDelivered=true, AckTag}}
        end
        QProc->>Ch: ChPid ! {deliver, Tag, Msg}
        Ch->>Client: basic.deliver(Tag, Msg)
        Ch->>Ch: 记录 unacked_message_q[DeliveryTag]
        
        alt 客户端发送 ack
            Client->>Ch: basic.ack(DeliveryTag)
            Ch->>QProc: gen_server2:cast({ack, [AckTag]})
            QProc->>VQ: ack([AckTag])
            VQ->>VQ:  ram_pending_ack 删除
            VQ-->>QProc: ok
        end
    end

QoS Prefetch 控制

  • 消费者初始信用:prefetch_count(例如 10)
  • 每投递一条消息:credit = credit - 1
  • 客户端 ack 后:credit = credit + 1
  • credit = 0 时:停止投递,等待 ack

A5. 配置与可观测

A5.1 关键配置项

配置项 默认值 说明
queue_index_embed_msgs_below 4096 字节 小于此值的消息嵌入索引,否则写入 msg_store
vm_memory_high_watermark 0.4 内存告警阈值(40% 物理内存)
vm_memory_high_watermark_paging_ratio 0.5 触发消息换页的内存比例(50% watermark)
queue_master_locator client-local 经典队列 Leader 选择策略
raft_log_max_entries 32768 仲裁队列 WAL 最大条目数
stream_queue_segment_size_bytes 500MB 流队列单个段大小

A5.2 监控指标

经典队列

  • messages_ready:待投递消息数
  • messages_unacknowledged:未确认消息数
  • message_bytes:队列总字节数
  • message_bytes_ram:内存中消息字节数
  • message_bytes_persistent:持久化消息字节数
  • disk_reads:磁盘读次数
  • disk_writes:磁盘写次数

仲裁队列

  • raft_term:当前 Raft Term
  • commit_index:已提交日志索引
  • wal_entries:WAL 中未截断的条目数
  • members:节点成员列表
  • leader:当前 Leader 节点

流队列

  • committed_offset:已提交偏移量
  • first_offset:最早可读偏移量
  • segments:段文件数量
  • reader_bytes_sent:读取器发送字节数