RabbitMQ-02-连接与通道模块-rabbit_reader_channel
模块概览
本文档涵盖两个紧密协作的核心模块:
- rabbit_reader:管理 AMQP 0-9-1 客户端连接
- rabbit_channel:实现 AMQP 0-9-1 通道逻辑
模块职责划分
rabbit_reader 职责
-
协议握手与协商:
- 处理 AMQP 协议版本协商
- 检测并拒绝 HTTP/TLS 错误连接
-
连接生命周期管理:
- 接受 TCP 连接
- 认证客户端
- 维护连接状态机
-
帧解析与分发:
- 从套接字读取数据
- 解析 AMQP 帧(frame header + payload)
- 分发帧到相应 Channel
-
Channel 管理:
- 创建和跟踪 Channel 进程
- 监控 Channel 生命周期
- 强制执行
channel_max限制
-
流控与资源管理:
- TCP 背压(backpressure)
- 内存/磁盘告警响应
- 连接级流控(connection.blocked)
-
心跳机制:
- 发送心跳帧
- 监控客户端心跳
- 超时检测
-
统计与事件:
- 发射连接创建/关闭事件
- 定期统计信息上报
rabbit_channel 职责
-
AMQP 方法处理:
queue.*:队列声明、绑定、删除exchange.*:交换器声明、删除basic.publish:消息发布basic.consume:消费者订阅basic.get:拉取消息basic.ack/nack/reject:消息确认tx.*:事务管理
-
消息路由:
- 调用
rabbit_exchange:route/3计算目标队列 - 将消息投递到队列
- 调用
-
消费者管理:
- 维护消费者注册表
- 推送消息给消费者
- QoS(prefetch)控制
-
确认机制:
- Publisher Confirms:追踪未确认消息
- Consumer Acks:追踪未确认投递
-
权限控制:
- 检查用户对 VHost/Queue/Exchange 的操作权限
- 使用权限缓存加速
-
事务支持:
tx.select/tx.commit/tx.rollback- 缓存事务中的操作
-
流控与反压:
- 限流器(Limiter)管理 prefetch
- 信用流控(credit flow)防止内存溢出
架构图
flowchart TB
subgraph 客户端
Client[AMQP 客户端]
end
subgraph Reader 进程
Reader[rabbit_reader<br/>连接进程]
FrameParser[Frame Parser<br/>帧解析]
Assembler[Command Assembler<br/>命令组装]
end
subgraph Channel 进程1..N
Channel1[rabbit_channel<br/>Channel 1]
Channel2[rabbit_channel<br/>Channel 2]
ChannelN[rabbit_channel<br/>Channel N]
end
subgraph 支持进程
Writer[rabbit_writer<br/>写入器]
Limiter[rabbit_limiter<br/>限流器]
Heartbeat[rabbit_heartbeat<br/>心跳器]
QCollector[rabbit_queue_collector<br/>队列收集器]
end
subgraph 业务逻辑
Exchange[rabbit_exchange<br/>交换器]
Queue[rabbit_amqqueue<br/>队列]
Router[rabbit_router<br/>路由器]
end
Client -->|TCP 连接| Reader
Reader -->|解析| FrameParser
FrameParser -->|组装| Assembler
Assembler -->|Channel 0| Reader
Assembler -->|Channel 1..N| Channel1
Assembler -->|Channel 1..N| Channel2
Assembler -->|Channel 1..N| ChannelN
Reader -->|创建| Channel1
Reader -->|创建| Channel2
Reader -->|创建| ChannelN
Reader <-->|使用| Writer
Reader <-->|管理| Heartbeat
Reader <-->|使用| QCollector
Channel1 <-->|QoS| Limiter
Channel2 <-->|QoS| Limiter
ChannelN <-->|QoS| Limiter
Channel1 -->|声明/绑定| Exchange
Channel1 -->|发布| Router
Channel1 -->|消费| Queue
Router -->|路由| Exchange
Exchange -->|绑定| Queue
Writer -->|TCP 写入| Client
模块交互说明
连接建立流程
- TCP 连接:Client → Reader(由 Ranch 监听器创建 Reader 进程)
- 协议握手:Reader 接收
"AMQP\x00\x00\x09\x01"头 - 版本协商:Reader 发送
connection.start,接收connection.start-ok - 认证:Reader 调用认证后端验证凭据
- VHost 选择:Client 发送
connection.open,Reader 检查 VHost 访问权限 - 连接就绪:Reader 进入
running状态
帧处理流程
套接字数据 → Reader:mainloop → Reader:handle_input
↓
帧头解析(Type, Channel, PayloadSize)
↓
完整帧读取(Payload + ?FRAME_END)
↓
Reader:handle_frame → 分发到 Channel
↓
rabbit_command_assembler:process → 组装完整命令
↓
Channel 0?→ Reader:handle_method0(连接级方法)
Channel N?→ rabbit_channel:do/2(通道级方法)
Channel 方法处理流程
Reader → rabbit_channel:do(Pid, Method)
↓
Channel:handle_cast({method, Method, Content, Flow})
↓
handle_method(Method, Content, State)
↓
示例: basic.publish
├─> 权限检查:rabbit_access_control:check_resource_access
├─> 路由计算:rabbit_exchange:route
├─> 消息投递:rabbit_amqqueue:deliver
└─> 确认追踪:如启用 confirms,记录 seqno
rabbit_reader 模块详解
状态机与连接状态
连接状态定义
-record(v1, {
parent, %% 父进程(通常是 supervisor)
ranch_ref, %% Ranch listener 引用
sock, %% TCP 套接字
connection, %% #connection{} 记录
callback, %% 当前解析回调(frame_header | {frame_payload, ...} | handshake)
recv_len, %% 期望接收长度
pending_recv, %% 待处理数据
connection_state, %% 连接状态:pre_init | securing | running | blocking | blocked | closing | closed
helper_sup, %% 辅助监督者
queue_collector, %% 独占队列收集器
heartbeater, %% 心跳器
stats_timer, %% 统计定时器
channel_sup_sup_pid, %% Channel 监督者的监督者
channel_count, %% 当前 Channel 数量
throttle, %% 流控状态
proxy_socket, %% 代理套接字(如 PROXY protocol)
dynamic_buffer_size, %% 动态缓冲区大小
dynamic_buffer_moving_average %% 移动平均
}).
-record(connection, {
name, %% 连接名称(用于日志)
host, %% 服务端地址
peer_host, %% 客户端地址
port, %% 服务端端口
peer_port, %% 客户端端口
protocol, %% AMQP 协议版本
user, %% #user{} 认证用户
vhost, %% 虚拟主机
timeout, %% 心跳超时(秒)
frame_max, %% 最大帧大小
channel_max, %% 最大 Channel 数
capabilities, %% 客户端能力
auth_mechanism, %% 认证机制
auth_state, %% 认证状态
connected_at %% 连接时间戳
}).
-record(throttle, {
last_blocked_at, %% 上次阻塞时间戳
blocked_by, %% 阻塞原因集合:#{resource => memory | disk}
should_block, %% 是否应该阻塞
connection_blocked_message_sent %% 是否已发送 connection.blocked
}).
状态转换图
stateDiagram-v2
[*] --> pre_init: start_link
pre_init --> securing: 接收 AMQP 头<br/>开始认证
securing --> running: connection.open-ok<br/>认证成功
running --> blocking: 资源告警触发<br/>(内存/磁盘)
blocking --> blocked: 发送 connection.blocked
blocked --> running: 资源恢复<br/>发送 connection.unblocked
running --> closing: 客户端/服务端<br/>发送 connection.close
blocking --> closing: 强制关闭
blocked --> closing: 强制关闭
closing --> closed: connection.close-ok<br/>双向确认
closed --> [*]
pre_init --> closed: 握手失败
securing --> closed: 认证失败
核心 API
API 1: start_link/2 - 启动连接进程
基本信息
- 名称:
start_link/2 - 调用者:Ranch 监听器(TCP 连接接受时)
- 进程类型:Special process(非 gen_server)
函数签名
-spec start_link({pid(), pid()}, ranch:ref()) ->
rabbit_types:ok(pid()).
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
HelperSups |
{pid(), pid()} |
AMQP 0-9-1 和 AMQP 1.0 辅助监督者 |
Ref |
ranch:ref() |
Ranch listener 引用 |
核心代码
start_link(HelperSups, Ref) ->
Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]),
{ok, Pid}.
init(Parent, HelperSups, Ref) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
?LG_PROCESS_TYPE(reader),
%% 1. 执行 TCP 握手(Ranch handshake)
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false),
dynamic_buffer),
Deb = sys:debug_options([]),
%% 2. 开始连接流程
start_connection(Parent, HelperSups, Ref, Deb, Sock).
start_connection(Parent, HelperSups, Ref, Deb, Sock) ->
%% 3. 创建队列收集器(管理独占队列)
{ok, Collector} = rabbit_queue_collector:start_link(group_leader()),
%% 4. 初始化状态
State = #v1{
parent = Parent,
ranch_ref = Ref,
sock = Sock,
connection = #connection{},
connection_state = pre_init,
helper_sup = HelperSups,
queue_collector = Collector,
throttle = #throttle{
last_blocked_at = never,
blocked_by = sets:new(),
should_block = false,
connection_blocked_message_sent = false
}
},
%% 5. 进入主循环,等待协议握手
mainloop(Deb, [], 0, switch_callback(State, handshake, 8)).
时序图
sequenceDiagram
autonumber
participant Ranch as Ranch Listener
participant Reader as rabbit_reader
participant QCollector as queue_collector
participant Sock as TCP Socket
participant Client as AMQP Client
Ranch->>+Reader: start_link(HelperSups, Ref)
Reader->>Reader: proc_lib:spawn_link<br/>init/3
Reader->>+Sock: rabbit_networking:handshake(Ref)
Sock->>Sock: Ranch 握手<br/>接管套接字
Sock-->>-Reader: {ok, Sock}
Reader->>+QCollector: start_link(group_leader())
QCollector->>QCollector: 初始化独占队列收集器
QCollector-->>-Reader: {ok, CollectorPid}
Reader->>Reader: 初始化状态<br/>connection_state = pre_init
Reader->>Reader: mainloop(..., handshake, 8)
Reader-->>-Ranch: {ok, ReaderPid}
Note over Reader,Client: 等待客户端发送协议头
Client->>+Reader: "AMQP\x00\x00\x09\x01"
Reader->>Reader: handle_input(handshake, ...)
Reader->>Reader: version_negotiation(...)
Reader-->>-Client: connection.start<br/>(认证机制列表)
API 2: mainloop/4 - 主循环(帧接收与处理)
基本信息
- 名称:
mainloop/4 - 循环方式:尾递归
- 阻塞点:
rabbit_net:recv/2套接字读取
函数签名
-spec mainloop([sys:debug_option()], [binary()], non_neg_integer(), #v1{}) ->
any().
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
Deb |
[sys:debug_option()] |
调试选项 |
Buf |
[binary()] |
已接收但未处理的数据缓冲区 |
BufLen |
non_neg_integer() |
缓冲区长度(字节) |
State |
#v1{} |
连接状态 |
核心代码
mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
recv_len = RecvLen,
callback = Callback}) ->
case BufLen >= RecvLen of
true ->
%% 缓冲区数据足够,处理输入
{DataLRev, RestLRev} = lists:split(RecvLen, lists:reverse(Buf)),
Data = list_to_binary(lists:reverse(DataLRev)),
Rest = lists:reverse(RestLRev),
{B, NewState} = handle_input(Callback, Data, State),
mainloop(Deb, B ++ Rest, BufLen - RecvLen, NewState);
false ->
%% 需要从套接字读取更多数据
case rabbit_net:recv(Sock, 0) of
{ok, Recv} ->
NewBuf = [Recv | Buf],
mainloop(Deb, NewBuf, BufLen + byte_size(Recv), State);
{error, timeout} ->
mainloop(Deb, Buf, BufLen, State);
{error, closed} ->
%% 客户端关闭连接
maybe_emit_stats(State),
terminate(State, normal);
{error, Reason} ->
terminate(State, {socket_error, Reason})
end
end.
处理流程分解
-
缓冲区检查:
- 如果
BufLen >= RecvLen,说明已接收到足够数据,调用handle_input/3 - 否则从套接字读取更多数据(
rabbit_net:recv/2)
- 如果
-
数据处理:
handle_input/3根据Callback决定解析方式:handshake:解析协议头(8 字节)frame_header:解析帧头(7 字节){frame_payload, Type, Channel, PayloadSize}:解析帧负载
-
尾递归:
handle_input/3返回{RestData, NewState}- 递归调用
mainloop/4继续处理
-
套接字关闭:
- 客户端关闭:触发
terminate(State, normal) - 错误关闭:触发
terminate(State, {socket_error, Reason})
- 客户端关闭:触发
帧解析时序图
sequenceDiagram
autonumber
participant Client
participant Sock as TCP Socket
participant Reader as mainloop
participant Parser as handle_input
participant Handler as handle_frame
loop 主循环
Reader->>Reader: 检查缓冲区<br/>BufLen >= RecvLen?
alt 数据不足
Reader->>+Sock: rabbit_net:recv(Sock, 0)
Sock->>Client: (TCP 接收)
Sock-->>-Reader: {ok, Data}
Reader->>Reader: 追加到 Buf<br/>更新 BufLen
else 数据充足
Reader->>+Parser: handle_input(Callback, Data, State)
alt Callback = handshake
Parser->>Parser: 解析 "AMQP" 头<br/>8 字节
Parser->>Parser: version_negotiation
Parser-->>Reader: {Rest, NewState}
else Callback = frame_header
Parser->>Parser: 解析帧头<br/>Type:8, Channel:16, PayloadSize:32
alt 帧完整(含 payload + 0xCE)
Parser->>+Handler: handle_frame(Type, Channel, Payload)
Handler->>Handler: 分发到 Channel
Handler-->>-Parser: NewState
Parser-->>-Reader: {Rest, NewState<br/>callback=frame_header}
else 帧不完整
Parser-->>-Reader: {Rest, NewState<br/>callback={frame_payload, ...}}
end
else Callback = {frame_payload, ...}
Parser->>Parser: 读取 Payload + 0xCE
Parser->>+Handler: handle_frame(...)
Handler-->>-Parser: NewState
Parser-->>Reader: {Rest, NewState<br/>callback=frame_header}
end
Reader->>Reader: 尾递归<br/>mainloop(Rest, NewState)
end
end
API 3: handle_frame/4 - 帧处理与分发
基本信息
- 名称:
handle_frame/4 - 分发规则:
- Channel 0 → Reader 自身(连接级方法)
- Channel N → 对应的
rabbit_channel进程
函数签名
-spec handle_frame(non_neg_integer(), non_neg_integer(), binary(), #v1{}) ->
#v1{}.
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
Type |
non_neg_integer() |
帧类型:1(method) / 2(header) / 3(body) / 8(heartbeat) |
Channel |
non_neg_integer() |
Channel 编号(0 表示连接级) |
Payload |
binary() |
帧负载 |
State |
#v1{} |
连接状态 |
核心代码
%% Channel 0 帧(连接级方法)
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> frame_error(unknown_frame, Type, 0, Payload, State);
heartbeat -> State; %% 心跳帧,忽略
{method, MethodName, FieldsBin} ->
%% 连接级方法(如 connection.close)
handle_method0(MethodName, FieldsBin, State);
_Other -> unexpected_frame(Type, 0, Payload, State)
end;
%% Channel N 帧(通道级方法)
handle_frame(Type, Channel, Payload,
State = #v1{connection = #connection{protocol = Protocol}})
when ?IS_RUNNING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> frame_error(unknown_frame, Type, Channel, Payload, State);
heartbeat -> unexpected_frame(Type, Channel, Payload, State);
Frame -> process_frame(Frame, Channel, State)
end.
%% 处理 Channel N 的帧
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
%% 1. 获取或创建 Channel
case get(ChKey) of
undefined ->
{ok, {ChPid, AState}, State1} = create_channel(Channel, State);
{ChPid, AState} ->
{ok, {ChPid, AState}, State1} = {ok, {ChPid, AState}, State}
end,
%% 2. 组装命令(可能需要多个帧)
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} ->
%% 帧已组装但命令未完成(如 header 后等待 body)
put(ChKey, {ChPid, NewAState}),
State1;
{ok, Method, NewAState} ->
%% 完整的命令(无 content)
rabbit_channel:do(ChPid, Method),
put(ChKey, {ChPid, NewAState}),
State1;
{ok, Method, Content, NewAState} ->
%% 完整的命令(有 content)
rabbit_channel:do_flow(ChPid, Method, Content),
put(ChKey, {ChPid, NewAState}),
control_throttle(State1)
end.
代码逐段解释
-
帧分析(
analyze_frame/3):- 解析帧类型(method/header/body/heartbeat)
- 提取方法名和字段(对于 method 帧)
- 返回
{method, MethodName, FieldsBin}或其他类型
-
Channel 0 处理:
- 连接级方法(如
connection.close)直接由 Reader 处理 - 调用
handle_method0/3
- 连接级方法(如
-
Channel N 处理:
- 从进程字典获取 Channel 状态(
get({channel, Channel})) - 如不存在,调用
create_channel/2创建 - 使用
rabbit_command_assembler组装完整命令
- 从进程字典获取 Channel 状态(
-
命令组装:
- AMQP 命令可能跨多个帧:
- Method 帧(如
basic.publish) - Header 帧(消息属性)
- Body 帧(可能多个,如果消息很大)
- Method 帧(如
process/2状态机累积帧,直到完整命令
- AMQP 命令可能跨多个帧:
-
分发到 Channel:
rabbit_channel:do/2:无 content 的方法rabbit_channel:do_flow/3:有 content 的方法(如 publish)- 异步调用(cast),不等待响应
帧分发流程图
flowchart TD
Start[handle_frame<br/>Type, Channel, Payload] --> Analyze[analyze_frame<br/>解析帧类型]
Analyze --> IsCh0{Channel == 0?}
IsCh0 -->|是| IsHeartbeat{heartbeat?}
IsHeartbeat -->|是| Ignore[忽略]
IsHeartbeat -->|否| Method0[handle_method0<br/>连接级方法]
IsCh0 -->|否| GetCh[获取 Channel<br/>from 进程字典]
GetCh --> ChExists{Channel 存在?}
ChExists -->|否| CreateCh[create_channel<br/>创建新 Channel]
ChExists -->|是| Assemble[rabbit_command_assembler:process<br/>组装命令]
CreateCh --> Assemble
Assemble --> Complete{命令完整?}
Complete -->|否| SaveState[保存组装状态<br/>等待下一帧]
Complete -->|是,无 content| DoMethod[rabbit_channel:do<br/>Method]
Complete -->|是,有 content| DoFlow[rabbit_channel:do_flow<br/>Method, Content]
DoMethod --> UpdateState[更新状态<br/>控制流控]
DoFlow --> UpdateState
SaveState --> UpdateState
Method0 --> UpdateState
Ignore --> UpdateState
UpdateState --> End[返回新状态]
API 4: create_channel/2 - 创建 Channel
基本信息
- 名称:
create_channel/2 - 限制:受
channel_max限制(默认 2047) - 监督:Channel 由
rabbit_channel_sup_sup监督
函数签名
-spec create_channel(non_neg_integer(), #v1{}) ->
{ok, {pid(), term()}, #v1{}} | {error, term()}.
核心代码
create_channel(Channel,
#v1{sock = Sock,
queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
channel_count = ChannelCount,
connection =
#connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
vhost = VHost,
capabilities = Capabilities,
user = User}
} = State) ->
%% 1. 检查 Channel 数量限制
case ChannelCount < ?CHANNEL_MAX of
false ->
{error, rabbit_misc:amqp_error(
not_allowed, "channel_max (~b) reached", [?CHANNEL_MAX], 'none')};
true ->
%% 2. 检查用户连接数限制
case is_over_limits(User#user.username) of
false ->
%% 3. 启动 Channel 监督者和 Channel 进程
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup,
{tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities,
Collector}),
%% 4. 监控 Channel 进程
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
{ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
{true, Limit, Fmt} ->
{error, rabbit_misc:amqp_error(
not_allowed, Fmt, [node(), Limit], 'none')}
end
end.
时序图
sequenceDiagram
autonumber
participant Reader as rabbit_reader
participant SupSup as channel_sup_sup
participant Sup as channel_sup
participant Channel as rabbit_channel
participant Limiter as rabbit_limiter
participant Writer as rabbit_writer
Reader->>Reader: create_channel(ChannelNum)
Reader->>Reader: 检查 channel_count<br/>< channel_max?
Reader->>Reader: 检查用户限制<br/>is_over_limits(User)?
Reader->>+SupSup: start_channel(...Args)
SupSup->>+Sup: supervisor:start_child<br/>channel_sup
Sup->>+Limiter: start_link()
Limiter->>Limiter: 初始化限流器<br/>prefetch=0
Limiter-->>-Sup: {ok, LimiterPid}
Sup->>+Writer: start_link(Sock)
Writer->>Writer: 初始化写入器<br/>缓冲区
Writer-->>-Sup: {ok, WriterPid}
Sup->>+Channel: start_link(...)<br/>包含 LimiterPid, WriterPid
Channel->>Channel: init(...)<br/>state=starting
Channel->>Channel: pg_local:join<br/>rabbit_channels
Channel-->>-Sup: {ok, ChannelPid}
Sup-->>-SupSup: {ok, SupPid, {ChannelPid, AState}}
SupSup-->>-Reader: {ok, _, {ChannelPid, AState}}
Reader->>Reader: erlang:monitor<br/>(process, ChannelPid)
Reader->>Reader: put({channel, ChannelNum},<br/>{ChannelPid, AState})
Reader->>Reader: channel_count += 1
API 5: handle_method0/3 - 处理连接级方法
基本信息
- 名称:
handle_method0/3 - 方法范围:
connection.*、channel.open(特殊情况)
核心方法列表
| 方法 | 方向 | 功能 |
|---|---|---|
connection.start-ok |
C→S | 客户端响应认证机制 |
connection.tune-ok |
C→S | 客户端确认连接参数 |
connection.open |
C→S | 请求打开 VHost |
connection.open-ok |
S→C | VHost 打开成功 |
connection.close |
C↔S | 请求关闭连接 |
connection.close-ok |
C↔S | 确认关闭连接 |
connection.blocked |
S→C | 通知客户端连接被阻塞 |
connection.unblocked |
S→C | 通知客户端连接解除阻塞 |
核心代码(示例:connection.open)
handle_method0(#'connection.open'{virtual_host = VHost},
State = #v1{ranch_ref = RanchRef,
connection_state = opening,
connection = Connection = #connection{
log_name = ConnName,
user = User = #user{username = Username},
protocol = Protocol},
helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
%% 1. 检查节点/VHost/用户连接数限制
ok = is_over_node_connection_limit(RanchRef),
ok = is_over_vhost_connection_limit(VHost, User),
ok = is_over_user_connection_limit(User),
%% 2. 检查 VHost 访问权限
ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
ok = is_vhost_alive(VHost, User),
%% 3. 更新连接记录
NewConnection = Connection#connection{vhost = VHost},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
%% 4. 注册告警
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
%% 5. 启动 Channel 监督者
{ok, ChannelSupSupPid} =
rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
channel_sup_sup_pid = ChannelSupSupPid,
throttle = Throttle1}),
%% 6. 发射连接创建事件
Infos = augment_infos_with_user_provided_connection_name(
[{type, network} | infos(?CREATION_EVENT_KEYS, State1)],
State1
),
rabbit_core_metrics:connection_created(proplists:get_value(pid, Infos),
Infos),
rabbit_event:notify(connection_created, Infos),
maybe_emit_stats(State1),
?LOG_INFO(
"connection ~tp (~ts -> ~ts): "
"user '~ts' authenticated and granted access to vhost '~ts'",
[self(), ConnName, peer_host_port(State1),
Username, VHost]),
State1.
rabbit_channel 模块详解
Channel 状态与数据结构
Channel 记录定义
-record(ch, {
cfg = #conf{}, %% 配置记录
limiter, %% 限流器
tx = none, %% 事务状态:none | #tx{}
next_tag = 1, %% 下一个 consumer_tag
unacked_message_q, %% 未确认消息队列(consumer acks)
consumer_mapping = #{}, %% consumer_tag -> {QName, ...}
queue_consumers = #{}, %% QName -> [consumer_tag]
confirm_enabled = false, %% 是否启用 publisher confirms
publish_seqno = 1, %% 下一个发布序号
unconfirmed, %% 未确认的发布(publisher confirms)
rejected = [], %% 被拒绝的消息
confirmed = [], %% 已确认的消息
direct_reply = none, %% 直接回复队列(fast reply-to)
delivery_flow = flow, %% 投递流控模式
interceptor_state, %% 拦截器状态
queue_states %% 队列类型状态映射
}).
-record(conf, {
state = starting, %% starting | running | flow | closing
protocol, %% AMQP 协议
channel, %% Channel 编号
reader_pid, %% Reader 进程 PID
writer_pid, %% Writer 进程 PID
conn_pid, %% Connection 进程 PID(通常就是 Reader)
conn_name, %% 连接名称
user, %% #user{} 用户
virtual_host, %% VHost
most_recently_declared_queue, %% 最近声明的队列(用于无名队列)
queue_collector_pid, %% 队列收集器 PID
capabilities, %% 客户端能力
trace_state, %% 跟踪状态
consumer_prefetch, %% 消费者 prefetch 计数
consumer_timeout, %% 消费者超时
authz_context, %% 授权上下文
max_consumers, %% 最大消费者数
writer_gc_threshold, %% Writer GC 阈值
msg_interceptor_ctx %% 消息拦截器上下文
}).
Channel 状态转换图
stateDiagram-v2
[*] --> starting: init
starting --> running: 第一次方法调用
running --> flow: 流控触发<br/>(queue blocked)
flow --> running: 流控解除<br/>(queue unblocked)
running --> closing: channel.close<br/>或连接关闭
flow --> closing: 强制关闭
closing --> [*]: 清理完成
核心 API
API 1: do/2 - 处理无 Content 的方法
基本信息
- 名称:
do/2 - 调用方式:异步(cast)
- 方法范围:除
basic.publish外的大多数方法
函数签名
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
核心代码
do(Pid, Method) ->
gen_server2:cast(Pid, {method, Method, none, noflow}).
示例方法处理
%% queue.declare
handle_cast({method, #'queue.declare'{queue = QueueNameBin,
passive = false,
durable = Durable,
exclusive = Exclusive,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args}, _, _},
State = #ch{cfg = #conf{virtual_host = VHostPath,
user = User,
conn_name = ConnName}}) ->
%% 1. 权限检查
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_resource_access(QueueName, configure, State),
%% 2. 实际声明队列
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, none, User) of
{new, Q} ->
%% 新建队列
maybe_register_exclusive_queue(Q, Exclusive, State),
return_queue_declare_ok(QueueName, NoWait, MessageCount=0,
ConsumerCount=0, State);
{existing, Q} ->
%% 已存在队列
check_exclusive_access(Q, Exclusive, State),
return_queue_declare_ok(QueueName, NoWait,
rabbit_amqqueue:get_msg_count(Q),
rabbit_amqqueue:get_consumer_count(Q),
State);
{owner_died, Q} ->
%% 独占队列所有者已死亡,删除并重建
_ = rabbit_amqqueue:delete(Q, false, false, User),
handle_method(#'queue.declare'{...}, State);
{error, Reason} ->
handle_exception(Reason, State)
end.
API 2: do_flow/3 - 处理有 Content 的方法(basic.publish)
基本信息
- 名称:
do_flow/3 - 调用方式:异步(cast)
- 主要用途:
basic.publish(消息发布)
函数签名
-spec do_flow(pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'.
核心代码
do_flow(Pid, Method, Content) ->
%% 使用 credit flow 控制,避免 Channel 进程被消息淹没
credit_flow:send(Pid),
gen_server2:cast(Pid, {method, Method, Content, flow}).
消息发布流程
handle_cast({method, #'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
Content, flow},
State = #ch{cfg = #conf{virtual_host = VHostPath,
user = User,
conn_name = ConnName}}) ->
%% 1. 构造 Exchange 资源名
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
%% 2. 权限检查
check_resource_access(ExchangeName, write, State),
%% 3. 查找 Exchange
case rabbit_exchange:lookup(ExchangeName) of
{ok, Exchange} ->
%% 4. 检查内部 Exchange
check_internal_exchange(Exchange),
%% 5. 解码消息内容
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
%% 6. 包装消息(添加元数据)
Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent),
%% 7. 追踪(如启用)
rabbit_trace:tap_in(Message, ConnName, State#ch.cfg#conf.trace_state),
%% 8. 执行路由与投递
{RoutingRes, DeliveredQPids} =
rabbit_basic:publish(Message, Exchange, Mandatory, Immediate),
%% 9. 处理 mandatory / immediate
case RoutingRes of
routed ->
%% 成功路由到至少一个队列
ok;
unroutable ->
%% 无路由目标
case Mandatory of
true ->
%% 返回消息给发布者
send_basic_return(Message, ExchangeName,
RoutingKey, no_route, State);
false ->
ok %% 丢弃
end
end,
%% 10. Publisher Confirms
case State#ch.confirm_enabled of
true ->
SeqNo = State#ch.publish_seqno,
Unconfirmed1 = rabbit_confirms:insert(SeqNo, Message,
DeliveredQPids,
State#ch.unconfirmed),
noreply(State#ch{publish_seqno = SeqNo + 1,
unconfirmed = Unconfirmed1});
false ->
noreply(State)
end;
{error, not_found} ->
handle_exception({amqp_error, not_found,
"exchange '~ts' not found", [ExchangeNameBin]},
State)
end.
发布时序图
sequenceDiagram
autonumber
participant Reader as rabbit_reader
participant Channel as rabbit_channel
participant Exchange as rabbit_exchange
participant Router as rabbit_router
participant Queue as rabbit_amqqueue
Reader->>+Channel: do_flow(Pid, basic.publish, Content)
Note over Reader,Channel: credit_flow:send(Pid)<br/>流控检查
Channel->>Channel: handle_cast({method, #'basic.publish'{}, Content, flow})
Channel->>Channel: 权限检查<br/>write on exchange
Channel->>+Exchange: lookup(ExchangeName)
Exchange-->>-Channel: {ok, Exchange}
Channel->>Channel: 构造消息<br/>rabbit_basic:message
Channel->>Channel: 跟踪(如启用)<br/>rabbit_trace:tap_in
Channel->>+Router: rabbit_basic:publish<br/>(Message, Exchange, Mandatory, Immediate)
Router->>+Exchange: route(Exchange, Message, #{})
Exchange->>Exchange: 根据类型路由<br/>(direct/topic/fanout/headers)
Exchange-->>-Router: [QueueName1, QueueName2, ...]
loop 每个目标队列
Router->>+Queue: deliver([Message], QueueName)
Queue->>Queue: 入队(内存/磁盘)
Queue-->>-Router: {ok, QPid} | {error, ...}
end
Router-->>-Channel: {routed, DeliveredQPids}<br/>或 {unroutable, []}
alt unroutable and mandatory
Channel->>Channel: 构造 basic.return
Channel->>Reader: 发送 basic.return 帧
end
alt confirm_enabled
Channel->>Channel: 记录 unconfirmed<br/>{SeqNo, QPids}
Channel->>Channel: publish_seqno += 1
end
Channel-->>-Reader: noreply
API 3: handle消费者相关方法
basic.consume - 注册消费者
handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
no_local = NoLocal,
no_ack = NoAck,
exclusive = Exclusive,
nowait = NoWait,
arguments = Args}, _,
State = #ch{cfg = #conf{user = User}}) ->
%% 1. 解析队列名
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
%% 2. 权限检查
check_resource_access(QueueName, read, State),
%% 3. 生成 consumer_tag(如未提供)
ActualConsumerTag = case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.ctag");
_ -> ConsumerTag
end,
%% 4. 检查 consumer_tag 是否已存在
case maps:is_key(ActualConsumerTag, State#ch.consumer_mapping) of
true ->
%% consumer_tag 冲突
precondition_failed("consumer tag '~ts' already exists", [ActualConsumerTag]);
false ->
%% 5. 调用队列的 consume 方法
case rabbit_amqqueue:with(
QueueName,
fun(Q) ->
rabbit_queue_type:consume(
Q,
#{consumer_tag => ActualConsumerTag,
no_ack => NoAck,
channel_pid => self(),
limiter_pid => rabbit_limiter:pid(State#ch.limiter),
limiter_active => rabbit_limiter:is_enabled(State#ch.limiter),
prefetch_count => rabbit_limiter:get_prefetch(State#ch.limiter),
exclusive => Exclusive,
args => Args,
ok_msg => #'basic.consume_ok'{consumer_tag = ActualConsumerTag},
acting_user => User},
State#ch.queue_states)
end) of
{ok, QPid, QState1} ->
%% 6. 更新消费者映射
ConsumerMapping1 = maps:put(ActualConsumerTag,
{QueueName, QPid, NoAck},
State#ch.consumer_mapping),
QueueConsumers1 = maps:update_with(
QueueName,
fun(CTags) -> [ActualConsumerTag | CTags] end,
[ActualConsumerTag],
State#ch.queue_consumers),
%% 7. 发送 basic.consume-ok
ok = maybe_send_reply(NoWait,
#'basic.consume_ok'{consumer_tag = ActualConsumerTag},
State),
{noreply, State#ch{consumer_mapping = ConsumerMapping1,
queue_consumers = QueueConsumers1,
queue_states = QState1}};
{error, Reason} ->
handle_exception(Reason, State)
end
end.
basic.ack - 确认消息
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple}, _, State) ->
%% 1. 查找未确认消息
case find_unacked_messages(DeliveryTag, Multiple, State#ch.unacked_message_q) of
{ok, MsgSeqNos, Remaining} ->
%% 2. 通知队列确认消息
lists:foreach(
fun({QueueName, MsgSeqNo}) ->
rabbit_amqqueue:with(
QueueName,
fun(Q) ->
rabbit_queue_type:settle(Q, complete, [MsgSeqNo],
State#ch.queue_states)
end)
end, MsgSeqNos),
%% 3. 更新未确认队列
{noreply, State#ch{unacked_message_q = Remaining}};
{error, not_found} ->
precondition_failed("delivery-tag ~b not found", [DeliveryTag])
end.
Publisher Confirms 机制
启用 Confirms
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
State1 = State#ch{confirm_enabled = true},
ok = maybe_send_reply(NoWait, #'confirm.select_ok'{}, State1),
{noreply, State1}.
接收队列确认
handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
%% 1. 标记消息为已确认
{Confirmed, UC1} = rabbit_confirms:confirm(MsgSeqNos, QPid, UC),
%% 2. 累积已确认消息
Confirmed1 = lists:append(Confirmed, State#ch.confirmed),
%% 3. 如果有足够多的确认,发送 basic.ack 给客户端
case should_send_confirms(Confirmed1) of
true ->
send_confirms(Confirmed1, State),
{noreply, State#ch{unconfirmed = UC1,
confirmed = []}};
false ->
{noreply, State#ch{unconfirmed = UC1,
confirmed = Confirmed1}}
end.
确认流程图
sequenceDiagram
autonumber
participant Client
participant Channel as rabbit_channel
participant Queue as rabbit_queue
participant Backend as Queue Backend
Note over Client,Channel: 启用 Confirms
Client->>+Channel: confirm.select
Channel->>Channel: confirm_enabled = true
Channel-->>-Client: confirm.select-ok
Note over Client,Backend: 发布消息
Client->>+Channel: basic.publish (SeqNo=1)
Channel->>Channel: unconfirmed[1] = {QPid, ...}
Channel->>+Queue: deliver(Message)
Queue->>+Backend: publish(Message)
Backend->>Backend: 写入存储<br/>(异步)
Backend-->>-Queue: ok
Queue-->>-Channel: (异步消息)
Channel-->>-Client: (不阻塞)
Note over Queue,Channel: 队列异步确认
Backend->>Backend: 消息持久化完成
Backend->>+Queue: notify_confirm
Queue->>+Channel: {confirm, [SeqNo=1], QPid}
Channel->>Channel: unconfirmed[1] -> confirmed
Channel->>Channel: 累积确认
alt 累积足够多 or 定时器触发
Channel->>Client: basic.ack<br/>delivery-tag=1<br/>multiple=false
end
Channel-->>-Queue: ok
Queue-->>-Backend: ok
典型场景时序图
场景 1:连接建立完整流程
sequenceDiagram
autonumber
participant Client as AMQP 客户端
participant Ranch as Ranch Listener
participant Reader as rabbit_reader
participant Auth as 认证后端
participant AccessCtl as rabbit_access_control
Client->>+Ranch: TCP 连接<br/>SYN/SYN-ACK/ACK
Ranch->>Ranch: accept() 接受连接
Ranch->>+Reader: start_link(HelperSups, Ref)
Reader->>Reader: rabbit_networking:handshake(Ref)
Reader->>Reader: init state<br/>connection_state=pre_init
Reader->>Reader: mainloop(..., handshake, 8)
Reader-->>-Ranch: {ok, ReaderPid}
Ranch-->>-Client: (连接已接受)
Note over Client,Reader: 协议握手
Client->>+Reader: "AMQP\x00\x00\x09\x01"
Reader->>Reader: version_negotiation()
Reader->>Client: connection.start<br/>(认证机制:PLAIN, AMQPLAIN)
Reader->>Reader: connection_state=securing
Client->>Reader: connection.start-ok<br/>mechanism=PLAIN<br/>response="\0guest\0guest"
Reader->>+Auth: check_user("guest", [...])<br/>调用 rabbit_auth_backend_internal
Auth->>Auth: 验证密码哈希
Auth-->>-Reader: {ok, #user{username="guest", tags=[administrator]}}
Reader->>Client: connection.tune<br/>channel-max=2047<br/>frame-max=131072<br/>heartbeat=60
Client->>Reader: connection.tune-ok<br/>确认参数
Reader->>Reader: connection_state=opening
Client->>Reader: connection.open<br/>virtual-host="/"
Reader->>Reader: 检查连接数限制<br/>节点/VHost/用户
Reader->>+AccessCtl: check_vhost_access(User, "/", ...)
AccessCtl->>AccessCtl: 检查用户权限<br/>(默认 guest 只能本地连接)
AccessCtl-->>-Reader: ok
Reader->>Reader: 启动 channel_sup_sup
Reader->>Reader: 注册资源告警
Reader->>Reader: connection_state=running
Reader->>Client: connection.open-ok
Reader->>Reader: 发射 connection_created 事件
Reader-->>-Client: (连接就绪)
场景 2:创建 Channel 并声明队列
sequenceDiagram
autonumber
participant Client
participant Reader as rabbit_reader
participant Channel as rabbit_channel
participant Queue as rabbit_amqqueue
participant DB as rabbit_db_queue
participant QueueProc as 队列进程
Client->>+Reader: channel.open<br/>channel-id=1
Reader->>Reader: process_frame(..., Channel=1)
Reader->>Reader: create_channel(1)
Reader->>+Channel: rabbit_channel_sup_sup:start_channel(...)
Channel->>Channel: init(...)<br/>state=starting
Channel-->>-Reader: {ok, ChannelPid, AState}
Reader->>Reader: put({channel, 1}, {ChannelPid, AState})
Reader->>Channel: (异步) channel.open
Channel->>Channel: state=running
Channel->>Client: channel.open-ok
Reader-->>-Client: (channel 1 就绪)
Note over Client,QueueProc: 声明队列
Client->>+Reader: queue.declare<br/>channel-id=1<br/>queue="test_queue"<br/>durable=true
Reader->>Reader: handle_frame(..., Channel=1)
Reader->>Reader: 组装完整命令
Reader->>+Channel: do(ChannelPid, #'queue.declare'{...})
Channel->>Channel: handle_method(#'queue.declare'{...})
Channel->>Channel: 权限检查<br/>configure on queue "test_queue"
Channel->>+Queue: declare(QueueName, Durable, AutoDelete, Args, ...)
Queue->>+DB: create_or_get(Q)
alt 队列不存在
DB->>DB: 写入 Mnesia/Khepri<br/>rabbit_durable_queue 表
DB->>+QueueProc: 启动队列进程<br/>(根据队列类型)
QueueProc->>QueueProc: 初始化队列状态
QueueProc-->>-DB: {ok, QPid}
DB-->>Queue: {new, Q}
else 队列已存在
DB->>DB: 检查参数等价性
DB-->>Queue: {existing, Q}
end
Queue-->>-Channel: {new, Q} | {existing, Q}
Channel->>Channel: 构造 queue.declare-ok<br/>message_count=0<br/>consumer_count=0
Channel->>Client: queue.declare-ok
Channel-->>-Reader: noreply
Reader-->>-Client: (完成)
场景 3:发布消息到队列
sequenceDiagram
autonumber
participant Client
participant Reader
participant Channel
participant Exchange
participant Queue
participant MsgStore as rabbit_msg_store
Note over Client,Channel: 发送 basic.publish(3 帧)
Client->>+Reader: Frame 1: basic.publish<br/>exchange=""<br/>routing-key="test_queue"
Reader->>Reader: 组装器:等待 content-header
Client->>Reader: Frame 2: content-header<br/>properties, body-size=12
Reader->>Reader: 组装器:等待 content-body
Client->>Reader: Frame 3: content-body<br/>"Hello World!"
Reader->>Reader: 组装器:命令完整
Reader->>+Channel: do_flow(ChannelPid, #'basic.publish'{}, Content)
Note over Reader,Channel: credit_flow:send(ChannelPid)<br/>流控检查
Channel->>Channel: handle_method(#'basic.publish'{}, Content)
Channel->>Channel: 权限检查<br/>write on exchange ""
Channel->>+Exchange: route(ExchangeName="", Message, #{})
Exchange->>Exchange: 默认交换器<br/>routing_key 直接映射队列名
Exchange-->>-Channel: [<<"test_queue">>]
loop 每个目标队列
Channel->>+Queue: deliver([Message], <<"test_queue">>)
Queue->>Queue: 检查内存/磁盘限制
alt 消息小于 4KB
Queue->>Queue: 写入内存<br/>rabbit_variable_queue
else 消息大于 4KB
Queue->>+MsgStore: write(MsgId, Content)
MsgStore->>MsgStore: 批量写入磁盘<br/>(异步)
MsgStore-->>-Queue: ok
end
Queue->>Queue: 检查是否有等待消费者
Queue-->>-Channel: {ok, QPid}
end
alt confirm_enabled
Channel->>Channel: unconfirmed[SeqNo] = {QPid, ...}
Note over Queue,Channel: 异步:队列持久化完成后<br/>发送 {confirm, [SeqNo], QPid}
end
Channel-->>-Reader: noreply
Reader-->>-Client: (不发送响应,除非 mandatory 失败)
场景 4:消费消息
sequenceDiagram
autonumber
participant Client
participant Reader
participant Channel
participant Queue
participant Limiter as rabbit_limiter
Note over Client,Queue: 注册消费者
Client->>+Reader: basic.consume<br/>queue="test_queue"<br/>consumer-tag=""<br/>no-ack=false
Reader->>+Channel: do(ChannelPid, #'basic.consume'{})
Channel->>Channel: 生成 consumer_tag<br/>"amq.ctag-xxx"
Channel->>Channel: 权限检查<br/>read on queue "test_queue"
Channel->>+Queue: consume(Q, #{consumer_tag, channel_pid, ...})
Queue->>Queue: 注册消费者<br/>consumer_tag -> ChannelPid
Queue-->>-Channel: {ok, QPid, QState}
Channel->>Channel: consumer_mapping[consumer_tag] = {QueueName, QPid, NoAck}
Channel->>Client: basic.consume-ok<br/>consumer-tag="amq.ctag-xxx"
Channel-->>-Reader: noreply
Reader-->>-Client: (消费者已注册)
Note over Queue,Client: 队列推送消息
Queue->>Queue: 检查消费者 prefetch
Queue->>+Limiter: can_send(ChannelPid, ..., QPid)
Limiter->>Limiter: 检查未确认消息数<br/>< prefetch_count?
Limiter-->>-Queue: true
Queue->>+Channel: 异步消息<br/>{deliver, consumer_tag, ack_required, Message}
Channel->>Channel: handle_info({deliver, ...})
Channel->>Channel: 分配 delivery_tag<br/>(递增)
alt ack_required
Channel->>Channel: unacked_message_q += {delivery_tag, QueueName, MsgId}
end
Channel->>+Reader: 发送 basic.deliver 帧
Reader->>Client: basic.deliver<br/>consumer-tag<br/>delivery-tag=1
Reader->>Client: content-header
Reader->>Client: content-body<br/>"Hello World!"
Reader-->>-Channel: ok
Channel-->>-Queue: ok
Note over Client,Queue: 客户端确认消息
Client->>+Reader: basic.ack<br/>delivery-tag=1<br/>multiple=false
Reader->>+Channel: do(ChannelPid, #'basic.ack'{})
Channel->>Channel: find_unacked_messages(delivery_tag=1)
Channel->>Channel: unacked_message_q -= {1, ...}
Channel->>+Queue: settle(Q, complete, [MsgId])
Queue->>Queue: 删除消息
Queue->>+Limiter: credit_flow:ack(ChannelPid, QPid)
Limiter-->>-Queue: ok
Queue-->>-Channel: ok
Channel-->>-Reader: noreply
Reader-->>-Client: (确认完成)
性能优化与最佳实践
连接管理
连接池
问题:频繁建立/关闭连接开销大(TCP 握手、认证、资源分配)。
最佳实践:
- 客户端维护长连接池(推荐每个应用 1-5 个连接)
- 连接复用:多个 Channel 共享一个连接
示例配置:
%% Erlang 客户端
{ok, Connection} = amqp_connection:start(#amqp_params_network{
host = "localhost",
port = 5672,
virtual_host = <<"/">>,
username = <<"guest">>,
password = <<"guest">>,
heartbeat = 60, %% 心跳间隔(秒)
connection_timeout = 60000 %% 连接超时(毫秒)
}).
心跳配置
目的:检测僵尸连接,及时释放资源。
配置建议:
- 默认:60 秒
- 高延迟网络:120-300 秒
- 低延迟网络:10-30 秒
计算公式:
超时时间 = heartbeat * 2
Channel 管理
Channel 数量
限制:
- 默认每个连接最多 2047 个 Channel(
channel_max) - 实际建议:每个连接 10-50 个 Channel
原因:
- 每个 Channel 是独立的 Erlang 进程(内存开销约 100KB-1MB)
- Channel 过多导致上下文切换开销
最佳实践:
- 发布者:每个线程一个 Channel
- 消费者:每个队列一个 Channel
- 避免共享 Channel(非线程安全)
Channel 异常处理
问题:Channel 异常不影响连接,但会关闭 Channel。
处理策略:
%% 监听 Channel 关闭
process_flag(trap_exit, true),
{ok, Channel} = amqp_connection:open_channel(Connection),
receive
{'EXIT', Channel, Reason} ->
%% Channel 关闭,记录日志并重新打开
error_logger:error_msg("Channel closed: ~p", [Reason]),
{ok, NewChannel} = amqp_connection:open_channel(Connection)
end.
消息发布优化
批量发布
问题:逐条发布消息,网络往返次数多。
优化:
- 客户端批量发送多条
basic.publish - RabbitMQ 批量处理(Nagle 算法)
性能提升:
- 单条发布:~5000 msg/s
- 批量发布(100 条/批):~50000 msg/s
Publisher Confirms
使用场景:需要可靠性但不需要事务的原子性。
最佳实践:
- 异步确认:发布后不阻塞,批量等待确认
- 窗口大小:100-1000(未确认消息数上限)
示例代码:
%% 启用 confirms
#'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
%% 发布消息
[amqp_channel:cast(Channel,
#'basic.publish'{exchange = <<>>, routing_key = <<"queue">>},
#amqp_msg{payload = <<"data">>})
|| _ <- lists:seq(1, 1000)],
%% 等待确认
receive
#'basic.ack'{delivery_tag = Tag, multiple = true} ->
io:format("Confirmed up to ~p~n", [Tag])
end.
消费优化
Prefetch 设置
定义:Channel 一次性从队列预取的消息数量(QoS)。
配置:
#'basic.qos_ok'{} = amqp_channel:call(Channel,
#'basic.qos'{prefetch_count = 100}).
选择策略:
- 低延迟网络:prefetch=1-10(避免消息堆积在客户端)
- 高延迟网络:prefetch=100-1000(减少往返次数)
- CPU 密集型处理:prefetch=CPU 核心数
- I/O 密集型处理:prefetch=10-100
自动确认 vs 手动确认
| 模式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
自动确认no_ack=true |
性能高 无需 ack 网络往返 |
消息可能丢失 (客户端崩溃) |
日志、监控等非关键数据 |
手动确认no_ack=false |
可靠性高 消息不会丢失 |
性能稍低 需要显式 ack |
交易、订单等关键数据 |
批量确认
问题:逐条确认消息,网络往返多。
优化:
%% 批量确认(multiple=true)
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = LastTag,
multiple = true}).
含义:确认 delivery_tag <= LastTag 的所有消息。
性能提升:
- 单条确认:~10000 msg/s
- 批量确认(每 100 条):~80000 msg/s
流控与反压
连接级流控
触发条件:
- 节点内存使用超过水位线(默认 40%)
- 节点磁盘剩余空间低于下限(默认 50MB)
行为:
- RabbitMQ 发送
connection.blocked给客户端 - 阻塞所有发布操作
- 恢复后发送
connection.unblocked
客户端处理:
receive
#'connection.blocked'{reason = Reason} ->
io:format("Connection blocked: ~s~n", [Reason]),
%% 暂停发布,等待 unblocked
wait_for_unblocked();
#'connection.unblocked'{} ->
io:format("Connection unblocked~n"),
%% 恢复发布
resume_publishing()
end.
Channel 级流控
机制:Credit Flow(信用流控)
原理:
- Queue → Channel:每推送一条消息消耗 1 个 credit
- Channel → Queue:定期补充 credit
- Credit 耗尽时 Queue 停止推送
好处:
- 防止快速队列淹没慢速消费者
- 自动平衡推送速率
配置:
%% rabbitmq.config
[{rabbit, [
{credit_flow_default_credit, {400, 200}} %% {初始 credit, 补充阈值}
]}].
故障排查
连接失败
症状 1:连接超时
原因:
- 网络不通(防火墙、安全组)
- RabbitMQ 未启动
- 端口错误
排查:
# 检查 RabbitMQ 是否运行
rabbitmqctl status
# 检查监听端口
netstat -tuln | grep 5672
# 测试连接
telnet localhost 5672
症状 2:认证失败
错误信息:
ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN.
For details see the broker logfile.
原因:
- 用户名/密码错误
- 用户不存在
- guest 用户远程连接被拒绝(默认只能本地连接)
解决:
# 创建用户
rabbitmqctl add_user myuser mypassword
# 设置权限
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
# 设置管理员标签
rabbitmqctl set_user_tags myuser administrator
# 允许 guest 远程连接(不推荐,仅开发环境)
# 修改 rabbitmq.config
[{rabbit, [{loopback_users, []}]}].
症状 3:VHost 访问被拒绝
错误信息:
NOT_ALLOWED - access to vhost '/' refused for user 'myuser'
解决:
# 检查用户权限
rabbitmqctl list_user_permissions myuser
# 授予权限
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
Channel 异常
症状 1:Channel 意外关闭
日志:
Channel error: PRECONDITION_FAILED - ...
常见原因:
- 队列/交换器不存在
- 参数不匹配(重复声明时)
- 权限不足
调试:
%% 捕获 Channel 关闭
receive
{'EXIT', ChannelPid, Reason} ->
case Reason of
{shutdown, {server_initiated_close, Code, Text}} ->
io:format("Channel closed by server: ~p - ~s~n", [Code, Text]);
_ ->
io:format("Channel closed: ~p~n", [Reason])
end
end.
症状 2:Channel 阻塞
现象:发布消息后无响应,Channel 卡住。
原因:
- 队列达到长度限制(
x-max-length) - 内存告警触发
- 磁盘告警触发
排查:
# 检查告警
rabbitmqctl alarm_list
# 检查队列状态
rabbitmqctl list_queues name messages consumers memory
# 检查连接状态
rabbitmqctl list_connections name state channels
性能问题
症状 1:发布速率低
排查:
-
检查是否启用 Publisher Confirms:
- 未启用:立即返回,吞吐量高
- 启用:等待确认,吞吐量降低但可靠性高
-
检查消息大小:
- 小消息(<4KB):性能最优
- 大消息(>1MB):需要分片,性能下降
-
检查持久化:
- 非持久化:纯内存,性能最高
- 持久化:需要磁盘写入,性能受磁盘 I/O 限制
优化:
- 批量发布
- 使用 SSD 存储
- 调整
queue_index_embed_msgs_below(默认 4096)
症状 2:消费速率低
排查:
-
检查 prefetch 设置:
- 过小:网络往返次数多
- 过大:消息堆积在客户端
-
检查消费者处理时间:
- 使用 Management UI 查看
deliver_get速率
- 使用 Management UI 查看
-
检查网络延迟:
- 使用
ping测试延迟 - 考虑增加 prefetch
- 使用
优化:
- 增加消费者数量
- 并行处理消息
- 批量确认
配置参考
连接相关配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
tcp_listeners |
[5672] |
TCP 监听端口 |
ssl_listeners |
[] |
TLS 监听端口 |
num_acceptors.tcp |
10 | TCP 接受器进程数 |
handshake_timeout |
10000 | 握手超时(毫秒) |
heartbeat |
60 | 心跳间隔(秒),0=禁用 |
frame_max |
131072 | 最大帧大小(字节) |
channel_max |
2047 | 每连接最大 Channel 数 |
connection_max |
infinity | 节点最大连接数 |
Channel 相关配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
channel_operation_timeout |
15000 | Channel 操作超时(毫秒) |
consumer_timeout |
1800000 | 消费者无活动超时(30 分钟) |
default_consumer_prefetch |
{false, 0} |
默认 prefetch,{global, count} |
consumer_max_per_channel |
infinity | 每 Channel 最大消费者数 |
流控相关配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
vm_memory_high_watermark |
0.4 | 内存水位线(触发流控) |
disk_free_limit |
50MB | 磁盘剩余下限(触发告警) |
credit_flow_default_credit |
{400, 200} |
Credit Flow 参数 |
附录
Reader 进程状态查询
%% 获取连接信息
rabbit_reader:info(ReaderPid, [name, peer_host, peer_port, channels, state]).
%% 输出示例
[{name, <<"127.0.0.1:51234 -> 127.0.0.1:5672">>},
{peer_host, <<"127.0.0.1">>},
{peer_port, 51234},
{channels, 5},
{state, running}]
Channel 进程状态查询
%% 获取 Channel 信息
rabbit_channel:info(ChannelPid, [number, consumer_count, messages_unacknowledged, prefetch_count]).
%% 输出示例
[{number, 1},
{consumer_count, 2},
{messages_unacknowledged, 10},
{prefetch_count, 100}]
相关模块
rabbit_networking:网络监听与连接管理rabbit_command_assembler:AMQP 命令组装rabbit_channel_sup_sup:Channel 监督者的监督者rabbit_writer:帧写入器rabbit_limiter:流控限流器rabbit_heartbeat:心跳管理rabbit_queue_collector:独占队列收集器