RabbitMQ-15-插件模块汇总
本文档汇总 RabbitMQ 的主要插件模块,包括协议扩展、认证授权、联邦/复制、监控和其他功能插件。
A1. 协议插件模块
A1.1 MQTT 插件(rabbitmq_mqtt)
职责:提供 MQTT 3.1.1 和 MQTT 5.0 协议支持,将 MQTT 客户端的发布/订阅映射到 AMQP 队列和交换器。
核心映射规则:
| MQTT 概念 | RabbitMQ 映射 |
|---|---|
| Topic(主题) | Exchange + Routing Key |
| Subscription(订阅) | 独占队列 + 绑定 |
| QoS 0 | no_ack=true,不持久化 |
| QoS 1 | 至少一次投递,需要 ack |
| QoS 2 | 不支持(降级到 QoS 1) |
| Retained Message | 存储在特殊队列 |
| Clean Session=false | 持久队列 + 订阅 |
关键模块:
rabbit_mqtt_reader:处理 MQTT 连接、CONNECT/PUBLISH/SUBSCRIBE 帧rabbit_mqtt_processor:协议转换层,将 MQTT 帧转换为 AMQP 操作rabbit_mqtt_retained_msg_store:保留消息存储(ETS 表)
配置示例:
[
{rabbitmq_mqtt, [
{default_user, <<"mqtt_user">>},
{default_pass, <<"password">>},
{allow_anonymous, false},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
{subscription_ttl, 1800000}, %% 订阅 TTL:30 分钟
{tcp_listeners, [1883]},
{ssl_listeners, [8883]}
]}
].
A1.2 STOMP 插件(rabbitmq_stomp)
职责:提供 STOMP 1.0/1.1/1.2 协议支持,用于文本协议客户端(如 Web 浏览器)。
核心映射规则:
| STOMP 概念 | RabbitMQ 映射 |
|---|---|
Destination /queue/foo |
队列名 foo |
Destination /topic/bar |
Exchange amq.topic + Routing Key bar |
Destination /exchange/X/K |
Exchange X + Routing Key K |
| SEND | basic.publish |
| SUBSCRIBE | basic.consume |
| ACK/NACK | basic.ack/basic.nack |
关键模块:
rabbit_stomp_reader:处理 STOMP 连接和帧rabbit_stomp_processor:协议转换rabbit_stomp_util:工具函数(帧解析/编码)
配置示例:
[
{rabbitmq_stomp, [
{default_user, [{login, <<"guest">>}, {passcode, <<"guest">>}]},
{tcp_listeners, [61613]},
{ssl_listeners, [61614]},
{implicit_connect, true}
]}
].
A1.3 Stream 协议插件(rabbitmq_stream)
职责:提供高性能流协议(RabbitMQ Stream Protocol),用于大吞吐、低延迟场景。
特性:
- 二进制协议:基于帧的紧凑二进制格式
- 客户端直连:直接连接到流队列的 Leader 节点
- 批量操作:单次请求可发布/消费多条消息
- 偏移量跟踪:支持客户端管理和服务端管理两种模式
核心模块:
rabbit_stream_reader:处理 TCP 连接和流协议帧rabbit_stream_coordinator:协调客户端连接到正确的 Leaderrabbit_stream_manager:管理流队列元数据
配置示例:
[
{rabbitmq_stream, [
{tcp_listeners, [5552]},
{advertised_host, <<"rabbitmq.example.com">>},
{advertised_port, 5552},
{initial_cluster_size, 3}
]}
].
A2. 认证与授权插件
A2.1 内部认证后端(rabbitmq_auth_backend_internal)
职责:RabbitMQ 的默认认证后端,用户和权限信息存储在 Mnesia 中。
数据结构:
-record(internal_user, {
username :: binary(),
password_hash :: binary(), %% SHA-256 哈希
tags :: [atom()], %% administrator, monitoring, management, policymaker
hashing_algorithm :: atom() %% sha256 | sha512
}).
-record(user_permission, {
user_vhost :: {binary(), binary()}, %% {Username, VHost}
permission :: #permission{
configure :: binary(), %% 配置权限正则(如 ".*")
write :: binary(), %% 写权限正则
read :: binary() %% 读权限正则
}
}).
核心 API:
add_user(Username, Password, ActingUser)- 添加用户delete_user(Username, ActingUser)- 删除用户change_password(Username, Password, ActingUser)- 修改密码set_tags(Username, Tags, ActingUser)- 设置用户标签set_permissions(Username, VHost, Configure, Write, Read, ActingUser)- 设置权限
A2.2 LDAP 认证后端(rabbitmq_auth_backend_ldap)
职责:集成企业 LDAP 目录服务进行用户认证和授权。
配置示例:
[
{rabbitmq_auth_backend_ldap, [
{servers, ["ldap.example.com"]},
{user_dn_pattern, "cn=${username},ou=users,dc=example,dc=com"},
{dn_lookup_attribute, "uid"},
{dn_lookup_base, "ou=users,dc=example,dc=com"},
{other_bind, as_user},
{tag_queries, [{administrator, {constant, true}}]},
{vhost_access_query, {constant, true}},
{resource_access_query, {constant, true}}
]}
].
授权流程:
- 客户端提供 Username/Password
- 插件向 LDAP 服务器发送 BIND 请求验证凭据
- 查询用户所属组(Group)
- 根据组映射规则授予 VHost 和资源权限
A2.3 OAuth 2.0 认证后端(rabbitmq_auth_backend_oauth2)
职责:使用 OAuth 2.0 JWT Token 进行认证,支持 Keycloak、UAA、Azure AD 等 IdP。
JWT Payload 示例:
{
"sub": "user@example.com",
"aud": ["rabbitmq"],
"scope": ["rabbitmq.read:*/* rabbitmq.write:*/* rabbitmq.configure:*/*"],
"exp": 1704067200
}
配置示例:
[
{rabbitmq_auth_backend_oauth2, [
{resource_server_id, <<"rabbitmq">>},
{jwks_url, <<"https://keycloak.example.com/auth/realms/master/protocol/openid-connect/certs">>},
{issuer, <<"https://keycloak.example.com/auth/realms/master">>},
{scope_prefix, <<"rabbitmq.">>},
{additional_scopes_key, <<"extra_scopes">>}
]}
].
权限映射:
Scope: rabbitmq.read:vhost1/*
→ VHost: vhost1, Read: .*
Scope: rabbitmq.configure:vhost1/queue_prefix_*
→ VHost: vhost1, Configure: ^queue_prefix_.*
A3. 联邦与复制插件
A3.1 Federation 插件(rabbitmq_federation)
职责:跨集群或数据中心的松耦合消息复制,支持 Exchange Federation 和 Queue Federation。
Exchange Federation 工作原理:
flowchart LR
subgraph Upstream[上游集群]
UpEx[Exchange: orders]
end
subgraph Downstream[下游集群]
DownEx[Exchange: orders]
Link[Federation Link]
InternalQ[内部队列]
end
UpEx -->|消息复制| Link
Link --> InternalQ
InternalQ -->|路由| DownEx
配置示例:
# 定义上游
rabbitmqctl set_parameter federation-upstream my-upstream \
'{"uri":"amqp://upstream.example.com","ack-mode":"on-confirm"}'
# 应用策略
rabbitmqctl set_policy federate-orders \
"^orders\$" '{"federation-upstream-set":"all"}' \
--apply-to exchanges
A3.2 Shovel 插件(rabbitmq_shovel)
职责:点对点的消息复制,可以在不同协议、不同集群之间传输消息。
Shovel 类型:
| 类型 | 持久化 | 配置方式 | 重启后行为 |
|---|---|---|---|
| Static Shovel | 是 | 配置文件 | 自动恢复 |
| Dynamic Shovel | 是 | HTTP API | 自动恢复 |
配置示例(Dynamic Shovel):
{
"src-uri": "amqp://source.example.com",
"src-queue": "source_queue",
"dest-uri": "amqp://dest.example.com",
"dest-exchange": "dest_exchange",
"dest-exchange-key": "routing.key",
"ack-mode": "on-confirm",
"delete-after": "never"
}
A4. 监控与可观测插件
A4.1 Prometheus 插件(rabbitmq_prometheus)
职责:导出 Prometheus 格式的指标,用于 Grafana 可视化。
核心指标:
# HELP rabbitmq_queue_messages Number of messages in the queue
# TYPE rabbitmq_queue_messages gauge
rabbitmq_queue_messages{vhost="/",queue="my_queue"} 1234
# HELP rabbitmq_connections Total number of connections
# TYPE rabbitmq_connections gauge
rabbitmq_connections 50
# HELP rabbitmq_channel_messages_published_total Messages published total
# TYPE rabbitmq_channel_messages_published_total counter
rabbitmq_channel_messages_published_total{vhost="/"} 100000
端点:
/metrics:所有指标(聚合)/metrics/per-object:每个对象的详细指标
A4.2 Event Exchange 插件(rabbitmq_event_exchange)
职责:将 RabbitMQ 内部事件(连接创建、队列删除等)发布到特殊的 Exchange。
事件类型:
connection.created/connection.closedchannel.created/channel.closedqueue.created/queue.deletedexchange.created/exchange.deletedbinding.created/binding.deletedconsumer.created/consumer.deleted
使用示例:
# 声明队列并绑定到事件交换器
rabbitmqadmin declare queue name=audit_log durable=true
rabbitmqadmin declare binding source="amq.rabbitmq.event" \
destination=audit_log routing_key="#"
# 消费事件
rabbitmq-consumer -q audit_log
A5. 其他功能插件
A5.1 Consistent Hash Exchange(rabbitmq_consistent_hash_exchange)
职责:基于一致性哈希的负载均衡,适用于分片场景。
工作原理:
- 根据 Routing Key 或消息头计算哈希值
- 将消息路由到哈希环上最近的队列
- 支持虚拟节点(Virtual Nodes)提高均匀性
配置示例:
# 声明一致性哈希交换器
rabbitmqadmin declare exchange name=hash_ex type=x-consistent-hash
# 绑定队列(权重为 10)
rabbitmqadmin declare binding source=hash_ex destination=queue1 routing_key="10"
rabbitmqadmin declare binding source=hash_ex destination=queue2 routing_key="10"
A5.2 Recent History Exchange(rabbitmq_recent_history_exchange)
职责:为新消费者重放最近 N 条消息(类似 Kafka 的 log compaction)。
特性:
- 缓存最近 20 条消息(默认)
- 新消费者订阅时自动接收历史消息
- 适用于状态同步场景
配置示例:
# 声明历史交换器
rabbitmqadmin declare exchange name=history_ex \
type=x-recent-history arguments='{"x-recent-history-length":50}'
A5.3 Delayed Message Exchange(rabbitmq_delayed_message_exchange)
职责:实现消息延迟投递(基于插件的 TTL + DLX 实现)。
使用示例:
# 声明延迟交换器
rabbitmqadmin declare exchange name=delayed_ex type=x-delayed-message \
arguments='{"x-delayed-type":"topic"}'
# 发布延迟消息(延迟 10 秒)
rabbitmqadmin publish exchange=delayed_ex routing_key="test" \
properties='{"headers":{"x-delay":10000}}' payload="Delayed message"
A5.4 Peer Discovery 插件
职责:自动发现集群节点,支持多种服务发现机制。
| 插件 | 服务发现方式 | 使用场景 |
|---|---|---|
rabbitmq_peer_discovery_k8s |
Kubernetes API | K8s 环境 |
rabbitmq_peer_discovery_consul |
Consul KV Store | Consul 环境 |
rabbitmq_peer_discovery_etcd |
etcd API | etcd 环境 |
rabbitmq_peer_discovery_aws |
AWS EC2 Tags | AWS 环境 |
Kubernetes 配置示例:
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
data:
rabbitmq.conf: |
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s
cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
cluster_formation.k8s.address_type = hostname
cluster_formation.k8s.service_name = rabbitmq-headless
A6. 插件开发最佳实践
A6.1 插件结构
标准 RabbitMQ 插件目录结构:
my_plugin/
├── Makefile
├── src/
│ ├── my_plugin.app.src # OTP 应用配置
│ ├── my_plugin_app.erl # Application behavior
│ ├── my_plugin_sup.erl # Supervisor
│ └── my_plugin_worker.erl # 核心逻辑
├── include/
│ └── my_plugin.hrl
├── test/
│ └── my_plugin_SUITE.erl # Common Test 测试套件
└── README.md
A6.2 Boot Step 注册
-rabbit_boot_step({my_plugin_worker,
[{description, "My Plugin Worker"},
{mfa, {my_plugin_worker, start_link, []}},
{requires, rabbit_registry},
{enables, external_infrastructure}]}).
A6.3 Custom Exchange Type
-module(rabbit_exchange_type_custom).
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/3, validate/1, validate_binding/2,
create/2, delete/2, policy_changed/2, add_binding/3, remove_bindings/3,
assert_args_equivalence/2]).
route(#exchange{name = Name}, Delivery, Opts) ->
%% 自定义路由逻辑
RoutingKeys = extract_routing_keys(Delivery),
rabbit_router:match_routing_key(Name, RoutingKeys).
本文档汇总了 RabbitMQ 的主要插件模块,涵盖协议扩展、认证授权、联邦复制、监控和其他功能插件,为用户理解和扩展 RabbitMQ 提供全面参考。