RabbitMQ Server 源码剖析 - 总览

0. 摘要

项目目标与核心能力

RabbitMQ 是一个功能丰富的多协议消息中间件系统,提供以下核心能力:

  • 消息路由:基于交换器(Exchange)和绑定(Binding)的灵活路由机制
  • 多种队列类型:经典队列(Classic)、仲裁队列(Quorum)、流队列(Stream)
  • 多协议支持:AMQP 0-9-1、AMQP 1.0、MQTT 3.1/3.1.1/5.0、STOMP 1.0-1.2、RabbitMQ Stream 协议
  • 高可用性:基于 Ra(Raft 实现)的集群与数据复制
  • 可扩展性:插件化架构,支持自定义交换器类型、认证后端、管理扩展等
  • 持久化存储:消息和元数据的可靠持久化机制

问题域与非目标

问题域

  • 应用间异步通信
  • 消息队列与流式数据处理
  • 工作负载分发与解耦
  • 事件驱动架构

非目标

  • 不提供事务型数据库功能
  • 不保证跨队列的全局事务一致性
  • 不直接提供数据分析能力

运行环境

  • 语言:Erlang/OTP
  • 依赖项
    • mnesiakhepri:元数据存储
    • ra:基于 Raft 的分布式一致性库
    • osiris:日志存储引擎
    • os_monsysmon_handler:系统监控
  • 部署形态
    • 单节点部署
    • 集群部署(多节点自动发现与协调)
    • 容器化部署(Docker、Kubernetes)

1. 整体架构图

flowchart TB
    Client[客户端连接<br/>AMQP/MQTT/STOMP/Stream] --> |协议帧| Listener[TCP Listener]
    Listener --> |创建连接| Reader[Connection Reader<br/>rabbit_reader]
    
    Reader --> |解析协议| Channel[Channel 进程<br/>rabbit_channel]
    Reader -.-> |AMQP 1.0| AmqpSession[AMQP 1.0 Session<br/>rabbit_amqp_session]
    
    Channel --> |声明/绑定| Exchange[Exchange<br/>rabbit_exchange]
    Channel --> |发布消息| Router[Router<br/>rabbit_router]
    Channel --> |消费消息| Queue[Queue 进程<br/>各类 queue_type]
    
    Router --> Exchange
    Exchange --> |路由规则| Binding[Binding<br/>rabbit_binding]
    Binding --> Queue
    
    Exchange --> |类型实现| ExDirect[Direct Exchange]
    Exchange --> |类型实现| ExTopic[Topic Exchange]
    Exchange --> |类型实现| ExFanout[Fanout Exchange]
    Exchange --> |类型实现| ExHeaders[Headers Exchange]
    
    Queue --> |经典队列| ClassicQ[rabbit_classic_queue<br/>rabbit_variable_queue]
    Queue --> |仲裁队列| QuorumQ[rabbit_quorum_queue<br/>rabbit_fifo Ra 机器]
    Queue --> |流队列| StreamQ[rabbit_stream_queue<br/>osiris 日志]
    
    ClassicQ --> MsgStore[Message Store<br/>rabbit_msg_store]
    ClassicQ --> QIndex[Queue Index<br/>rabbit_classic_queue_index_v2]
    QuorumQ --> Ra[Ra Consensus<br/>分布式日志]
    StreamQ --> Osiris[Osiris Log<br/>持久化日志]
    
    Channel --> |元数据操作| DB[(Database<br/>Mnesia/Khepri)]
    Exchange --> DB
    Queue --> DB
    
    subgraph 核心监督树
        RabbitSup[rabbit_sup] --> |启动| VHostSupSup[VHost Sup Sup<br/>rabbit_vhost_sup_sup]
        RabbitSup --> |启动| Networking[Networking<br/>rabbit_networking]
        RabbitSup --> |启动| NodeMonitor[Node Monitor<br/>rabbit_node_monitor]
        
        VHostSupSup --> |每个 vhost| VHostSup[VHost Sup<br/>rabbit_vhost_sup]
        VHostSup --> |每个队列| QueueSup[Queue Sup<br/>rabbit_amqqueue_sup]
    end
    
    subgraph 插件系统
        Management[Management API<br/>rabbitmq_management]
        MQTT[MQTT Adapter<br/>rabbitmq_mqtt]
        STOMP[STOMP Adapter<br/>rabbitmq_stomp]
        StreamPlugin[Stream Plugin<br/>rabbitmq_stream]
        Federation[Federation<br/>rabbitmq_federation]
        Shovel[Shovel<br/>rabbitmq_shovel]
        
        Management -.-> Channel
        MQTT -.-> Channel
        STOMP -.-> Channel
        StreamPlugin -.-> Queue
    end

