RabbitMQ-03-交换器与路由模块
模块概览
本文档涵盖消息路由的核心模块:
- rabbit_exchange:交换器管理
- rabbit_router:路由协调
- rabbit_exchange_type_*:各类型交换器实现
- rabbit_binding:绑定管理
职责与边界
rabbit_exchange 职责
-
交换器生命周期管理:
- 声明(declare)、删除(delete)
- 恢复(recover)持久化交换器
- 参数等价性检查
-
路由分发:
- 调用类型模块执行路由算法
- 处理备用交换器(alternate-exchange)
- 支持交换器装饰器(decorators)
-
元数据管理:
- 交换器信息查询
- Policy 应用
- Scratch 空间(插件存储临时数据)
-
事件通知:
- 发射交换器创建/删除事件
- 序列化事件控制
rabbit_router 职责
-
绑定查询:
- 根据 routing key 匹配绑定
- 支持自定义匹配函数
-
路由协调:
- 简化路由逻辑接口
- 委托给
rabbit_db_binding
架构图
flowchart TB
subgraph Channel
Publisher[消息发布者<br/>basic.publish]
end
subgraph Exchange 管理层
ExchangeAPI[rabbit_exchange<br/>交换器 API]
ExchangeDec[exchange_decorator<br/>装饰器]
end
subgraph 类型实现层
Direct[rabbit_exchange_type_direct<br/>直接交换器]
Topic[rabbit_exchange_type_topic<br/>主题交换器]
Fanout[rabbit_exchange_type_fanout<br/>扇出交换器]
Headers[rabbit_exchange_type_headers<br/>头交换器]
Custom[自定义类型...]
end
subgraph 路由层
Router[rabbit_router<br/>路由器]
BindingDB[rabbit_db_binding<br/>绑定数据库]
end
subgraph 队列层
QueueAPI[rabbit_amqqueue<br/>队列 API]
Queue1[Queue 1]
Queue2[Queue 2]
QueueN[Queue N]
end
Publisher --> ExchangeAPI
ExchangeAPI --> ExchangeDec
ExchangeAPI --> Direct
ExchangeAPI --> Topic
ExchangeAPI --> Fanout
ExchangeAPI --> Headers
ExchangeAPI --> Custom
Direct --> Router
Topic --> Router
Fanout --> Router
Headers --> Router
Router --> BindingDB
BindingDB --> QueueAPI
QueueAPI --> Queue1
QueueAPI --> Queue2
QueueAPI --> QueueN
数据结构
Exchange 记录
-record(exchange, {
name, %% #resource{virtual_host, exchange, name}
type, %% 类型:direct | topic | fanout | headers | 自定义
durable, %% 是否持久化
auto_delete, %% 无绑定时自动删除
internal, %% 内部交换器(不可直接发布)
arguments, %% 声明参数(如 alternate-exchange)
scratches, %% 插件临时数据存储
policy, %% 应用的 policy
operator_policy,%% 操作员 policy
decorators, %% 装饰器列表
options %% 选项(如 user)
}).
-type name() :: rabbit_types:exchange_name(). %% #resource{}
-type type() :: rabbit_types:exchange_type(). %% binary()
Binding 记录
-record(binding, {
source, %% #resource{} 源交换器
key, %% binary() 绑定键(routing key)
destination, %% #resource{} 目标(队列或交换器)
args %% [{Key, Type, Value}] 绑定参数
}).
rabbit_exchange 模块详解
核心 API
API 1: declare/7 - 声明交换器
基本信息
- 名称:
declare/7 - 幂等性:是(参数相同时返回已存在交换器)
- 权限要求:
configure权限
函数签名
-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) ->
Ret when
Name :: name(),
Type :: type(),
Durable :: boolean(),
AutoDelete :: boolean(),
Internal :: boolean(),
Args :: rabbit_framing:amqp_table(),
Username :: rabbit_types:username(),
Ret :: {ok, rabbit_types:exchange()} |
{error, timeout} |
rabbit_types:channel_exit().
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
Name |
#resource{} |
交换器名称(含 vhost) |
Type |
binary() |
类型:<<"direct">> / <<"topic">> / <<"fanout">> / <<"headers">> |
Durable |
boolean() |
是否持久化到磁盘 |
AutoDelete |
boolean() |
无绑定时自动删除 |
Internal |
boolean() |
是否内部交换器(不可直接发布) |
Args |
[{binary(), Type, Value}] |
参数表(如 alternate-exchange) |
Username |
binary() |
执行操作的用户名 |
响应结构
- 成功:
{ok, #exchange{}} - 失败:
{error, timeout}:数据库超时- 抛出
#amqp_error{}:参数错误、权限不足
核心代码
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
%% 1. 检查交换器数量限制
ok = check_exchange_limits(XName),
%% 2. 构造交换器记录
X = rabbit_exchange_decorator:set(
rabbit_policy:set(
#exchange{name = XName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args,
options = #{user => Username}})),
%% 3. 类型模块验证
XT = type_to_module(Type),
ok = XT:validate(X),
%% 4. 检查是否在删除中(避免竞态条件)
case rabbit_runtime_parameters:lookup(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
XName#resource.name) of
not_found ->
%% 5. 创建或获取交换器
case rabbit_db_exchange:create_or_get(X) of
{new, Exchange} ->
%% 新建:执行回调,发射事件
Serial = serial(Exchange),
ok = callback(X, create, Serial, [Exchange]),
rabbit_event:notify(exchange_created, info(Exchange)),
{ok, Exchange};
{existing, Exchange} ->
%% 已存在:返回
{ok, Exchange};
{error, timeout} = Err ->
Err
end;
_ ->
%% 交换器正在被删除,忽略声明
{ok, X}
end.
代码逐段解释
-
检查限制(第 2 行):
- 集群交换器总数上限(默认
infinity) - 配置:
cluster_exchange_limit - 允许重新声明已存在的交换器(即使达到上限)
- 集群交换器总数上限(默认
-
构造记录(第 5-13 行):
rabbit_policy:set/1:应用匹配的 policyrabbit_exchange_decorator:set/1:确定装饰器列表options字段存储用户信息(用于审计)
-
类型验证(第 15-16 行):
- 调用类型模块的
validate/1回调 - 例如:检查
x-delayed-message参数(延迟交换器插件)
- 调用类型模块的
-
竞态条件保护(第 18-22 行):
- 场景:Federation 插件删除交换器时可能与声明并发
- 解决:检查运行时参数表中的删除标记
- 如果在删除中,忽略声明(返回临时交换器记录)
-
数据库操作(第 24-34 行):
create_or_get/1:原子性创建或获取{new, Exchange}:新建成功- 调用类型模块的
create/2回调 - 发射
exchange_created事件
- 调用类型模块的
{existing, Exchange}:已存在,检查参数等价性(由调用方负责)
时序图
sequenceDiagram
autonumber
participant Channel as rabbit_channel
participant Exchange as rabbit_exchange
participant TypeMod as Exchange Type 模块
participant DB as rabbit_db_exchange
participant Registry as rabbit_registry
participant Event as rabbit_event
Channel->>+Exchange: declare(XName, Type, Durable, ...)
Exchange->>Exchange: check_exchange_limits(XName)
Exchange->>Exchange: 构造 #exchange{}<br/>应用 policy 和 decorators
Exchange->>+TypeMod: validate(Exchange)
TypeMod->>TypeMod: 检查参数<br/>(如 x-match for headers)
TypeMod-->>-Exchange: ok
Exchange->>Exchange: 检查删除标记<br/>runtime_parameters
Exchange->>+DB: create_or_get(Exchange)
alt 交换器不存在
DB->>DB: 写入 Mnesia/Khepri<br/>rabbit_durable_exchange
DB-->>Exchange: {new, Exchange}
Exchange->>Exchange: serial(Exchange)<br/>获取序列号(如需)
Exchange->>+TypeMod: create(Serial, Exchange)
TypeMod->>TypeMod: 初始化类型特定数据<br/>(如 topic trie)
TypeMod-->>-Exchange: ok
Exchange->>+Event: notify(exchange_created, Info)
Event->>Event: 分发给订阅者<br/>(如 management 插件)
Event-->>-Exchange: ok
Exchange-->>Channel: {ok, Exchange}
else 交换器已存在
DB->>DB: 检查参数等价性<br/>(由调用方处理)
DB-->>Exchange: {existing, Exchange}
Exchange-->>Channel: {ok, Exchange}
end
DB-->>-Exchange: result
Exchange-->>-Channel: {ok, Exchange}
API 2: route/3 - 路由消息
基本信息
- 名称:
route/3 - 调用者:
rabbit_channel(消息发布时) - 返回:目标队列列表(可能包含重复)
函数签名
-spec route(rabbit_types:exchange(), rabbit_types:message(), route_opts()) ->
route_return().
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
Exchange |
#exchange{} |
交换器记录 |
Message |
rabbit_types:message() |
消息(含 routing key 和 headers) |
Opts |
route_opts() |
选项:#{return_binding_keys => boolean()} |
响应结构
- 默认:
[QueueName]- 队列名称列表 - 启用
return_binding_keys:[{QueueName, #{binding_keys => ...}}]
核心代码
%% 默认交换器(空字符串)特殊处理
route(#exchange{name = #resource{name = <<>>,
virtual_host = VHost}},
Message, _Opts) ->
%% 默认交换器:routing key 直接映射到队列名
RKs0 = mc:routing_keys(Message),
RKs = lists:usort(RKs0),
[rabbit_misc:r(VHost, queue, RK) || RK <- RKs];
%% 普通交换器路由
route(X = #exchange{name = XName,
decorators = Decorators},
Message, Opts) ->
%% 1. 选择路由装饰器
Decs = rabbit_exchange_decorator:select(route, Decorators),
%% 2. 执行路由算法(深度优先遍历交换器图)
QNamesToBKeys = route1(Message, Decs, Opts, {[X], XName, #{}}),
%% 3. 格式化返回结果
case Opts of
#{return_binding_keys := true} ->
maps:fold(fun(QName, BindingKeys, L) ->
[{QName, #{binding_keys => BindingKeys}} | L]
end, [], QNamesToBKeys);
_ ->
maps:keys(QNamesToBKeys)
end.
%% 路由算法核心:深度优先遍历
route1(_, _, _, {[], _, QNames}) ->
%% 工作列表为空,返回累积的队列
QNames;
route1(Message, Decorators, Opts,
{[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
%% 1. 获取类型模块的路由函数
{Route, Arity} = type_to_route_fun(Type),
%% 2. 调用类型模块路由
ExchangeDests = case Arity of
2 -> Route(X, Message);
3 -> Route(X, Message, Opts)
end,
%% 3. 处理装饰器路由
DecorateDests = process_decorators(X, Decorators, Message),
%% 4. 处理备用交换器(如路由失败)
AlternateDests = process_alternate(X, ExchangeDests),
%% 5. 递归处理所有目标
route1(Message, Decorators, Opts,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
AlternateDests ++ DecorateDests ++ ExchangeDests)).
%% 处理备用交换器
process_alternate(X = #exchange{name = XName}, []) ->
%% 路由失败(无目标队列),检查备用交换器
case rabbit_policy:get_arg(
<<"alternate-exchange">>, <<"alternate-exchange">>, X) of
undefined -> [];
AName -> [rabbit_misc:r(XName, exchange, AName)]
end;
process_alternate(_X, _Results) ->
%% 路由成功,不使用备用交换器
[].
%% 处理单个路由目标
process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames}) ->
%% 目标是交换器(E2E binding)
case sets:is_element(XName, SeenXs) of
true ->
%% 已访问过,避免循环
{WorkList, SeenXs, QNames};
false ->
%% 加入工作列表,继续路由
case rabbit_exchange:lookup(XName) of
{ok, X} -> {[X | WorkList], sets:add_element(XName, SeenXs), QNames};
{error, not_found} -> {WorkList, SeenXs, QNames}
end
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
%% 目标是队列,累积到结果
{WorkList, SeenXs, maps:put(QName, true, QNames)}.
路由算法流程图
flowchart TD
Start[route/3<br/>Exchange, Message, Opts] --> IsDefault{默认交换器?}
IsDefault -->|是| DirectRoute[routing_key 直接<br/>映射到队列名]
DirectRoute --> End[返回队列列表]
IsDefault -->|否| InitWorkList[初始化工作列表<br/>{[Exchange], SeenXs=#{}, QNames=#{}}]
InitWorkList --> Route1[route1 深度优先遍历]
Route1 --> CheckWorkList{工作列表为空?}
CheckWorkList -->|是| End
CheckWorkList -->|否| PopExchange[取出第一个交换器]
PopExchange --> CallTypeModule[调用类型模块<br/>Route(Exchange, Message)]
CallTypeModule --> GetDests[获取目标列表<br/>队列 + 交换器]
GetDests --> HasDests{有目标?}
HasDests -->|否| CheckAlternate[检查备用交换器<br/>alternate-exchange]
CheckAlternate --> AddAlternate[添加备用交换器<br/>到目标列表]
HasDests -->|是| ProcessDests[处理每个目标]
AddAlternate --> ProcessDests
ProcessDests --> IsQueue{目标类型?}
IsQueue -->|队列| AddQueue[累积到 QNames]
AddQueue --> Route1
IsQueue -->|交换器| CheckSeen{已访问?}
CheckSeen -->|是| Skip[跳过(避免循环)]
CheckSeen -->|否| AddToWorkList[加入工作列表<br/>标记已访问]
AddToWorkList --> Route1
Skip --> Route1
备用交换器示例
%% 场景:消息无法路由到任何队列时,转发到备用交换器
%% 1. 声明主交换器(带备用交换器参数)
rabbit_exchange:declare(
rabbit_misc:r(<<"/">>, exchange, <<"main">>),
<<"topic">>,
true, %% durable
false, %% auto_delete
false, %% internal
[{<<"alternate-exchange">>, longstr, <<"backup">>}],
<<"user">>
).
%% 2. 声明备用交换器(通常是 fanout 类型)
rabbit_exchange:declare(
rabbit_misc:r(<<"/">>, exchange, <<"backup">>),
<<"fanout">>,
true,
false,
false,
[],
<<"user">>
).
%% 3. 绑定备用交换器到日志队列
rabbit_binding:add(
#binding{source = rabbit_misc:r(<<"/">>, exchange, <<"backup">>),
key = <<>>,
destination = rabbit_misc:r(<<"/">>, queue, <<"unrouted_log">>),
args = []},
<<"user">>
).
%% 4. 发布消息(无匹配绑定)
%% 结果:消息被路由到 backup 交换器,最终到达 unrouted_log 队列
API 3: delete/3 - 删除交换器
基本信息
- 名称:
delete/3 - 权限要求:
configure权限 - 条件:无绑定或
if_unused=false
函数签名
-spec delete(name(), boolean(), rabbit_types:username()) ->
'ok' | rabbit_types:error('not_found' | 'in_use') |
rabbit_types:connection_exit().
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
Name |
#resource{} |
交换器名称 |
IfUnused |
boolean() |
仅当无绑定时删除 |
Username |
binary() |
执行操作的用户名 |
核心代码
delete(XName, IfUnused, Username) ->
Fun = case IfUnused of
true -> fun conditional_delete/2;
false -> fun unconditional_delete/2
end,
case Fun(XName, Username) of
{deleted, X, Bs, Deletions} ->
%% 删除成功
Serial = peek_serial(X),
ok = callback(X, delete, Serial, [X, Bs]),
_ = [rabbit_amqqueue:delete_queue_association(DstQ, X)
|| {_, DstQ} <- Deletions],
rabbit_event:notify(exchange_deleted, info(X)),
ok;
{error, _} = E ->
E
end.
unconditional_delete(XName, Username) ->
%% 无条件删除:删除所有绑定
case rabbit_db_exchange:delete(XName, fun delete_bindings/2, Username) of
{deleted, X, Bs} ->
Deletions = [{{X#exchange.name, DstName}, DstName} ||
#binding{destination = DstName} <- Bs],
{deleted, X, Bs, Deletions};
{error, _} = E ->
E
end.
conditional_delete(XName, Username) ->
%% 条件删除:仅当无绑定时删除
case rabbit_db_exchange:delete(XName, fun has_no_bindings/2, Username) of
{deleted, X, []} ->
{deleted, X, [], []};
{error, in_use} = E ->
E;
{error, _} = E ->
E
end.
交换器类型实现
rabbit_exchange_type 行为
所有交换器类型必须实现 rabbit_exchange_type 行为:
-callback description() -> [proplists:property()].
-callback serialise_events() -> boolean().
-callback route(rabbit_types:exchange(), rabbit_types:message()) ->
rabbit_router:match_result().
-callback route(rabbit_types:exchange(), rabbit_types:message(), map()) ->
rabbit_router:match_result().
-callback validate(rabbit_types:exchange()) -> 'ok' | {error, term()}.
-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) ->
'ok' | {error, term()}.
-callback create(Serial, rabbit_types:exchange()) -> 'ok' when
Serial :: 'none' | pos_integer().
-callback delete(Serial, rabbit_types:exchange()) -> 'ok' when
Serial :: 'none' | pos_integer().
-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
-callback add_binding(Serial, rabbit_types:exchange(), rabbit_types:binding()) ->
'ok' when Serial :: 'none' | pos_integer().
-callback remove_bindings(Serial, rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok' when Serial :: 'none' | pos_integer().
-callback assert_args_equivalence(rabbit_types:exchange(),
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
-callback info(rabbit_types:exchange()) -> [proplists:property()].
-callback info(rabbit_types:exchange(), [atom()]) -> [proplists:property()].
Direct Exchange(直接交换器)
路由算法
规则:routing key 精确匹配 binding key。
实现:
route(#exchange{name = Name, type = Type}, Msg, _Opts) ->
Routes = mc:routing_keys(Msg),
rabbit_db_binding:match_routing_key(Name, Routes, Type =:= direct).
时间复杂度:O(1) - 哈希表查找
使用场景
- 点对点通信:每个队列绑定唯一 routing key
- 任务分发:多个 worker 队列绑定相同 routing key(负载均衡)
示例
%% 声明 direct 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"logs">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"direct">>, true, false, false, [], <<"admin">>).
%% 绑定队列(不同级别)
rabbit_binding:add(
#binding{source = Exchange,
key = <<"error">>,
destination = rabbit_misc:r(VHost, queue, <<"error_log">>),
args = []},
<<"admin">>
).
rabbit_binding:add(
#binding{source = Exchange,
key = <<"info">>,
destination = rabbit_misc:r(VHost, queue, <<"info_log">>),
args = []},
<<"admin">>
).
%% 发布消息
%% routing_key = <<"error">> → 仅到达 error_log
%% routing_key = <<"info">> → 仅到达 info_log
Topic Exchange(主题交换器)
路由算法
规则:routing key 模式匹配 binding key。
模式语法:
*:匹配一个单词(词之间用.分隔)#:匹配零个或多个单词
实现:
route(#exchange{name = XName}, Msg, Opts) ->
RKeys = mc:routing_keys(Msg),
lists:append([rabbit_db_topic_exchange:match(XName, RKey, Opts)
|| RKey <- RKeys]).
数据结构:Trie(前缀树)
时间复杂度:O(m * n)
- m:routing key 长度
- n:绑定数量(最坏情况,实际通过 trie 优化)
使用场景
- 日志分类:
logs.*.error、logs.*.info - 地理路由:
cn.beijing.*、us.*.sales - 多维过滤:
order.*.paid
示例
%% 声明 topic 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"topic_logs">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"topic">>, true, false, false, [], <<"admin">>).
%% 绑定模式
rabbit_binding:add(
#binding{source = Exchange,
key = <<"*.error">>, %% 所有 error 日志
destination = rabbit_misc:r(VHost, queue, <<"all_errors">>),
args = []},
<<"admin">>
).
rabbit_binding:add(
#binding{source = Exchange,
key = <<"auth.#">>, %% 认证模块所有日志
destination = rabbit_misc:r(VHost, queue, <<"auth_logs">>),
args = []},
<<"admin">>
).
rabbit_binding:add(
#binding{source = Exchange,
key = <<"#">>, %% 所有日志
destination = rabbit_misc:r(VHost, queue, <<"all_logs">>),
args = []},
<<"admin">>
).
%% 路由示例
%% routing_key = <<"auth.error">> → all_errors, auth_logs, all_logs
%% routing_key = <<"auth.info.login">> → auth_logs, all_logs
%% routing_key = <<"db.error">> → all_errors, all_logs
Trie 数据结构
graph TD
Root[根节点] --> Auth[auth]
Root --> DB[db]
Root --> Hash[#<br/>匹配所有]
Auth --> Star1[*]
Auth --> Info[info]
Auth --> Hash1[#]
Star1 --> Error1[error]
Info --> Login[login]
DB --> Star2[*]
Star2 --> Error2[error]
Error1 -.-> Q1[队列 all_errors]
Error2 -.-> Q1
Hash1 -.-> Q2[队列 auth_logs]
Hash -.-> Q3[队列 all_logs]
Fanout Exchange(扇出交换器)
路由算法
规则:忽略 routing key,广播到所有绑定队列。
实现:
route(#exchange{name = Name}, _Msg, _Opts) ->
rabbit_router:match_routing_key(Name, ['_']).
时间复杂度:O(n) - n 为绑定数量
使用场景
- 广播消息:系统通知
- 缓存失效:多个缓存节点
- 实时更新:多个监控面板
示例
%% 声明 fanout 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"broadcast">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"fanout">>, true, false, false, [], <<"admin">>).
%% 绑定多个队列(routing key 被忽略)
rabbit_binding:add(
#binding{source = Exchange,
key = <<>>, %% 任意值均可
destination = rabbit_misc:r(VHost, queue, <<"listener1">>),
args = []},
<<"admin">>
).
rabbit_binding:add(
#binding{source = Exchange,
key = <<>>,
destination = rabbit_misc:r(VHost, queue, <<"listener2">>),
args = []},
<<"admin">>
).
%% 发布消息
%% 所有绑定队列都会收到消息
Headers Exchange(头交换器)
路由算法
规则:根据消息头(headers)匹配,而非 routing key。
匹配模式:
x-match = all(默认):所有头必须匹配x-match = any:至少一个头匹配x-match = any-with-x:至少一个x-*头匹配x-match = all-with-x:所有x-*头匹配
实现:
route(#exchange{name = Name}, Msg, _Opts) ->
Headers = mc:routing_headers(Msg, [x_headers]),
rabbit_router:match_bindings(
Name, fun(#binding{args = Args}) ->
case rabbit_misc:table_lookup(Args, <<"x-match">>) of
{longstr, <<"any">>} ->
match_any(Args, Headers, fun match/2);
{longstr, <<"all">>} ->
match_all(Args, Headers, fun match/2);
_ ->
match_all(Args, Headers, fun match/2)
end
end).
%% 匹配函数
match({<<"x-match">>, _, _}, _M) -> skip; %% 跳过 x-match
match({<<"x-", _/binary>>, _, _}, _M) -> skip; %% 跳过 x-* 头
match({K, void, _}, M) -> maps:is_key(K, M); %% 检查键存在
match({K, _, V}, M) -> maps:get(K, M, undefined) =:= V. %% 精确匹配
时间复杂度:O(n * m)
- n:绑定数量
- m:每个绑定的头数量
使用场景
- 多维度过滤:根据多个属性路由
- 内容路由:基于消息内容(非 routing key)
- 复杂规则:组合多个条件
示例
%% 声明 headers 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"headers_exchange">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"headers">>, true, false, false, [], <<"admin">>).
%% 绑定:匹配所有头(x-match = all)
rabbit_binding:add(
#binding{source = Exchange,
key = <<>>, %% headers 交换器忽略 routing key
destination = rabbit_misc:r(VHost, queue, <<"critical_queue">>),
args = [{<<"x-match">>, longstr, <<"all">>},
{<<"level">>, longstr, <<"critical">>},
{<<"module">>, longstr, <<"auth">>}]},
<<"admin">>
).
%% 绑定:匹配任意头(x-match = any)
rabbit_binding:add(
#binding{source = Exchange,
key = <<>>,
destination = rabbit_misc:r(VHost, queue, <<"any_error_queue">>),
args = [{<<"x-match">>, longstr, <<"any">>},
{<<"level">>, longstr, <<"error">>},
{<<"level">>, longstr, <<"critical">>}]},
<<"admin">>
).
%% 发布消息
%% Headers = #{<<"level">> => <<"critical">>, <<"module">> => <<"auth">>}
%% → 路由到 critical_queue 和 any_error_queue
%% Headers = #{<<"level">> => <<"error">>, <<"module">> => <<"db">>}
%% → 仅路由到 any_error_queue
rabbit_binding 模块
职责
-
绑定管理:
- 添加绑定(
add/2) - 删除绑定(
remove/2) - 列表查询(
list/1)
- 添加绑定(
-
数据库操作:
- 委托给
rabbit_db_binding - 维护绑定索引(源→目标、目标→源)
- 委托给
核心 API
add/2 - 添加绑定
-spec add(rabbit_types:binding(), rabbit_types:username()) ->
'ok' | rabbit_types:error('source_not_found' | 'destination_not_found') |
rabbit_types:connection_exit().
add(Binding = #binding{source = SrcName,
destination = DstName,
args = Args}, Username) ->
%% 1. 验证源交换器存在
case rabbit_exchange:lookup(SrcName) of
{ok, Src} ->
%% 2. 验证目标存在(队列或交换器)
case lookup_destination(DstName) of
{ok, Dst} ->
%% 3. 验证绑定参数
ok = rabbit_exchange:validate_binding(Src, Binding),
%% 4. 添加绑定到数据库
case rabbit_db_binding:create(Binding, fun add_callback/3, Username) of
{new, Binding} ->
%% 新建绑定
Serial = rabbit_exchange:serial(Src),
ok = rabbit_exchange:callback(
Src, add_binding, Serial, [Src, Binding]),
rabbit_event:notify(binding_created, info(Binding)),
ok;
{existing, _} ->
%% 已存在(幂等)
ok;
{error, _} = E ->
E
end;
{error, not_found} ->
{error, destination_not_found}
end;
{error, not_found} ->
{error, source_not_found}
end.
性能优化
路由性能
Direct Exchange
优化点:
- 使用哈希表存储绑定:O(1) 查找
- ETS 表并发读取
配置:
%% rabbitmq.config
[{rabbit, [
{direct_exchange_ets_options, [read_concurrency]}
]}].
Topic Exchange
优化点:
- Trie 剪枝:提前终止不匹配分支
- 缓存常用模式
性能对比(100 万绑定):
| Routing Key | 匹配时间 |
|---|---|
a.b.c (精确) |
~10 µs |
a.*.c (通配符) |
~50 µs |
# (全匹配) |
~500 µs |
缓存策略
%% 启用路由缓存(实验性功能)
[{rabbit, [
{exchange_cache_size, 1000} %% 缓存最近路由结果
]}].
绑定数量优化
问题:单个交换器绑定过多(>10000)导致路由变慢。
解决方案:
- 拆分交换器:使用多个交换器分散负载
- E2E 绑定:交换器链路,分层路由
- 使用一致性哈希交换器(插件)
典型场景时序图
场景 1:消息发布到 Topic Exchange
sequenceDiagram
autonumber
participant Publisher as 发布者
participant Channel as rabbit_channel
participant Exchange as rabbit_exchange
participant TopicType as rabbit_exchange_type_topic
participant TrieDB as rabbit_db_topic_exchange
participant Queue1 as 队列 1
participant Queue2 as 队列 2
Publisher->>+Channel: basic.publish<br/>exchange="logs"<br/>routing_key="auth.error"
Channel->>Channel: 权限检查<br/>write on exchange
Channel->>+Exchange: route(Exchange, Message, #{})
Exchange->>Exchange: 选择装饰器<br/>(如有)
Exchange->>+TopicType: route(Exchange, Message, #{})
TopicType->>+TrieDB: match(ExchangeName, "auth.error", #{})
TrieDB->>TrieDB: 遍历 Trie<br/>匹配模式<br/>"auth.error"<br/>"*.error"<br/>"auth.#"<br/>"#"
TrieDB-->>-TopicType: [Queue1Name, Queue2Name]
TopicType-->>-Exchange: [Queue1Name, Queue2Name]
Exchange->>Exchange: 检查备用交换器<br/>(有路由结果,不使用)
Exchange-->>-Channel: [Queue1Name, Queue2Name]
Channel->>+Queue1: deliver(Message)
Queue1->>Queue1: 入队
Queue1-->>-Channel: ok
Channel->>+Queue2: deliver(Message)
Queue2->>Queue2: 入队
Queue2-->>-Channel: ok
alt Publisher Confirms 启用
Channel->>Channel: 记录 unconfirmed<br/>{SeqNo, [QPid1, QPid2]}
end
Channel-->>-Publisher: (异步返回)
场景 2:备用交换器处理未路由消息
sequenceDiagram
autonumber
participant Publisher
participant Exchange as 主交换器<br/>(topic)
participant AltExchange as 备用交换器<br/>(fanout)
participant UnroutedQueue as unrouted_queue
Publisher->>+Exchange: 发布消息<br/>routing_key="unknown.pattern"
Exchange->>Exchange: route(...)<br/>调用 topic 类型模块
Exchange->>Exchange: 无匹配绑定<br/>ExchangeDests = []
Exchange->>Exchange: process_alternate(X, [])<br/>检查 alternate-exchange
Exchange->>Exchange: 获取备用交换器名<br/>从 policy 或 arguments
Exchange->>Exchange: 递归调用 route1<br/>WorkList += AltExchange
Exchange->>+AltExchange: route(AltExchange, Message, #{})
AltExchange->>AltExchange: fanout 类型<br/>广播到所有绑定
AltExchange-->>-Exchange: [UnroutedQueueName]
Exchange-->>-Publisher: 路由结果(通过备用交换器)
Note over UnroutedQueue: 消息到达 unrouted_queue<br/>可用于审计/调试
故障排查
消息未路由
症状
客户端发布消息后,队列中无消息。
排查步骤
- 检查交换器类型:
rabbitmqctl list_exchanges name type
- 检查绑定:
rabbitmqctl list_bindings source_name destination_name routing_key
- 检查 routing key:
%% Erlang shell
{ok, X} = rabbit_exchange:lookup(rabbit_misc:r(<<"/">>, exchange, <<"logs">>)).
Message = ... %% 构造测试消息
Routes = rabbit_exchange:route(X, Message, #{}).
%% 输出:[QueueName] 或 []
- 启用跟踪:
rabbitmqctl trace_on
rabbitmqctl set_user_tags guest administrator tracing
路由性能差
症状
消息发布延迟高(>10ms)。
原因
- Topic Exchange 绑定过多(>10000)
- 复杂通配符模式(多个
#) - 磁盘 I/O 瓶颈(Mnesia 表)
优化
- 减少绑定:
# 查看绑定数量
rabbitmqctl list_bindings | wc -l
# 清理无用绑定
rabbitmqctl delete_binding <source> <destination> <routing_key>
-
使用 Direct Exchange(如适用):
- Direct 比 Topic 快 10 倍
-
启用 Khepri(替代 Mnesia):
%% rabbitmq.conf
metadata_store = khepri
配置参考
交换器相关配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
cluster_exchange_limit |
infinity |
集群交换器总数上限 |
exchange_delete_in_progress_ttl |
300000 | 删除标记 TTL(毫秒) |
路由相关配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
topic_exchange_max_hops |
无限制 | Topic trie 最大深度(防止恶意模式) |
routing_cache_enabled |
false |
是否启用路由缓存(实验性) |
最佳实践
交换器设计
-
选择合适的类型:
- 简单路由 → Direct
- 模式匹配 → Topic
- 广播 → Fanout
- 多维过滤 → Headers
-
命名规范:
- 使用有意义的名称:
logs.topic、events.fanout - 避免特殊字符:
amq.前缀保留
- 使用有意义的名称:
-
持久化策略:
- 关键业务交换器:
durable=true - 临时交换器:
durable=false(重启后自动删除)
- 关键业务交换器:
-
备用交换器:
- 用于未路由消息的审计
- 通常配合 fanout 类型
绑定管理
-
避免过多绑定:
- 单个交换器 < 1000 绑定(性能考虑)
- 使用多个交换器分散
-
合理使用通配符:
- 少用
#(匹配所有) - 优先使用
*(匹配一级)
- 少用
-
定期清理:
- 删除无用绑定
- 监控绑定数量
附录
交换器类型注册
%% 查看已注册类型
rabbit_registry:lookup_all(exchange).
%% 输出:[{<<"direct">>, rabbit_exchange_type_direct},
%% {<<"topic">>, rabbit_exchange_type_topic},
%% {<<"fanout">>, rabbit_exchange_type_fanout},
%% {<<"headers">>, rabbit_exchange_type_headers}]
%% 自定义类型注册(插件)
rabbit_registry:register(exchange, <<"x-custom">>, my_exchange_type).
相关模块
rabbit_db_exchange:交换器数据库操作rabbit_db_binding:绑定数据库操作rabbit_db_topic_exchange:Topic trie 存储rabbit_exchange_decorator:交换器装饰器rabbit_policy:Policy 应用mc:消息容器(Message Container)