RabbitMQ-02-连接与通道模块-rabbit_reader_channel

模块概览

本文档涵盖两个紧密协作的核心模块:

  • rabbit_reader:管理 AMQP 0-9-1 客户端连接
  • rabbit_channel:实现 AMQP 0-9-1 通道逻辑

模块职责划分

rabbit_reader 职责

  1. 协议握手与协商

    • 处理 AMQP 协议版本协商
    • 检测并拒绝 HTTP/TLS 错误连接
  2. 连接生命周期管理

    • 接受 TCP 连接
    • 认证客户端
    • 维护连接状态机
  3. 帧解析与分发

    • 从套接字读取数据
    • 解析 AMQP 帧(frame header + payload)
    • 分发帧到相应 Channel
  4. Channel 管理

    • 创建和跟踪 Channel 进程
    • 监控 Channel 生命周期
    • 强制执行 channel_max 限制
  5. 流控与资源管理

    • TCP 背压(backpressure)
    • 内存/磁盘告警响应
    • 连接级流控(connection.blocked)
  6. 心跳机制

    • 发送心跳帧
    • 监控客户端心跳
    • 超时检测
  7. 统计与事件

    • 发射连接创建/关闭事件
    • 定期统计信息上报

rabbit_channel 职责

  1. AMQP 方法处理

    • queue.*:队列声明、绑定、删除
    • exchange.*:交换器声明、删除
    • basic.publish:消息发布
    • basic.consume:消费者订阅
    • basic.get:拉取消息
    • basic.ack/nack/reject:消息确认
    • tx.*:事务管理
  2. 消息路由

    • 调用 rabbit_exchange:route/3 计算目标队列
    • 将消息投递到队列
  3. 消费者管理

    • 维护消费者注册表
    • 推送消息给消费者
    • QoS(prefetch)控制
  4. 确认机制

    • Publisher Confirms:追踪未确认消息
    • Consumer Acks:追踪未确认投递
  5. 权限控制

    • 检查用户对 VHost/Queue/Exchange 的操作权限
    • 使用权限缓存加速
  6. 事务支持

    • tx.select/tx.commit/tx.rollback
    • 缓存事务中的操作
  7. 流控与反压

    • 限流器(Limiter)管理 prefetch
    • 信用流控(credit flow)防止内存溢出

架构图

flowchart TB
    subgraph 客户端
        Client[AMQP 客户端]
    end
    
    subgraph Reader 进程
        Reader[rabbit_reader<br/>连接进程]
        FrameParser[Frame Parser<br/>帧解析]
        Assembler[Command Assembler<br/>命令组装]
    end
    
    subgraph Channel 进程1..N
        Channel1[rabbit_channel<br/>Channel 1]
        Channel2[rabbit_channel<br/>Channel 2]
        ChannelN[rabbit_channel<br/>Channel N]
    end
    
    subgraph 支持进程
        Writer[rabbit_writer<br/>写入器]
        Limiter[rabbit_limiter<br/>限流器]
        Heartbeat[rabbit_heartbeat<br/>心跳器]
        QCollector[rabbit_queue_collector<br/>队列收集器]
    end
    
    subgraph 业务逻辑
        Exchange[rabbit_exchange<br/>交换器]
        Queue[rabbit_amqqueue<br/>队列]
        Router[rabbit_router<br/>路由器]
    end
    
    Client -->|TCP 连接| Reader
    Reader -->|解析| FrameParser
    FrameParser -->|组装| Assembler
    Assembler -->|Channel 0| Reader
    Assembler -->|Channel 1..N| Channel1
    Assembler -->|Channel 1..N| Channel2
    Assembler -->|Channel 1..N| ChannelN
    
    Reader -->|创建| Channel1
    Reader -->|创建| Channel2
    Reader -->|创建| ChannelN
    
    Reader <-->|使用| Writer
    Reader <-->|管理| Heartbeat
    Reader <-->|使用| QCollector
    
    Channel1 <-->|QoS| Limiter
    Channel2 <-->|QoS| Limiter
    ChannelN <-->|QoS| Limiter
    
    Channel1 -->|声明/绑定| Exchange
    Channel1 -->|发布| Router
    Channel1 -->|消费| Queue
    
    Router -->|路由| Exchange
    Exchange -->|绑定| Queue
    
    Writer -->|TCP 写入| Client

模块交互说明

连接建立流程

  1. TCP 连接:Client → Reader(由 Ranch 监听器创建 Reader 进程)
  2. 协议握手:Reader 接收 "AMQP\x00\x00\x09\x01"
  3. 版本协商:Reader 发送 connection.start,接收 connection.start-ok
  4. 认证:Reader 调用认证后端验证凭据
  5. VHost 选择:Client 发送 connection.open,Reader 检查 VHost 访问权限
  6. 连接就绪:Reader 进入 running 状态

帧处理流程

套接字数据  Reader:mainloop  Reader:handle_input
  
帧头解析(Type, Channel, PayloadSize
  
完整帧读取(Payload + ?FRAME_END
  
Reader:handle_frame  分发到 Channel
  
rabbit_command_assembler:process  组装完整命令
  
Channel 0?→ Reader:handle_method0(连接级方法)
Channel N?→ rabbit_channel:do/2(通道级方法)

Channel 方法处理流程

Reader → rabbit_channel:do(Pid, Method)
Channel:handle_cast({method, Method, Content, Flow})
handle_method(Method, Content, State)
示例: basic.publish
  ├─> 权限检查:rabbit_access_control:check_resource_access
  ├─> 路由计算:rabbit_exchange:route
  ├─> 消息投递:rabbit_amqqueue:deliver
  └─> 确认追踪:如启用 confirms,记录 seqno

rabbit_reader 模块详解

状态机与连接状态

连接状态定义

-record(v1, {
    parent,                     %% 父进程(通常是 supervisor)
    ranch_ref,                  %% Ranch listener 引用
    sock,                       %% TCP 套接字
    connection,                 %% #connection{} 记录
    callback,                   %% 当前解析回调(frame_header | {frame_payload, ...} | handshake)
    recv_len,                   %% 期望接收长度
    pending_recv,               %% 待处理数据
    connection_state,           %% 连接状态:pre_init | securing | running | blocking | blocked | closing | closed
    helper_sup,                 %% 辅助监督者
    queue_collector,            %% 独占队列收集器
    heartbeater,                %% 心跳器
    stats_timer,                %% 统计定时器
    channel_sup_sup_pid,        %% Channel 监督者的监督者
    channel_count,              %% 当前 Channel 数量
    throttle,                   %% 流控状态
    proxy_socket,               %% 代理套接字(如 PROXY protocol)
    dynamic_buffer_size,        %% 动态缓冲区大小
    dynamic_buffer_moving_average %% 移动平均
}).

-record(connection, {
    name,              %% 连接名称(用于日志)
    host,              %% 服务端地址
    peer_host,         %% 客户端地址
    port,              %% 服务端端口
    peer_port,         %% 客户端端口
    protocol,          %% AMQP 协议版本
    user,              %% #user{} 认证用户
    vhost,             %% 虚拟主机
    timeout,           %% 心跳超时(秒)
    frame_max,         %% 最大帧大小
    channel_max,       %% 最大 Channel 数
    capabilities,      %% 客户端能力
    auth_mechanism,    %% 认证机制
    auth_state,        %% 认证状态
    connected_at       %% 连接时间戳
}).

-record(throttle, {
    last_blocked_at,                   %% 上次阻塞时间戳
    blocked_by,                        %% 阻塞原因集合:#{resource => memory | disk}
    should_block,                      %% 是否应该阻塞
    connection_blocked_message_sent    %% 是否已发送 connection.blocked
}).

状态转换图

stateDiagram-v2
    [*] --> pre_init: start_link
    pre_init --> securing: 接收 AMQP 头<br/>开始认证
    securing --> running: connection.open-ok<br/>认证成功
    
    running --> blocking: 资源告警触发<br/>(内存/磁盘)
    blocking --> blocked: 发送 connection.blocked
    blocked --> running: 资源恢复<br/>发送 connection.unblocked
    
    running --> closing: 客户端/服务端<br/>发送 connection.close
    blocking --> closing: 强制关闭
    blocked --> closing: 强制关闭
    
    closing --> closed: connection.close-ok<br/>双向确认
    closed --> [*]
    
    pre_init --> closed: 握手失败
    securing --> closed: 认证失败

核心 API

API 1: start_link/2 - 启动连接进程

基本信息
  • 名称start_link/2
  • 调用者:Ranch 监听器(TCP 连接接受时)
  • 进程类型:Special process(非 gen_server)
函数签名
-spec start_link({pid(), pid()}, ranch:ref()) ->
    rabbit_types:ok(pid()).
请求参数
参数 类型 说明
HelperSups {pid(), pid()} AMQP 0-9-1 和 AMQP 1.0 辅助监督者
Ref ranch:ref() Ranch listener 引用
核心代码
start_link(HelperSups, Ref) ->
    Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]),
    {ok, Pid}.

