RabbitMQ-03-交换器与路由模块

模块概览

本文档涵盖消息路由的核心模块:

  • rabbit_exchange:交换器管理
  • rabbit_router:路由协调
  • rabbit_exchange_type_*:各类型交换器实现
  • rabbit_binding:绑定管理

职责与边界

rabbit_exchange 职责

  1. 交换器生命周期管理

    • 声明(declare)、删除(delete)
    • 恢复(recover)持久化交换器
    • 参数等价性检查
  2. 路由分发

    • 调用类型模块执行路由算法
    • 处理备用交换器(alternate-exchange)
    • 支持交换器装饰器(decorators)
  3. 元数据管理

    • 交换器信息查询
    • Policy 应用
    • Scratch 空间(插件存储临时数据)
  4. 事件通知

    • 发射交换器创建/删除事件
    • 序列化事件控制

rabbit_router 职责

  1. 绑定查询

    • 根据 routing key 匹配绑定
    • 支持自定义匹配函数
  2. 路由协调

    • 简化路由逻辑接口
    • 委托给 rabbit_db_binding

架构图

flowchart TB
    subgraph Channel
        Publisher[消息发布者<br/>basic.publish]
    end
    
    subgraph Exchange 管理层
        ExchangeAPI[rabbit_exchange<br/>交换器 API]
        ExchangeDec[exchange_decorator<br/>装饰器]
    end
    
    subgraph 类型实现层
        Direct[rabbit_exchange_type_direct<br/>直接交换器]
        Topic[rabbit_exchange_type_topic<br/>主题交换器]
        Fanout[rabbit_exchange_type_fanout<br/>扇出交换器]
        Headers[rabbit_exchange_type_headers<br/>头交换器]
        Custom[自定义类型...]
    end
    
    subgraph 路由层
        Router[rabbit_router<br/>路由器]
        BindingDB[rabbit_db_binding<br/>绑定数据库]
    end
    
    subgraph 队列层
        QueueAPI[rabbit_amqqueue<br/>队列 API]
        Queue1[Queue 1]
        Queue2[Queue 2]
        QueueN[Queue N]
    end
    
    Publisher --> ExchangeAPI
    ExchangeAPI --> ExchangeDec
    ExchangeAPI --> Direct
    ExchangeAPI --> Topic
    ExchangeAPI --> Fanout
    ExchangeAPI --> Headers
    ExchangeAPI --> Custom
    
    Direct --> Router
    Topic --> Router
    Fanout --> Router
    Headers --> Router
    
    Router --> BindingDB
    BindingDB --> QueueAPI
    QueueAPI --> Queue1
    QueueAPI --> Queue2
    QueueAPI --> QueueN

数据结构

Exchange 记录

-record(exchange, {
    name,           %% #resource{virtual_host, exchange, name}
    type,           %% 类型:direct | topic | fanout | headers | 自定义
    durable,        %% 是否持久化
    auto_delete,    %% 无绑定时自动删除
    internal,       %% 内部交换器(不可直接发布)
    arguments,      %% 声明参数(如 alternate-exchange)
    scratches,      %% 插件临时数据存储
    policy,         %% 应用的 policy
    operator_policy,%% 操作员 policy
    decorators,     %% 装饰器列表
    options         %% 选项(如 user)
}).

-type name() :: rabbit_types:exchange_name().  %% #resource{}
-type type() :: rabbit_types:exchange_type().  %% binary()

Binding 记录

-record(binding, {
    source,      %% #resource{} 源交换器
    key,         %% binary() 绑定键(routing key)
    destination, %% #resource{} 目标(队列或交换器)
    args         %% [{Key, Type, Value}] 绑定参数
}).

rabbit_exchange 模块详解

核心 API

API 1: declare/7 - 声明交换器

基本信息
  • 名称declare/7
  • 幂等性:是(参数相同时返回已存在交换器)
  • 权限要求configure 权限
函数签名
-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) ->
    Ret when
      Name :: name(),
      Type :: type(),
      Durable :: boolean(),
      AutoDelete :: boolean(),
      Internal :: boolean(),
      Args :: rabbit_framing:amqp_table(),
      Username :: rabbit_types:username(),
      Ret :: {ok, rabbit_types:exchange()} |
             {error, timeout} |
             rabbit_types:channel_exit().