架构图说明

组件职责与耦合关系

  1. 连接层(Connection Layer)

    • rabbit_reader:处理客户端连接,解析 AMQP 0-9-1 协议帧,分发到 Channel
    • rabbit_amqp_reader:处理 AMQP 1.0 连接
    • 协议适配器(MQTT、STOMP):将其他协议转换为内部消息格式
  2. 通道层(Channel Layer)

    • rabbit_channel:AMQP 0-9-1 通道实现,处理方法调用(声明、绑定、发布、消费)
    • rabbit_amqp_session:AMQP 1.0 会话实现
    • 每个通道独立管理事务、确认、消费者
  3. 路由层(Routing Layer)

    • rabbit_exchange:交换器管理,委托给具体类型模块
    • 交换器类型模块(rabbit_exchange_type_directrabbit_exchange_type_topic 等):实现路由逻辑
    • rabbit_router:路由协调,处理备用交换器
    • rabbit_binding:绑定关系管理
  4. 队列层(Queue Layer)

    • rabbit_amqqueue:队列抽象层,统一接口
    • 队列类型实现:
      • rabbit_classic_queue:经典队列,使用 rabbit_variable_queue 作为后端
      • rabbit_quorum_queue:基于 Ra 的仲裁队列
      • rabbit_stream_queue:基于 osiris 的流队列
  5. 存储层(Storage Layer)

    • rabbit_msg_store:共享消息存储(用于大消息)
    • rabbit_classic_queue_index_v2:队列索引(元数据)
    • rabbit_classic_queue_store_v2:队列专属消息存储(小消息)
    • Mnesia/Khepri:元数据存储(队列、交换器、绑定、用户等)
  6. 监督树(Supervision Tree)

    • rabbit_sup:根监督者
    • rabbit_vhost_sup_sup:虚拟主机监督者的监督者
    • 各级监督者确保进程容错与重启

数据流与控制流

  • 发布路径(同步):

    1. 客户端发布消息 → Reader 解析帧 → Channel 处理 basic.publish
    2. Channel 调用 rabbit_exchange:route() → Exchange 类型模块计算目标队列
    3. Router 遍历目标队列,调用 Queue 的 deliver() 方法
    4. Queue 根据类型执行不同存储逻辑(内存/磁盘/Raft 日志)
    5. 返回确认(publisher confirms)到 Channel → 返回给客户端
  • 消费路径(异步):

    1. 客户端订阅队列 → Channel 注册消费者到 Queue
    2. Queue 向消费者推送消息(push 模式)或等待拉取(pull 模式)
    3. Channel 通过 Writer 发送 basic.deliver 帧给客户端
    4. 客户端确认(ack)→ Channel → Queue 删除或重新入队消息
  • 元数据操作路径(同步):

    1. 客户端声明队列/交换器/绑定 → Channel
    2. Channel 调用 rabbit_db_* 模块操作数据库
    3. 数据库(Mnesia/Khepri)执行事务性写入
    4. 返回结果给 Channel → 客户端