init(Parent, HelperSups, Ref) ->
    logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
    ?LG_PROCESS_TYPE(reader),
    %% 1. 执行 TCP 握手(Ranch handshake)
    {ok, Sock} = rabbit_networking:handshake(Ref,
        application:get_env(rabbit, proxy_protocol, false),
        dynamic_buffer),
    Deb = sys:debug_options([]),
    %% 2. 开始连接流程
    start_connection(Parent, HelperSups, Ref, Deb, Sock).

start_connection(Parent, HelperSups, Ref, Deb, Sock) ->
    %% 3. 创建队列收集器(管理独占队列)
    {ok, Collector} = rabbit_queue_collector:start_link(group_leader()),
    
    %% 4. 初始化状态
    State = #v1{
        parent = Parent,
        ranch_ref = Ref,
        sock = Sock,
        connection = #connection{},
        connection_state = pre_init,
        helper_sup = HelperSups,
        queue_collector = Collector,
        throttle = #throttle{
            last_blocked_at = never,
            blocked_by = sets:new(),
            should_block = false,
            connection_blocked_message_sent = false
        }
    },
    
    %% 5. 进入主循环,等待协议握手
    mainloop(Deb, [], 0, switch_callback(State, handshake, 8)).
时序图
sequenceDiagram
    autonumber
    participant Ranch as Ranch Listener
    participant Reader as rabbit_reader
    participant QCollector as queue_collector
    participant Sock as TCP Socket
    participant Client as AMQP Client
    
    Ranch->>+Reader: start_link(HelperSups, Ref)
    Reader->>Reader: proc_lib:spawn_link<br/>init/3
    
    Reader->>+Sock: rabbit_networking:handshake(Ref)
    Sock->>Sock: Ranch 握手<br/>接管套接字
    Sock-->>-Reader: {ok, Sock}
    
    Reader->>+QCollector: start_link(group_leader())
    QCollector->>QCollector: 初始化独占队列收集器
    QCollector-->>-Reader: {ok, CollectorPid}
    
    Reader->>Reader: 初始化状态<br/>connection_state = pre_init
    Reader->>Reader: mainloop(..., handshake, 8)
    Reader-->>-Ranch: {ok, ReaderPid}
    
    Note over Reader,Client: 等待客户端发送协议头
    Client->>+Reader: "AMQP\x00\x00\x09\x01"
    Reader->>Reader: handle_input(handshake, ...)
    Reader->>Reader: version_negotiation(...)
    Reader-->>-Client: connection.start<br/>(认证机制列表)

API 2: mainloop/4 - 主循环(帧接收与处理)

基本信息
  • 名称mainloop/4
  • 循环方式:尾递归
  • 阻塞点rabbit_net:recv/2 套接字读取
函数签名
-spec mainloop([sys:debug_option()], [binary()], non_neg_integer(), #v1{}) ->
    any().
请求参数
参数 类型 说明
Deb [sys:debug_option()] 调试选项
Buf [binary()] 已接收但未处理的数据缓冲区
BufLen non_neg_integer() 缓冲区长度(字节)
State #v1{} 连接状态
核心代码
mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
                                       recv_len = RecvLen,
                                       callback = Callback}) ->
    case BufLen >= RecvLen of
        true  ->
            %% 缓冲区数据足够,处理输入
            {DataLRev, RestLRev} = lists:split(RecvLen, lists:reverse(Buf)),
            Data = list_to_binary(lists:reverse(DataLRev)),
            Rest = lists:reverse(RestLRev),
            {B, NewState} = handle_input(Callback, Data, State),
            mainloop(Deb, B ++ Rest, BufLen - RecvLen, NewState);
        false ->
            %% 需要从套接字读取更多数据
            case rabbit_net:recv(Sock, 0) of
                {ok, Recv} ->
                    NewBuf = [Recv | Buf],
                    mainloop(Deb, NewBuf, BufLen + byte_size(Recv), State);
                {error, timeout} ->
                    mainloop(Deb, Buf, BufLen, State);
                {error, closed} ->
                    %% 客户端关闭连接
                    maybe_emit_stats(State),
                    terminate(State, normal);
                {error, Reason} ->
                    terminate(State, {socket_error, Reason})
            end
    end.