请求参数
参数 类型 说明
Name #resource{} 交换器名称(含 vhost)
Type binary() 类型:<<"direct">> / <<"topic">> / <<"fanout">> / <<"headers">>
Durable boolean() 是否持久化到磁盘
AutoDelete boolean() 无绑定时自动删除
Internal boolean() 是否内部交换器(不可直接发布)
Args [{binary(), Type, Value}] 参数表(如 alternate-exchange)
Username binary() 执行操作的用户名
响应结构
  • 成功{ok, #exchange{}}
  • 失败
    • {error, timeout}:数据库超时
    • 抛出 #amqp_error{}:参数错误、权限不足
核心代码
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
    %% 1. 检查交换器数量限制
    ok = check_exchange_limits(XName),
    
    %% 2. 构造交换器记录
    X = rabbit_exchange_decorator:set(
          rabbit_policy:set(
            #exchange{name        = XName,
                      type        = Type,
                      durable     = Durable,
                      auto_delete = AutoDelete,
                      internal    = Internal,
                      arguments   = Args,
                      options     = #{user => Username}})),
    
    %% 3. 类型模块验证
    XT = type_to_module(Type),
    ok = XT:validate(X),
    
    %% 4. 检查是否在删除中(避免竞态条件)
    case rabbit_runtime_parameters:lookup(XName#resource.virtual_host,
                                          ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
                                          XName#resource.name) of
        not_found ->
            %% 5. 创建或获取交换器
            case rabbit_db_exchange:create_or_get(X) of
                {new, Exchange} ->
                    %% 新建:执行回调,发射事件
                    Serial = serial(Exchange),
                    ok = callback(X, create, Serial, [Exchange]),
                    rabbit_event:notify(exchange_created, info(Exchange)),
                    {ok, Exchange};
                {existing, Exchange} ->
                    %% 已存在:返回
                    {ok, Exchange};
                {error, timeout} = Err ->
                    Err
            end;
        _ ->
            %% 交换器正在被删除,忽略声明
            {ok, X}
    end.
代码逐段解释
  1. 检查限制(第 2 行):

    • 集群交换器总数上限(默认 infinity
    • 配置:cluster_exchange_limit
    • 允许重新声明已存在的交换器(即使达到上限)
  2. 构造记录(第 5-13 行):

    • rabbit_policy:set/1:应用匹配的 policy
    • rabbit_exchange_decorator:set/1:确定装饰器列表
    • options 字段存储用户信息(用于审计)
  3. 类型验证(第 15-16 行):

    • 调用类型模块的 validate/1 回调
    • 例如:检查 x-delayed-message 参数(延迟交换器插件)
  4. 竞态条件保护(第 18-22 行):

    • 场景:Federation 插件删除交换器时可能与声明并发
    • 解决:检查运行时参数表中的删除标记
    • 如果在删除中,忽略声明(返回临时交换器记录)
  5. 数据库操作(第 24-34 行):

    • create_or_get/1:原子性创建或获取
    • {new, Exchange}:新建成功
      • 调用类型模块的 create/2 回调
      • 发射 exchange_created 事件
    • {existing, Exchange}:已存在,检查参数等价性(由调用方负责)
时序图
sequenceDiagram
    autonumber
    participant Channel as rabbit_channel
    participant Exchange as rabbit_exchange
    participant TypeMod as Exchange Type 模块
    participant DB as rabbit_db_exchange
    participant Registry as rabbit_registry
    participant Event as rabbit_event
    
    Channel->>+Exchange: declare(XName, Type, Durable, ...)
    Exchange->>Exchange: check_exchange_limits(XName)
    
    Exchange->>Exchange: 构造 #exchange{}<br/>应用 policy 和 decorators
    
    Exchange->>+TypeMod: validate(Exchange)
    TypeMod->>TypeMod: 检查参数<br/>(如 x-match for headers)
    TypeMod-->>-Exchange: ok
    
    Exchange->>Exchange: 检查删除标记<br/>runtime_parameters
    
    Exchange->>+DB: create_or_get(Exchange)
    
    alt 交换器不存在
        DB->>DB: 写入 Mnesia/Khepri<br/>rabbit_durable_exchange
        DB-->>Exchange: {new, Exchange}
        
        Exchange->>Exchange: serial(Exchange)<br/>获取序列号(如需)
        Exchange->>+TypeMod: create(Serial, Exchange)
        TypeMod->>TypeMod: 初始化类型特定数据<br/>(如 topic trie)
        TypeMod-->>-Exchange: ok
        
        Exchange->>+Event: notify(exchange_created, Info)
        Event->>Event: 分发给订阅者<br/>(如 management 插件)
        Event-->>-Exchange: ok
        
        Exchange-->>Channel: {ok, Exchange}
    else 交换器已存在
        DB->>DB: 检查参数等价性<br/>(由调用方处理)
        DB-->>Exchange: {existing, Exchange}
        Exchange-->>Channel: {ok, Exchange}
    end
    
    DB-->>-Exchange: result
    Exchange-->>-Channel: {ok, Exchange}

API 2: route/3 - 路由消息

基本信息
  • 名称route/3
  • 调用者rabbit_channel(消息发布时)
  • 返回:目标队列列表(可能包含重复)
函数签名
-spec route(rabbit_types:exchange(), rabbit_types:message(), route_opts()) ->
    route_return().
请求参数
参数 类型 说明
Exchange #exchange{} 交换器记录
Message rabbit_types:message() 消息(含 routing key 和 headers)
Opts route_opts() 选项:#{return_binding_keys => boolean()}
响应结构
  • 默认[QueueName] - 队列名称列表
  • 启用 return_binding_keys[{QueueName, #{binding_keys => ...}}]
核心代码
%% 默认交换器(空字符串)特殊处理
route(#exchange{name = #resource{name = <<>>,
                                 virtual_host = VHost}},
      Message, _Opts) ->
    %% 默认交换器:routing key 直接映射到队列名
    RKs0 = mc:routing_keys(Message),
    RKs = lists:usort(RKs0),
    [rabbit_misc:r(VHost, queue, RK) || RK <- RKs];

%% 普通交换器路由
route(X = #exchange{name = XName,
                    decorators = Decorators},
      Message, Opts) ->
    %% 1. 选择路由装饰器
    Decs = rabbit_exchange_decorator:select(route, Decorators),
    
    %% 2. 执行路由算法(深度优先遍历交换器图)
    QNamesToBKeys = route1(Message, Decs, Opts, {[X], XName, #{}}),
    
    %% 3. 格式化返回结果
    case Opts of
        #{return_binding_keys := true} ->
            maps:fold(fun(QName, BindingKeys, L) ->
                              [{QName, #{binding_keys => BindingKeys}} | L]
                      end, [], QNamesToBKeys);
        _ ->
            maps:keys(QNamesToBKeys)
    end.

%% 路由算法核心:深度优先遍历
route1(_, _, _, {[], _, QNames}) ->
    %% 工作列表为空,返回累积的队列
    QNames;
route1(Message, Decorators, Opts,
       {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
    %% 1. 获取类型模块的路由函数
    {Route, Arity} = type_to_route_fun(Type),
    
    %% 2. 调用类型模块路由
    ExchangeDests = case Arity of
                        2 -> Route(X, Message);
                        3 -> Route(X, Message, Opts)
                    end,
    
    %% 3. 处理装饰器路由
    DecorateDests  = process_decorators(X, Decorators, Message),
    
    %% 4. 处理备用交换器(如路由失败)
    AlternateDests = process_alternate(X, ExchangeDests),
    
    %% 5. 递归处理所有目标
    route1(Message, Decorators, Opts,
           lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
                       AlternateDests ++ DecorateDests ++ ExchangeDests)).

%% 处理备用交换器
process_alternate(X = #exchange{name = XName}, []) ->
    %% 路由失败(无目标队列),检查备用交换器
    case rabbit_policy:get_arg(
           <<"alternate-exchange">>, <<"alternate-exchange">>, X) of
        undefined -> [];
        AName     -> [rabbit_misc:r(XName, exchange, AName)]
    end;
process_alternate(_X, _Results) ->
    %% 路由成功,不使用备用交换器
    [].

%% 处理单个路由目标
process_route(#resource{kind = exchange} = XName,
              {WorkList, SeenXs, QNames}) ->
    %% 目标是交换器(E2E binding)
    case sets:is_element(XName, SeenXs) of
        true  -> 
            %% 已访问过,避免循环
            {WorkList, SeenXs, QNames};
        false -> 
            %% 加入工作列表,继续路由
            case rabbit_exchange:lookup(XName) of
                {ok, X} -> {[X | WorkList], sets:add_element(XName, SeenXs), QNames};
                {error, not_found} -> {WorkList, SeenXs, QNames}
            end
    end;
process_route(#resource{kind = queue} = QName,
              {WorkList, SeenXs, QNames}) ->
    %% 目标是队列,累积到结果
    {WorkList, SeenXs, maps:put(QName, true, QNames)}.
路由算法流程图
flowchart TD
    Start[route/3<br/>Exchange, Message, Opts] --> IsDefault{默认交换器?}
    
    IsDefault -->|是| DirectRoute[routing_key 直接<br/>映射到队列名]
    DirectRoute --> End[返回队列列表]
    
    IsDefault -->|否| InitWorkList[初始化工作列表<br/>{[Exchange], SeenXs=#{}, QNames=#{}}]
    InitWorkList --> Route1[route1 深度优先遍历]
    
    Route1 --> CheckWorkList{工作列表为空?}
    CheckWorkList -->|是| End
    
    CheckWorkList -->|否| PopExchange[取出第一个交换器]
    PopExchange --> CallTypeModule[调用类型模块<br/>Route(Exchange, Message)]
    
    CallTypeModule --> GetDests[获取目标列表<br/>队列 + 交换器]
    
    GetDests --> HasDests{有目标?}
    HasDests -->|否| CheckAlternate[检查备用交换器<br/>alternate-exchange]
    CheckAlternate --> AddAlternate[添加备用交换器<br/>到目标列表]
    
    HasDests -->|是| ProcessDests[处理每个目标]
    AddAlternate --> ProcessDests
    
    ProcessDests --> IsQueue{目标类型?}
    
    IsQueue -->|队列| AddQueue[累积到 QNames]
    AddQueue --> Route1
    
    IsQueue -->|交换器| CheckSeen{已访问?}
    CheckSeen -->|是| Skip[跳过(避免循环)]
    CheckSeen -->|否| AddToWorkList[加入工作列表<br/>标记已访问]
    AddToWorkList --> Route1
    Skip --> Route1
备用交换器示例
%% 场景:消息无法路由到任何队列时,转发到备用交换器

%% 1. 声明主交换器(带备用交换器参数)
rabbit_exchange:declare(
    rabbit_misc:r(<<"/">>, exchange, <<"main">>),
    <<"topic">>,
    true,  %% durable
    false, %% auto_delete
    false, %% internal
    [{<<"alternate-exchange">>, longstr, <<"backup">>}],
    <<"user">>
).

%% 2. 声明备用交换器(通常是 fanout 类型)
rabbit_exchange:declare(
    rabbit_misc:r(<<"/">>, exchange, <<"backup">>),
    <<"fanout">>,
    true,
    false,
    false,
    [],
    <<"user">>
).

%% 3. 绑定备用交换器到日志队列
rabbit_binding:add(
    #binding{source = rabbit_misc:r(<<"/">>, exchange, <<"backup">>),
             key = <<>>,
             destination = rabbit_misc:r(<<"/">>, queue, <<"unrouted_log">>),
             args = []},
    <<"user">>
).

%% 4. 发布消息(无匹配绑定)
%% 结果:消息被路由到 backup 交换器,最终到达 unrouted_log 队列

API 3: delete/3 - 删除交换器

基本信息
  • 名称delete/3
  • 权限要求configure 权限
  • 条件:无绑定或 if_unused=false
函数签名
-spec delete(name(), boolean(), rabbit_types:username()) ->
    'ok' | rabbit_types:error('not_found' | 'in_use') |
    rabbit_types:connection_exit().
请求参数
参数 类型 说明
Name #resource{} 交换器名称
IfUnused boolean() 仅当无绑定时删除
Username binary() 执行操作的用户名
核心代码
delete(XName, IfUnused, Username) ->
    Fun = case IfUnused of
              true  -> fun conditional_delete/2;
              false -> fun unconditional_delete/2
          end,
    case Fun(XName, Username) of
        {deleted, X, Bs, Deletions} ->
            %% 删除成功
            Serial = peek_serial(X),
            ok = callback(X, delete, Serial, [X, Bs]),
            _ = [rabbit_amqqueue:delete_queue_association(DstQ, X)
                 || {_, DstQ} <- Deletions],
            rabbit_event:notify(exchange_deleted, info(X)),
            ok;
        {error, _} = E ->
            E
    end.

unconditional_delete(XName, Username) ->
    %% 无条件删除:删除所有绑定
    case rabbit_db_exchange:delete(XName, fun delete_bindings/2, Username) of
        {deleted, X, Bs} -> 
            Deletions = [{{X#exchange.name, DstName}, DstName} || 
                        #binding{destination = DstName} <- Bs],
            {deleted, X, Bs, Deletions};
        {error, _} = E -> 
            E
    end.

conditional_delete(XName, Username) ->
    %% 条件删除:仅当无绑定时删除
    case rabbit_db_exchange:delete(XName, fun has_no_bindings/2, Username) of
        {deleted, X, []} -> 
            {deleted, X, [], []};
        {error, in_use} = E -> 
            E;
        {error, _} = E -> 
            E
    end.

交换器类型实现

rabbit_exchange_type 行为

所有交换器类型必须实现 rabbit_exchange_type 行为:

-callback description() -> [proplists:property()].
-callback serialise_events() -> boolean().
-callback route(rabbit_types:exchange(), rabbit_types:message()) ->
    rabbit_router:match_result().
-callback route(rabbit_types:exchange(), rabbit_types:message(), map()) ->
    rabbit_router:match_result().
-callback validate(rabbit_types:exchange()) -> 'ok' | {error, term()}.
-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) ->
    'ok' | {error, term()}.
-callback create(Serial, rabbit_types:exchange()) -> 'ok' when
    Serial :: 'none' | pos_integer().
-callback delete(Serial, rabbit_types:exchange()) -> 'ok' when
    Serial :: 'none' | pos_integer().
-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
-callback add_binding(Serial, rabbit_types:exchange(), rabbit_types:binding()) ->
    'ok' when Serial :: 'none' | pos_integer().
-callback remove_bindings(Serial, rabbit_types:exchange(), [rabbit_types:binding()]) ->
    'ok' when Serial :: 'none' | pos_integer().
-callback assert_args_equivalence(rabbit_types:exchange(),
                                  rabbit_framing:amqp_table()) ->
    'ok' | rabbit_types:connection_exit().
-callback info(rabbit_types:exchange()) -> [proplists:property()].
-callback info(rabbit_types:exchange(), [atom()]) -> [proplists:property()].

Direct Exchange(直接交换器)

路由算法

规则:routing key 精确匹配 binding key。

实现

route(#exchange{name = Name, type = Type}, Msg, _Opts) ->
    Routes = mc:routing_keys(Msg),
    rabbit_db_binding:match_routing_key(Name, Routes, Type =:= direct).

时间复杂度:O(1) - 哈希表查找

使用场景

  • 点对点通信:每个队列绑定唯一 routing key
  • 任务分发:多个 worker 队列绑定相同 routing key(负载均衡)

示例

%% 声明 direct 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"logs">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"direct">>, true, false, false, [], <<"admin">>).

%% 绑定队列(不同级别)
rabbit_binding:add(
    #binding{source = Exchange,
             key = <<"error">>,
             destination = rabbit_misc:r(VHost, queue, <<"error_log">>),
             args = []},
    <<"admin">>
).

rabbit_binding:add(
    #binding{source = Exchange,
             key = <<"info">>,
             destination = rabbit_misc:r(VHost, queue, <<"info_log">>),
             args = []},
    <<"admin">>
).

%% 发布消息
%% routing_key = <<"error">> → 仅到达 error_log
%% routing_key = <<"info">>  → 仅到达 info_log

Topic Exchange(主题交换器)

路由算法

规则:routing key 模式匹配 binding key。

模式语法

  • *:匹配一个单词(词之间用 . 分隔)
  • #:匹配零个或多个单词

实现

route(#exchange{name = XName}, Msg, Opts) ->
    RKeys = mc:routing_keys(Msg),
    lists:append([rabbit_db_topic_exchange:match(XName, RKey, Opts) 
                  || RKey <- RKeys]).

数据结构:Trie(前缀树)

时间复杂度:O(m * n)

  • m:routing key 长度
  • n:绑定数量(最坏情况,实际通过 trie 优化)

使用场景

  • 日志分类logs.*.errorlogs.*.info
  • 地理路由cn.beijing.*us.*.sales
  • 多维过滤order.*.paid

示例

%% 声明 topic 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"topic_logs">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"topic">>, true, false, false, [], <<"admin">>).

%% 绑定模式
rabbit_binding:add(
    #binding{source = Exchange,
             key = <<"*.error">>,          %% 所有 error 日志
             destination = rabbit_misc:r(VHost, queue, <<"all_errors">>),
             args = []},
    <<"admin">>
).

rabbit_binding:add(
    #binding{source = Exchange,
             key = <<"auth.#">>,           %% 认证模块所有日志
             destination = rabbit_misc:r(VHost, queue, <<"auth_logs">>),
             args = []},
    <<"admin">>
).

rabbit_binding:add(
    #binding{source = Exchange,
             key = <<"#">>,                %% 所有日志
             destination = rabbit_misc:r(VHost, queue, <<"all_logs">>),
             args = []},
    <<"admin">>
).

%% 路由示例
%% routing_key = <<"auth.error">>        → all_errors, auth_logs, all_logs
%% routing_key = <<"auth.info.login">>   → auth_logs, all_logs
%% routing_key = <<"db.error">>          → all_errors, all_logs

Trie 数据结构

graph TD
    Root[根节点] --> Auth[auth]
    Root --> DB[db]
    Root --> Hash[#<br/>匹配所有]
    
    Auth --> Star1[*]
    Auth --> Info[info]
    Auth --> Hash1[#]
    
    Star1 --> Error1[error]
    Info --> Login[login]
    
    DB --> Star2[*]
    Star2 --> Error2[error]
    
    Error1 -.-> Q1[队列 all_errors]
    Error2 -.-> Q1
    Hash1 -.-> Q2[队列 auth_logs]
    Hash -.-> Q3[队列 all_logs]

Fanout Exchange(扇出交换器)

路由算法

规则:忽略 routing key,广播到所有绑定队列。

实现

route(#exchange{name = Name}, _Msg, _Opts) ->
    rabbit_router:match_routing_key(Name, ['_']).

时间复杂度:O(n) - n 为绑定数量

使用场景

  • 广播消息:系统通知
  • 缓存失效:多个缓存节点
  • 实时更新:多个监控面板

示例

%% 声明 fanout 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"broadcast">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"fanout">>, true, false, false, [], <<"admin">>).

%% 绑定多个队列(routing key 被忽略)
rabbit_binding:add(
    #binding{source = Exchange,
             key = <<>>,  %% 任意值均可
             destination = rabbit_misc:r(VHost, queue, <<"listener1">>),
             args = []},
    <<"admin">>
).

rabbit_binding:add(
    #binding{source = Exchange,
             key = <<>>,
             destination = rabbit_misc:r(VHost, queue, <<"listener2">>),
             args = []},
    <<"admin">>
).

%% 发布消息
%% 所有绑定队列都会收到消息

Headers Exchange(头交换器)

路由算法

规则:根据消息头(headers)匹配,而非 routing key。

匹配模式

  • x-match = all(默认):所有头必须匹配
  • x-match = any:至少一个头匹配
  • x-match = any-with-x:至少一个 x-* 头匹配
  • x-match = all-with-x:所有 x-* 头匹配

实现

route(#exchange{name = Name}, Msg, _Opts) ->
    Headers = mc:routing_headers(Msg, [x_headers]),
    
    rabbit_router:match_bindings(
      Name, fun(#binding{args = Args}) ->
                    case rabbit_misc:table_lookup(Args, <<"x-match">>) of
                        {longstr, <<"any">>} ->
                            match_any(Args, Headers, fun match/2);
                        {longstr, <<"all">>} ->
                            match_all(Args, Headers, fun match/2);
                        _ ->
                            match_all(Args, Headers, fun match/2)
                    end
            end).

%% 匹配函数
match({<<"x-match">>, _, _}, _M) -> skip;  %% 跳过 x-match
match({<<"x-", _/binary>>, _, _}, _M) -> skip;  %% 跳过 x-* 头
match({K, void, _}, M) -> maps:is_key(K, M);   %% 检查键存在
match({K, _, V}, M) -> maps:get(K, M, undefined) =:= V.  %% 精确匹配

时间复杂度:O(n * m)

  • n:绑定数量
  • m:每个绑定的头数量

使用场景

  • 多维度过滤:根据多个属性路由
  • 内容路由:基于消息内容(非 routing key)
  • 复杂规则:组合多个条件

示例

%% 声明 headers 交换器
Exchange = rabbit_misc:r(VHost, exchange, <<"headers_exchange">>),
{ok, _} = rabbit_exchange:declare(Exchange, <<"headers">>, true, false, false, [], <<"admin">>).

%% 绑定:匹配所有头(x-match = all)
rabbit_binding:add(
    #binding{source = Exchange,
             key = <<>>,  %% headers 交换器忽略 routing key
             destination = rabbit_misc:r(VHost, queue, <<"critical_queue">>),
             args = [{<<"x-match">>, longstr, <<"all">>},
                     {<<"level">>, longstr, <<"critical">>},
                     {<<"module">>, longstr, <<"auth">>}]},
    <<"admin">>
).

%% 绑定:匹配任意头(x-match = any)
rabbit_binding:add(
    #binding{source = Exchange,
             key = <<>>,
             destination = rabbit_misc:r(VHost, queue, <<"any_error_queue">>),
             args = [{<<"x-match">>, longstr, <<"any">>},
                     {<<"level">>, longstr, <<"error">>},
                     {<<"level">>, longstr, <<"critical">>}]},
    <<"admin">>
).

%% 发布消息
%% Headers = #{<<"level">> => <<"critical">>, <<"module">> => <<"auth">>}
%% → 路由到 critical_queue 和 any_error_queue

%% Headers = #{<<"level">> => <<"error">>, <<"module">> => <<"db">>}
%% → 仅路由到 any_error_queue

rabbit_binding 模块

职责

  1. 绑定管理

    • 添加绑定(add/2
    • 删除绑定(remove/2
    • 列表查询(list/1
  2. 数据库操作

    • 委托给 rabbit_db_binding
    • 维护绑定索引(源→目标、目标→源)

核心 API

add/2 - 添加绑定

-spec add(rabbit_types:binding(), rabbit_types:username()) ->
    'ok' | rabbit_types:error('source_not_found' | 'destination_not_found') |
    rabbit_types:connection_exit().

add(Binding = #binding{source = SrcName,
                       destination = DstName,
                       args = Args}, Username) ->
    %% 1. 验证源交换器存在
    case rabbit_exchange:lookup(SrcName) of
        {ok, Src} ->
            %% 2. 验证目标存在(队列或交换器)
            case lookup_destination(DstName) of
                {ok, Dst} ->
                    %% 3. 验证绑定参数
                    ok = rabbit_exchange:validate_binding(Src, Binding),
                    
                    %% 4. 添加绑定到数据库
                    case rabbit_db_binding:create(Binding, fun add_callback/3, Username) of
                        {new, Binding} ->
                            %% 新建绑定
                            Serial = rabbit_exchange:serial(Src),
                            ok = rabbit_exchange:callback(
                                   Src, add_binding, Serial, [Src, Binding]),
                            rabbit_event:notify(binding_created, info(Binding)),
                            ok;
                        {existing, _} ->
                            %% 已存在(幂等)
                            ok;
                        {error, _} = E ->
                            E
                    end;
                {error, not_found} ->
                    {error, destination_not_found}
            end;
        {error, not_found} ->
            {error, source_not_found}
    end.

性能优化

路由性能

Direct Exchange

优化点

  • 使用哈希表存储绑定:O(1) 查找
  • ETS 表并发读取

配置

%% rabbitmq.config
[{rabbit, [
    {direct_exchange_ets_options, [read_concurrency]}
]}].

Topic Exchange

优化点

  • Trie 剪枝:提前终止不匹配分支
  • 缓存常用模式

性能对比(100 万绑定):

Routing Key 匹配时间
a.b.c (精确) ~10 µs
a.*.c (通配符) ~50 µs
# (全匹配) ~500 µs

缓存策略

%% 启用路由缓存(实验性功能)
[{rabbit, [
    {exchange_cache_size, 1000}  %% 缓存最近路由结果
]}].

绑定数量优化

问题:单个交换器绑定过多(>10000)导致路由变慢。

解决方案

  1. 拆分交换器:使用多个交换器分散负载
  2. E2E 绑定:交换器链路,分层路由
  3. 使用一致性哈希交换器(插件)

典型场景时序图

场景 1:消息发布到 Topic Exchange

sequenceDiagram
    autonumber
    participant Publisher as 发布者
    participant Channel as rabbit_channel
    participant Exchange as rabbit_exchange
    participant TopicType as rabbit_exchange_type_topic
    participant TrieDB as rabbit_db_topic_exchange
    participant Queue1 as 队列 1
    participant Queue2 as 队列 2
    
    Publisher->>+Channel: basic.publish<br/>exchange="logs"<br/>routing_key="auth.error"
    
    Channel->>Channel: 权限检查<br/>write on exchange
    Channel->>+Exchange: route(Exchange, Message, #{})
    
    Exchange->>Exchange: 选择装饰器<br/>(如有)
    Exchange->>+TopicType: route(Exchange, Message, #{})
    
    TopicType->>+TrieDB: match(ExchangeName, "auth.error", #{})
    TrieDB->>TrieDB: 遍历 Trie<br/>匹配模式<br/>"auth.error"<br/>"*.error"<br/>"auth.#"<br/>"#"
    TrieDB-->>-TopicType: [Queue1Name, Queue2Name]
    
    TopicType-->>-Exchange: [Queue1Name, Queue2Name]
    
    Exchange->>Exchange: 检查备用交换器<br/>(有路由结果,不使用)
    Exchange-->>-Channel: [Queue1Name, Queue2Name]
    
    Channel->>+Queue1: deliver(Message)
    Queue1->>Queue1: 入队
    Queue1-->>-Channel: ok
    
    Channel->>+Queue2: deliver(Message)
    Queue2->>Queue2: 入队
    Queue2-->>-Channel: ok
    
    alt Publisher Confirms 启用
        Channel->>Channel: 记录 unconfirmed<br/>{SeqNo, [QPid1, QPid2]}
    end
    
    Channel-->>-Publisher: (异步返回)

场景 2:备用交换器处理未路由消息

sequenceDiagram
    autonumber
    participant Publisher
    participant Exchange as 主交换器<br/>(topic)
    participant AltExchange as 备用交换器<br/>(fanout)
    participant UnroutedQueue as unrouted_queue
    
    Publisher->>+Exchange: 发布消息<br/>routing_key="unknown.pattern"
    
    Exchange->>Exchange: route(...)<br/>调用 topic 类型模块
    Exchange->>Exchange: 无匹配绑定<br/>ExchangeDests = []
    
    Exchange->>Exchange: process_alternate(X, [])<br/>检查 alternate-exchange
    Exchange->>Exchange: 获取备用交换器名<br/>从 policy 或 arguments
    
    Exchange->>Exchange: 递归调用 route1<br/>WorkList += AltExchange
    
    Exchange->>+AltExchange: route(AltExchange, Message, #{})
    AltExchange->>AltExchange: fanout 类型<br/>广播到所有绑定
    AltExchange-->>-Exchange: [UnroutedQueueName]
    
    Exchange-->>-Publisher: 路由结果(通过备用交换器)
    
    Note over UnroutedQueue: 消息到达 unrouted_queue<br/>可用于审计/调试

故障排查

消息未路由

症状

客户端发布消息后,队列中无消息。

排查步骤

  1. 检查交换器类型
rabbitmqctl list_exchanges name type
  1. 检查绑定
rabbitmqctl list_bindings source_name destination_name routing_key
  1. 检查 routing key
%% Erlang shell
{ok, X} = rabbit_exchange:lookup(rabbit_misc:r(<<"/">>, exchange, <<"logs">>)).
Message = ...  %% 构造测试消息
Routes = rabbit_exchange:route(X, Message, #{}).
%% 输出:[QueueName] 或 []
  1. 启用跟踪
rabbitmqctl trace_on
rabbitmqctl set_user_tags guest administrator tracing

路由性能差

症状

消息发布延迟高(>10ms)。

原因

  • Topic Exchange 绑定过多(>10000)
  • 复杂通配符模式(多个 #
  • 磁盘 I/O 瓶颈(Mnesia 表)

优化

  1. 减少绑定
# 查看绑定数量
rabbitmqctl list_bindings | wc -l

# 清理无用绑定
rabbitmqctl delete_binding <source> <destination> <routing_key>
  1. 使用 Direct Exchange(如适用):

    • Direct 比 Topic 快 10 倍
  2. 启用 Khepri(替代 Mnesia):

%% rabbitmq.conf
metadata_store = khepri

配置参考

交换器相关配置

配置项 默认值 说明
cluster_exchange_limit infinity 集群交换器总数上限
exchange_delete_in_progress_ttl 300000 删除标记 TTL(毫秒)

路由相关配置

配置项 默认值 说明
topic_exchange_max_hops 无限制 Topic trie 最大深度(防止恶意模式)
routing_cache_enabled false 是否启用路由缓存(实验性)

最佳实践

交换器设计

  1. 选择合适的类型

    • 简单路由 → Direct
    • 模式匹配 → Topic
    • 广播 → Fanout
    • 多维过滤 → Headers
  2. 命名规范

    • 使用有意义的名称:logs.topicevents.fanout
    • 避免特殊字符:amq. 前缀保留
  3. 持久化策略

    • 关键业务交换器:durable=true
    • 临时交换器:durable=false(重启后自动删除)
  4. 备用交换器

    • 用于未路由消息的审计
    • 通常配合 fanout 类型

绑定管理

  1. 避免过多绑定

    • 单个交换器 < 1000 绑定(性能考虑)
    • 使用多个交换器分散
  2. 合理使用通配符

    • 少用 #(匹配所有)
    • 优先使用 *(匹配一级)
  3. 定期清理

    • 删除无用绑定
    • 监控绑定数量

附录

交换器类型注册

%% 查看已注册类型
rabbit_registry:lookup_all(exchange).
%% 输出:[{<<"direct">>, rabbit_exchange_type_direct},
%%        {<<"topic">>, rabbit_exchange_type_topic},
%%        {<<"fanout">>, rabbit_exchange_type_fanout},
%%        {<<"headers">>, rabbit_exchange_type_headers}]

%% 自定义类型注册(插件)
rabbit_registry:register(exchange, <<"x-custom">>, my_exchange_type).

相关模块

  • rabbit_db_exchange:交换器数据库操作
  • rabbit_db_binding:绑定数据库操作
  • rabbit_db_topic_exchange:Topic trie 存储
  • rabbit_exchange_decorator:交换器装饰器
  • rabbit_policy:Policy 应用
  • mc:消息容器(Message Container)