跨进程/跨线程路径

  • 跨进程

    • Reader → Channel:通过 Erlang 消息传递(gen_server:call/cast
    • Channel → Queue:异步消息 + 同步调用混合
    • Queue → Queue(仲裁队列):Ra 协议(Raft 复制)
    • 集群节点间:Erlang 分布式消息
  • 跨线程

    • Erlang 使用轻量级进程(协程),调度器在多核上分配
    • 脏调度器(dirty scheduler)用于 NIF 调用

高可用与扩展性

  • 高可用

    • 仲裁队列通过 Ra(Raft)实现多副本数据一致性
    • 流队列通过 osiris 副本机制实现持久化
    • 集群节点监控(rabbit_node_monitor)与自动恢复
  • 扩展性

    • 水平扩展:增加集群节点
    • 队列分片:通过 Shovel、Federation 分布负载
    • 插件机制:动态加载功能模块

2. 全局时序图(主要业务闭环)

消息发布-路由-消费完整流程

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Reader as rabbit_reader
    participant Channel as rabbit_channel
    participant Exchange as rabbit_exchange
    participant Router as rabbit_router
    participant Queue as Queue 进程
    participant QBackend as Queue Backend<br/>(variable_queue/ra)
    participant Consumer as Consumer 进程<br/>(Channel)
    
    %% 发布阶段
    Note over Client,Queue: === 消息发布阶段 ===
    Client->>+Reader: basic.publish 
    Reader->>Reader: 解析帧
    Reader->>+Channel: 转发方法调用
    Channel->>Channel: 权限检查
    Channel->>+Exchange: route(Message, Opts)
    Exchange->>Exchange: 根据 type 调用路由函数
    Exchange->>-Channel: 返回目标队列列表
    
    Channel->>+Router: deliver_to_queues(Queues, Msg)
    loop 每个目标队列
        Router->>+Queue: deliver(Msg)
        Queue->>+QBackend: publish(Msg)
        QBackend->>QBackend: 写入存储<br/>(内存/磁盘/Raft)
        QBackend-->>-Queue: ok
        Queue-->>-Router: ok
    end
    Router-->>-Channel: delivery_result
    
    Channel->>Channel: 记录 confirms(如需要)
    Channel-->>-Reader: basic.ack (publisher confirm)
    Reader-->>-Client: 确认帧
    
    %% 消费阶段
    Note over Client,Consumer: === 消息消费阶段 ===
    Queue->>Queue: 检查待分发消息
    Queue->>+Consumer: push message
    Consumer->>Consumer: 格式化消息
    Consumer->>Reader: basic.deliver 
    Reader->>Client: 推送消息
    
    Client->>+Reader: basic.ack 
    Reader->>+Consumer: 确认消息
    Consumer->>+Queue: settle(ack, MsgId)
    Queue->>+QBackend: ack(MsgId)
    QBackend->>QBackend: 删除/标记消息
    QBackend-->>-Queue: ok
    Queue-->>-Consumer: ok
    Consumer-->>-Reader: ok
    Reader-->>-Client: ok

时序图说明

关键步骤与职责

  1. 协议解析(步骤 1-3):

    • Reader 负责 TCP 连接与帧解析,不处理业务逻辑
    • 每个 Channel 独立处理,避免连接级阻塞
  2. 权限与路由(步骤 4-7):

    • Channel 执行用户权限检查(rabbit_access_control
    • Exchange 根据类型(direct/topic/fanout/headers)计算目标队列
    • 支持多播(一个消息路由到多个队列)
  3. 消息持久化(步骤 8-12):

    • Queue Backend 根据队列类型执行不同策略:
      • Classic Queue:内存优先,内存压力时分页到磁盘
      • Quorum Queue:通过 Ra(Raft)同步到多个副本
      • Stream Queue:追加到 osiris 日志
    • 持久化是异步批量操作,但 confirms 等待持久化完成
  4. Publisher Confirms(步骤 13-15):

    • 可选特性,客户端通过 confirm.select 启用
    • Channel 等待所有目标队列返回确认后才返回
  5. 消费推送(步骤 16-19):

    • Queue 根据消费者 QoS(prefetch)主动推送
    • 使用信用流控(credit flow)防止内存溢出
    • 消息在 Channel 缓冲,批量发送给客户端
  6. 消费确认(步骤 20-26):

    • 客户端确认后,Queue 才真正删除消息(或标记已消费)
    • 支持批量 ack、nack、reject
    • 未确认消息在连接断开时重新入队

边界条件

  • 幂等性

    • 队列/交换器声明是幂等的(参数相同时)
    • 消息发布不幂等,需业务层去重
    • 消费确认基于 delivery_tag(递增序号),可重复确认(忽略)
  • 并发

    • 单个 Channel 串行处理方法(避免竞态)
    • 不同 Channel 并行操作
    • Queue 进程内部串行(顺序保证)
  • 超时

    • 连接心跳超时(可配置,默认 60 秒)
    • Channel 操作无超时(阻塞式),但 Queue 可能阻塞 Channel
    • 消费者超时(consumer_timeout,默认 30 分钟)
  • 顺序性

    • 单个 Queue 内保证 FIFO(优先级队列例外)
    • 跨 Queue 无顺序保证
    • 仲裁队列提供严格顺序

异常与回退

  • 路由失败

    • 无目标队列时,根据 mandatory 标志返回 basic.return 或丢弃
    • 支持备用交换器(alternate-exchange)
  • 队列满

    • 达到长度限制(x-max-length)时,删除队首消息或拒绝新消息
    • 达到内存上限时触发流控,阻塞发布者
  • 存储失败

    • 磁盘满时拒绝持久化消息
    • 触发全局 alarm,阻塞所有发布操作
  • 消费失败

    • 客户端 nack 消息可选重新入队(requeue)
    • 死信队列(DLX)处理多次失败的消息
  • 节点故障

    • Classic Queue:数据丢失(非 HA 模式)
    • Quorum Queue:自动故障转移到副本
    • Stream Queue:副本接管

性能要点

  • 零拷贝

    • 消息在 Erlang 进程间通过引用传递(小消息)
    • 大消息通过共享 binary 避免拷贝
  • 批处理

    • 消息存储批量写入磁盘(flush 间隔可配置)
    • 网络帧批量发送(Nagle 算法可选)
  • 信用流控

    • Queue → Channel → Reader 层层背压
    • 防止生产者过快导致内存溢出
  • 资源上界

    • 每个连接的 Channel 数量上限(默认 2047)
    • 队列消息数/字节数上限(可配置)
    • 全局内存水位线(默认 0.4)

3. 模块边界与交互图

核心模块清单

模块名称 职责 对外 API 对内依赖
rabbit 应用入口与启动协调 start/0, stop/0, boot/0 rabbit_sup, rabbit_boot_steps
rabbit_sup 根监督者 start_child/1 各子监督者
rabbit_reader AMQP 0-9-1 连接处理 - rabbit_channel, rabbit_writer
rabbit_channel AMQP 0-9-1 通道 do/2, do_flow/3 rabbit_exchange, rabbit_amqqueue
rabbit_exchange 交换器管理 declare/7, route/3, delete/3 rabbit_db_exchange, 类型模块
rabbit_router 路由协调 deliver/2, match_bindings/2 rabbit_exchange, rabbit_amqqueue
rabbit_amqqueue 队列抽象层 declare/2, lookup/1, deliver/2 队列类型模块
rabbit_classic_queue 经典队列实现 - rabbit_variable_queue
rabbit_quorum_queue 仲裁队列实现 - rabbit_fifo (Ra)
rabbit_stream_queue 流队列实现 - osiris
rabbit_variable_queue 经典队列存储后端 publish/4, fetch/2 rabbit_msg_store, queue_index
rabbit_msg_store 共享消息存储 write/3, read/2 rabbit_file
rabbit_db_* 数据库抽象层 - mnesia 或 khepri
rabbit_auth_backend_internal 内部认证 check_user/2 rabbit_db_user
rabbit_access_control 权限控制 check_vhost_access/3 认证后端

插件模块清单

插件模块 职责 协议/接口
rabbitmq_management HTTP 管理 API RESTful HTTP API
rabbitmq_management_agent 管理数据收集 内部事件订阅
rabbitmq_mqtt MQTT 协议适配 MQTT 3.1/3.1.1/5.0
rabbitmq_stomp STOMP 协议适配 STOMP 1.0-1.2
rabbitmq_stream Stream 协议 RabbitMQ Stream 协议
rabbitmq_web_mqtt MQTT over WebSocket MQTT + WebSocket
rabbitmq_web_stomp STOMP over WebSocket STOMP + WebSocket
rabbitmq_federation 集群联邦 AMQP 0-9-1
rabbitmq_shovel 消息搬运 AMQP 0-9-1/1.0
rabbitmq_auth_backend_ldap LDAP 认证 LDAP
rabbitmq_auth_backend_http HTTP 认证 HTTP
rabbitmq_auth_backend_oauth2 OAuth2 认证 OAuth2/JWT
rabbitmq_consistent_hash_exchange 一致性哈希交换器 Exchange 类型
rabbitmq_federation_management 联邦管理 Management 扩展

模块交互矩阵

调用方 ↓ / 被调方 → rabbit_channel rabbit_exchange rabbit_amqqueue rabbit_msg_store rabbit_db_*
rabbit_reader 同步调用 - - - -
rabbit_channel - 同步调用 同步/异步 - 同步调用
rabbit_exchange - - 通过 Router - 同步调用
rabbit_amqqueue 异步消息 - - - 同步调用
rabbit_classic_queue 异步消息 - - 同步调用 -
rabbit_quorum_queue 异步消息 - - - -

交互说明

  1. 同步调用

    • Reader → Channel:gen_server:call/2(RPC 风格)
    • Channel → Exchange/DB:函数调用(同进程或跨进程 RPC)
    • 阻塞调用,等待返回
  2. 异步消息

    • Queue → Channel(消费推送):! 消息发送
    • Event 通知:rabbit_event:notify/2
    • 非阻塞,不等待响应
  3. 共享存储

    • 元数据通过 Mnesia/Khepri ETS 表共享
    • 消息存储通过文件句柄共享(协调访问)
  4. 订阅/发布

    • 统计信息:rabbit_event:notify → Management 订阅
    • 集群事件:rabbit_node_monitor 发布节点变化
  5. 错误语义

    • Channel 异常不影响 Connection(隔离)
    • Queue 进程崩溃时 Channel 收到 {'DOWN', ...} 消息
    • 数据库事务失败回滚,返回错误给调用方
  6. 一致性要求

    • 元数据操作(声明/删除):强一致性(Mnesia 事务)
    • 消息发布:最终一致性(Queue 异步确认)
    • 仲裁队列:Raft 一致性(线性化)

4. 关键设计与权衡

4.1 数据一致性

强一致性(元数据)

  • 实现

    • 使用 Mnesia 分布式事务(传统模式)
    • 使用 Khepri(基于 Ra/Raft)实现强一致性元数据存储
  • 权衡

    • 优势:声明操作保证全集群可见,避免脑裂
    • 劣势:写入延迟较高(网络往返 + 多数派确认)

最终一致性(消息)

  • 实现

    • 经典队列:单节点存储,无跨节点复制
    • 仲裁队列:Raft 复制,强一致性
    • 流队列:副本异步复制,leader 串行写入
  • 权衡

    • Classic:高性能,低延迟,无持久性保证(节点故障丢失)
    • Quorum:强持久性,中等延迟(2-10ms),需多数派存活
    • Stream:高吞吐,追加写入,适合日志型工作负载

4.2 事务边界

  • AMQP 事务tx.select):

    • 仅覆盖单个 Channel 的发布和确认
    • 不跨 Queue,不跨 Connection
    • 实现:Channel 内缓存操作,tx.commit 时批量执行
  • Publisher Confirms

    • 单向确认机制(比事务更轻量)
    • 异步确认,不阻塞后续发布
    • 适合高吞吐场景
  • 权衡

    • 事务保证原子性但降低吞吐(串行化)
    • Confirms 提供可靠性但不保证原子性

4.3 锁与并发策略

进程级隔离

  • 设计

    • 每个 Queue、Exchange、Channel 是独立的 Erlang 进程
    • 进程内串行处理消息,天然避免锁竞争
  • 优势

    • 无共享状态,无锁设计
    • 故障隔离(进程崩溃不影响其他)

共享资源

  • Mnesia 表

    • 读操作并发(脏读)
    • 写操作通过事务序列化
  • Message Store

    • 文件锁保护并发写入
    • 引用计数管理消息生命周期

4.4 性能关键路径

发布路径优化

  1. 路由缓存

    • Topic Exchange 使用前缀树(Trie)加速匹配
    • Direct Exchange 使用哈希表 O(1) 查找
  2. 批量操作

    • 消息批量写入磁盘(默认 100ms 或 128 条)
    • 批量确认(一次确认多条消息)
  3. 内存优先

    • 小消息完全在内存中路由和存储
    • 大消息引用传递,避免拷贝

消费路径优化

  1. 预取(Prefetch)

    • Queue 一次性推送多条消息到 Channel
    • 减少往返次数
  2. 信用流控

    • 动态调整推送速率
    • 避免内存溢出

4.5 可观测性

指标(Metrics)

  • 连接级:连接数、通道数、发布/消费速率
  • 队列级:消息数、消费者数、内存占用、磁盘读写
  • 节点级:内存/磁盘使用率、Erlang 进程数、文件句柄数

跟踪(Tracing)

  • Firehose:抓取所有发布/消费消息
  • 用户级 Trace:选择性跟踪特定队列/交换器

日志(Logging)

  • 结构化日志:使用 Erlang logger,支持 JSON 格式
  • 日志级别:debug、info、warning、error、critical

4.6 配置项(影响行为)

配置项 默认值 影响
vm_memory_high_watermark 0.4 内存水位线,超过阻塞发布
disk_free_limit 50MB 磁盘剩余空间下限,低于阻塞
channel_max 2047 单连接最大 Channel 数
heartbeat 60s 连接心跳间隔
consumer_timeout 30min 消费者无活动超时
queue_index_embed_msgs_below 4096 小消息嵌入索引的阈值
msg_store_file_size_limit 16MB 消息存储文件大小

5. 典型使用示例与最佳实践

示例 1:最小可运行入口

启动 RabbitMQ

# 方式 1:使用默认配置启动
rabbitmq-server

# 方式 2:前台启动(开发调试)
rabbitmq-server -detached

# 方式 3:Erlang shell 启动(高级)
erl -pa deps/*/ebin -s rabbit

基本操作(使用 Erlang shell)

%% 连接到节点
{ok, Conn} = amqp_connection:start(#amqp_params_direct{}).

%% 打开通道
{ok, Channel} = amqp_connection:open_channel(Conn).

%% 声明队列
QDeclare = #'queue.declare'{queue = <<"test_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare).

%% 发布消息
Publish = #'basic.publish'{exchange = <<>>, routing_key = <<"test_queue">>},
Msg = #amqp_msg{payload = <<"Hello RabbitMQ">>},
ok = amqp_channel:cast(Channel, Publish, Msg).

%% 消费消息
Subscribe = #'basic.consume'{queue = <<"test_queue">>},
#'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()).

%% 接收消息
receive
    {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
        io:format("Received: ~p~n", [Payload])
end.

%% 关闭连接
amqp_connection:close(Conn).

示例 2:扩展点/插件接入

自定义 Exchange 类型

  1. 创建模块 my_exchange.erl
-module(my_exchange).
-behaviour(rabbit_exchange_type).

-export([description/0, serialise_events/0, route/2, route/3,
         validate/1, validate_binding/2, create/2, delete/2,
         policy_changed/2, add_binding/3, remove_bindings/3,
         assert_args_equivalence/2, info/1, info/2]).

%% 描述
description() ->
    [{description, <<"Custom exchange type">>}].

serialise_events() -> false.

%% 核心路由逻辑
route(#exchange{name = XName}, Msg, _Opts) ->
    %% 自定义路由算法
    RKey = mc:routing_key(Msg),
    %% 示例:哈希路由到特定队列
    HashValue = erlang:phash2(RKey, 10),
    TargetQueue = rabbit_misc:r(XName, queue, integer_to_binary(HashValue)),
    [TargetQueue].

%% 其他回调(省略实现)
validate(_X) -> ok.
validate_binding(_X, _B) -> ok.
create(_Serial, _X) -> ok.
delete(_Serial, _X) -> ok.
policy_changed(_X1, _X2) -> ok.
add_binding(_Serial, _X, _B) -> ok.
remove_bindings(_Serial, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
    rabbit_exchange:assert_args_equivalence(X, Args).
info(_X) -> [].
info(_X, _) -> [].
  1. 注册 Exchange 类型(在插件的 boot step 中):
-rabbit_boot_step({my_exchange_type,
                   [{description, "custom exchange type"},
                    {mfa, {rabbit_registry, register,
                           [exchange, <<"x-custom">>, my_exchange]}},
                    {requires, rabbit_registry},
                    {enables, kernel_ready}]}).
  1. 使用自定义类型:
%% 声明自定义类型的 exchange
ExDeclare = #'exchange.declare'{
    exchange = <<"my_custom_exchange">>,
    type = <<"x-custom">>
},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare).

自定义认证后端

  1. 实现 rabbit_authn_backend 行为:
-module(my_auth_backend).
-behaviour(rabbit_authn_backend).
-behaviour(rabbit_authz_backend).

-export([user_login_authentication/2, user_login_authorization/2,
         check_vhost_access/3, check_resource_access/4, check_topic_access/4,
         state_can_expire/0]).

%% 认证
user_login_authentication(Username, AuthProps) ->
    %% 从外部系统验证用户(例如 HTTP API)
    case external_api:verify_user(Username, AuthProps) of
        {ok, UserData} ->
            {ok, #auth_user{username = Username,
                           tags = proplists:get_value(tags, UserData, [])}};
        {error, _} ->
            {refused, "Authentication failed", []}
    end.

%% 授权
user_login_authorization(Username, _AuthProps) ->
    {ok, _Impl} = application:get_env(my_plugin, authz_impl),
    {ok, {Username, _Impl}}.

check_vhost_access(#auth_user{username = Username}, VHost, _Sock) ->
    %% 检查虚拟主机访问权限
    external_api:check_vhost(Username, VHost).

%% 其他授权回调(省略)
check_resource_access(_User, _Resource, _Permission, _Context) ->
    true.

check_topic_access(_User, _Resource, _Permission, _Context) ->
    true.

state_can_expire() -> false.
  1. 配置使用自定义认证:
%% rabbitmq.config
[
  {rabbit, [
    {auth_backends, [my_auth_backend]}
  ]}
].

示例 3:规模化/上线注意事项

集群部署

  1. 节点发现配置(使用 Consul):
%% rabbitmq.conf
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = 127.0.0.1
cluster_formation.consul.port = 8500
cluster_formation.consul.svc = rabbitmq
cluster_formation.node_cleanup.only_log_warning = true
  1. 资源限制
%% rabbitmq.conf
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 10GB
total_memory_available_override_value = 16GB
  1. 队列类型选择
    • Classic Queue:低延迟,单节点,适合临时消息
    • Quorum Queue:高可用,多副本,适合关键业务
    • Stream Queue:高吞吐,持久化,适合日志/审计

性能调优

  1. 连接池

    • 客户端维护连接池(推荐每个应用 1-5 个连接)
    • 每个连接使用多个 Channel(避免 Channel 数过多)
  2. 消息持久化

    • 关键消息:delivery_mode = 2(持久化)
    • 临时消息:delivery_mode = 1(非持久化)
  3. Prefetch 设置

    • 默认 basic.qos(prefetch_count=10)
    • 高延迟网络增大 prefetch(如 100)
    • 低延迟网络减小 prefetch(如 1-10)
  4. 批量操作

    • 批量发布(一次性发送多条消息)
    • 批量确认(multiple = true

监控与告警

  1. 关键指标

    • 队列消息堆积(messages_ready > 阈值)
    • 内存使用率(mem_used / mem_limit
    • 磁盘剩余空间(disk_free < disk_free_limit
    • 连接数/通道数异常增长
  2. 告警配置

# 使用 rabbitmqctl 设置内存告警
rabbitmqctl set_vm_memory_high_watermark 0.5

# 检查告警状态
rabbitmqctl alarm_list
  1. 日志级别
# 运行时调整日志级别
rabbitmqctl set_log_level debug

# 恢复默认
rabbitmqctl set_log_level info

附录:术语表

术语 解释
AMQP Advanced Message Queuing Protocol,高级消息队列协议
Exchange 交换器,消息路由实体
Binding 绑定,连接 Exchange 和 Queue 的路由规则
Queue 队列,消息存储实体
Channel 通道,AMQP 连接内的逻辑通道
VHost 虚拟主机,多租户隔离单元
Mnesia Erlang 分布式数据库
Khepri 基于 Ra(Raft)的元数据存储
Ra Erlang 实现的 Raft 一致性算法库
Osiris 高性能日志存储引擎
Quorum Queue 仲裁队列,基于 Ra 的多副本队列
Stream Queue 流队列,基于 osiris 的追加日志队列
Classic Queue 经典队列,传统的单节点队列
DLX Dead Letter Exchange,死信交换器
TTL Time To Live,消息生存时间
QoS Quality of Service,服务质量(Prefetch)
Ack Acknowledge,消息确认
Nack Negative Acknowledge,消息拒绝