处理流程分解
  1. 缓冲区检查

    • 如果 BufLen >= RecvLen,说明已接收到足够数据,调用 handle_input/3
    • 否则从套接字读取更多数据(rabbit_net:recv/2
  2. 数据处理

    • handle_input/3 根据 Callback 决定解析方式:
      • handshake:解析协议头(8 字节)
      • frame_header:解析帧头(7 字节)
      • {frame_payload, Type, Channel, PayloadSize}:解析帧负载
  3. 尾递归

    • handle_input/3 返回 {RestData, NewState}
    • 递归调用 mainloop/4 继续处理
  4. 套接字关闭

    • 客户端关闭:触发 terminate(State, normal)
    • 错误关闭:触发 terminate(State, {socket_error, Reason})
帧解析时序图
sequenceDiagram
    autonumber
    participant Client
    participant Sock as TCP Socket
    participant Reader as mainloop
    participant Parser as handle_input
    participant Handler as handle_frame
    
    loop 主循环
        Reader->>Reader: 检查缓冲区<br/>BufLen >= RecvLen?
        
        alt 数据不足
            Reader->>+Sock: rabbit_net:recv(Sock, 0)
            Sock->>Client: (TCP 接收)
            Sock-->>-Reader: {ok, Data}
            Reader->>Reader: 追加到 Buf<br/>更新 BufLen
        else 数据充足
            Reader->>+Parser: handle_input(Callback, Data, State)
            
            alt Callback = handshake
                Parser->>Parser: 解析 "AMQP" <br/>8 字节
                Parser->>Parser: version_negotiation
                Parser-->>Reader: {Rest, NewState}
            else Callback = frame_header
                Parser->>Parser: 解析帧头<br/>Type:8, Channel:16, PayloadSize:32
                
                alt 帧完整(含 payload + 0xCE
                    Parser->>+Handler: handle_frame(Type, Channel, Payload)
                    Handler->>Handler: 分发到 Channel
                    Handler-->>-Parser: NewState
                    Parser-->>-Reader: {Rest, NewState<br/>callback=frame_header}
                else 帧不完整
                    Parser-->>-Reader: {Rest, NewState<br/>callback={frame_payload, ...}}
                end
            else Callback = {frame_payload, ...}
                Parser->>Parser: 读取 Payload + 0xCE
                Parser->>+Handler: handle_frame(...)
                Handler-->>-Parser: NewState
                Parser-->>Reader: {Rest, NewState<br/>callback=frame_header}
            end
            
            Reader->>Reader: 尾递归<br/>mainloop(Rest, NewState)
        end
    end

API 3: handle_frame/4 - 帧处理与分发

基本信息
  • 名称handle_frame/4
  • 分发规则
    • Channel 0 → Reader 自身(连接级方法)
    • Channel N → 对应的 rabbit_channel 进程
函数签名
-spec handle_frame(non_neg_integer(), non_neg_integer(), binary(), #v1{}) ->
    #v1{}.
请求参数
参数 类型 说明
Type non_neg_integer() 帧类型:1(method) / 2(header) / 3(body) / 8(heartbeat)
Channel non_neg_integer() Channel 编号(0 表示连接级)
Payload binary() 帧负载
State #v1{} 连接状态
核心代码
%% Channel 0 帧(连接级方法)
handle_frame(Type, 0, Payload,
             State = #v1{connection = #connection{protocol = Protocol}}) ->
    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
        error     -> frame_error(unknown_frame, Type, 0, Payload, State);
        heartbeat -> State;  %% 心跳帧,忽略
        {method, MethodName, FieldsBin} ->
            %% 连接级方法(如 connection.close)
            handle_method0(MethodName, FieldsBin, State);
        _Other    -> unexpected_frame(Type, 0, Payload, State)
    end;

%% Channel N 帧(通道级方法)
handle_frame(Type, Channel, Payload,
             State = #v1{connection = #connection{protocol = Protocol}})
  when ?IS_RUNNING(State) ->
    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
        error     -> frame_error(unknown_frame, Type, Channel, Payload, State);
        heartbeat -> unexpected_frame(Type, Channel, Payload, State);
        Frame     -> process_frame(Frame, Channel, State)
    end.

%% 处理 Channel N 的帧
process_frame(Frame, Channel, State) ->
    ChKey = {channel, Channel},
    %% 1. 获取或创建 Channel
    case get(ChKey) of
        undefined -> 
            {ok, {ChPid, AState}, State1} = create_channel(Channel, State);
        {ChPid, AState} -> 
            {ok, {ChPid, AState}, State1} = {ok, {ChPid, AState}, State}
    end,
    
    %% 2. 组装命令(可能需要多个帧)
    case rabbit_command_assembler:process(Frame, AState) of
        {ok, NewAState} ->
            %% 帧已组装但命令未完成(如 header 后等待 body)
            put(ChKey, {ChPid, NewAState}),
            State1;
        {ok, Method, NewAState} ->
            %% 完整的命令(无 content)
            rabbit_channel:do(ChPid, Method),
            put(ChKey, {ChPid, NewAState}),
            State1;
        {ok, Method, Content, NewAState} ->
            %% 完整的命令(有 content)
            rabbit_channel:do_flow(ChPid, Method, Content),
            put(ChKey, {ChPid, NewAState}),
            control_throttle(State1)
    end.
代码逐段解释
  1. 帧分析analyze_frame/3):

    • 解析帧类型(method/header/body/heartbeat)
    • 提取方法名和字段(对于 method 帧)
    • 返回 {method, MethodName, FieldsBin} 或其他类型
  2. Channel 0 处理

    • 连接级方法(如 connection.close)直接由 Reader 处理
    • 调用 handle_method0/3
  3. Channel N 处理

    • 从进程字典获取 Channel 状态(get({channel, Channel})
    • 如不存在,调用 create_channel/2 创建
    • 使用 rabbit_command_assembler 组装完整命令
  4. 命令组装

    • AMQP 命令可能跨多个帧:
      • Method 帧(如 basic.publish
      • Header 帧(消息属性)
      • Body 帧(可能多个,如果消息很大)
    • process/2 状态机累积帧,直到完整命令
  5. 分发到 Channel

    • rabbit_channel:do/2:无 content 的方法
    • rabbit_channel:do_flow/3:有 content 的方法(如 publish)
    • 异步调用(cast),不等待响应
帧分发流程图
flowchart TD
    Start[handle_frame<br/>Type, Channel, Payload] --> Analyze[analyze_frame<br/>解析帧类型]
    
    Analyze --> IsCh0{Channel == 0?}
    
    IsCh0 -->|| IsHeartbeat{heartbeat?}
    IsHeartbeat -->|| Ignore[忽略]
    IsHeartbeat -->|| Method0[handle_method0<br/>连接级方法]
    
    IsCh0 -->|| GetCh[获取 Channel<br/>from 进程字典]
    GetCh --> ChExists{Channel 存在?}
    ChExists -->|| CreateCh[create_channel<br/>创建新 Channel]
    ChExists -->|| Assemble[rabbit_command_assembler:process<br/>组装命令]
    CreateCh --> Assemble
    
    Assemble --> Complete{命令完整?}
    Complete -->|| SaveState[保存组装状态<br/>等待下一帧]
    Complete -->|, content| DoMethod[rabbit_channel:do<br/>Method]
    Complete -->|, content| DoFlow[rabbit_channel:do_flow<br/>Method, Content]
    
    DoMethod --> UpdateState[更新状态<br/>控制流控]
    DoFlow --> UpdateState
    SaveState --> UpdateState
    Method0 --> UpdateState
    Ignore --> UpdateState
    
    UpdateState --> End[返回新状态]

API 4: create_channel/2 - 创建 Channel

基本信息
  • 名称create_channel/2
  • 限制:受 channel_max 限制(默认 2047)
  • 监督:Channel 由 rabbit_channel_sup_sup 监督
函数签名
-spec create_channel(non_neg_integer(), #v1{}) ->
    {ok, {pid(), term()}, #v1{}} | {error, term()}.
核心代码
create_channel(Channel,
               #v1{sock                = Sock,
                   queue_collector     = Collector,
                   channel_sup_sup_pid = ChanSupSup,
                   channel_count       = ChannelCount,
                   connection =
                       #connection{name         = Name,
                                   protocol     = Protocol,
                                   frame_max    = FrameMax,
                                   vhost        = VHost,
                                   capabilities = Capabilities,
                                   user         = User}
                   } = State) ->
    %% 1. 检查 Channel 数量限制
    case ChannelCount < ?CHANNEL_MAX of
        false ->
            {error, rabbit_misc:amqp_error(
                not_allowed, "channel_max (~b) reached", [?CHANNEL_MAX], 'none')};
        true ->
            %% 2. 检查用户连接数限制
            case is_over_limits(User#user.username) of
                false ->
                    %% 3. 启动 Channel 监督者和 Channel 进程
                    {ok, _ChSupPid, {ChPid, AState}} =
                        rabbit_channel_sup_sup:start_channel(
                            ChanSupSup, 
                            {tcp, Sock, Channel, FrameMax, self(), Name,
                             Protocol, User, VHost, Capabilities,
                             Collector}),
                    
                    %% 4. 监控 Channel 进程
                    MRef = erlang:monitor(process, ChPid),
                    put({ch_pid, ChPid}, {Channel, MRef}),
                    put({channel, Channel}, {ChPid, AState}),
                    
                    {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
                {true, Limit, Fmt} ->
                    {error, rabbit_misc:amqp_error(
                        not_allowed, Fmt, [node(), Limit], 'none')}
            end
    end.
时序图
sequenceDiagram
    autonumber
    participant Reader as rabbit_reader
    participant SupSup as channel_sup_sup
    participant Sup as channel_sup
    participant Channel as rabbit_channel
    participant Limiter as rabbit_limiter
    participant Writer as rabbit_writer
    
    Reader->>Reader: create_channel(ChannelNum)
    Reader->>Reader: 检查 channel_count<br/>< channel_max?
    Reader->>Reader: 检查用户限制<br/>is_over_limits(User)?
    
    Reader->>+SupSup: start_channel(...Args)
    SupSup->>+Sup: supervisor:start_child<br/>channel_sup
    
    Sup->>+Limiter: start_link()
    Limiter->>Limiter: 初始化限流器<br/>prefetch=0
    Limiter-->>-Sup: {ok, LimiterPid}
    
    Sup->>+Writer: start_link(Sock)
    Writer->>Writer: 初始化写入器<br/>缓冲区
    Writer-->>-Sup: {ok, WriterPid}
    
    Sup->>+Channel: start_link(...)<br/>包含 LimiterPid, WriterPid
    Channel->>Channel: init(...)<br/>state=starting
    Channel->>Channel: pg_local:join<br/>rabbit_channels
    Channel-->>-Sup: {ok, ChannelPid}
    
    Sup-->>-SupSup: {ok, SupPid, {ChannelPid, AState}}
    SupSup-->>-Reader: {ok, _, {ChannelPid, AState}}
    
    Reader->>Reader: erlang:monitor<br/>(process, ChannelPid)
    Reader->>Reader: put({channel, ChannelNum},<br/>{ChannelPid, AState})
    Reader->>Reader: channel_count += 1

API 5: handle_method0/3 - 处理连接级方法

基本信息
  • 名称handle_method0/3
  • 方法范围connection.*channel.open(特殊情况)
核心方法列表
方法 方向 功能
connection.start-ok C→S 客户端响应认证机制
connection.tune-ok C→S 客户端确认连接参数
connection.open C→S 请求打开 VHost
connection.open-ok S→C VHost 打开成功
connection.close C↔S 请求关闭连接
connection.close-ok C↔S 确认关闭连接
connection.blocked S→C 通知客户端连接被阻塞
connection.unblocked S→C 通知客户端连接解除阻塞
核心代码(示例:connection.open)
handle_method0(#'connection.open'{virtual_host = VHost},
               State = #v1{ranch_ref        = RanchRef,
                           connection_state = opening,
                           connection       = Connection = #connection{
                                                log_name = ConnName,
                                                user = User = #user{username = Username},
                                                protocol = Protocol},
                           helper_sup       = SupPid,
                           sock             = Sock,
                           throttle         = Throttle}) ->
    %% 1. 检查节点/VHost/用户连接数限制
    ok = is_over_node_connection_limit(RanchRef),
    ok = is_over_vhost_connection_limit(VHost, User),
    ok = is_over_user_connection_limit(User),
    
    %% 2. 检查 VHost 访问权限
    ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
    ok = is_vhost_alive(VHost, User),
    
    %% 3. 更新连接记录
    NewConnection = Connection#connection{vhost = VHost},
    ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
    
    %% 4. 注册告警
    Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
    BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
    Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
    
    %% 5. 启动 Channel 监督者
    {ok, ChannelSupSupPid} =
        rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
    
    State1 = control_throttle(
               State#v1{connection_state    = running,
                        connection          = NewConnection,
                        channel_sup_sup_pid = ChannelSupSupPid,
                        throttle            = Throttle1}),
    
    %% 6. 发射连接创建事件
    Infos = augment_infos_with_user_provided_connection_name(
        [{type, network} | infos(?CREATION_EVENT_KEYS, State1)],
        State1
    ),
    rabbit_core_metrics:connection_created(proplists:get_value(pid, Infos),
                                           Infos),
    rabbit_event:notify(connection_created, Infos),
    maybe_emit_stats(State1),
    
    ?LOG_INFO(
      "connection ~tp (~ts -> ~ts): "
      "user '~ts' authenticated and granted access to vhost '~ts'",
      [self(), ConnName, peer_host_port(State1),
       Username, VHost]),
    State1.

rabbit_channel 模块详解

Channel 状态与数据结构

Channel 记录定义

-record(ch, {
    cfg = #conf{},              %% 配置记录
    limiter,                    %% 限流器
    tx = none,                  %% 事务状态:none | #tx{}
    next_tag = 1,               %% 下一个 consumer_tag
    unacked_message_q,          %% 未确认消息队列(consumer acks)
    consumer_mapping = #{},     %% consumer_tag -> {QName, ...}
    queue_consumers = #{},      %% QName -> [consumer_tag]
    confirm_enabled = false,    %% 是否启用 publisher confirms
    publish_seqno = 1,          %% 下一个发布序号
    unconfirmed,                %% 未确认的发布(publisher confirms)
    rejected = [],              %% 被拒绝的消息
    confirmed = [],             %% 已确认的消息
    direct_reply = none,        %% 直接回复队列(fast reply-to)
    delivery_flow = flow,       %% 投递流控模式
    interceptor_state,          %% 拦截器状态
    queue_states                %% 队列类型状态映射
}).

-record(conf, {
    state = starting,           %% starting | running | flow | closing
    protocol,                   %% AMQP 协议
    channel,                    %% Channel 编号
    reader_pid,                 %% Reader 进程 PID
    writer_pid,                 %% Writer 进程 PID
    conn_pid,                   %% Connection 进程 PID(通常就是 Reader)
    conn_name,                  %% 连接名称
    user,                       %% #user{} 用户
    virtual_host,               %% VHost
    most_recently_declared_queue, %% 最近声明的队列(用于无名队列)
    queue_collector_pid,        %% 队列收集器 PID
    capabilities,               %% 客户端能力
    trace_state,                %% 跟踪状态
    consumer_prefetch,          %% 消费者 prefetch 计数
    consumer_timeout,           %% 消费者超时
    authz_context,              %% 授权上下文
    max_consumers,              %% 最大消费者数
    writer_gc_threshold,        %% Writer GC 阈值
    msg_interceptor_ctx         %% 消息拦截器上下文
}).

Channel 状态转换图

stateDiagram-v2
    [*] --> starting: init
    starting --> running: 第一次方法调用
    
    running --> flow: 流控触发<br/>(queue blocked)
    flow --> running: 流控解除<br/>(queue unblocked)
    
    running --> closing: channel.close<br/>或连接关闭
    flow --> closing: 强制关闭
    
    closing --> [*]: 清理完成

核心 API

API 1: do/2 - 处理无 Content 的方法

基本信息
  • 名称do/2
  • 调用方式:异步(cast)
  • 方法范围:除 basic.publish 外的大多数方法
函数签名
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
核心代码
do(Pid, Method) ->
    gen_server2:cast(Pid, {method, Method, none, noflow}).
示例方法处理
%% queue.declare
handle_cast({method, #'queue.declare'{queue  = QueueNameBin,
                                      passive = false,
                                      durable = Durable,
                                      exclusive = Exclusive,
                                      auto_delete = AutoDelete,
                                      nowait = NoWait,
                                      arguments = Args}, _, _},
            State = #ch{cfg = #conf{virtual_host = VHostPath,
                                    user = User,
                                    conn_name = ConnName}}) ->
    %% 1. 权限检查
    QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
    check_resource_access(QueueName, configure, State),
    
    %% 2. 实际声明队列
    case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
                                  Args, none, User) of
        {new, Q} ->
            %% 新建队列
            maybe_register_exclusive_queue(Q, Exclusive, State),
            return_queue_declare_ok(QueueName, NoWait, MessageCount=0, 
                                   ConsumerCount=0, State);
        {existing, Q} ->
            %% 已存在队列
            check_exclusive_access(Q, Exclusive, State),
            return_queue_declare_ok(QueueName, NoWait, 
                                   rabbit_amqqueue:get_msg_count(Q),
                                   rabbit_amqqueue:get_consumer_count(Q), 
                                   State);
        {owner_died, Q} ->
            %% 独占队列所有者已死亡,删除并重建
            _ = rabbit_amqqueue:delete(Q, false, false, User),
            handle_method(#'queue.declare'{...}, State);
        {error, Reason} ->
            handle_exception(Reason, State)
    end.

API 2: do_flow/3 - 处理有 Content 的方法(basic.publish)

基本信息
  • 名称do_flow/3
  • 调用方式:异步(cast)
  • 主要用途basic.publish(消息发布)
函数签名
-spec do_flow(pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'.
核心代码
do_flow(Pid, Method, Content) ->
    %% 使用 credit flow 控制,避免 Channel 进程被消息淹没
    credit_flow:send(Pid),
    gen_server2:cast(Pid, {method, Method, Content, flow}).
消息发布流程
handle_cast({method, #'basic.publish'{exchange    = ExchangeNameBin,
                                       routing_key = RoutingKey,
                                       mandatory   = Mandatory,
                                       immediate   = Immediate}, 
                     Content, flow},
            State = #ch{cfg = #conf{virtual_host = VHostPath,
                                    user = User,
                                    conn_name = ConnName}}) ->
    %% 1. 构造 Exchange 资源名
    ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
    
    %% 2. 权限检查
    check_resource_access(ExchangeName, write, State),
    
    %% 3. 查找 Exchange
    case rabbit_exchange:lookup(ExchangeName) of
        {ok, Exchange} ->
            %% 4. 检查内部 Exchange
            check_internal_exchange(Exchange),
            
            %% 5. 解码消息内容
            DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
            
            %% 6. 包装消息(添加元数据)
            Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent),
            
            %% 7. 追踪(如启用)
            rabbit_trace:tap_in(Message, ConnName, State#ch.cfg#conf.trace_state),
            
            %% 8. 执行路由与投递
            {RoutingRes, DeliveredQPids} = 
                rabbit_basic:publish(Message, Exchange, Mandatory, Immediate),
            
            %% 9. 处理 mandatory / immediate
            case RoutingRes of
                routed ->
                    %% 成功路由到至少一个队列
                    ok;
                unroutable ->
                    %% 无路由目标
                    case Mandatory of
                        true  -> 
                            %% 返回消息给发布者
                            send_basic_return(Message, ExchangeName, 
                                             RoutingKey, no_route, State);
                        false -> 
                            ok  %% 丢弃
                    end
            end,
            
            %% 10. Publisher Confirms
            case State#ch.confirm_enabled of
                true ->
                    SeqNo = State#ch.publish_seqno,
                    Unconfirmed1 = rabbit_confirms:insert(SeqNo, Message, 
                                                          DeliveredQPids,
                                                          State#ch.unconfirmed),
                    noreply(State#ch{publish_seqno = SeqNo + 1,
                                     unconfirmed = Unconfirmed1});
                false ->
                    noreply(State)
            end;
        {error, not_found} ->
            handle_exception({amqp_error, not_found, 
                             "exchange '~ts' not found", [ExchangeNameBin]},
                            State)
    end.
发布时序图
sequenceDiagram
    autonumber
    participant Reader as rabbit_reader
    participant Channel as rabbit_channel
    participant Exchange as rabbit_exchange
    participant Router as rabbit_router
    participant Queue as rabbit_amqqueue
    
    Reader->>+Channel: do_flow(Pid, basic.publish, Content)
    Note over Reader,Channel: credit_flow:send(Pid)<br/>流控检查
    
    Channel->>Channel: handle_cast({method, #'basic.publish'{}, Content, flow})
    Channel->>Channel: 权限检查<br/>write on exchange
    Channel->>+Exchange: lookup(ExchangeName)
    Exchange-->>-Channel: {ok, Exchange}
    
    Channel->>Channel: 构造消息<br/>rabbit_basic:message
    Channel->>Channel: 跟踪(如启用)<br/>rabbit_trace:tap_in
    
    Channel->>+Router: rabbit_basic:publish<br/>(Message, Exchange, Mandatory, Immediate)
    Router->>+Exchange: route(Exchange, Message, #{})
    Exchange->>Exchange: 根据类型路由<br/>(direct/topic/fanout/headers)
    Exchange-->>-Router: [QueueName1, QueueName2, ...]
    
    loop 每个目标队列
        Router->>+Queue: deliver([Message], QueueName)
        Queue->>Queue: 入队(内存/磁盘)
        Queue-->>-Router: {ok, QPid} | {error, ...}
    end
    
    Router-->>-Channel: {routed, DeliveredQPids}<br/>或 {unroutable, []}
    
    alt unroutable and mandatory
        Channel->>Channel: 构造 basic.return
        Channel->>Reader: 发送 basic.return 帧
    end
    
    alt confirm_enabled
        Channel->>Channel: 记录 unconfirmed<br/>{SeqNo, QPids}
        Channel->>Channel: publish_seqno += 1
    end
    
    Channel-->>-Reader: noreply

API 3: handle消费者相关方法

basic.consume - 注册消费者
handle_method(#'basic.consume'{queue        = QueueNameBin,
                                consumer_tag = ConsumerTag,
                                no_local     = NoLocal,
                                no_ack       = NoAck,
                                exclusive    = Exclusive,
                                nowait       = NoWait,
                                arguments    = Args}, _, 
              State = #ch{cfg = #conf{user = User}}) ->
    %% 1. 解析队列名
    QueueName = expand_queue_name_shortcut(QueueNameBin, State),
    
    %% 2. 权限检查
    check_resource_access(QueueName, read, State),
    
    %% 3. 生成 consumer_tag(如未提供)
    ActualConsumerTag = case ConsumerTag of
                            <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.ctag");
                            _    -> ConsumerTag
                        end,
    
    %% 4. 检查 consumer_tag 是否已存在
    case maps:is_key(ActualConsumerTag, State#ch.consumer_mapping) of
        true  -> 
            %% consumer_tag 冲突
            precondition_failed("consumer tag '~ts' already exists", [ActualConsumerTag]);
        false ->
            %% 5. 调用队列的 consume 方法
            case rabbit_amqqueue:with(
                   QueueName,
                   fun(Q) ->
                       rabbit_queue_type:consume(
                           Q,
                           #{consumer_tag => ActualConsumerTag,
                             no_ack => NoAck,
                             channel_pid => self(),
                             limiter_pid => rabbit_limiter:pid(State#ch.limiter),
                             limiter_active => rabbit_limiter:is_enabled(State#ch.limiter),
                             prefetch_count => rabbit_limiter:get_prefetch(State#ch.limiter),
                             exclusive => Exclusive,
                             args => Args,
                             ok_msg => #'basic.consume_ok'{consumer_tag = ActualConsumerTag},
                             acting_user => User},
                           State#ch.queue_states)
                   end) of
                {ok, QPid, QState1} ->
                    %% 6. 更新消费者映射
                    ConsumerMapping1 = maps:put(ActualConsumerTag,
                                                 {QueueName, QPid, NoAck},
                                                 State#ch.consumer_mapping),
                    QueueConsumers1 = maps:update_with(
                                        QueueName,
                                        fun(CTags) -> [ActualConsumerTag | CTags] end,
                                        [ActualConsumerTag],
                                        State#ch.queue_consumers),
                    
                    %% 7. 发送 basic.consume-ok
                    ok = maybe_send_reply(NoWait, 
                                         #'basic.consume_ok'{consumer_tag = ActualConsumerTag},
                                         State),
                    
                    {noreply, State#ch{consumer_mapping = ConsumerMapping1,
                                       queue_consumers = QueueConsumers1,
                                       queue_states = QState1}};
                {error, Reason} ->
                    handle_exception(Reason, State)
            end
    end.
basic.ack - 确认消息
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
                            multiple     = Multiple}, _, State) ->
    %% 1. 查找未确认消息
    case find_unacked_messages(DeliveryTag, Multiple, State#ch.unacked_message_q) of
        {ok, MsgSeqNos, Remaining} ->
            %% 2. 通知队列确认消息
            lists:foreach(
              fun({QueueName, MsgSeqNo}) ->
                  rabbit_amqqueue:with(
                    QueueName,
                    fun(Q) ->
                        rabbit_queue_type:settle(Q, complete, [MsgSeqNo], 
                                                State#ch.queue_states)
                    end)
              end, MsgSeqNos),
            
            %% 3. 更新未确认队列
            {noreply, State#ch{unacked_message_q = Remaining}};
        {error, not_found} ->
            precondition_failed("delivery-tag ~b not found", [DeliveryTag])
    end.

Publisher Confirms 机制

启用 Confirms

handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
    State1 = State#ch{confirm_enabled = true},
    ok = maybe_send_reply(NoWait, #'confirm.select_ok'{}, State1),
    {noreply, State1}.

接收队列确认

handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
    %% 1. 标记消息为已确认
    {Confirmed, UC1} = rabbit_confirms:confirm(MsgSeqNos, QPid, UC),
    
    %% 2. 累积已确认消息
    Confirmed1 = lists:append(Confirmed, State#ch.confirmed),
    
    %% 3. 如果有足够多的确认,发送 basic.ack 给客户端
    case should_send_confirms(Confirmed1) of
        true ->
            send_confirms(Confirmed1, State),
            {noreply, State#ch{unconfirmed = UC1,
                               confirmed = []}};
        false ->
            {noreply, State#ch{unconfirmed = UC1,
                               confirmed = Confirmed1}}
    end.

确认流程图

sequenceDiagram
    autonumber
    participant Client
    participant Channel as rabbit_channel
    participant Queue as rabbit_queue
    participant Backend as Queue Backend
    
    Note over Client,Channel: 启用 Confirms
    Client->>+Channel: confirm.select
    Channel->>Channel: confirm_enabled = true
    Channel-->>-Client: confirm.select-ok
    
    Note over Client,Backend: 发布消息
    Client->>+Channel: basic.publish (SeqNo=1)
    Channel->>Channel: unconfirmed[1] = {QPid, ...}
    Channel->>+Queue: deliver(Message)
    Queue->>+Backend: publish(Message)
    Backend->>Backend: 写入存储<br/>(异步)
    Backend-->>-Queue: ok
    Queue-->>-Channel: (异步消息)
    Channel-->>-Client: (不阻塞)
    
    Note over Queue,Channel: 队列异步确认
    Backend->>Backend: 消息持久化完成
    Backend->>+Queue: notify_confirm
    Queue->>+Channel: {confirm, [SeqNo=1], QPid}
    Channel->>Channel: unconfirmed[1] -> confirmed
    Channel->>Channel: 累积确认
    
    alt 累积足够多 or 定时器触发
        Channel->>Client: basic.ack<br/>delivery-tag=1<br/>multiple=false
    end
    
    Channel-->>-Queue: ok
    Queue-->>-Backend: ok

典型场景时序图

场景 1:连接建立完整流程

sequenceDiagram
    autonumber
    participant Client as AMQP 客户端
    participant Ranch as Ranch Listener
    participant Reader as rabbit_reader
    participant Auth as 认证后端
    participant AccessCtl as rabbit_access_control
    
    Client->>+Ranch: TCP 连接<br/>SYN/SYN-ACK/ACK
    Ranch->>Ranch: accept() 接受连接
    Ranch->>+Reader: start_link(HelperSups, Ref)
    Reader->>Reader: rabbit_networking:handshake(Ref)
    Reader->>Reader: init state<br/>connection_state=pre_init
    Reader->>Reader: mainloop(..., handshake, 8)
    Reader-->>-Ranch: {ok, ReaderPid}
    Ranch-->>-Client: (连接已接受)
    
    Note over Client,Reader: 协议握手
    Client->>+Reader: "AMQP\x00\x00\x09\x01"
    Reader->>Reader: version_negotiation()
    Reader->>Client: connection.start<br/>(认证机制:PLAIN, AMQPLAIN)
    Reader->>Reader: connection_state=securing
    
    Client->>Reader: connection.start-ok<br/>mechanism=PLAIN<br/>response="\0guest\0guest"
    Reader->>+Auth: check_user("guest", [...])<br/>调用 rabbit_auth_backend_internal
    Auth->>Auth: 验证密码哈希
    Auth-->>-Reader: {ok, #user{username="guest", tags=[administrator]}}
    
    Reader->>Client: connection.tune<br/>channel-max=2047<br/>frame-max=131072<br/>heartbeat=60
    
    Client->>Reader: connection.tune-ok<br/>确认参数
    Reader->>Reader: connection_state=opening
    
    Client->>Reader: connection.open<br/>virtual-host="/"
    Reader->>Reader: 检查连接数限制<br/>节点/VHost/用户
    Reader->>+AccessCtl: check_vhost_access(User, "/", ...)
    AccessCtl->>AccessCtl: 检查用户权限<br/>(默认 guest 只能本地连接)
    AccessCtl-->>-Reader: ok
    
    Reader->>Reader: 启动 channel_sup_sup
    Reader->>Reader: 注册资源告警
    Reader->>Reader: connection_state=running
    Reader->>Client: connection.open-ok
    
    Reader->>Reader: 发射 connection_created 事件
    Reader-->>-Client: (连接就绪)

场景 2:创建 Channel 并声明队列

sequenceDiagram
    autonumber
    participant Client
    participant Reader as rabbit_reader
    participant Channel as rabbit_channel
    participant Queue as rabbit_amqqueue
    participant DB as rabbit_db_queue
    participant QueueProc as 队列进程
    
    Client->>+Reader: channel.open<br/>channel-id=1
    Reader->>Reader: process_frame(..., Channel=1)
    Reader->>Reader: create_channel(1)
    Reader->>+Channel: rabbit_channel_sup_sup:start_channel(...)
    Channel->>Channel: init(...)<br/>state=starting
    Channel-->>-Reader: {ok, ChannelPid, AState}
    Reader->>Reader: put({channel, 1}, {ChannelPid, AState})
    Reader->>Channel: (异步) channel.open
    Channel->>Channel: state=running
    Channel->>Client: channel.open-ok
    Reader-->>-Client: (channel 1 就绪)
    
    Note over Client,QueueProc: 声明队列
    Client->>+Reader: queue.declare<br/>channel-id=1<br/>queue="test_queue"<br/>durable=true
    Reader->>Reader: handle_frame(..., Channel=1)
    Reader->>Reader: 组装完整命令
    Reader->>+Channel: do(ChannelPid, #'queue.declare'{...})
    
    Channel->>Channel: handle_method(#'queue.declare'{...})
    Channel->>Channel: 权限检查<br/>configure on queue "test_queue"
    
    Channel->>+Queue: declare(QueueName, Durable, AutoDelete, Args, ...)
    Queue->>+DB: create_or_get(Q)
    
    alt 队列不存在
        DB->>DB: 写入 Mnesia/Khepri<br/>rabbit_durable_queue 表
        DB->>+QueueProc: 启动队列进程<br/>(根据队列类型)
        QueueProc->>QueueProc: 初始化队列状态
        QueueProc-->>-DB: {ok, QPid}
        DB-->>Queue: {new, Q}
    else 队列已存在
        DB->>DB: 检查参数等价性
        DB-->>Queue: {existing, Q}
    end
    
    Queue-->>-Channel: {new, Q} | {existing, Q}
    
    Channel->>Channel: 构造 queue.declare-ok<br/>message_count=0<br/>consumer_count=0
    Channel->>Client: queue.declare-ok
    
    Channel-->>-Reader: noreply
    Reader-->>-Client: (完成)

场景 3:发布消息到队列

sequenceDiagram
    autonumber
    participant Client
    participant Reader
    participant Channel
    participant Exchange
    participant Queue
    participant MsgStore as rabbit_msg_store
    
    Note over Client,Channel: 发送 basic.publish3 帧)
    Client->>+Reader: Frame 1: basic.publish<br/>exchange=""<br/>routing-key="test_queue"
    Reader->>Reader: 组装器:等待 content-header
    
    Client->>Reader: Frame 2: content-header<br/>properties, body-size=12
    Reader->>Reader: 组装器:等待 content-body
    
    Client->>Reader: Frame 3: content-body<br/>"Hello World!"
    Reader->>Reader: 组装器:命令完整
    
    Reader->>+Channel: do_flow(ChannelPid, #'basic.publish'{}, Content)
    Note over Reader,Channel: credit_flow:send(ChannelPid)<br/>流控检查
    
    Channel->>Channel: handle_method(#'basic.publish'{}, Content)
    Channel->>Channel: 权限检查<br/>write on exchange ""
    
    Channel->>+Exchange: route(ExchangeName="", Message, #{})
    Exchange->>Exchange: 默认交换器<br/>routing_key 直接映射队列名
    Exchange-->>-Channel: [<<"test_queue">>]
    
    loop 每个目标队列
        Channel->>+Queue: deliver([Message], <<"test_queue">>)
        Queue->>Queue: 检查内存/磁盘限制
        
        alt 消息小于 4KB
            Queue->>Queue: 写入内存<br/>rabbit_variable_queue
        else 消息大于 4KB
            Queue->>+MsgStore: write(MsgId, Content)
            MsgStore->>MsgStore: 批量写入磁盘<br/>(异步)
            MsgStore-->>-Queue: ok
        end
        
        Queue->>Queue: 检查是否有等待消费者
        Queue-->>-Channel: {ok, QPid}
    end
    
    alt confirm_enabled
        Channel->>Channel: unconfirmed[SeqNo] = {QPid, ...}
        Note over Queue,Channel: 异步:队列持久化完成后<br/>发送 {confirm, [SeqNo], QPid}
    end
    
    Channel-->>-Reader: noreply
    Reader-->>-Client: (不发送响应,除非 mandatory 失败)

场景 4:消费消息

sequenceDiagram
    autonumber
    participant Client
    participant Reader
    participant Channel
    participant Queue
    participant Limiter as rabbit_limiter
    
    Note over Client,Queue: 注册消费者
    Client->>+Reader: basic.consume<br/>queue="test_queue"<br/>consumer-tag=""<br/>no-ack=false
    Reader->>+Channel: do(ChannelPid, #'basic.consume'{})
    
    Channel->>Channel: 生成 consumer_tag<br/>"amq.ctag-xxx"
    Channel->>Channel: 权限检查<br/>read on queue "test_queue"
    
    Channel->>+Queue: consume(Q, #{consumer_tag, channel_pid, ...})
    Queue->>Queue: 注册消费者<br/>consumer_tag -> ChannelPid
    Queue-->>-Channel: {ok, QPid, QState}
    
    Channel->>Channel: consumer_mapping[consumer_tag] = {QueueName, QPid, NoAck}
    Channel->>Client: basic.consume-ok<br/>consumer-tag="amq.ctag-xxx"
    Channel-->>-Reader: noreply
    Reader-->>-Client: (消费者已注册)
    
    Note over Queue,Client: 队列推送消息
    Queue->>Queue: 检查消费者 prefetch
    Queue->>+Limiter: can_send(ChannelPid, ..., QPid)
    Limiter->>Limiter: 检查未确认消息数<br/>< prefetch_count?
    Limiter-->>-Queue: true
    
    Queue->>+Channel: 异步消息<br/>{deliver, consumer_tag, ack_required, Message}
    Channel->>Channel: handle_info({deliver, ...})
    Channel->>Channel: 分配 delivery_tag<br/>(递增)
    
    alt ack_required
        Channel->>Channel: unacked_message_q += {delivery_tag, QueueName, MsgId}
    end
    
    Channel->>+Reader: 发送 basic.deliver 帧
    Reader->>Client: basic.deliver<br/>consumer-tag<br/>delivery-tag=1
    Reader->>Client: content-header
    Reader->>Client: content-body<br/>"Hello World!"
    Reader-->>-Channel: ok
    Channel-->>-Queue: ok
    
    Note over Client,Queue: 客户端确认消息
    Client->>+Reader: basic.ack<br/>delivery-tag=1<br/>multiple=false
    Reader->>+Channel: do(ChannelPid, #'basic.ack'{})
    
    Channel->>Channel: find_unacked_messages(delivery_tag=1)
    Channel->>Channel: unacked_message_q -= {1, ...}
    
    Channel->>+Queue: settle(Q, complete, [MsgId])
    Queue->>Queue: 删除消息
    Queue->>+Limiter: credit_flow:ack(ChannelPid, QPid)
    Limiter-->>-Queue: ok
    Queue-->>-Channel: ok
    
    Channel-->>-Reader: noreply
    Reader-->>-Client: (确认完成)

性能优化与最佳实践

连接管理

连接池

问题:频繁建立/关闭连接开销大(TCP 握手、认证、资源分配)。

最佳实践

  • 客户端维护长连接池(推荐每个应用 1-5 个连接)
  • 连接复用:多个 Channel 共享一个连接

示例配置

%% Erlang 客户端
{ok, Connection} = amqp_connection:start(#amqp_params_network{
    host = "localhost",
    port = 5672,
    virtual_host = <<"/">>,
    username = <<"guest">>,
    password = <<"guest">>,
    heartbeat = 60,  %% 心跳间隔(秒)
    connection_timeout = 60000  %% 连接超时(毫秒)
}).

心跳配置

目的:检测僵尸连接,及时释放资源。

配置建议

  • 默认:60 秒
  • 高延迟网络:120-300 秒
  • 低延迟网络:10-30 秒

计算公式

超时时间 = heartbeat * 2

Channel 管理

Channel 数量

限制

  • 默认每个连接最多 2047 个 Channel(channel_max
  • 实际建议:每个连接 10-50 个 Channel

原因

  • 每个 Channel 是独立的 Erlang 进程(内存开销约 100KB-1MB)
  • Channel 过多导致上下文切换开销

最佳实践

  • 发布者:每个线程一个 Channel
  • 消费者:每个队列一个 Channel
  • 避免共享 Channel(非线程安全)

Channel 异常处理

问题:Channel 异常不影响连接,但会关闭 Channel。

处理策略

%% 监听 Channel 关闭
process_flag(trap_exit, true),
{ok, Channel} = amqp_connection:open_channel(Connection),

receive
    {'EXIT', Channel, Reason} ->
        %% Channel 关闭,记录日志并重新打开
        error_logger:error_msg("Channel closed: ~p", [Reason]),
        {ok, NewChannel} = amqp_connection:open_channel(Connection)
end.

消息发布优化

批量发布

问题:逐条发布消息,网络往返次数多。

优化

  • 客户端批量发送多条 basic.publish
  • RabbitMQ 批量处理(Nagle 算法)

性能提升

  • 单条发布:~5000 msg/s
  • 批量发布(100 条/批):~50000 msg/s

Publisher Confirms

使用场景:需要可靠性但不需要事务的原子性。

最佳实践

  • 异步确认:发布后不阻塞,批量等待确认
  • 窗口大小:100-1000(未确认消息数上限)

示例代码

%% 启用 confirms
#'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),

%% 发布消息
[amqp_channel:cast(Channel, 
                   #'basic.publish'{exchange = <<>>, routing_key = <<"queue">>},
                   #amqp_msg{payload = <<"data">>}) 
 || _ <- lists:seq(1, 1000)],

%% 等待确认
receive
    #'basic.ack'{delivery_tag = Tag, multiple = true} ->
        io:format("Confirmed up to ~p~n", [Tag])
end.

消费优化

Prefetch 设置

定义:Channel 一次性从队列预取的消息数量(QoS)。

配置

#'basic.qos_ok'{} = amqp_channel:call(Channel, 
                                     #'basic.qos'{prefetch_count = 100}).

选择策略

  • 低延迟网络:prefetch=1-10(避免消息堆积在客户端)
  • 高延迟网络:prefetch=100-1000(减少往返次数)
  • CPU 密集型处理:prefetch=CPU 核心数
  • I/O 密集型处理:prefetch=10-100

自动确认 vs 手动确认

模式 优点 缺点 适用场景
自动确认
no_ack=true
性能高
无需 ack 网络往返
消息可能丢失
(客户端崩溃)
日志、监控等非关键数据
手动确认
no_ack=false
可靠性高
消息不会丢失
性能稍低
需要显式 ack
交易、订单等关键数据

批量确认

问题:逐条确认消息,网络往返多。

优化

%% 批量确认(multiple=true)
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = LastTag,
                                        multiple = true}).

含义:确认 delivery_tag <= LastTag 的所有消息。

性能提升

  • 单条确认:~10000 msg/s
  • 批量确认(每 100 条):~80000 msg/s

流控与反压

连接级流控

触发条件

  • 节点内存使用超过水位线(默认 40%)
  • 节点磁盘剩余空间低于下限(默认 50MB)

行为

  • RabbitMQ 发送 connection.blocked 给客户端
  • 阻塞所有发布操作
  • 恢复后发送 connection.unblocked

客户端处理

receive
    #'connection.blocked'{reason = Reason} ->
        io:format("Connection blocked: ~s~n", [Reason]),
        %% 暂停发布,等待 unblocked
        wait_for_unblocked();
    #'connection.unblocked'{} ->
        io:format("Connection unblocked~n"),
        %% 恢复发布
        resume_publishing()
end.

Channel 级流控

机制:Credit Flow(信用流控)

原理

  • Queue → Channel:每推送一条消息消耗 1 个 credit
  • Channel → Queue:定期补充 credit
  • Credit 耗尽时 Queue 停止推送

好处

  • 防止快速队列淹没慢速消费者
  • 自动平衡推送速率

配置

%% rabbitmq.config
[{rabbit, [
    {credit_flow_default_credit, {400, 200}}  %% {初始 credit, 补充阈值}
]}].

故障排查

连接失败

症状 1:连接超时

原因

  • 网络不通(防火墙、安全组)
  • RabbitMQ 未启动
  • 端口错误

排查

# 检查 RabbitMQ 是否运行
rabbitmqctl status

# 检查监听端口
netstat -tuln | grep 5672

# 测试连接
telnet localhost 5672

症状 2:认证失败

错误信息

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. 
For details see the broker logfile.

原因

  • 用户名/密码错误
  • 用户不存在
  • guest 用户远程连接被拒绝(默认只能本地连接)

解决

# 创建用户
rabbitmqctl add_user myuser mypassword

# 设置权限
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"

# 设置管理员标签
rabbitmqctl set_user_tags myuser administrator

# 允许 guest 远程连接(不推荐,仅开发环境)
# 修改 rabbitmq.config
[{rabbit, [{loopback_users, []}]}].

症状 3:VHost 访问被拒绝

错误信息

NOT_ALLOWED - access to vhost '/' refused for user 'myuser'

解决

# 检查用户权限
rabbitmqctl list_user_permissions myuser

# 授予权限
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"

Channel 异常

症状 1:Channel 意外关闭

日志

Channel error: PRECONDITION_FAILED - ...

常见原因

  • 队列/交换器不存在
  • 参数不匹配(重复声明时)
  • 权限不足

调试

%% 捕获 Channel 关闭
receive
    {'EXIT', ChannelPid, Reason} ->
        case Reason of
            {shutdown, {server_initiated_close, Code, Text}} ->
                io:format("Channel closed by server: ~p - ~s~n", [Code, Text]);
            _ ->
                io:format("Channel closed: ~p~n", [Reason])
        end
end.

症状 2:Channel 阻塞

现象:发布消息后无响应,Channel 卡住。

原因

  • 队列达到长度限制(x-max-length
  • 内存告警触发
  • 磁盘告警触发

排查

# 检查告警
rabbitmqctl alarm_list

# 检查队列状态
rabbitmqctl list_queues name messages consumers memory

# 检查连接状态
rabbitmqctl list_connections name state channels

性能问题

症状 1:发布速率低

排查

  1. 检查是否启用 Publisher Confirms:

    • 未启用:立即返回,吞吐量高
    • 启用:等待确认,吞吐量降低但可靠性高
  2. 检查消息大小:

    • 小消息(<4KB):性能最优
    • 大消息(>1MB):需要分片,性能下降
  3. 检查持久化:

    • 非持久化:纯内存,性能最高
    • 持久化:需要磁盘写入,性能受磁盘 I/O 限制

优化

  • 批量发布
  • 使用 SSD 存储
  • 调整 queue_index_embed_msgs_below(默认 4096)

症状 2:消费速率低

排查

  1. 检查 prefetch 设置:

    • 过小:网络往返次数多
    • 过大:消息堆积在客户端
  2. 检查消费者处理时间:

    • 使用 Management UI 查看 deliver_get 速率
  3. 检查网络延迟:

    • 使用 ping 测试延迟
    • 考虑增加 prefetch

优化

  • 增加消费者数量
  • 并行处理消息
  • 批量确认

配置参考

连接相关配置

配置项 默认值 说明
tcp_listeners [5672] TCP 监听端口
ssl_listeners [] TLS 监听端口
num_acceptors.tcp 10 TCP 接受器进程数
handshake_timeout 10000 握手超时(毫秒)
heartbeat 60 心跳间隔(秒),0=禁用
frame_max 131072 最大帧大小(字节)
channel_max 2047 每连接最大 Channel 数
connection_max infinity 节点最大连接数

Channel 相关配置

配置项 默认值 说明
channel_operation_timeout 15000 Channel 操作超时(毫秒)
consumer_timeout 1800000 消费者无活动超时(30 分钟)
default_consumer_prefetch {false, 0} 默认 prefetch,{global, count}
consumer_max_per_channel infinity 每 Channel 最大消费者数

流控相关配置

配置项 默认值 说明
vm_memory_high_watermark 0.4 内存水位线(触发流控)
disk_free_limit 50MB 磁盘剩余下限(触发告警)
credit_flow_default_credit {400, 200} Credit Flow 参数

附录

Reader 进程状态查询

%% 获取连接信息
rabbit_reader:info(ReaderPid, [name, peer_host, peer_port, channels, state]).

%% 输出示例
[{name, <<"127.0.0.1:51234 -> 127.0.0.1:5672">>},
 {peer_host, <<"127.0.0.1">>},
 {peer_port, 51234},
 {channels, 5},
 {state, running}]

Channel 进程状态查询

%% 获取 Channel 信息
rabbit_channel:info(ChannelPid, [number, consumer_count, messages_unacknowledged, prefetch_count]).

%% 输出示例
[{number, 1},
 {consumer_count, 2},
 {messages_unacknowledged, 10},
 {prefetch_count, 100}]

相关模块

  • rabbit_networking:网络监听与连接管理
  • rabbit_command_assembler:AMQP 命令组装
  • rabbit_channel_sup_sup:Channel 监督者的监督者
  • rabbit_writer:帧写入器
  • rabbit_limiter:流控限流器
  • rabbit_heartbeat:心跳管理
  • rabbit_queue_collector:独占队列收集器