RabbitMQ-15-插件模块汇总

本文档汇总 RabbitMQ 的主要插件模块,包括协议扩展、认证授权、联邦/复制、监控和其他功能插件。


A1. 协议插件模块

A1.1 MQTT 插件(rabbitmq_mqtt

职责:提供 MQTT 3.1.1 和 MQTT 5.0 协议支持,将 MQTT 客户端的发布/订阅映射到 AMQP 队列和交换器。

核心映射规则

MQTT 概念 RabbitMQ 映射
Topic(主题) Exchange + Routing Key
Subscription(订阅) 独占队列 + 绑定
QoS 0 no_ack=true,不持久化
QoS 1 至少一次投递,需要 ack
QoS 2 不支持(降级到 QoS 1)
Retained Message 存储在特殊队列
Clean Session=false 持久队列 + 订阅

关键模块

  • rabbit_mqtt_reader:处理 MQTT 连接、CONNECT/PUBLISH/SUBSCRIBE 帧
  • rabbit_mqtt_processor:协议转换层,将 MQTT 帧转换为 AMQP 操作
  • rabbit_mqtt_retained_msg_store:保留消息存储(ETS 表)

配置示例

[
  {rabbitmq_mqtt, [
    {default_user, <<"mqtt_user">>},
    {default_pass, <<"password">>},
    {allow_anonymous, false},
    {vhost, <<"/">>},
    {exchange, <<"amq.topic">>},
    {subscription_ttl, 1800000},  %% 订阅 TTL:30 分钟
    {tcp_listeners, [1883]},
    {ssl_listeners, [8883]}
  ]}
].

A1.2 STOMP 插件(rabbitmq_stomp

职责:提供 STOMP 1.0/1.1/1.2 协议支持,用于文本协议客户端(如 Web 浏览器)。

核心映射规则

STOMP 概念 RabbitMQ 映射
Destination /queue/foo 队列名 foo
Destination /topic/bar Exchange amq.topic + Routing Key bar
Destination /exchange/X/K Exchange X + Routing Key K
SEND basic.publish
SUBSCRIBE basic.consume
ACK/NACK basic.ack/basic.nack

关键模块

  • rabbit_stomp_reader:处理 STOMP 连接和帧
  • rabbit_stomp_processor:协议转换
  • rabbit_stomp_util:工具函数(帧解析/编码)

配置示例

[
  {rabbitmq_stomp, [
    {default_user, [{login, <<"guest">>}, {passcode, <<"guest">>}]},
    {tcp_listeners, [61613]},
    {ssl_listeners, [61614]},
    {implicit_connect, true}
  ]}
].

A1.3 Stream 协议插件(rabbitmq_stream

职责:提供高性能流协议(RabbitMQ Stream Protocol),用于大吞吐、低延迟场景。

特性

  • 二进制协议:基于帧的紧凑二进制格式
  • 客户端直连:直接连接到流队列的 Leader 节点
  • 批量操作:单次请求可发布/消费多条消息
  • 偏移量跟踪:支持客户端管理和服务端管理两种模式

核心模块

  • rabbit_stream_reader:处理 TCP 连接和流协议帧
  • rabbit_stream_coordinator:协调客户端连接到正确的 Leader
  • rabbit_stream_manager:管理流队列元数据

配置示例

[
  {rabbitmq_stream, [
    {tcp_listeners, [5552]},
    {advertised_host, <<"rabbitmq.example.com">>},
    {advertised_port, 5552},
    {initial_cluster_size, 3}
  ]}
].

A2. 认证与授权插件

A2.1 内部认证后端(rabbitmq_auth_backend_internal

职责:RabbitMQ 的默认认证后端,用户和权限信息存储在 Mnesia 中。

数据结构

-record(internal_user, {
    username :: binary(),
    password_hash :: binary(),  %% SHA-256 哈希
    tags :: [atom()],            %% administrator, monitoring, management, policymaker
    hashing_algorithm :: atom()  %% sha256 | sha512
}).

-record(user_permission, {
    user_vhost :: {binary(), binary()},  %% {Username, VHost}
    permission :: #permission{
        configure :: binary(),  %% 配置权限正则(如 ".*")
        write :: binary(),      %% 写权限正则
        read :: binary()        %% 读权限正则
    }
}).

核心 API

  • add_user(Username, Password, ActingUser) - 添加用户
  • delete_user(Username, ActingUser) - 删除用户
  • change_password(Username, Password, ActingUser) - 修改密码
  • set_tags(Username, Tags, ActingUser) - 设置用户标签
  • set_permissions(Username, VHost, Configure, Write, Read, ActingUser) - 设置权限

A2.2 LDAP 认证后端(rabbitmq_auth_backend_ldap

职责:集成企业 LDAP 目录服务进行用户认证和授权。

配置示例

[
  {rabbitmq_auth_backend_ldap, [
    {servers, ["ldap.example.com"]},
    {user_dn_pattern, "cn=${username},ou=users,dc=example,dc=com"},
    {dn_lookup_attribute, "uid"},
    {dn_lookup_base, "ou=users,dc=example,dc=com"},
    {other_bind, as_user},
    {tag_queries, [{administrator, {constant, true}}]},
    {vhost_access_query, {constant, true}},
    {resource_access_query, {constant, true}}
  ]}
].

授权流程

  1. 客户端提供 Username/Password
  2. 插件向 LDAP 服务器发送 BIND 请求验证凭据
  3. 查询用户所属组(Group)
  4. 根据组映射规则授予 VHost 和资源权限

A2.3 OAuth 2.0 认证后端(rabbitmq_auth_backend_oauth2

职责:使用 OAuth 2.0 JWT Token 进行认证,支持 Keycloak、UAA、Azure AD 等 IdP。

JWT Payload 示例

{
  "sub": "user@example.com",
  "aud": ["rabbitmq"],
  "scope": ["rabbitmq.read:*/* rabbitmq.write:*/* rabbitmq.configure:*/*"],
  "exp": 1704067200
}

配置示例

[
  {rabbitmq_auth_backend_oauth2, [
    {resource_server_id, <<"rabbitmq">>},
    {jwks_url, <<"https://keycloak.example.com/auth/realms/master/protocol/openid-connect/certs">>},
    {issuer, <<"https://keycloak.example.com/auth/realms/master">>},
    {scope_prefix, <<"rabbitmq.">>},
    {additional_scopes_key, <<"extra_scopes">>}
  ]}
].

权限映射

Scope: rabbitmq.read:vhost1/*
→ VHost: vhost1, Read: .*

Scope: rabbitmq.configure:vhost1/queue_prefix_*
→ VHost: vhost1, Configure: ^queue_prefix_.*

A3. 联邦与复制插件

A3.1 Federation 插件(rabbitmq_federation

职责:跨集群或数据中心的松耦合消息复制,支持 Exchange Federation 和 Queue Federation。

Exchange Federation 工作原理

flowchart LR
    subgraph Upstream[上游集群]
        UpEx[Exchange: orders]
    end
    
    subgraph Downstream[下游集群]
        DownEx[Exchange: orders]
        Link[Federation Link]
        InternalQ[内部队列]
    end
    
    UpEx -->|消息复制| Link
    Link --> InternalQ
    InternalQ -->|路由| DownEx

配置示例

# 定义上游
rabbitmqctl set_parameter federation-upstream my-upstream \
  '{"uri":"amqp://upstream.example.com","ack-mode":"on-confirm"}'

# 应用策略
rabbitmqctl set_policy federate-orders \
  "^orders\$" '{"federation-upstream-set":"all"}' \
  --apply-to exchanges

A3.2 Shovel 插件(rabbitmq_shovel

职责:点对点的消息复制,可以在不同协议、不同集群之间传输消息。

Shovel 类型

类型 持久化 配置方式 重启后行为
Static Shovel 配置文件 自动恢复
Dynamic Shovel HTTP API 自动恢复

配置示例(Dynamic Shovel):

{
  "src-uri": "amqp://source.example.com",
  "src-queue": "source_queue",
  "dest-uri": "amqp://dest.example.com",
  "dest-exchange": "dest_exchange",
  "dest-exchange-key": "routing.key",
  "ack-mode": "on-confirm",
  "delete-after": "never"
}

A4. 监控与可观测插件

A4.1 Prometheus 插件(rabbitmq_prometheus

职责:导出 Prometheus 格式的指标,用于 Grafana 可视化。

核心指标

# HELP rabbitmq_queue_messages Number of messages in the queue
# TYPE rabbitmq_queue_messages gauge
rabbitmq_queue_messages{vhost="/",queue="my_queue"} 1234

# HELP rabbitmq_connections Total number of connections
# TYPE rabbitmq_connections gauge
rabbitmq_connections 50

# HELP rabbitmq_channel_messages_published_total Messages published total
# TYPE rabbitmq_channel_messages_published_total counter
rabbitmq_channel_messages_published_total{vhost="/"} 100000

端点

  • /metrics:所有指标(聚合)
  • /metrics/per-object:每个对象的详细指标

A4.2 Event Exchange 插件(rabbitmq_event_exchange

职责:将 RabbitMQ 内部事件(连接创建、队列删除等)发布到特殊的 Exchange。

事件类型

  • connection.created / connection.closed
  • channel.created / channel.closed
  • queue.created / queue.deleted
  • exchange.created / exchange.deleted
  • binding.created / binding.deleted
  • consumer.created / consumer.deleted

使用示例

# 声明队列并绑定到事件交换器
rabbitmqadmin declare queue name=audit_log durable=true
rabbitmqadmin declare binding source="amq.rabbitmq.event" \
  destination=audit_log routing_key="#"

# 消费事件
rabbitmq-consumer -q audit_log

A5. 其他功能插件

A5.1 Consistent Hash Exchange(rabbitmq_consistent_hash_exchange

职责:基于一致性哈希的负载均衡,适用于分片场景。

工作原理

  • 根据 Routing Key 或消息头计算哈希值
  • 将消息路由到哈希环上最近的队列
  • 支持虚拟节点(Virtual Nodes)提高均匀性

配置示例

# 声明一致性哈希交换器
rabbitmqadmin declare exchange name=hash_ex type=x-consistent-hash

# 绑定队列(权重为 10)
rabbitmqadmin declare binding source=hash_ex destination=queue1 routing_key="10"
rabbitmqadmin declare binding source=hash_ex destination=queue2 routing_key="10"

A5.2 Recent History Exchange(rabbitmq_recent_history_exchange

职责:为新消费者重放最近 N 条消息(类似 Kafka 的 log compaction)。

特性

  • 缓存最近 20 条消息(默认)
  • 新消费者订阅时自动接收历史消息
  • 适用于状态同步场景

配置示例

# 声明历史交换器
rabbitmqadmin declare exchange name=history_ex \
  type=x-recent-history arguments='{"x-recent-history-length":50}'

A5.3 Delayed Message Exchange(rabbitmq_delayed_message_exchange

职责:实现消息延迟投递(基于插件的 TTL + DLX 实现)。

使用示例

# 声明延迟交换器
rabbitmqadmin declare exchange name=delayed_ex type=x-delayed-message \
  arguments='{"x-delayed-type":"topic"}'

# 发布延迟消息(延迟 10 秒)
rabbitmqadmin publish exchange=delayed_ex routing_key="test" \
  properties='{"headers":{"x-delay":10000}}' payload="Delayed message"

A5.4 Peer Discovery 插件

职责:自动发现集群节点,支持多种服务发现机制。

插件 服务发现方式 使用场景
rabbitmq_peer_discovery_k8s Kubernetes API K8s 环境
rabbitmq_peer_discovery_consul Consul KV Store Consul 环境
rabbitmq_peer_discovery_etcd etcd API etcd 环境
rabbitmq_peer_discovery_aws AWS EC2 Tags AWS 环境

Kubernetes 配置示例

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  rabbitmq.conf: |
    cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s
    cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
    cluster_formation.k8s.address_type = hostname
    cluster_formation.k8s.service_name = rabbitmq-headless

A6. 插件开发最佳实践

A6.1 插件结构

标准 RabbitMQ 插件目录结构:

my_plugin/
├── Makefile
├── src/
│   ├── my_plugin.app.src        # OTP 应用配置
│   ├── my_plugin_app.erl        # Application behavior
│   ├── my_plugin_sup.erl        # Supervisor
│   └── my_plugin_worker.erl     # 核心逻辑
├── include/
│   └── my_plugin.hrl
├── test/
│   └── my_plugin_SUITE.erl      # Common Test 测试套件
└── README.md

A6.2 Boot Step 注册

-rabbit_boot_step({my_plugin_worker,
    [{description, "My Plugin Worker"},
     {mfa,         {my_plugin_worker, start_link, []}},
     {requires,    rabbit_registry},
     {enables,     external_infrastructure}]}).

A6.3 Custom Exchange Type

-module(rabbit_exchange_type_custom).
-behaviour(rabbit_exchange_type).

-export([description/0, serialise_events/0, route/3, validate/1, validate_binding/2,
         create/2, delete/2, policy_changed/2, add_binding/3, remove_bindings/3,
         assert_args_equivalence/2]).

route(#exchange{name = Name}, Delivery, Opts) ->
    %% 自定义路由逻辑
    RoutingKeys = extract_routing_keys(Delivery),
    rabbit_router:match_routing_key(Name, RoutingKeys).

本文档汇总了 RabbitMQ 的主要插件模块,涵盖协议扩展、认证授权、联邦复制、监控和其他功能插件,为用户理解和扩展 RabbitMQ 提供全面参考。