——|———|——|" tags: [“RabbitMQ”, “技术文档”, “源码剖析”]
RabbitMQ-10-管理插件模块
A1. 模块概览
A1.1 职责与边界
rabbitmq_management 插件提供了 HTTP REST API 和 Web UI,用于监控和管理 RabbitMQ 集群。主要功能包括:
| 功能类别 | API 端点 | 说明 |
|---|---|---|
| 集群监控 | /api/overview, /api/nodes |
集群概览、节点状态、资源使用 |
| 连接管理 | /api/connections, /api/channels |
查看/关闭连接、Channel 详情 |
| 队列管理 | /api/queues, /api/queues/:vhost/:queue |
队列列表、声明、删除、清空 |
| 交换器管理 | /api/exchanges, /api/exchanges/:vhost/:exchange |
交换器列表、声明、删除 |
| 绑定管理 | /api/bindings, /api/bindings/:vhost/... |
绑定列表、创建、删除 |
| 用户权限 | /api/users, /api/permissions |
用户管理、权限配置 |
| 策略配置 | /api/policies, /api/parameters |
动态策略、运行时参数 |
| 配置导入导出 | /api/definitions |
导出/导入集群配置(JSON) |
| 健康检查 | /api/health/checks/* |
节点健康状态、告警检查 |
A1.2 架构图
flowchart TB
subgraph Browser[Web Browser]
UI[Management UI<br/>HTML/JS]
end
subgraph Management[Management Plugin]
Dispatcher[rabbit_mgmt_dispatcher<br/>路由分发器]
WM[Webmachine 资源模块<br/>rabbit_mgmt_wm_*]
Util[rabbit_mgmt_util<br/>工具函数]
end
subgraph Agent[Management Agent]
Collector[Metrics Collector<br/>rabbit_mgmt_metrics_collector]
GC[Metrics GC<br/>定期清理过期指标]
DB[ETS Metrics DB<br/>内存时序数据库]
end
subgraph Core[RabbitMQ Core]
Rabbit[rabbit_amqqueue]
Exchange[rabbit_exchange]
Channel[rabbit_channel]
CoreMetrics[rabbit_core_metrics]
end
UI -->|HTTP REST API| Dispatcher
Dispatcher --> WM
WM --> Util
Util --> Rabbit
Util --> Exchange
Util --> Channel
CoreMetrics -->|emit_event| Collector
Collector --> DB
Collector --> GC
WM --> DB
style Management fill:#e3f2fd
style Agent fill:#fff3e0
style Core fill:#f3e5f5
关键设计:
- Webmachine 框架:基于 REST 资源的 HTTP 处理,支持内容协商(JSON/MessagePack)
- 指标聚合:
rabbit_mgmt_agent独立进程收集指标,避免阻塞核心业务 - 权限控制:每个 API 端点都需要用户认证和权限验证
A2. 对外 API 列表与规格
A2.1 集群监控 API
(1)GET /api/overview - 集群概览
功能说明:返回集群全局统计信息,包括消息速率、连接数、队列数等。
响应结构:
{
"management_version": "4.0.0",
"rabbitmq_version": "4.0.0",
"cluster_name": "rabbit@hostname",
"message_stats": {
"publish": 12345,
"publish_details": {"rate": 102.3},
"deliver": 11000,
"deliver_details": {"rate": 98.7},
"ack": 10500,
"ack_details": {"rate": 95.1}
},
"queue_totals": {
"messages": 500,
"messages_ready": 450,
"messages_unacknowledged": 50
},
"object_totals": {
"connections": 25,
"channels": 100,
"exchanges": 10,
"queues": 30,
"consumers": 15
},
"node": "rabbit@hostname",
"listeners": [
{"node": "rabbit@hostname", "protocol": "amqp", "port": 5672},
{"node": "rabbit@hostname", "protocol": "http", "port": 15672}
]
}
核心代码(rabbit_mgmt_wm_overview:to_json/2):
to_json(ReqData, Context) ->
%% 1)收集全局统计
MessageStats = rabbit_mgmt_db:get_overview(all, augment_msg_stats),
%% 2)查询对象计数
QueueTotals = rabbit_mgmt_db:get_queue_totals(),
ObjectTotals = [
{connections, length(rabbit_connection_tracking:list())},
{channels, length(rabbit_channel_tracking:list())},
{queues, rabbit_amqqueue:count()},
{exchanges, length(rabbit_exchange:list())}
],
%% 3)构造响应
Overview = [
{management_version, rabbit_mgmt_version()},
{message_stats, MessageStats},
{queue_totals, QueueTotals},
{object_totals, ObjectTotals},
{node, node()},
{listeners, rabbit_mgmt_util:listeners()}
],
rabbit_mgmt_util:reply(Overview, ReqData, Context).
(2)GET /api/queues - 队列列表
功能说明:返回所有或指定 VHost 的队列列表,包含每个队列的详细统计信息。
查询参数:
vhost:虚拟主机名称(可选)page:分页页码(可选)page_size:每页数量(默认 100)name:队列名称过滤(支持正则)use_regex:是否使用正则匹配(true/false)
响应结构:
[
{
"name": "my_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"exclusive": false,
"arguments": {"x-max-length": 10000},
"type": "classic",
"state": "running",
"consumers": 2,
"messages": 1234,
"messages_ready": 1000,
"messages_unacknowledged": 234,
"message_bytes": 5000000,
"memory": 1048576,
"message_stats": {
"publish": 50000,
"publish_details": {"rate": 100.5},
"deliver": 48000,
"deliver_details": {"rate": 98.2}
},
"backing_queue_status": {
"q3": 100,
"delta": 900,
"ram_msg_count": 100,
"disk_read_count": 500
}
}
]
核心代码(rabbit_mgmt_wm_queues:augmented/2):
augmented(ReqData, Context) ->
%% 1)获取队列列表
Queues = rabbit_mgmt_db:augment_queues(
rabbit_amqqueue:list(rabbit_mgmt_util:vhost(ReqData)),
full % 包含完整统计信息
),
%% 2)过滤和排序
Filtered = rabbit_mgmt_util:filter_user(Queues, ReqData, Context),
Sorted = rabbit_mgmt_util:sort(Filtered, ReqData),
%% 3)分页
Paginated = rabbit_mgmt_util:paginate(Sorted, ReqData),
rabbit_mgmt_util:reply(Paginated, ReqData, Context).
A2.2 队列操作 API
(3)PUT /api/queues/:vhost/:queue - 声明队列
请求结构:
{
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "quorum",
"x-max-length": 10000,
"x-message-ttl": 60000,
"x-dead-letter-exchange": "dlx"
}
}
响应:
201 Created:队列创建成功204 No Content:队列已存在(幂等)400 Bad Request:参数错误
核心代码(rabbit_mgmt_wm_queue:put_queue/2):
put_queue(ReqData, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData),
QueueBin = rabbit_mgmt_util:id(queue, ReqData),
QueueName = rabbit_misc:r(VHost, queue, QueueBin),
%% 1)解析请求体
Body = rabbit_mgmt_util:decode_json(wrq:req_body(ReqData)),
Durable = proplists:get_value(<<"durable">>, Body, true),
AutoDelete = proplists:get_value(<<"auto_delete">>, Body, false),
Args = proplists:get_value(<<"arguments">>, Body, []),
%% 2)调用核心声明接口
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, none, acting_user(ReqData)) of
{new, _Q} ->
%% 新创建
{true, wrq:set_resp_header("Location", queue_uri(ReqData), ReqData), Context};
{existing, _Q} ->
%% 已存在
{true, ReqData, Context};
{protocol_error, Type, Reason, _} ->
rabbit_mgmt_util:bad_request(Type, Reason, ReqData, Context)
end.
(4)DELETE /api/queues/:vhost/:queue - 删除队列
查询参数:
if-empty=true:仅当队列为空时删除if-unused=true:仅当无消费者时删除
核心代码(rabbit_mgmt_wm_queue:delete_queue/2):
delete_queue(ReqData, Context) ->
QueueName = queue_name(ReqData),
IfEmpty = rabbit_mgmt_util:parse_bool(wrq:get_qs_value("if-empty", ReqData)),
IfUnused = rabbit_mgmt_util:parse_bool(wrq:get_qs_value("if-unused", ReqData)),
%% 调用核心删除接口
case rabbit_amqqueue:delete(QueueName, IfUnused, IfEmpty, acting_user(ReqData)) of
{ok, _} -> {true, ReqData, Context};
{error, not_empty} -> rabbit_mgmt_util:bad_request(precondition_failed, "Queue not empty", ReqData, Context);
{error, in_use} -> rabbit_mgmt_util:bad_request(precondition_failed, "Queue in use", ReqData, Context)
end.
A2.3 配置导入导出 API
(5)GET /api/definitions - 导出配置
功能说明:导出整个集群的配置定义(用户、VHost、队列、交换器、绑定、策略等)为 JSON 文件。
响应结构:
{
"rabbitmq_version": "4.0.0",
"users": [
{"name": "admin", "password_hash": "...", "tags": ["administrator"]}
],
"vhosts": [
{"name": "/"}
],
"permissions": [
{"user": "admin", "vhost": "/", "configure": ".*", "write": ".*", "read": ".*"}
],
"parameters": [
{"component": "federation", "vhost": "/", "name": "my-upstream", "value": {...}}
],
"policies": [
{"vhost": "/", "name": "ha-all", "pattern": ".*", "definition": {"ha-mode": "all"}}
],
"queues": [
{"name": "my_queue", "vhost": "/", "durable": true, "arguments": {...}}
],
"exchanges": [
{"name": "my_exchange", "vhost": "/", "type": "topic", "durable": true}
],
"bindings": [
{"source": "my_exchange", "vhost": "/", "destination": "my_queue", "destination_type": "queue", "routing_key": "#"}
]
}
核心代码(rabbit_mgmt_wm_definitions:to_json/2):
to_json(ReqData, Context) ->
Definitions = [
{rabbitmq_version, rabbit_version()},
{users, all_users()},
{vhosts, all_vhosts()},
{permissions, all_permissions()},
{parameters, all_parameters()},
{policies, all_policies()},
{queues, all_queues()},
{exchanges, all_exchanges()},
{bindings, all_bindings()}
],
rabbit_mgmt_util:reply(Definitions, ReqData, Context).
(6)POST /api/definitions - 导入配置
功能说明:从 JSON 文件导入配置,自动创建用户、VHost、队列、交换器等。
请求结构:与导出格式相同。
核心代码(rabbit_mgmt_wm_definitions:apply_defs/2):
apply_defs(Body, ActingUser) ->
Defs = rabbit_mgmt_util:decode_json(Body),
%% 1)创建用户
[rabbit_auth_backend_internal:put_user(U, ActingUser) || U <- proplists:get_value(users, Defs, [])],
%% 2)创建 VHost
[rabbit_vhost:add(V, ActingUser) || V <- proplists:get_value(vhosts, Defs, [])],
%% 3)设置权限
[rabbit_auth_backend_internal:set_permissions(P, ActingUser) || P <- proplists:get_value(permissions, Defs, [])],
%% 4)声明队列
[rabbit_amqqueue:declare(Q, ActingUser) || Q <- proplists:get_value(queues, Defs, [])],
%% 5)声明交换器
[rabbit_exchange:declare(X, ActingUser) || X <- proplists:get_value(exchanges, Defs, [])],
%% 6)创建绑定
[rabbit_binding:add(B, ActingUser) || B <- proplists:get_value(bindings, Defs, [])],
ok.
A3. 关键数据结构与 UML
A3.1 指标收集架构
classDiagram
class rabbit_mgmt_metrics_collector {
+ets_tables: [ets()]
+gc_policy: map()
+handle_event(Event, State)
+override_lookups(LookupFun)
}
class MetricsDB {
+conn_stats: ets()
+channel_stats: ets()
+queue_stats: ets()
+exchange_stats: ets()
+node_stats: ets()
}
class rabbit_core_metrics {
+connection_created(Pid, Props)
+channel_created(Pid, Props)
+queue_stats(QName, Stats)
+exchange_stats(XName, Stats)
}
rabbit_core_metrics --> rabbit_mgmt_metrics_collector : emit_event
rabbit_mgmt_metrics_collector --> MetricsDB : ets:insert
指标类型:
| 类型 | ETS 表名 | 数据结构 |
|---|---|---|
| 连接统计 | connection_stats |
{Pid, RecvOct, SendOct, Timestamp} |
| Channel 统计 | channel_stats |
{Pid, Publishes, Delivers, Acks, Timestamp} |
| 队列统计 | queue_stats |
{QName, Messages, Consumers, MemoryBytes, Timestamp} |
| 交换器统计 | exchange_stats |
{XName, PublishIn, PublishOut, Timestamp} |
A4. 核心算法/流程剖析
A4.1 指标收集与聚合流程
指标流转路径:
-
核心模块发出事件:
%% rabbit_reader 创建连接时 rabbit_core_metrics:connection_created(self(), #{ user => User, vhost => VHost, protocol => <<"AMQP 0-9-1">>, connected_at => erlang:system_time(millisecond) }), rabbit_event:notify(connection_created, Props). -
Metrics Collector 接收事件:
handle_event({event, connection_created, Pid, Props}, State) -> %% 1)记录连接基础信息 ets:insert(connection_stats, {Pid, Props}), %% 2)初始化统计计数器 ets:insert(connection_metrics, {Pid, #{ recv_oct => 0, send_oct => 0, recv_cnt => 0, send_cnt => 0 }}), {ok, State}. -
定期更新统计(每 5 秒):
handle_info(update_stats, State) -> %% 遍历所有连接进程 Connections = rabbit_connection_tracking:list(), lists:foreach( fun(Pid) -> %% 从进程字典读取最新统计 Stats = rabbit_reader:info(Pid, [recv_oct, send_oct]), %% 更新 ETS 表 ets:insert(connection_metrics, {Pid, Stats}) end, Connections), schedule_next_update(), {noreply, State}. -
API 查询聚合:
%% GET /api/connections get_connections() -> Connections = ets:tab2list(connection_stats), lists:map( fun({Pid, Props}) -> Metrics = ets:lookup(connection_metrics, Pid), Props ++ Metrics end, Connections).
时序图:
sequenceDiagram
autonumber
participant Core as rabbit_reader
participant Event as rabbit_event
participant Collector as mgmt_metrics_collector
participant ETS as ETS Tables
participant API as Management API
Core->>Event: notify(connection_created, Props)
Event->>Collector: handle_event(connection_created)
Collector->>ETS: insert(connection_stats, {Pid, Props})
loop 每 5 秒
Collector->>Core: info(Pid, [recv_oct, send_oct])
Core-->>Collector: Stats
Collector->>ETS: update(connection_metrics, Stats)
end
API->>ETS: lookup(connection_stats)
ETS-->>API: Connections
API->>ETS: lookup(connection_metrics)
ETS-->>API: Metrics
API-->>API: merge(Connections, Metrics)
A4.2 指标垃圾回收(GC)
GC 策略:
- 连接/Channel 关闭后,指标保留 60 秒(用于展示最近关闭的连接)
- 队列删除后,指标保留 60 秒
- 旧数据定期清理,避免内存泄漏
GC 实现:
handle_info(gc_metrics, State) ->
Now = erlang:system_time(millisecond),
RetentionPeriod = 60000, % 60 秒
%% 清理过期连接指标
ets:foldl(
fun({Pid, #{closed_at := ClosedAt}} = Entry, Acc) ->
case Now - ClosedAt > RetentionPeriod of
true ->
ets:delete(connection_stats, Pid),
ets:delete(connection_metrics, Pid);
false ->
ok
end,
Acc
end, ok, connection_stats),
schedule_next_gc(),
{noreply, State}.
A5. 配置与可观测
A5.1 关键配置项
| 配置项 | 默认值 | 说明 |
|---|---|---|
management.listener.port |
15672 | HTTP API 端口 |
management.listener.ssl |
false | 启用 HTTPS |
management.load_definitions |
- | 启动时自动加载配置文件路径 |
management.rates_mode |
basic |
统计速率模式(basic/detailed/none) |
management.sample_retention_policies |
- | 指标保留策略(全局/详细/基础) |
management.http_log_dir |
- | HTTP 访问日志目录 |
A5.2 常用 API 端点
| 端点 | 方法 | 功能 |
|---|---|---|
/api/overview |
GET | 集群概览 |
/api/nodes |
GET | 节点列表 |
/api/connections |
GET | 连接列表 |
/api/connections/:name |
DELETE | 关闭连接 |
/api/channels |
GET | Channel 列表 |
/api/queues |
GET | 队列列表 |
/api/queues/:vhost/:queue |
PUT | 声明队列 |
/api/queues/:vhost/:queue |
DELETE | 删除队列 |
/api/queues/:vhost/:queue/get |
POST | 从队列获取消息(测试用) |
/api/queues/:vhost/:queue/contents |
DELETE | 清空队列 |
/api/exchanges/:vhost/:exchange/publish |
POST | 发布消息到交换器(测试用) |
/api/definitions |
GET | 导出配置 |
/api/definitions |
POST | 导入配置 |
本文档涵盖了 RabbitMQ Management 插件的核心架构、HTTP API、指标收集机制和配置导入导出功能。