RabbitMQ Server 源码剖析 - 总览
0. 摘要
项目目标与核心能力
RabbitMQ 是一个功能丰富的多协议消息中间件系统,提供以下核心能力:
- 消息路由:基于交换器(Exchange)和绑定(Binding)的灵活路由机制
- 多种队列类型:经典队列(Classic)、仲裁队列(Quorum)、流队列(Stream)
- 多协议支持:AMQP 0-9-1、AMQP 1.0、MQTT 3.1/3.1.1/5.0、STOMP 1.0-1.2、RabbitMQ Stream 协议
- 高可用性:基于 Ra(Raft 实现)的集群与数据复制
- 可扩展性:插件化架构,支持自定义交换器类型、认证后端、管理扩展等
- 持久化存储:消息和元数据的可靠持久化机制
问题域与非目标
问题域:
- 应用间异步通信
- 消息队列与流式数据处理
- 工作负载分发与解耦
- 事件驱动架构
非目标:
- 不提供事务型数据库功能
- 不保证跨队列的全局事务一致性
- 不直接提供数据分析能力
运行环境
- 语言:Erlang/OTP
- 依赖项:
mnesia或khepri:元数据存储ra:基于 Raft 的分布式一致性库osiris:日志存储引擎os_mon、sysmon_handler:系统监控
- 部署形态:
- 单节点部署
- 集群部署(多节点自动发现与协调)
- 容器化部署(Docker、Kubernetes)
1. 整体架构图
flowchart TB
Client[客户端连接<br/>AMQP/MQTT/STOMP/Stream] --> |协议帧| Listener[TCP Listener]
Listener --> |创建连接| Reader[Connection Reader<br/>rabbit_reader]
Reader --> |解析协议| Channel[Channel 进程<br/>rabbit_channel]
Reader -.-> |AMQP 1.0| AmqpSession[AMQP 1.0 Session<br/>rabbit_amqp_session]
Channel --> |声明/绑定| Exchange[Exchange<br/>rabbit_exchange]
Channel --> |发布消息| Router[Router<br/>rabbit_router]
Channel --> |消费消息| Queue[Queue 进程<br/>各类 queue_type]
Router --> Exchange
Exchange --> |路由规则| Binding[Binding<br/>rabbit_binding]
Binding --> Queue
Exchange --> |类型实现| ExDirect[Direct Exchange]
Exchange --> |类型实现| ExTopic[Topic Exchange]
Exchange --> |类型实现| ExFanout[Fanout Exchange]
Exchange --> |类型实现| ExHeaders[Headers Exchange]
Queue --> |经典队列| ClassicQ[rabbit_classic_queue<br/>rabbit_variable_queue]
Queue --> |仲裁队列| QuorumQ[rabbit_quorum_queue<br/>rabbit_fifo Ra 机器]
Queue --> |流队列| StreamQ[rabbit_stream_queue<br/>osiris 日志]
ClassicQ --> MsgStore[Message Store<br/>rabbit_msg_store]
ClassicQ --> QIndex[Queue Index<br/>rabbit_classic_queue_index_v2]
QuorumQ --> Ra[Ra Consensus<br/>分布式日志]
StreamQ --> Osiris[Osiris Log<br/>持久化日志]
Channel --> |元数据操作| DB[(Database<br/>Mnesia/Khepri)]
Exchange --> DB
Queue --> DB
subgraph 核心监督树
RabbitSup[rabbit_sup] --> |启动| VHostSupSup[VHost Sup Sup<br/>rabbit_vhost_sup_sup]
RabbitSup --> |启动| Networking[Networking<br/>rabbit_networking]
RabbitSup --> |启动| NodeMonitor[Node Monitor<br/>rabbit_node_monitor]
VHostSupSup --> |每个 vhost| VHostSup[VHost Sup<br/>rabbit_vhost_sup]
VHostSup --> |每个队列| QueueSup[Queue Sup<br/>rabbit_amqqueue_sup]
end
subgraph 插件系统
Management[Management API<br/>rabbitmq_management]
MQTT[MQTT Adapter<br/>rabbitmq_mqtt]
STOMP[STOMP Adapter<br/>rabbitmq_stomp]
StreamPlugin[Stream Plugin<br/>rabbitmq_stream]
Federation[Federation<br/>rabbitmq_federation]
Shovel[Shovel<br/>rabbitmq_shovel]
Management -.-> Channel
MQTT -.-> Channel
STOMP -.-> Channel
StreamPlugin -.-> Queue
end
架构图说明
组件职责与耦合关系
-
连接层(Connection Layer)
rabbit_reader:处理客户端连接,解析 AMQP 0-9-1 协议帧,分发到 Channelrabbit_amqp_reader:处理 AMQP 1.0 连接- 协议适配器(MQTT、STOMP):将其他协议转换为内部消息格式
-
通道层(Channel Layer)
rabbit_channel:AMQP 0-9-1 通道实现,处理方法调用(声明、绑定、发布、消费)rabbit_amqp_session:AMQP 1.0 会话实现- 每个通道独立管理事务、确认、消费者
-
路由层(Routing Layer)
rabbit_exchange:交换器管理,委托给具体类型模块- 交换器类型模块(
rabbit_exchange_type_direct、rabbit_exchange_type_topic等):实现路由逻辑 rabbit_router:路由协调,处理备用交换器rabbit_binding:绑定关系管理
-
队列层(Queue Layer)
rabbit_amqqueue:队列抽象层,统一接口- 队列类型实现:
rabbit_classic_queue:经典队列,使用rabbit_variable_queue作为后端rabbit_quorum_queue:基于 Ra 的仲裁队列rabbit_stream_queue:基于 osiris 的流队列
-
存储层(Storage Layer)
rabbit_msg_store:共享消息存储(用于大消息)rabbit_classic_queue_index_v2:队列索引(元数据)rabbit_classic_queue_store_v2:队列专属消息存储(小消息)- Mnesia/Khepri:元数据存储(队列、交换器、绑定、用户等)
-
监督树(Supervision Tree)
rabbit_sup:根监督者rabbit_vhost_sup_sup:虚拟主机监督者的监督者- 各级监督者确保进程容错与重启
数据流与控制流
-
发布路径(同步):
- 客户端发布消息 → Reader 解析帧 → Channel 处理
basic.publish - Channel 调用
rabbit_exchange:route()→ Exchange 类型模块计算目标队列 - Router 遍历目标队列,调用 Queue 的
deliver()方法 - Queue 根据类型执行不同存储逻辑(内存/磁盘/Raft 日志)
- 返回确认(publisher confirms)到 Channel → 返回给客户端
- 客户端发布消息 → Reader 解析帧 → Channel 处理
-
消费路径(异步):
- 客户端订阅队列 → Channel 注册消费者到 Queue
- Queue 向消费者推送消息(push 模式)或等待拉取(pull 模式)
- Channel 通过 Writer 发送
basic.deliver帧给客户端 - 客户端确认(ack)→ Channel → Queue 删除或重新入队消息
-
元数据操作路径(同步):
- 客户端声明队列/交换器/绑定 → Channel
- Channel 调用
rabbit_db_*模块操作数据库 - 数据库(Mnesia/Khepri)执行事务性写入
- 返回结果给 Channel → 客户端
跨进程/跨线程路径
-
跨进程:
- Reader → Channel:通过 Erlang 消息传递(
gen_server:call/cast) - Channel → Queue:异步消息 + 同步调用混合
- Queue → Queue(仲裁队列):Ra 协议(Raft 复制)
- 集群节点间:Erlang 分布式消息
- Reader → Channel:通过 Erlang 消息传递(
-
跨线程:
- Erlang 使用轻量级进程(协程),调度器在多核上分配
- 脏调度器(dirty scheduler)用于 NIF 调用
高可用与扩展性
-
高可用:
- 仲裁队列通过 Ra(Raft)实现多副本数据一致性
- 流队列通过 osiris 副本机制实现持久化
- 集群节点监控(
rabbit_node_monitor)与自动恢复
-
扩展性:
- 水平扩展:增加集群节点
- 队列分片:通过 Shovel、Federation 分布负载
- 插件机制:动态加载功能模块
2. 全局时序图(主要业务闭环)
消息发布-路由-消费完整流程
sequenceDiagram
autonumber
participant Client as 客户端
participant Reader as rabbit_reader
participant Channel as rabbit_channel
participant Exchange as rabbit_exchange
participant Router as rabbit_router
participant Queue as Queue 进程
participant QBackend as Queue Backend<br/>(variable_queue/ra)
participant Consumer as Consumer 进程<br/>(Channel)
%% 发布阶段
Note over Client,Queue: === 消息发布阶段 ===
Client->>+Reader: basic.publish 帧
Reader->>Reader: 解析帧
Reader->>+Channel: 转发方法调用
Channel->>Channel: 权限检查
Channel->>+Exchange: route(Message, Opts)
Exchange->>Exchange: 根据 type 调用路由函数
Exchange->>-Channel: 返回目标队列列表
Channel->>+Router: deliver_to_queues(Queues, Msg)
loop 每个目标队列
Router->>+Queue: deliver(Msg)
Queue->>+QBackend: publish(Msg)
QBackend->>QBackend: 写入存储<br/>(内存/磁盘/Raft)
QBackend-->>-Queue: ok
Queue-->>-Router: ok
end
Router-->>-Channel: delivery_result
Channel->>Channel: 记录 confirms(如需要)
Channel-->>-Reader: basic.ack (publisher confirm)
Reader-->>-Client: 确认帧
%% 消费阶段
Note over Client,Consumer: === 消息消费阶段 ===
Queue->>Queue: 检查待分发消息
Queue->>+Consumer: push message
Consumer->>Consumer: 格式化消息
Consumer->>Reader: basic.deliver 帧
Reader->>Client: 推送消息
Client->>+Reader: basic.ack 帧
Reader->>+Consumer: 确认消息
Consumer->>+Queue: settle(ack, MsgId)
Queue->>+QBackend: ack(MsgId)
QBackend->>QBackend: 删除/标记消息
QBackend-->>-Queue: ok
Queue-->>-Consumer: ok
Consumer-->>-Reader: ok
Reader-->>-Client: ok
时序图说明
关键步骤与职责
-
协议解析(步骤 1-3):
- Reader 负责 TCP 连接与帧解析,不处理业务逻辑
- 每个 Channel 独立处理,避免连接级阻塞
-
权限与路由(步骤 4-7):
- Channel 执行用户权限检查(
rabbit_access_control) - Exchange 根据类型(direct/topic/fanout/headers)计算目标队列
- 支持多播(一个消息路由到多个队列)
- Channel 执行用户权限检查(
-
消息持久化(步骤 8-12):
- Queue Backend 根据队列类型执行不同策略:
- Classic Queue:内存优先,内存压力时分页到磁盘
- Quorum Queue:通过 Ra(Raft)同步到多个副本
- Stream Queue:追加到 osiris 日志
- 持久化是异步批量操作,但 confirms 等待持久化完成
- Queue Backend 根据队列类型执行不同策略:
-
Publisher Confirms(步骤 13-15):
- 可选特性,客户端通过
confirm.select启用 - Channel 等待所有目标队列返回确认后才返回
- 可选特性,客户端通过
-
消费推送(步骤 16-19):
- Queue 根据消费者 QoS(prefetch)主动推送
- 使用信用流控(credit flow)防止内存溢出
- 消息在 Channel 缓冲,批量发送给客户端
-
消费确认(步骤 20-26):
- 客户端确认后,Queue 才真正删除消息(或标记已消费)
- 支持批量 ack、nack、reject
- 未确认消息在连接断开时重新入队
边界条件
-
幂等性:
- 队列/交换器声明是幂等的(参数相同时)
- 消息发布不幂等,需业务层去重
- 消费确认基于 delivery_tag(递增序号),可重复确认(忽略)
-
并发:
- 单个 Channel 串行处理方法(避免竞态)
- 不同 Channel 并行操作
- Queue 进程内部串行(顺序保证)
-
超时:
- 连接心跳超时(可配置,默认 60 秒)
- Channel 操作无超时(阻塞式),但 Queue 可能阻塞 Channel
- 消费者超时(consumer_timeout,默认 30 分钟)
-
顺序性:
- 单个 Queue 内保证 FIFO(优先级队列例外)
- 跨 Queue 无顺序保证
- 仲裁队列提供严格顺序
异常与回退
-
路由失败:
- 无目标队列时,根据 mandatory 标志返回 basic.return 或丢弃
- 支持备用交换器(alternate-exchange)
-
队列满:
- 达到长度限制(
x-max-length)时,删除队首消息或拒绝新消息 - 达到内存上限时触发流控,阻塞发布者
- 达到长度限制(
-
存储失败:
- 磁盘满时拒绝持久化消息
- 触发全局 alarm,阻塞所有发布操作
-
消费失败:
- 客户端 nack 消息可选重新入队(requeue)
- 死信队列(DLX)处理多次失败的消息
-
节点故障:
- Classic Queue:数据丢失(非 HA 模式)
- Quorum Queue:自动故障转移到副本
- Stream Queue:副本接管
性能要点
-
零拷贝:
- 消息在 Erlang 进程间通过引用传递(小消息)
- 大消息通过共享 binary 避免拷贝
-
批处理:
- 消息存储批量写入磁盘(flush 间隔可配置)
- 网络帧批量发送(Nagle 算法可选)
-
信用流控:
- Queue → Channel → Reader 层层背压
- 防止生产者过快导致内存溢出
-
资源上界:
- 每个连接的 Channel 数量上限(默认 2047)
- 队列消息数/字节数上限(可配置)
- 全局内存水位线(默认 0.4)
3. 模块边界与交互图
核心模块清单
| 模块名称 | 职责 | 对外 API | 对内依赖 |
|---|---|---|---|
| rabbit | 应用入口与启动协调 | start/0, stop/0, boot/0 |
rabbit_sup, rabbit_boot_steps |
| rabbit_sup | 根监督者 | start_child/1 |
各子监督者 |
| rabbit_reader | AMQP 0-9-1 连接处理 | - | rabbit_channel, rabbit_writer |
| rabbit_channel | AMQP 0-9-1 通道 | do/2, do_flow/3 |
rabbit_exchange, rabbit_amqqueue |
| rabbit_exchange | 交换器管理 | declare/7, route/3, delete/3 |
rabbit_db_exchange, 类型模块 |
| rabbit_router | 路由协调 | deliver/2, match_bindings/2 |
rabbit_exchange, rabbit_amqqueue |
| rabbit_amqqueue | 队列抽象层 | declare/2, lookup/1, deliver/2 |
队列类型模块 |
| rabbit_classic_queue | 经典队列实现 | - | rabbit_variable_queue |
| rabbit_quorum_queue | 仲裁队列实现 | - | rabbit_fifo (Ra) |
| rabbit_stream_queue | 流队列实现 | - | osiris |
| rabbit_variable_queue | 经典队列存储后端 | publish/4, fetch/2 |
rabbit_msg_store, queue_index |
| rabbit_msg_store | 共享消息存储 | write/3, read/2 |
rabbit_file |
| rabbit_db_* | 数据库抽象层 | - | mnesia 或 khepri |
| rabbit_auth_backend_internal | 内部认证 | check_user/2 |
rabbit_db_user |
| rabbit_access_control | 权限控制 | check_vhost_access/3 |
认证后端 |
插件模块清单
| 插件模块 | 职责 | 协议/接口 |
|---|---|---|
| rabbitmq_management | HTTP 管理 API | RESTful HTTP API |
| rabbitmq_management_agent | 管理数据收集 | 内部事件订阅 |
| rabbitmq_mqtt | MQTT 协议适配 | MQTT 3.1/3.1.1/5.0 |
| rabbitmq_stomp | STOMP 协议适配 | STOMP 1.0-1.2 |
| rabbitmq_stream | Stream 协议 | RabbitMQ Stream 协议 |
| rabbitmq_web_mqtt | MQTT over WebSocket | MQTT + WebSocket |
| rabbitmq_web_stomp | STOMP over WebSocket | STOMP + WebSocket |
| rabbitmq_federation | 集群联邦 | AMQP 0-9-1 |
| rabbitmq_shovel | 消息搬运 | AMQP 0-9-1/1.0 |
| rabbitmq_auth_backend_ldap | LDAP 认证 | LDAP |
| rabbitmq_auth_backend_http | HTTP 认证 | HTTP |
| rabbitmq_auth_backend_oauth2 | OAuth2 认证 | OAuth2/JWT |
| rabbitmq_consistent_hash_exchange | 一致性哈希交换器 | Exchange 类型 |
| rabbitmq_federation_management | 联邦管理 | Management 扩展 |
模块交互矩阵
| 调用方 ↓ / 被调方 → | rabbit_channel | rabbit_exchange | rabbit_amqqueue | rabbit_msg_store | rabbit_db_* |
|---|---|---|---|---|---|
| rabbit_reader | 同步调用 | - | - | - | - |
| rabbit_channel | - | 同步调用 | 同步/异步 | - | 同步调用 |
| rabbit_exchange | - | - | 通过 Router | - | 同步调用 |
| rabbit_amqqueue | 异步消息 | - | - | - | 同步调用 |
| rabbit_classic_queue | 异步消息 | - | - | 同步调用 | - |
| rabbit_quorum_queue | 异步消息 | - | - | - | - |
交互说明
-
同步调用:
- Reader → Channel:
gen_server:call/2(RPC 风格) - Channel → Exchange/DB:函数调用(同进程或跨进程 RPC)
- 阻塞调用,等待返回
- Reader → Channel:
-
异步消息:
- Queue → Channel(消费推送):
!消息发送 - Event 通知:
rabbit_event:notify/2 - 非阻塞,不等待响应
- Queue → Channel(消费推送):
-
共享存储:
- 元数据通过 Mnesia/Khepri ETS 表共享
- 消息存储通过文件句柄共享(协调访问)
-
订阅/发布:
- 统计信息:
rabbit_event:notify→ Management 订阅 - 集群事件:
rabbit_node_monitor发布节点变化
- 统计信息:
-
错误语义:
- Channel 异常不影响 Connection(隔离)
- Queue 进程崩溃时 Channel 收到
{'DOWN', ...}消息 - 数据库事务失败回滚,返回错误给调用方
-
一致性要求:
- 元数据操作(声明/删除):强一致性(Mnesia 事务)
- 消息发布:最终一致性(Queue 异步确认)
- 仲裁队列:Raft 一致性(线性化)
4. 关键设计与权衡
4.1 数据一致性
强一致性(元数据)
-
实现:
- 使用 Mnesia 分布式事务(传统模式)
- 使用 Khepri(基于 Ra/Raft)实现强一致性元数据存储
-
权衡:
- 优势:声明操作保证全集群可见,避免脑裂
- 劣势:写入延迟较高(网络往返 + 多数派确认)
最终一致性(消息)
-
实现:
- 经典队列:单节点存储,无跨节点复制
- 仲裁队列:Raft 复制,强一致性
- 流队列:副本异步复制,leader 串行写入
-
权衡:
- Classic:高性能,低延迟,无持久性保证(节点故障丢失)
- Quorum:强持久性,中等延迟(2-10ms),需多数派存活
- Stream:高吞吐,追加写入,适合日志型工作负载
4.2 事务边界
-
AMQP 事务(
tx.select):- 仅覆盖单个 Channel 的发布和确认
- 不跨 Queue,不跨 Connection
- 实现:Channel 内缓存操作,
tx.commit时批量执行
-
Publisher Confirms:
- 单向确认机制(比事务更轻量)
- 异步确认,不阻塞后续发布
- 适合高吞吐场景
-
权衡:
- 事务保证原子性但降低吞吐(串行化)
- Confirms 提供可靠性但不保证原子性
4.3 锁与并发策略
进程级隔离
-
设计:
- 每个 Queue、Exchange、Channel 是独立的 Erlang 进程
- 进程内串行处理消息,天然避免锁竞争
-
优势:
- 无共享状态,无锁设计
- 故障隔离(进程崩溃不影响其他)
共享资源
-
Mnesia 表:
- 读操作并发(脏读)
- 写操作通过事务序列化
-
Message Store:
- 文件锁保护并发写入
- 引用计数管理消息生命周期
4.4 性能关键路径
发布路径优化
-
路由缓存:
- Topic Exchange 使用前缀树(Trie)加速匹配
- Direct Exchange 使用哈希表 O(1) 查找
-
批量操作:
- 消息批量写入磁盘(默认 100ms 或 128 条)
- 批量确认(一次确认多条消息)
-
内存优先:
- 小消息完全在内存中路由和存储
- 大消息引用传递,避免拷贝
消费路径优化
-
预取(Prefetch):
- Queue 一次性推送多条消息到 Channel
- 减少往返次数
-
信用流控:
- 动态调整推送速率
- 避免内存溢出
4.5 可观测性
指标(Metrics)
- 连接级:连接数、通道数、发布/消费速率
- 队列级:消息数、消费者数、内存占用、磁盘读写
- 节点级:内存/磁盘使用率、Erlang 进程数、文件句柄数
跟踪(Tracing)
- Firehose:抓取所有发布/消费消息
- 用户级 Trace:选择性跟踪特定队列/交换器
日志(Logging)
- 结构化日志:使用 Erlang logger,支持 JSON 格式
- 日志级别:debug、info、warning、error、critical
4.6 配置项(影响行为)
| 配置项 | 默认值 | 影响 |
|---|---|---|
vm_memory_high_watermark |
0.4 | 内存水位线,超过阻塞发布 |
disk_free_limit |
50MB | 磁盘剩余空间下限,低于阻塞 |
channel_max |
2047 | 单连接最大 Channel 数 |
heartbeat |
60s | 连接心跳间隔 |
consumer_timeout |
30min | 消费者无活动超时 |
queue_index_embed_msgs_below |
4096 | 小消息嵌入索引的阈值 |
msg_store_file_size_limit |
16MB | 消息存储文件大小 |
5. 典型使用示例与最佳实践
示例 1:最小可运行入口
启动 RabbitMQ
# 方式 1:使用默认配置启动
rabbitmq-server
# 方式 2:前台启动(开发调试)
rabbitmq-server -detached
# 方式 3:Erlang shell 启动(高级)
erl -pa deps/*/ebin -s rabbit
基本操作(使用 Erlang shell)
%% 连接到节点
{ok, Conn} = amqp_connection:start(#amqp_params_direct{}).
%% 打开通道
{ok, Channel} = amqp_connection:open_channel(Conn).
%% 声明队列
QDeclare = #'queue.declare'{queue = <<"test_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare).
%% 发布消息
Publish = #'basic.publish'{exchange = <<>>, routing_key = <<"test_queue">>},
Msg = #amqp_msg{payload = <<"Hello RabbitMQ">>},
ok = amqp_channel:cast(Channel, Publish, Msg).
%% 消费消息
Subscribe = #'basic.consume'{queue = <<"test_queue">>},
#'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()).
%% 接收消息
receive
{#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
io:format("Received: ~p~n", [Payload])
end.
%% 关闭连接
amqp_connection:close(Conn).
示例 2:扩展点/插件接入
自定义 Exchange 类型
- 创建模块
my_exchange.erl:
-module(my_exchange).
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2, route/3,
validate/1, validate_binding/2, create/2, delete/2,
policy_changed/2, add_binding/3, remove_bindings/3,
assert_args_equivalence/2, info/1, info/2]).
%% 描述
description() ->
[{description, <<"Custom exchange type">>}].
serialise_events() -> false.
%% 核心路由逻辑
route(#exchange{name = XName}, Msg, _Opts) ->
%% 自定义路由算法
RKey = mc:routing_key(Msg),
%% 示例:哈希路由到特定队列
HashValue = erlang:phash2(RKey, 10),
TargetQueue = rabbit_misc:r(XName, queue, integer_to_binary(HashValue)),
[TargetQueue].
%% 其他回调(省略实现)
validate(_X) -> ok.
validate_binding(_X, _B) -> ok.
create(_Serial, _X) -> ok.
delete(_Serial, _X) -> ok.
policy_changed(_X1, _X2) -> ok.
add_binding(_Serial, _X, _B) -> ok.
remove_bindings(_Serial, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
info(_X) -> [].
info(_X, _) -> [].
- 注册 Exchange 类型(在插件的 boot step 中):
-rabbit_boot_step({my_exchange_type,
[{description, "custom exchange type"},
{mfa, {rabbit_registry, register,
[exchange, <<"x-custom">>, my_exchange]}},
{requires, rabbit_registry},
{enables, kernel_ready}]}).
- 使用自定义类型:
%% 声明自定义类型的 exchange
ExDeclare = #'exchange.declare'{
exchange = <<"my_custom_exchange">>,
type = <<"x-custom">>
},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare).
自定义认证后端
- 实现
rabbit_authn_backend行为:
-module(my_auth_backend).
-behaviour(rabbit_authn_backend).
-behaviour(rabbit_authz_backend).
-export([user_login_authentication/2, user_login_authorization/2,
check_vhost_access/3, check_resource_access/4, check_topic_access/4,
state_can_expire/0]).
%% 认证
user_login_authentication(Username, AuthProps) ->
%% 从外部系统验证用户(例如 HTTP API)
case external_api:verify_user(Username, AuthProps) of
{ok, UserData} ->
{ok, #auth_user{username = Username,
tags = proplists:get_value(tags, UserData, [])}};
{error, _} ->
{refused, "Authentication failed", []}
end.
%% 授权
user_login_authorization(Username, _AuthProps) ->
{ok, _Impl} = application:get_env(my_plugin, authz_impl),
{ok, {Username, _Impl}}.
check_vhost_access(#auth_user{username = Username}, VHost, _Sock) ->
%% 检查虚拟主机访问权限
external_api:check_vhost(Username, VHost).
%% 其他授权回调(省略)
check_resource_access(_User, _Resource, _Permission, _Context) ->
true.
check_topic_access(_User, _Resource, _Permission, _Context) ->
true.
state_can_expire() -> false.
- 配置使用自定义认证:
%% rabbitmq.config
[
{rabbit, [
{auth_backends, [my_auth_backend]}
]}
].
示例 3:规模化/上线注意事项
集群部署
- 节点发现配置(使用 Consul):
%% rabbitmq.conf
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = 127.0.0.1
cluster_formation.consul.port = 8500
cluster_formation.consul.svc = rabbitmq
cluster_formation.node_cleanup.only_log_warning = true
- 资源限制:
%% rabbitmq.conf
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 10GB
total_memory_available_override_value = 16GB
- 队列类型选择:
- Classic Queue:低延迟,单节点,适合临时消息
- Quorum Queue:高可用,多副本,适合关键业务
- Stream Queue:高吞吐,持久化,适合日志/审计
性能调优
-
连接池:
- 客户端维护连接池(推荐每个应用 1-5 个连接)
- 每个连接使用多个 Channel(避免 Channel 数过多)
-
消息持久化:
- 关键消息:
delivery_mode = 2(持久化) - 临时消息:
delivery_mode = 1(非持久化)
- 关键消息:
-
Prefetch 设置:
- 默认
basic.qos(prefetch_count=10) - 高延迟网络增大 prefetch(如 100)
- 低延迟网络减小 prefetch(如 1-10)
- 默认
-
批量操作:
- 批量发布(一次性发送多条消息)
- 批量确认(
multiple = true)
监控与告警
-
关键指标:
- 队列消息堆积(
messages_ready> 阈值) - 内存使用率(
mem_used/mem_limit) - 磁盘剩余空间(
disk_free<disk_free_limit) - 连接数/通道数异常增长
- 队列消息堆积(
-
告警配置:
# 使用 rabbitmqctl 设置内存告警
rabbitmqctl set_vm_memory_high_watermark 0.5
# 检查告警状态
rabbitmqctl alarm_list
- 日志级别:
# 运行时调整日志级别
rabbitmqctl set_log_level debug
# 恢复默认
rabbitmqctl set_log_level info
附录:术语表
| 术语 | 解释 |
|---|---|
| AMQP | Advanced Message Queuing Protocol,高级消息队列协议 |
| Exchange | 交换器,消息路由实体 |
| Binding | 绑定,连接 Exchange 和 Queue 的路由规则 |
| Queue | 队列,消息存储实体 |
| Channel | 通道,AMQP 连接内的逻辑通道 |
| VHost | 虚拟主机,多租户隔离单元 |
| Mnesia | Erlang 分布式数据库 |
| Khepri | 基于 Ra(Raft)的元数据存储 |
| Ra | Erlang 实现的 Raft 一致性算法库 |
| Osiris | 高性能日志存储引擎 |
| Quorum Queue | 仲裁队列,基于 Ra 的多副本队列 |
| Stream Queue | 流队列,基于 osiris 的追加日志队列 |
| Classic Queue | 经典队列,传统的单节点队列 |
| DLX | Dead Letter Exchange,死信交换器 |
| TTL | Time To Live,消息生存时间 |
| QoS | Quality of Service,服务质量(Prefetch) |
| Ack | Acknowledge,消息确认 |
| Nack | Negative Acknowledge,消息拒绝 |