RabbitMQ-04-队列模块
A1. 模块概览
A1.1 职责与边界
队列模块是 RabbitMQ 消息系统的核心存储层,负责消息的持久化、排队与投递。RabbitMQ 提供三种队列类型,分别针对不同的业务场景与可靠性要求:
| 队列类型 | 模块名 | 适用场景 | 关键特性 |
|---|---|---|---|
| 经典队列 | rabbit_classic_queue |
通用场景、低延迟 | 单节点存储、可选持久化、灵活消费 |
| 仲裁队列 | rabbit_quorum_queue |
高可用、强一致 | Raft 多副本、日志复制、自动故障转移 |
| 流队列 | rabbit_stream_queue |
大吞吐、可回溯 | 只追加日志、非破坏性消费、持久化偏移量 |
队列模块的核心职责包括:
- 消息存储与检索:支持内存/磁盘混合存储,根据队列类型选择不同的持久化策略
- 消费者管理:跟踪活跃消费者、分配消息、维护未确认消息(unacked messages)
- 流控与反压:根据内存/磁盘压力、消费速率动态调整消息接收速度
- 优先级与 TTL:支持消息优先级排序、过期消息自动删除
- 死信与备用交换器:将无法投递的消息路由到 DLX(Dead Letter Exchange)
- 一致性保证:
- 经典队列:单节点原子性(不跨节点复制)
- 仲裁队列:基于 Ra(Raft)的多副本强一致
- 流队列:基于 Osiris 的 Quorum 日志写入
A1.2 队列类型对比
flowchart TB
Client[客户端 Channel]
QT[rabbit_queue_type<br/>类型分发器]
subgraph Classic[经典队列 Classic]
CQ[rabbit_classic_queue]
VQ[rabbit_variable_queue<br/>变长队列]
QI[queue_index<br/>索引]
MS[msg_store<br/>消息存储]
end
subgraph Quorum[仲裁队列 Quorum]
QQ[rabbit_quorum_queue]
RF[rabbit_fifo<br/>Ra 状态机]
RA[Ra 共识引擎]
WAL[Write Ahead Log]
end
subgraph Stream[流队列 Stream]
SQ[rabbit_stream_queue]
OSI[Osiris 日志系统]
SEG[Segment 分段存储]
end
Client --> QT
QT --> CQ
QT --> QQ
QT --> SQ
CQ --> VQ
VQ --> QI
VQ --> MS
QQ --> RF
RF --> RA
RA --> WAL
SQ --> OSI
OSI --> SEG
style Classic fill:#e3f2fd
style Quorum fill:#fff3e0
style Stream fill:#f3e5f5
图说明:
- 类型分发:
rabbit_queue_type根据队列声明时的x-queue-type参数选择对应实现 - 经典队列路径:消息通过
variable_queue进入内存队列(q3)或磁盘队列(delta),索引和内容分离存储 - 仲裁队列路径:消息通过
rabbit_fifo状态机写入 Ra 的 WAL,异步复制到多个节点 - 流队列路径:消息直接追加到 Osiris 的不可变日志段,消费者通过偏移量读取
A1.3 上下游依赖
上游调用方:
rabbit_channel:消息发布(basic.publish)、消费者订阅(basic.consume)、确认/拒绝(basic.ack/nack/reject)rabbit_amqqueue:队列管理(declare、delete、purge)、元数据查询rabbit_exchange:路由匹配后将消息投递到队列
下游依赖:
- 存储层:
rabbit_msg_store(经典队列持久化消息体)rabbit_classic_queue_index_v2(经典队列消息索引)ra(仲裁队列共识引擎)osiris(流队列日志系统)
- 监控/统计:
rabbit_core_metrics:队列 QPS、消息堆积、内存占用rabbit_event:队列状态变更事件(创建、删除、消费者变化)
- 限流/告警:
rabbit_alarm:内存/磁盘告警触发流控rabbit_limiter:QoS prefetch 限制消费速率
A2. 对外 API 列表与规格
A2.1 队列类型行为接口(rabbit_queue_type Behavior)
所有队列类型必须实现的回调接口,定义于 rabbit_queue_type.erl。
(1)declare/2 - 声明队列
基本信息:
- 协议:
rabbit_queue_type:declare(amqqueue:amqqueue(), node()) -> Result - 幂等性:是(重复声明相同配置返回
existing,不同配置返回错误)
请求结构体:
%% amqqueue 记录(定义于 amqqueue_v2.hrl)
-record(amqqueue, {
name :: rabbit_amqqueue:name(), % {resource, VHost, queue, Name}
durable :: boolean(), % 是否持久化到磁盘
auto_delete :: boolean(), % 无消费者时自动删除
exclusive_owner :: none | pid(), % 独占连接(仅该连接可访问)
arguments :: rabbit_framing:amqp_table(), % 扩展参数(如 x-max-length)
pid :: pid() | ra_server_id(), % 队列进程 PID(经典)或 Ra 集群 ID(仲裁/流)
type :: atom(), % rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue
vhost :: binary(), % 虚拟主机
state :: atom(), % live | stopped
policy :: undefined | binary(), % 应用的策略名称
operator_policy :: undefined | binary() % 运维策略
}).
响应类型:
-type declare_result() ::
{new, amqqueue:amqqueue()} | % 新创建
{existing, amqqueue:amqqueue()} | % 已存在且配置一致
{owner_died, amqqueue:amqqueue()} | % 独占队列的所有者进程已死亡
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
核心代码(rabbit_amqqueue:declare/7):
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, Node) ->
%% 1)参数规范化:合并策略参数、检查最大长度等约束
NormalizedArgs = augment_declare_args(QueueName, Durable, AutoDelete, Args, ActingUser),
%% 2)确定队列类型(默认 classic,可通过 x-queue-type 指定)
Type = get_queue_type(NormalizedArgs),
%% 3)检查类型兼容性(如 Quorum 队列必须是 durable=true, auto_delete=false)
case rabbit_queue_type:is_compatible(Type, Durable, AutoDelete, Owner =/= none) of
false -> {error, incompatible_queue_arguments};
true ->
%% 4)构造 amqqueue 记录
Q0 = amqqueue:new(QueueName, Owner, Durable, AutoDelete, NormalizedArgs),
Q = amqqueue:set_type(Q0, Type),
%% 5)委托给队列类型模块声明(事务:检查冲突、写入 Mnesia)
rabbit_queue_type:declare(Q, Node)
end.
调用链上游(谁调用了 declare):
rabbit_channel:handle_method(#'queue.declare'{}, _, State):handle_method(#'queue.declare'{queue = QueueBin, durable = Durable, ...}, _, State) -> QueueName = rabbit_misc:r(VHost, queue, QueueBin), case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner, User) of {new, Q} -> %% 声明成功,记录到 channel 状态 State1 = monitor_queue(Q, State), reply(#'queue.declare_ok'{queue = QueueBin, ...}, State1); {existing, Q} -> %% 幂等声明 reply(#'queue.declare_ok'{...}, State); {protocol_error, Type, Reason, Args} -> rabbit_misc:protocol_error(Type, Reason, Args) end.
时序图:
sequenceDiagram
autonumber
participant Client
participant Channel as rabbit_channel
participant AM as rabbit_amqqueue
participant QT as rabbit_queue_type
participant CQ as rabbit_classic_queue<br/>(或 Quorum/Stream)
participant DB as rabbit_db_queue
Client->>Channel: queue.declare(durable, args)
Channel->>AM: declare(QueueName, Durable, Args)
AM->>QT: get_queue_type(Args)
QT-->>AM: rabbit_classic_queue
AM->>QT: is_compatible(Type, Durable, AutoDelete)
QT-->>AM: true
AM->>CQ: declare(Q, Node)
CQ->>DB: insert_or_update(Q)
DB-->>CQ: {ok, NewQ}
CQ-->>AM: {new, NewQ}
AM-->>Channel: {new, NewQ}
Channel->>Channel: monitor_queue(Q)
Channel-->>Client: queue.declare_ok
异常与边界:
| 场景 | 行为 |
|---|---|
| 重复声明(配置一致) | 返回 {existing, Q},幂等操作 |
| 重复声明(配置冲突) | {protocol_error, precondition_failed, "cannot redeclare..."} |
| 节点不可用 | 阻塞等待或超时失败(仲裁队列需大多数节点在线) |
| 超过队列数量限制 | {protocol_error, precondition_failed, "max queues reached"} |
(2)deliver/3 - 投递消息
基本信息:
- 协议:
rabbit_queue_type:deliver([amqqueue:amqqueue()], mc:state(), Options) -> Result - 幂等性:否(每次调用都会重新路由消息)
请求结构体:
-type deliver_options() :: #{
correlation := term(), % 关联 ID,用于 Publisher Confirms
mandatory := boolean(), % 若无队列可路由则返回 basic.return
delivery_mode := 1 | 2 % 1=transient, 2=persistent
}.
响应类型:
-type deliver_result() ::
ok | % 所有队列投递成功
{delivered, [amqqueue:amqqueue()]} | % 部分队列投递(其余崩溃/停止)
{unroutable, mc:state()}. % 没有队列接收(mandatory=true 时触发 basic.return)
核心代码(rabbit_classic_queue:deliver/3):
deliver(Qs, Msg, Options) ->
%% 经典队列:同步投递到队列进程
Correlation = maps:get(correlation, Options),
lists:foldl(
fun(Q, Acc) ->
QPid = amqqueue:get_pid(Q),
case is_process_alive(QPid) of
true ->
%% 发送消息到队列进程的 mailbox
QPid ! {deliver, Msg, Correlation},
[Q | Acc];
false ->
%% 队列崩溃,跳过
Acc
end
end, [], Qs).
仲裁队列的 deliver 实现(rabbit_quorum_queue:deliver/3):
deliver(Qs, Msg, Options) ->
%% 仲裁队列:通过 Ra 客户端异步提交日志
Correlation = maps:get(correlation, Options),
lists:foldl(
fun(Q, Acc) ->
RaName = amqqueue:get_pid(Q), % Ra 集群标识 {Name, Node}
Cmd = rabbit_fifo:make_enqueue(undefined, Msg),
case ra:pipeline_command(RaName, Cmd, Correlation) of
ok -> [Q | Acc];
{error, _} -> Acc
end
end, [], Qs).
时序图(经典队列):
sequenceDiagram
autonumber
participant Channel as rabbit_channel
participant QT as rabbit_queue_type
participant CQ as Classic Queue Process
participant VQ as rabbit_variable_queue
participant Index as queue_index
Channel->>QT: deliver([Q1, Q2], Msg, Options)
QT->>CQ: QPid ! {deliver, Msg, Correlation}
CQ->>VQ: publish(Msg, MsgProps, Correlation)
VQ->>Index: publish(MsgId, SeqId, MsgProps)
Index-->>VQ: ok
VQ->>VQ: 将消息加入 q3(内存队列)或 delta(磁盘队列)
VQ-->>CQ: {ok, State1}
CQ->>CQ: 检查是否有待投递消费者
CQ->>CQ: 立即投递或排队等待 consume
CQ-->>QT: ok
QT-->>Channel: {delivered, [Q1, Q2]}
时序图(仲裁队列):
sequenceDiagram
autonumber
participant Channel
participant QQ as rabbit_quorum_queue
participant Ra as Ra Consensus
participant Fifo as rabbit_fifo<br/>(State Machine)
participant Follower1
participant Follower2
Channel->>QQ: deliver([Q], Msg, Opts)
QQ->>Ra: pipeline_command(enqueue, Msg, Corr)
Ra->>Fifo: apply(enqueue, State)
Fifo->>Fifo: 将消息加入 messages 队列<br/>分配 RaftIndex
Fifo-->>Ra: {State1, Effects}
Ra->>Follower1: AppendEntries RPC(异步复制)
Ra->>Follower2: AppendEntries RPC
Follower1-->>Ra: ack
Follower2-->>Ra: ack
Note over Ra: 达到 Quorum(2/3)后提交
Ra-->>QQ: {ok, Correlation}
QQ-->>Channel: {delivered, [Q]}
异常与性能要点:
| 场景 | 行为 |
|---|---|
| 队列进程崩溃 | 经典队列跳过该队列;仲裁队列自动切换到新 Leader |
| 内存告警 | 队列阻塞接收(流控),返回 {busy, _},Channel 暂停发送 |
| 磁盘满 | 仲裁/流队列无法写入 WAL,返回错误;经典队列尝试清理或拒绝 |
| 慢消费者 | 队列堆积,经典队列可能触发消息换页(paging)到磁盘 |
(3)consume/3 - 注册消费者
基本信息:
- 协议:
rabbit_queue_type:consume(amqqueue:amqqueue(), Consume, State) -> {ok, NewState} - 幂等性:否(每次调用创建新的消费者订阅)
请求结构体:
-record(basic_consume, {
queue :: binary(), % 队列名
consumer_tag :: binary(), % 消费者标签(客户端提供或自动生成)
no_ack :: boolean(), % true=自动确认,false=手动确认
exclusive :: boolean(), % true=独占消费(单消费者模式)
arguments :: rabbit_framing:amqp_table() % 扩展参数(如 x-priority)
}).
核心代码(rabbit_classic_queue 中的消费者注册):
consume(Q, #basic_consume{consumer_tag = CTag, no_ack = NoAck, exclusive = Exclusive}, QState) ->
QPid = amqqueue:get_pid(Q),
%% 1)向队列进程发送 consume 请求
case gen_server2:call(QPid, {basic_consume, NoAck, self(), CTag, Exclusive, Args}, infinity) of
ok ->
%% 2)将消费者记录到 Channel 状态
Consumer = #consumer{tag = CTag, ack_required = not NoAck},
NewState = QState#{consumers => maps:put(CTag, Consumer, maps:get(consumers, QState))},
{ok, NewState};
{error, exclusive_consume_unavailable} ->
{error, <<"queue is exclusively consumed">>}
end.
队列进程处理 consume(rabbit_classic_queue_v2:handle_call/3):
handle_call({basic_consume, NoAck, ChPid, CTag, Exclusive, Args}, _From, State) ->
%% 1)检查是否独占冲突
case Exclusive andalso has_consumers(State) of
true -> {reply, {error, exclusive_consume_unavailable}, State};
false ->
%% 2)创建消费者记录
Consumer = #consumer{
channel_pid = ChPid,
tag = CTag,
ack_required = not NoAck,
prefetch_count = get_prefetch_count(Args)
},
%% 3)添加到消费者列表
State1 = add_consumer(CTag, Consumer, State),
%% 4)立即尝试投递消息
State2 = deliver_msgs_to_consumers(State1),
{reply, ok, State2}
end.
时序图:
sequenceDiagram
autonumber
participant Client
participant Channel as rabbit_channel
participant QProc as Queue Process
participant VQ as variable_queue
Client->>Channel: basic.consume(queue, tag, no_ack)
Channel->>QProc: gen_server2:call({basic_consume, ...})
QProc->>QProc: 检查独占冲突
QProc->>QProc: add_consumer(Tag, Consumer)
QProc->>VQ: fetch(AckRequired)
VQ-->>QProc: {Msg, State1} | empty
alt 有消息可投递
QProc->>Channel: deliver_to_consumer(Msg)
Channel-->>Client: basic.deliver(Msg)
end
QProc-->>Channel: ok
Channel-->>Client: basic.consume_ok(tag)
异常处理:
| 场景 | 行为 |
|---|---|
| 队列不存在 | {error, not_found} |
| 独占冲突 | {error, exclusive_consume_unavailable} |
| Consumer Tag 冲突 | {error, consumer_tag_in_use} |
| Channel 进程崩溃 | 队列自动清理该 Channel 的所有消费者 |
A2.2 经典队列专有 API
(1)rabbit_variable_queue:publish/5 - 发布消息到后端队列
功能说明:将消息写入经典队列的内存/磁盘混合存储结构,根据消息大小和队列状态决定存储位置。
入口函数:
publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
%% 1)检查消息是否重复(基于 Msg ID 去重)
case is_duplicate(Msg, State) of
{true, State1} -> {ok, State1}; % 幂等:已发布过,跳过
{false, State1} ->
%% 2)分配序列号(SeqId)
SeqId = State1#vqstate.next_seq_id,
State2 = State1#vqstate{next_seq_id = SeqId + 1},
%% 3)持久化消息(如果 durable=true 且 delivery_mode=2)
State3 = case needs_confirm(MsgProps, State2) of
true ->
%% 记录未确认消息,用于 Publisher Confirms
track_confirm(Msg, SeqId, ChPid, State2);
false -> State2
end,
%% 4)写入索引(将 SeqId -> MsgId 映射持久化)
State4 = publish_to_index(Msg, SeqId, MsgProps, State3),
%% 5)根据内存压力决定消息位置
State5 = case should_keep_in_ram(State4) of
true ->
%% 消息保留在 q3(内存队列)
enqueue_message(q3, Msg, SeqId, State4);
false ->
%% 消息直接写入 delta(磁盘队列)
enqueue_message(delta, Msg, SeqId, State4)
end,
{ok, State5}
end.
关键决策:should_keep_in_ram 的逻辑:
should_keep_in_ram(#vqstate{ram_msg_count = RamCount, target_ram_count = Target}) ->
%% 目标内存消息数由消费速率动态调整(最少1条,最多2048条)
RamCount < Target.
A3. 关键数据结构与 UML
A3.1 经典队列核心数据结构
(1)amqqueue 记录(队列元数据)
classDiagram
class amqqueue {
+name: resource()
+durable: boolean()
+auto_delete: boolean()
+exclusive_owner: none | pid()
+arguments: amqp_table()
+pid: pid()
+type: atom()
+vhost: binary()
+state: live | stopped
+policy: binary()
+operator_policy: binary()
+decorators: [atom()]
+slave_pids: [pid()]
+sync_slave_pids: [pid()]
+gm_pids: [pid()]
}
note for amqqueue "持久化到 Mnesia 的队列元数据\n经典队列:pid 是本地进程\n仲裁队列:pid 是 {RaName, Node} 元组"
| 字段 | 类型 | 说明 |
|---|---|---|
name |
rabbit_types:r(queue) |
资源名称:{resource, VHost, queue, NameBin} |
durable |
boolean() |
持久化队列(元数据和消息均写入磁盘) |
auto_delete |
boolean() |
无消费者时自动删除 |
exclusive_owner |
none | pid() |
独占连接 PID(仅该连接可访问) |
arguments |
[{binary(), type(), value()}] |
扩展参数(如 x-max-length, x-message-ttl) |
pid |
pid() | {atom(), node()} |
经典队列:本地进程 PID;仲裁队列:Ra 服务器 ID |
type |
atom() |
rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue |
vhost |
binary() |
虚拟主机名称 |
state |
live | stopped |
队列状态(stopped 用于优雅停机) |
policy |
undefined | binary() |
应用的策略名称(动态配置) |
(2)vqstate 记录(经典队列运行时状态)
定义于 rabbit_variable_queue.erl:
-record(vqstate, {
%% 消息队列(简化版:q1/q2/q4 已废弃)
delta, % 磁盘队列:{DeltaCount, StartSeqId, EndSeqId, PendingAcks}
q3, % 内存队列:lqueue(双端队列)
%% 序列号管理
next_seq_id, % 下一个消息的 SeqId
next_deliver_seq_id,% 首次投递的 SeqId 分界线(用于判断 redelivered)
%% 未确认消息
ram_pending_ack, % 内存中的未确认消息:#{SeqId => Msg}
disk_pending_ack, % 磁盘上的未确认消息:#{SeqId => {MsgId, IsDelivered}}
%% 索引与存储
index_state, % rabbit_classic_queue_index_v2 的状态
store_state, % rabbit_classic_queue_store_v2 的状态
msg_store_clients, % {PersistentClient, TransientClient} 到共享 msg_store 的连接
%% 统计信息
len, % 队列长度(不含未确认)
bytes, % 队列字节数
persistent_count, % 持久化消息数
ram_msg_count, % 内存中的消息数
target_ram_count, % 目标内存消息数(根据消费速率动态调整:1~2048)
%% 流控
rates, % 入队/出队速率统计
durable % 是否持久化队列
}).
关键字段说明:
| 字段 | 用途 | 数据结构 |
|---|---|---|
q3 |
内存队列,存放最近的消息(热数据) | lqueue:lqueue(#msg_status{}) |
delta |
磁盘队列,存放较旧的消息(冷数据) | {Count, StartSeqId, EndSeqId, PendingAcks} |
ram_pending_ack |
未确认且在内存中的消息 | #{SeqId => #msg_status{}} |
disk_pending_ack |
未确认但已换页到磁盘的消息 | #{SeqId => {MsgId, IsDelivered}} |
target_ram_count |
自适应内存消息数: - 慢消费:减少到 1 条 - 快消费:增加到 2048 条 |
1..2048 |
UML 类图:
classDiagram
class vqstate {
+delta: delta_queue()
+q3: lqueue()
+next_seq_id: integer()
+ram_pending_ack: map()
+disk_pending_ack: map()
+index_state: index_state()
+len: integer()
+target_ram_count: 1..2048
}
class msg_status {
+seq_id: integer()
+msg_id: binary()
+msg: mc:state()
+is_persistent: boolean()
+is_delivered: boolean()
+msg_props: map()
}
class delta_queue {
+count: integer()
+start_seq_id: integer()
+end_seq_id: integer()
}
vqstate --> msg_status : q3 包含
vqstate --> delta_queue
vqstate --> index_state
A3.2 仲裁队列核心数据结构
(1)rabbit_fifo 记录(Ra 状态机状态)
定义于 rabbit_fifo.hrl:
-record(rabbit_fifo, {
cfg :: #cfg{}, % 配置:名称、资源名、死信交换器
messages = rabbit_fifo_q:new(), % 待投递消息队列(加权优先级队列)
messages_total = 0, % 总消息数
returns = lqueue:new(), % 被 nack/reject 返回的消息
enqueue_count = 0, % 入队计数器(用于触发快照)
%% 生产者管理
enqueuers = #{}, % #{Pid => #enqueuer{}},跟踪活跃发布者
%% Raft 索引管理
ra_indexes = rabbit_fifo_index:empty(), % 已投递消息的 Raft Index 集合
%% 消费者管理
consumers = #{}, % #{ConsumerKey => #consumer{}}
service_queue = priority_queue:new(), % 待服务的消费者队列
%% 死信队列
dlx = rabbit_fifo_dlx:init(), % 死信交换器状态
%% 统计
msg_bytes_enqueue = 0, % 入队字节数
msg_bytes_checkout = 0, % 已投递字节数
%% 单活消费者(SAC)
waiting_consumers = [] % SAC 模式下的等待消费者列表
}).
关键字段说明:
| 字段 | 用途 | 数据结构 |
|---|---|---|
messages |
待投递的消息队列(高优先级/普通优先级) | rabbit_fifo_q:state() |
enqueuers |
跟踪每个发布者的消息数,用于流控 | #{Pid => #enqueuer{msg_seqs :: [RaftIndex]}} |
ra_indexes |
记录所有未确认消息的 Raft Index,用于日志截断 | rabbit_fifo_index:state() |
consumers |
活跃消费者状态(信用额度、未确认消息) | #{Key => #consumer{credit :: integer()}} |
service_queue |
按优先级排序的待投递消费者队列 | priority_queue:q() |
(2)consumer 记录(消费者状态)
-record(consumer, {
meta = #{}, % 元数据:CTag, ChPid, Prefetch
credit = 0, % 剩余信用额度(QoS prefetch)
checked_out = #{}, % 未确认消息:#{RaftIndex => {SeqId, Msg}}
next_msg_id = 0, % 下一个消息序列号
status = up, % up | suspected_down | cancelled
lifetime = once % once(断开即取消) | auto(自动恢复)
}).
UML 类图:
classDiagram
class rabbit_fifo {
+cfg: config()
+messages: rabbit_fifo_q()
+returns: lqueue()
+enqueuers: map()
+consumers: map()
+ra_indexes: fifo_index()
+dlx: dlx_state()
}
class consumer {
+credit: integer()
+checked_out: map()
+next_msg_id: integer()
+status: up | down
}
class enqueuer {
+msg_seqs: [RaftIndex]
+pending_confirms: [SeqId]
}
rabbit_fifo --> consumer
rabbit_fifo --> enqueuer
A3.3 流队列核心数据结构
(1)stream_client 记录(Channel 中的流队列状态)
定义于 rabbit_stream_queue.erl:
-record(stream_client, {
stream_id :: string(), % 流名称(内部标识)
name :: rabbit_amqqueue:name(), % 队列资源名
leader :: pid(), % Osiris Leader PID
local_pid :: undefined | pid(), % 本地副本 PID(如果有)
next_seq = 1, % 下一个发布序列号
correlation = #{} % #{AppenderSeq => {Correlation, Msg}}(Publisher Confirms)
}).
(2)stream 记录(消费者状态)
-record(stream, {
mode :: simple | automatic, % 消费模式:simple(手动 ack)| automatic(自动)
credit :: integer(), % 剩余信用额度
ack :: boolean(), % 是否需要确认
start_offset = 0, % 起始偏移量
listening_offset = 0, % 当前监听偏移量
log :: osiris_log:state(), % Osiris 日志句柄
chunk_iterator :: chunk_iter(), % 块迭代器(用于批量读取)
buffer_msgs_rev = [] % 预读消息缓冲(逆序)
}).
特性对比:
| 特性 | 经典队列 | 仲裁队列 | 流队列 |
|---|---|---|---|
| 消息持久化 | 可选(durable + delivery_mode=2) | 强制(Raft Log) | 强制(Osiris Segment) |
| 消费语义 | 破坏性读取(消费后删除) | 破坏性读取 | 非破坏性读取(可重复消费) |
| 多副本 | 不支持(单节点) | 支持(Raft 复制) | 支持(Osiris Quorum) |
| 偏移量管理 | 无(基于 SeqId 但不可回溯) | 无 | 支持偏移量回溯 |
| 顺序保证 | 单队列 FIFO | 单队列 FIFO | 单分区严格有序 |
A4. 核心算法/流程剖析
A4.1 经典队列消息发布流程
流程描述:从 Channel 接收消息到持久化/入队的完整路径。
详细步骤:
-
Channel 路由:
rabbit_channel:handle_method(#'basic.publish'{})- 解析路由键、mandatory 标志
- 调用
rabbit_exchange:route/3获取目标队列列表
-
队列投递:
rabbit_classic_queue:deliver/3- 向队列进程发送
{deliver, Msg, Correlation}
- 向队列进程发送
-
队列进程接收:
rabbit_classic_queue_v2:handle_info({deliver, Msg, _})- 委托给
rabbit_variable_queue:publish/5
- 委托给
-
VQ 发布逻辑(
rabbit_variable_queue:publish/5):publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> %% 步骤 1:分配 SeqId SeqId = State#vqstate.next_seq_id, State1 = State#vqstate{next_seq_id = SeqId + 1}, %% 步骤 2:写入索引(SeqId -> MsgId 映射) IndexState = rabbit_classic_queue_index_v2:publish( Msg, SeqId, MsgProps, State1#vqstate.index_state ), State2 = State1#vqstate{index_state = IndexState}, %% 步骤 3:决定消息存储位置 case should_keep_in_ram(State2) of true -> %% 存入 q3(内存队列) MsgStatus = #msg_status{seq_id = SeqId, msg = Msg, ...}, Q3 = lqueue:in(MsgStatus, State2#vqstate.q3), State3 = State2#vqstate{q3 = Q3, ram_msg_count = State2#vqstate.ram_msg_count + 1}; false -> %% 存入 delta(磁盘队列) %% 消息体已写入 msg_store(大消息)或 queue_store(小消息) Delta = increment_delta(State2#vqstate.delta), State3 = State2#vqstate{delta = Delta} end, %% 步骤 4:更新统计 State4 = State3#vqstate{ len = State3#vqstate.len + 1, bytes = State3#vqstate.bytes + mc:size(Msg) }, %% 步骤 5:记录 Publisher Confirm case needs_confirm(MsgProps) of true -> Unconfirmed = maps:put(SeqId, ChPid, State4#vqstate.unconfirmed), {ok, State4#vqstate{unconfirmed = Unconfirmed}}; false -> {ok, State4} end. -
持久化消息体(大消息 >= 4KB):
- 写入
rabbit_msg_store(VHost 级别共享存储) - 返回
{ok, MsgId, RefCount}
- 写入
-
持久化消息体(小消息 < 4KB):
- 直接嵌入
rabbit_classic_queue_store_v2(队列专属存储)
- 直接嵌入
时序图:
sequenceDiagram
autonumber
participant Ch as Channel
participant Ex as Exchange
participant QProc as Queue Process
participant VQ as variable_queue
participant Index as queue_index
participant Store as msg_store
Ch->>Ex: route(Msg, RoutingKey)
Ex-->>Ch: [Q1, Q2]
Ch->>QProc: QPid ! {deliver, Msg, Corr}
QProc->>VQ: publish(Msg, Props)
VQ->>VQ: 分配 SeqId = 1001
VQ->>Index: publish(SeqId, MsgId, Props)
Index-->>VQ: ok
alt 消息 >= 4KB
VQ->>Store: write(MsgId, Msg)
Store-->>VQ: ok
end
VQ->>VQ: should_keep_in_ram() ?
alt ram_msg_count < target
VQ->>VQ: q3 = lqueue:in(#msg_status{}, q3)
else
VQ->>VQ: delta = increment(delta)
end
VQ-->>QProc: {ok, State1}
QProc->>QProc: 检查待投递消费者
QProc-->>Ch: (异步 Publisher Confirm)
关键决策点:
- 内存 vs 磁盘:
target_ram_count根据消费速率动态调整- 快速消费:保留 2048 条在内存
- 慢速消费:仅保留 1 条在内存
- 大小消息分离:
- 小消息(< 4KB):嵌入队列索引,减少 I/O
- 大消息(>= 4KB):写入共享 msg_store,避免重复存储
A4.2 仲裁队列消息发布流程(Raft 日志复制)
流程描述:消息通过 Ra 共识引擎异步复制到多个节点,达到 Quorum 后提交。
详细步骤:
-
Channel 调用:
rabbit_quorum_queue:deliver/3deliver(Qs, Msg, Options) -> Correlation = maps:get(correlation, Options), lists:foreach( fun(Q) -> RaName = amqqueue:get_pid(Q), % {quorum_queue_1, node1@host} Cmd = rabbit_fifo:make_enqueue(undefined, Msg), ra:pipeline_command(RaName, Cmd, Correlation) end, Qs). -
Ra 管道命令:
ra:pipeline_command/3- 将命令发送到 Leader 的 mailbox
- 不等待响应(异步)
-
Leader 接收命令:
rabbit_fifo:apply/3apply({enqueue, Pid, Seq, Msg}, State) -> %% 步骤 1:将消息加入 messages 队列 Messages = rabbit_fifo_q:in(no, {Seq, Msg}, State#rabbit_fifo.messages), State1 = State#rabbit_fifo{messages = Messages, messages_total = State#rabbit_fifo.messages_total + 1}, %% 步骤 2:记录生产者 Enqueuers = maps:update_with( Pid, fun(E) -> E#enqueuer{msg_seqs = [Seq | E#enqueuer.msg_seqs]} end, #enqueuer{msg_seqs = [Seq]}, State1#rabbit_fifo.enqueuers ), State2 = State1#rabbit_fifo{enqueuers = Enqueuers}, %% 步骤 3:返回副作用(监控生产者、发送 Publisher Confirm) Effects = [ {monitor, process, Pid}, {send_msg, Pid, {confirm, Seq, RaftIndex}} ], {State2, Effects}. -
Ra 复制日志:
- Leader 将
{Index, Term, Cmd}写入本地 WAL - 异步发送
AppendEntriesRPC 到 Follower
- Leader 将
-
Follower 持久化:
- 接收
AppendEntries,写入本地 WAL - 返回
ack给 Leader
- 接收
-
Leader 提交:
- 收到 Quorum(例如 3 节点中的 2 个)确认
- 更新
commit_index,应用状态机 - 发送 Publisher Confirm 给 Channel
时序图:
sequenceDiagram
autonumber
participant Ch as Channel
participant Leader as Ra Leader
participant Fifo as rabbit_fifo
participant F1 as Follower1
participant F2 as Follower2
Ch->>Leader: pipeline_command(enqueue, Msg)
Leader->>Leader: 写入本地 WAL<br/>{Index=100, Term=5, Cmd}
Leader->>Fifo: apply(enqueue, State)
Fifo->>Fifo: messages = fifo_q:in(Msg)
Fifo-->>Leader: {State1, Effects}
par 异步复制
Leader->>F1: AppendEntries(Index=100, Cmd)
Leader->>F2: AppendEntries(Index=100, Cmd)
end
F1->>F1: 写入 WAL
F1-->>Leader: ack(Index=100)
F2->>F2: 写入 WAL
F2-->>Leader: ack(Index=100)
Note over Leader: 达到 Quorum(2/3)
Leader->>Leader: commit_index = 100
Leader->>Ch: {confirm, Seq, RaftIndex}
Ch-->>Ch: 标记消息已确认
性能优化:
- 批量管道:多个命令可连续发送而不等待响应(pipeline)
- 本地快速路径:Leader 写入 WAL 后立即返回,不等待 Follower
- 异步复制:
AppendEntries是异步 RPC,不阻塞客户端
A4.3 消息消费流程(经典队列)
流程描述:从队列取出消息投递给消费者,处理 QoS prefetch 限制和手动确认。
详细步骤:
-
消费者注册后触发投递:
rabbit_classic_queue_v2:deliver_msgs_to_consumers/1deliver_msgs_to_consumers(State) -> case has_consumers(State) of false -> State; true -> %% 遍历消费者,尝试投递消息 lists:foldl( fun(Consumer, S) -> deliver_to_consumer(Consumer, S) end, State, get_all_consumers(State) ) end. -
向单个消费者投递:
deliver_to_consumer/2deliver_to_consumer(#consumer{channel_pid = ChPid, credit = Credit} = C, State) when Credit > 0 -> %% 步骤 1:从 variable_queue 取出消息 case rabbit_variable_queue:fetch(AckRequired = true, State#state.backing_queue_state) of {empty, BQS} -> State#state{backing_queue_state = BQS}; {{Msg, IsDelivered, AckTag}, BQS} -> %% 步骤 2:发送到 Channel 进程 ChPid ! {deliver, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Msg}}, %% 步骤 3:更新消费者状态(减少信用、记录未确认) C1 = C#consumer{ credit = Credit - 1, unacked = [{AckTag, Msg} | C#consumer.unacked] }, State1 = State#state{backing_queue_state = BQS}, update_consumer(ConsumerTag, C1, State1) end. -
variable_queue 取出消息:
rabbit_variable_queue:fetch/2fetch(AckRequired, State) -> case lqueue:out(State#vqstate.q3) of {{value, #msg_status{seq_id = SeqId, msg = Msg}}, Q3_1} -> %% 从 q3(内存队列)取出 State1 = State#vqstate{q3 = Q3_1, len = State#vqstate.len - 1}, case AckRequired of true -> %% 记录到未确认列表 RamPA = maps:put(SeqId, Msg, State1#vqstate.ram_pending_ack), State2 = State1#vqstate{ram_pending_ack = RamPA}, {{Msg, false, SeqId}, State2}; false -> %% 无需确认,直接删除 {{Msg, false, undefined}, State1} end; {empty, _} -> %% q3 为空,尝试从 delta(磁盘队列)加载 case fetch_from_delta(State) of {empty, State1} -> {empty, State1}; {Msg, State1} -> {{Msg, true, SeqId}, State1} end end. -
Channel 接收并投递给客户端:
rabbit_channel:handle_info({deliver, ...})handle_info({deliver, ConsumerTag, AckRequired, Msg}, State) -> %% 步骤 1:构造 AMQP basic.deliver 帧 {QName, QPid, AckTag, Redelivered, McState} = Msg, Method = #'basic.deliver'{ consumer_tag = ConsumerTag, delivery_tag = make_delivery_tag(AckTag), redelivered = Redelivered, exchange = mc:exchange(McState), routing_key = mc:routing_key(McState) }, %% 步骤 2:发送帧到客户端 Writer = State#ch.writer, Writer ! {send_command, Method, mc:protocol_state(McState)}, %% 步骤 3:记录未确认消息 case AckRequired of true -> Unacked = gb_trees:insert(DeliveryTag, {QPid, AckTag}, State#ch.unacked_message_q), State#ch{unacked_message_q = Unacked}; false -> State end.
时序图:
sequenceDiagram
autonumber
participant Client
participant Ch as Channel
participant QProc as Queue Process
participant VQ as variable_queue
Client->>Ch: basic.consume(queue, tag)
Ch->>QProc: gen_server2:call({basic_consume})
QProc->>QProc: add_consumer(Tag, Consumer)
loop 持续投递
QProc->>VQ: fetch(AckRequired=true)
VQ->>VQ: lqueue:out(q3)
alt q3 有消息
VQ-->>QProc: {{Msg, IsDelivered=false, AckTag}}
else q3 为空
VQ->>VQ: fetch_from_delta()
VQ-->>QProc: {{Msg, IsDelivered=true, AckTag}}
end
QProc->>Ch: ChPid ! {deliver, Tag, Msg}
Ch->>Client: basic.deliver(Tag, Msg)
Ch->>Ch: 记录 unacked_message_q[DeliveryTag]
alt 客户端发送 ack
Client->>Ch: basic.ack(DeliveryTag)
Ch->>QProc: gen_server2:cast({ack, [AckTag]})
QProc->>VQ: ack([AckTag])
VQ->>VQ: 从 ram_pending_ack 删除
VQ-->>QProc: ok
end
end
QoS Prefetch 控制:
- 消费者初始信用:
prefetch_count(例如 10) - 每投递一条消息:
credit = credit - 1 - 客户端 ack 后:
credit = credit + 1 - 当
credit = 0时:停止投递,等待 ack
A5. 配置与可观测
A5.1 关键配置项
| 配置项 | 默认值 | 说明 |
|---|---|---|
queue_index_embed_msgs_below |
4096 字节 | 小于此值的消息嵌入索引,否则写入 msg_store |
vm_memory_high_watermark |
0.4 | 内存告警阈值(40% 物理内存) |
vm_memory_high_watermark_paging_ratio |
0.5 | 触发消息换页的内存比例(50% watermark) |
queue_master_locator |
client-local |
经典队列 Leader 选择策略 |
raft_log_max_entries |
32768 | 仲裁队列 WAL 最大条目数 |
stream_queue_segment_size_bytes |
500MB | 流队列单个段大小 |
A5.2 监控指标
经典队列:
messages_ready:待投递消息数messages_unacknowledged:未确认消息数message_bytes:队列总字节数message_bytes_ram:内存中消息字节数message_bytes_persistent:持久化消息字节数disk_reads:磁盘读次数disk_writes:磁盘写次数
仲裁队列:
raft_term:当前 Raft Termcommit_index:已提交日志索引wal_entries:WAL 中未截断的条目数members:节点成员列表leader:当前 Leader 节点
流队列:
committed_offset:已提交偏移量first_offset:最早可读偏移量segments:段文件数量reader_bytes_sent:读取器发送字节数