——|———|——|" tags: [“RabbitMQ”, “技术文档”, “源码剖析”]

RabbitMQ-10-管理插件模块

A1. 模块概览

A1.1 职责与边界

rabbitmq_management 插件提供了 HTTP REST API 和 Web UI,用于监控和管理 RabbitMQ 集群。主要功能包括:

功能类别 API 端点 说明
集群监控 /api/overview, /api/nodes 集群概览、节点状态、资源使用
连接管理 /api/connections, /api/channels 查看/关闭连接、Channel 详情
队列管理 /api/queues, /api/queues/:vhost/:queue 队列列表、声明、删除、清空
交换器管理 /api/exchanges, /api/exchanges/:vhost/:exchange 交换器列表、声明、删除
绑定管理 /api/bindings, /api/bindings/:vhost/... 绑定列表、创建、删除
用户权限 /api/users, /api/permissions 用户管理、权限配置
策略配置 /api/policies, /api/parameters 动态策略、运行时参数
配置导入导出 /api/definitions 导出/导入集群配置(JSON)
健康检查 /api/health/checks/* 节点健康状态、告警检查

A1.2 架构图

flowchart TB
    subgraph Browser[Web Browser]
        UI[Management UI<br/>HTML/JS]
    end
    
    subgraph Management[Management Plugin]
        Dispatcher[rabbit_mgmt_dispatcher<br/>路由分发器]
        WM[Webmachine 资源模块<br/>rabbit_mgmt_wm_*]
        Util[rabbit_mgmt_util<br/>工具函数]
    end
    
    subgraph Agent[Management Agent]
        Collector[Metrics Collector<br/>rabbit_mgmt_metrics_collector]
        GC[Metrics GC<br/>定期清理过期指标]
        DB[ETS Metrics DB<br/>内存时序数据库]
    end
    
    subgraph Core[RabbitMQ Core]
        Rabbit[rabbit_amqqueue]
        Exchange[rabbit_exchange]
        Channel[rabbit_channel]
        CoreMetrics[rabbit_core_metrics]
    end
    
    UI -->|HTTP REST API| Dispatcher
    Dispatcher --> WM
    WM --> Util
    Util --> Rabbit
    Util --> Exchange
    Util --> Channel
    
    CoreMetrics -->|emit_event| Collector
    Collector --> DB
    Collector --> GC
    WM --> DB
    
    style Management fill:#e3f2fd
    style Agent fill:#fff3e0
    style Core fill:#f3e5f5

关键设计

  1. Webmachine 框架:基于 REST 资源的 HTTP 处理,支持内容协商(JSON/MessagePack)
  2. 指标聚合rabbit_mgmt_agent 独立进程收集指标,避免阻塞核心业务
  3. 权限控制:每个 API 端点都需要用户认证和权限验证

A2. 对外 API 列表与规格

A2.1 集群监控 API

(1)GET /api/overview - 集群概览

功能说明:返回集群全局统计信息,包括消息速率、连接数、队列数等。

响应结构

{
  "management_version": "4.0.0",
  "rabbitmq_version": "4.0.0",
  "cluster_name": "rabbit@hostname",
  "message_stats": {
    "publish": 12345,
    "publish_details": {"rate": 102.3},
    "deliver": 11000,
    "deliver_details": {"rate": 98.7},
    "ack": 10500,
    "ack_details": {"rate": 95.1}
  },
  "queue_totals": {
    "messages": 500,
    "messages_ready": 450,
    "messages_unacknowledged": 50
  },
  "object_totals": {
    "connections": 25,
    "channels": 100,
    "exchanges": 10,
    "queues": 30,
    "consumers": 15
  },
  "node": "rabbit@hostname",
  "listeners": [
    {"node": "rabbit@hostname", "protocol": "amqp", "port": 5672},
    {"node": "rabbit@hostname", "protocol": "http", "port": 15672}
  ]
}

核心代码rabbit_mgmt_wm_overview:to_json/2):

to_json(ReqData, Context) ->
    %% 1)收集全局统计
    MessageStats = rabbit_mgmt_db:get_overview(all, augment_msg_stats),
    
    %% 2)查询对象计数
    QueueTotals = rabbit_mgmt_db:get_queue_totals(),
    ObjectTotals = [
        {connections, length(rabbit_connection_tracking:list())},
        {channels, length(rabbit_channel_tracking:list())},
        {queues, rabbit_amqqueue:count()},
        {exchanges, length(rabbit_exchange:list())}
    ],
    
    %% 3)构造响应
    Overview = [
        {management_version, rabbit_mgmt_version()},
        {message_stats, MessageStats},
        {queue_totals, QueueTotals},
        {object_totals, ObjectTotals},
        {node, node()},
        {listeners, rabbit_mgmt_util:listeners()}
    ],
    rabbit_mgmt_util:reply(Overview, ReqData, Context).

(2)GET /api/queues - 队列列表

功能说明:返回所有或指定 VHost 的队列列表,包含每个队列的详细统计信息。

查询参数

  • vhost:虚拟主机名称(可选)
  • page:分页页码(可选)
  • page_size:每页数量(默认 100)
  • name:队列名称过滤(支持正则)
  • use_regex:是否使用正则匹配(true/false)

响应结构

[
  {
    "name": "my_queue",
    "vhost": "/",
    "durable": true,
    "auto_delete": false,
    "exclusive": false,
    "arguments": {"x-max-length": 10000},
    "type": "classic",
    "state": "running",
    "consumers": 2,
    "messages": 1234,
    "messages_ready": 1000,
    "messages_unacknowledged": 234,
    "message_bytes": 5000000,
    "memory": 1048576,
    "message_stats": {
      "publish": 50000,
      "publish_details": {"rate": 100.5},
      "deliver": 48000,
      "deliver_details": {"rate": 98.2}
    },
    "backing_queue_status": {
      "q3": 100,
      "delta": 900,
      "ram_msg_count": 100,
      "disk_read_count": 500
    }
  }
]

核心代码rabbit_mgmt_wm_queues:augmented/2):

augmented(ReqData, Context) ->
    %% 1)获取队列列表
    Queues = rabbit_mgmt_db:augment_queues(
        rabbit_amqqueue:list(rabbit_mgmt_util:vhost(ReqData)),
        full  % 包含完整统计信息
    ),
    
    %% 2)过滤和排序
    Filtered = rabbit_mgmt_util:filter_user(Queues, ReqData, Context),
    Sorted = rabbit_mgmt_util:sort(Filtered, ReqData),
    
    %% 3)分页
    Paginated = rabbit_mgmt_util:paginate(Sorted, ReqData),
    
    rabbit_mgmt_util:reply(Paginated, ReqData, Context).

A2.2 队列操作 API

(3)PUT /api/queues/:vhost/:queue - 声明队列

请求结构

{
  "durable": true,
  "auto_delete": false,
  "arguments": {
    "x-queue-type": "quorum",
    "x-max-length": 10000,
    "x-message-ttl": 60000,
    "x-dead-letter-exchange": "dlx"
  }
}

响应

  • 201 Created:队列创建成功
  • 204 No Content:队列已存在(幂等)
  • 400 Bad Request:参数错误

核心代码rabbit_mgmt_wm_queue:put_queue/2):

put_queue(ReqData, Context) ->
    VHost = rabbit_mgmt_util:vhost(ReqData),
    QueueBin = rabbit_mgmt_util:id(queue, ReqData),
    QueueName = rabbit_misc:r(VHost, queue, QueueBin),
    
    %% 1)解析请求体
    Body = rabbit_mgmt_util:decode_json(wrq:req_body(ReqData)),
    Durable = proplists:get_value(<<"durable">>, Body, true),
    AutoDelete = proplists:get_value(<<"auto_delete">>, Body, false),
    Args = proplists:get_value(<<"arguments">>, Body, []),
    
    %% 2)调用核心声明接口
    case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, none, acting_user(ReqData)) of
        {new, _Q} ->
            %% 新创建
            {true, wrq:set_resp_header("Location", queue_uri(ReqData), ReqData), Context};
        {existing, _Q} ->
            %% 已存在
            {true, ReqData, Context};
        {protocol_error, Type, Reason, _} ->
            rabbit_mgmt_util:bad_request(Type, Reason, ReqData, Context)
    end.

(4)DELETE /api/queues/:vhost/:queue - 删除队列

查询参数

  • if-empty=true:仅当队列为空时删除
  • if-unused=true:仅当无消费者时删除

核心代码rabbit_mgmt_wm_queue:delete_queue/2):

delete_queue(ReqData, Context) ->
    QueueName = queue_name(ReqData),
    IfEmpty = rabbit_mgmt_util:parse_bool(wrq:get_qs_value("if-empty", ReqData)),
    IfUnused = rabbit_mgmt_util:parse_bool(wrq:get_qs_value("if-unused", ReqData)),
    
    %% 调用核心删除接口
    case rabbit_amqqueue:delete(QueueName, IfUnused, IfEmpty, acting_user(ReqData)) of
        {ok, _} -> {true, ReqData, Context};
        {error, not_empty} -> rabbit_mgmt_util:bad_request(precondition_failed, "Queue not empty", ReqData, Context);
        {error, in_use} -> rabbit_mgmt_util:bad_request(precondition_failed, "Queue in use", ReqData, Context)
    end.

A2.3 配置导入导出 API

(5)GET /api/definitions - 导出配置

功能说明:导出整个集群的配置定义(用户、VHost、队列、交换器、绑定、策略等)为 JSON 文件。

响应结构

{
  "rabbitmq_version": "4.0.0",
  "users": [
    {"name": "admin", "password_hash": "...", "tags": ["administrator"]}
  ],
  "vhosts": [
    {"name": "/"}
  ],
  "permissions": [
    {"user": "admin", "vhost": "/", "configure": ".*", "write": ".*", "read": ".*"}
  ],
  "parameters": [
    {"component": "federation", "vhost": "/", "name": "my-upstream", "value": {...}}
  ],
  "policies": [
    {"vhost": "/", "name": "ha-all", "pattern": ".*", "definition": {"ha-mode": "all"}}
  ],
  "queues": [
    {"name": "my_queue", "vhost": "/", "durable": true, "arguments": {...}}
  ],
  "exchanges": [
    {"name": "my_exchange", "vhost": "/", "type": "topic", "durable": true}
  ],
  "bindings": [
    {"source": "my_exchange", "vhost": "/", "destination": "my_queue", "destination_type": "queue", "routing_key": "#"}
  ]
}

核心代码rabbit_mgmt_wm_definitions:to_json/2):

to_json(ReqData, Context) ->
    Definitions = [
        {rabbitmq_version, rabbit_version()},
        {users, all_users()},
        {vhosts, all_vhosts()},
        {permissions, all_permissions()},
        {parameters, all_parameters()},
        {policies, all_policies()},
        {queues, all_queues()},
        {exchanges, all_exchanges()},
        {bindings, all_bindings()}
    ],
    rabbit_mgmt_util:reply(Definitions, ReqData, Context).

(6)POST /api/definitions - 导入配置

功能说明:从 JSON 文件导入配置,自动创建用户、VHost、队列、交换器等。

请求结构:与导出格式相同。

核心代码rabbit_mgmt_wm_definitions:apply_defs/2):

apply_defs(Body, ActingUser) ->
    Defs = rabbit_mgmt_util:decode_json(Body),
    
    %% 1)创建用户
    [rabbit_auth_backend_internal:put_user(U, ActingUser) || U <- proplists:get_value(users, Defs, [])],
    
    %% 2)创建 VHost
    [rabbit_vhost:add(V, ActingUser) || V <- proplists:get_value(vhosts, Defs, [])],
    
    %% 3)设置权限
    [rabbit_auth_backend_internal:set_permissions(P, ActingUser) || P <- proplists:get_value(permissions, Defs, [])],
    
    %% 4)声明队列
    [rabbit_amqqueue:declare(Q, ActingUser) || Q <- proplists:get_value(queues, Defs, [])],
    
    %% 5)声明交换器
    [rabbit_exchange:declare(X, ActingUser) || X <- proplists:get_value(exchanges, Defs, [])],
    
    %% 6)创建绑定
    [rabbit_binding:add(B, ActingUser) || B <- proplists:get_value(bindings, Defs, [])],
    
    ok.

A3. 关键数据结构与 UML

A3.1 指标收集架构

classDiagram
    class rabbit_mgmt_metrics_collector {
        +ets_tables: [ets()]
        +gc_policy: map()
        +handle_event(Event, State)
        +override_lookups(LookupFun)
    }
    
    class MetricsDB {
        +conn_stats: ets()
        +channel_stats: ets()
        +queue_stats: ets()
        +exchange_stats: ets()
        +node_stats: ets()
    }
    
    class rabbit_core_metrics {
        +connection_created(Pid, Props)
        +channel_created(Pid, Props)
        +queue_stats(QName, Stats)
        +exchange_stats(XName, Stats)
    }
    
    rabbit_core_metrics --> rabbit_mgmt_metrics_collector : emit_event
    rabbit_mgmt_metrics_collector --> MetricsDB : ets:insert

指标类型

类型 ETS 表名 数据结构
连接统计 connection_stats {Pid, RecvOct, SendOct, Timestamp}
Channel 统计 channel_stats {Pid, Publishes, Delivers, Acks, Timestamp}
队列统计 queue_stats {QName, Messages, Consumers, MemoryBytes, Timestamp}
交换器统计 exchange_stats {XName, PublishIn, PublishOut, Timestamp}

A4. 核心算法/流程剖析

A4.1 指标收集与聚合流程

指标流转路径

  1. 核心模块发出事件

    %% rabbit_reader 创建连接时
    rabbit_core_metrics:connection_created(self(), #{
        user => User,
        vhost => VHost,
        protocol => <<"AMQP 0-9-1">>,
        connected_at => erlang:system_time(millisecond)
    }),
    rabbit_event:notify(connection_created, Props).
    
  2. Metrics Collector 接收事件

    handle_event({event, connection_created, Pid, Props}, State) ->
        %% 1)记录连接基础信息
        ets:insert(connection_stats, {Pid, Props}),
    
        %% 2)初始化统计计数器
        ets:insert(connection_metrics, {Pid, #{
            recv_oct => 0,
            send_oct => 0,
            recv_cnt => 0,
            send_cnt => 0
        }}),
        {ok, State}.
    
  3. 定期更新统计(每 5 秒):

    handle_info(update_stats, State) ->
        %% 遍历所有连接进程
        Connections = rabbit_connection_tracking:list(),
        lists:foreach(
            fun(Pid) ->
                %% 从进程字典读取最新统计
                Stats = rabbit_reader:info(Pid, [recv_oct, send_oct]),
                %% 更新 ETS 表
                ets:insert(connection_metrics, {Pid, Stats})
            end, Connections),
        schedule_next_update(),
        {noreply, State}.
    
  4. API 查询聚合

    %% GET /api/connections
    get_connections() ->
        Connections = ets:tab2list(connection_stats),
        lists:map(
            fun({Pid, Props}) ->
                Metrics = ets:lookup(connection_metrics, Pid),
                Props ++ Metrics
            end, Connections).
    

时序图

sequenceDiagram
    autonumber
    participant Core as rabbit_reader
    participant Event as rabbit_event
    participant Collector as mgmt_metrics_collector
    participant ETS as ETS Tables
    participant API as Management API
    
    Core->>Event: notify(connection_created, Props)
    Event->>Collector: handle_event(connection_created)
    Collector->>ETS: insert(connection_stats, {Pid, Props})
    
    loop 每 5 秒
        Collector->>Core: info(Pid, [recv_oct, send_oct])
        Core-->>Collector: Stats
        Collector->>ETS: update(connection_metrics, Stats)
    end
    
    API->>ETS: lookup(connection_stats)
    ETS-->>API: Connections
    API->>ETS: lookup(connection_metrics)
    ETS-->>API: Metrics
    API-->>API: merge(Connections, Metrics)

A4.2 指标垃圾回收(GC)

GC 策略

  • 连接/Channel 关闭后,指标保留 60 秒(用于展示最近关闭的连接)
  • 队列删除后,指标保留 60 秒
  • 旧数据定期清理,避免内存泄漏

GC 实现

handle_info(gc_metrics, State) ->
    Now = erlang:system_time(millisecond),
    RetentionPeriod = 60000, % 60 秒
    
    %% 清理过期连接指标
    ets:foldl(
        fun({Pid, #{closed_at := ClosedAt}} = Entry, Acc) ->
            case Now - ClosedAt > RetentionPeriod of
                true ->
                    ets:delete(connection_stats, Pid),
                    ets:delete(connection_metrics, Pid);
                false ->
                    ok
            end,
            Acc
        end, ok, connection_stats),
    
    schedule_next_gc(),
    {noreply, State}.

A5. 配置与可观测

A5.1 关键配置项

配置项 默认值 说明
management.listener.port 15672 HTTP API 端口
management.listener.ssl false 启用 HTTPS
management.load_definitions - 启动时自动加载配置文件路径
management.rates_mode basic 统计速率模式(basic/detailed/none)
management.sample_retention_policies - 指标保留策略(全局/详细/基础)
management.http_log_dir - HTTP 访问日志目录

A5.2 常用 API 端点

端点 方法 功能
/api/overview GET 集群概览
/api/nodes GET 节点列表
/api/connections GET 连接列表
/api/connections/:name DELETE 关闭连接
/api/channels GET Channel 列表
/api/queues GET 队列列表
/api/queues/:vhost/:queue PUT 声明队列
/api/queues/:vhost/:queue DELETE 删除队列
/api/queues/:vhost/:queue/get POST 从队列获取消息(测试用)
/api/queues/:vhost/:queue/contents DELETE 清空队列
/api/exchanges/:vhost/:exchange/publish POST 发布消息到交换器(测试用)
/api/definitions GET 导出配置
/api/definitions POST 导入配置

本文档涵盖了 RabbitMQ Management 插件的核心架构、HTTP API、指标收集机制和配置导入导出功能。