RabbitMQ-20-框架使用示例与最佳实践

文档概览

本文档提供 RabbitMQ 的实战指南,涵盖:

  • 常见使用场景的完整示例
  • 生产环境最佳实践
  • 性能调优指南
  • 故障排查方法
  • 集群部署与运维

目标读者:

  • 应用开发者(使用 RabbitMQ 构建系统)
  • 运维工程师(部署和维护 RabbitMQ)
  • 架构师(设计基于 RabbitMQ 的解决方案)

一、基础使用示例

1.1 工作队列(Work Queues)

场景描述

多个 worker 处理任务队列,实现负载均衡和并行处理。

架构图

flowchart LR
    Producer[生产者] -->|发布任务| Queue[work_queue]
    Queue -->|分发| Worker1[Worker 1]
    Queue -->|分发| Worker2[Worker 2]
    Queue -->|分发| Worker3[Worker 3]
    
    Worker1 -->|ack| Queue
    Worker2 -->|ack| Queue
    Worker3 -->|ack| Queue

实现代码(Erlang)

生产者

-module(task_producer).
-export([send_tasks/2]).

send_tasks(Connection, TaskCount) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 声明持久化队列
    QDeclare = #'queue.declare'{queue = <<"work_queue">>,
                                 durable = true},
    #'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare),
    
    %% 发送任务
    lists:foreach(
      fun(N) ->
          Task = list_to_binary(io_lib:format("Task ~p", [N])),
          Msg = #amqp_msg{
                  payload = Task,
                  props = #'P_basic'{delivery_mode = 2}  %% 持久化消息
                 },
          Publish = #'basic.publish'{exchange = <<>>,
                                     routing_key = <<"work_queue">>},
          amqp_channel:cast(Channel, Publish, Msg),
          io:format("Sent: ~s~n", [Task])
      end, lists:seq(1, TaskCount)),
    
    amqp_channel:close(Channel),
    ok.

消费者

-module(task_worker).
-export([start_worker/2]).

start_worker(Connection, WorkerId) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 设置 prefetch=1(公平分发)
    #'basic.qos_ok'{} = amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 1}),
    
    %% 订阅队列
    Subscribe = #'basic.consume'{queue = <<"work_queue">>,
                                  no_ack = false},  %% 手动确认
    #'basic.consume_ok'{consumer_tag = Tag} = 
        amqp_channel:subscribe(Channel, Subscribe, self()),
    
    io:format("Worker ~p waiting for tasks...~n", [WorkerId]),
    worker_loop(Channel, WorkerId).

worker_loop(Channel, WorkerId) ->
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag}, 
         #amqp_msg{payload = Payload}} ->
            io:format("Worker ~p processing: ~s~n", [WorkerId, Payload]),
            
            %% 模拟任务处理
            ProcessTime = length(binary_to_list(Payload)) * 1000,  %% 字符数 * 1s
            timer:sleep(ProcessTime),
            
            %% 确认消息
            amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
            io:format("Worker ~p done~n", [WorkerId]),
            
            worker_loop(Channel, WorkerId);
        
        #'basic.cancel'{} ->
            io:format("Worker ~p cancelled~n", [WorkerId]),
            ok
    end.

关键点

  1. 消息持久化

    • 队列:durable = true
    • 消息:delivery_mode = 2
    • 确保重启后任务不丢失
  2. 公平分发

    • prefetch_count = 1
    • 每次只给 worker 一个任务
    • 防止快 worker 空闲,慢 worker 过载
  3. 手动确认

    • no_ack = false
    • 任务完成后才确认
    • Worker 崩溃时任务重新入队

1.2 发布/订阅(Pub/Sub)

场景描述

一条消息广播给多个订阅者,实现事件通知。

架构图

flowchart LR
    Publisher[发布者] -->|日志消息| Exchange[logs<br/>fanout]
    Exchange -->|广播| Queue1[queue1]
    Exchange -->|广播| Queue2[queue2]
    Exchange -->|广播| Queue3[queue3]
    
    Queue1 --> Sub1[订阅者 1]
    Queue2 --> Sub2[订阅者 2]
    Queue3 --> Sub3[订阅者 3]

实现代码

发布者

-module(log_publisher).
-export([send_log/2]).

send_log(Connection, LogMessage) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 声明 fanout 交换器
    ExDeclare = #'exchange.declare'{exchange = <<"logs">>,
                                     type = <<"fanout">>},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
    
    %% 发布消息
    Msg = #amqp_msg{payload = LogMessage},
    Publish = #'basic.publish'{exchange = <<"logs">>,
                               routing_key = <<>>},  %% fanout 忽略 routing key
    amqp_channel:cast(Channel, Publish, Msg),
    
    io:format("Sent log: ~s~n", [LogMessage]),
    amqp_channel:close(Channel),
    ok.

订阅者

-module(log_subscriber).
-export([start_subscriber/2]).

start_subscriber(Connection, SubscriberId) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 声明交换器
    ExDeclare = #'exchange.declare'{exchange = <<"logs">>,
                                     type = <<"fanout">>},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
    
    %% 声明临时队列(独占、自动删除)
    #'queue.declare_ok'{queue = QueueName} = 
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
    
    %% 绑定队列到交换器
    Binding = #'queue.bind'{queue = QueueName,
                            exchange = <<"logs">>},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
    
    %% 订阅
    Subscribe = #'basic.consume'{queue = QueueName,
                                  no_ack = true},  %% 自动确认
    #'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()),
    
    io:format("Subscriber ~p waiting for logs...~n", [SubscriberId]),
    subscriber_loop(SubscriberId).

subscriber_loop(SubscriberId) ->
    receive
        {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
            io:format("Subscriber ~p received: ~s~n", [SubscriberId, Payload]),
            subscriber_loop(SubscriberId)
    end.

关键点

  1. 临时队列

    • exclusive = true:连接断开时自动删除
    • 适合临时订阅者
  2. Fanout 交换器

    • 忽略 routing key
    • 广播到所有绑定队列
  3. 自动确认

    • 日志场景可接受消息丢失
    • 提高性能

1.3 路由(Routing)

场景描述

根据日志级别将消息路由到不同队列。

架构图

flowchart LR
    Publisher[发布者] -->|error| Exchange[direct_logs<br/>direct]
    Publisher -->|info| Exchange
    Publisher -->|warning| Exchange
    
    Exchange -->|error| ErrorQ[error_queue]
    Exchange -->|info| InfoQ[info_queue]
    Exchange -->|warning| WarnQ[warning_queue]
    Exchange -->|error| AllQ[all_logs_queue]
    Exchange -->|info| AllQ
    Exchange -->|warning| AllQ
    
    ErrorQ --> ErrorSub[错误处理器]
    InfoQ --> InfoSub[信息记录器]
    WarnQ --> WarnSub[警告监控]
    AllQ --> AllSub[全量日志]

实现代码

发布者

-module(routing_publisher).
-export([send_log/3]).

send_log(Connection, Level, Message) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 声明 direct 交换器
    ExDeclare = #'exchange.declare'{exchange = <<"direct_logs">>,
                                     type = <<"direct">>},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
    
    %% 发布消息(routing_key = Level)
    Msg = #amqp_msg{payload = Message},
    Publish = #'basic.publish'{exchange = <<"direct_logs">>,
                               routing_key = Level},
    amqp_channel:cast(Channel, Publish, Msg),
    
    io:format("Sent [~s]: ~s~n", [Level, Message]),
    amqp_channel:close(Channel),
    ok.

订阅者(按级别)

-module(routing_subscriber).
-export([start_subscriber/2]).

start_subscriber(Connection, Levels) when is_list(Levels) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    ExDeclare = #'exchange.declare'{exchange = <<"direct_logs">>,
                                     type = <<"direct">>},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
    
    #'queue.declare_ok'{queue = QueueName} = 
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
    
    %% 为每个级别创建绑定
    lists:foreach(
      fun(Level) ->
          Binding = #'queue.bind'{queue = QueueName,
                                  exchange = <<"direct_logs">>,
                                  routing_key = Level},
          #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
      end, Levels),
    
    Subscribe = #'basic.consume'{queue = QueueName,
                                  no_ack = true},
    #'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()),
    
    io:format("Subscriber listening for levels: ~p~n", [Levels]),
    subscriber_loop().

subscriber_loop() ->
    receive
        {#'basic.deliver'{routing_key = Level}, #amqp_msg{payload = Payload}} ->
            io:format("[~s] ~s~n", [Level, Payload]),
            subscriber_loop()
    end.

使用示例

%% 只接收 error 日志
routing_subscriber:start_subscriber(Conn, [<<"error">>]).

%% 接收 error 和 warning 日志
routing_subscriber:start_subscriber(Conn, [<<"error">>, <<"warning">>]).

%% 接收所有日志
routing_subscriber:start_subscriber(Conn, [<<"error">>, <<"warning">>, <<"info">>]).

1.4 主题(Topics)

场景描述

根据多维度模式匹配路由消息(如地区、级别、模块)。

架构图

flowchart LR
    Publisher[发布者] -->|cn.error.auth| Exchange[topic_logs<br/>topic]
    Publisher -->|us.info.db| Exchange
    Publisher -->|eu.warning.api| Exchange
    
    Exchange -->|*.error.*| ErrorQ[所有错误队列]
    Exchange -->|cn.#| ChinaQ[中国所有日志]
    Exchange -->|#.auth| AuthQ[认证模块日志]
    Exchange -->|#| AllQ[全量日志]

Routing Key 设计

格式<region>.<level>.<module>

示例:

  • cn.error.auth:中国区认证模块错误
  • us.info.db:美国区数据库信息
  • eu.warning.api:欧洲区 API 警告

模式语法

  • *:匹配一个单词
  • #:匹配零个或多个单词

实现代码

发布者

-module(topic_publisher).
-export([send_log/4]).

send_log(Connection, Region, Level, Module) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    ExDeclare = #'exchange.declare'{exchange = <<"topic_logs">>,
                                     type = <<"topic">>},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
    
    RoutingKey = <<Region/binary, ".", Level/binary, ".", Module/binary>>,
    Message = <<"Log message from ", RoutingKey/binary>>,
    
    Msg = #amqp_msg{payload = Message},
    Publish = #'basic.publish'{exchange = <<"topic_logs">>,
                               routing_key = RoutingKey},
    amqp_channel:cast(Channel, Publish, Msg),
    
    io:format("Sent [~s]: ~s~n", [RoutingKey, Message]),
    amqp_channel:close(Channel),
    ok.

订阅者(按模式)

-module(topic_subscriber).
-export([start_subscriber/2]).

start_subscriber(Connection, BindingPattern) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    ExDeclare = #'exchange.declare'{exchange = <<"topic_logs">>,
                                     type = <<"topic">>},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
    
    #'queue.declare_ok'{queue = QueueName} = 
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
    
    Binding = #'queue.bind'{queue = QueueName,
                            exchange = <<"topic_logs">>,
                            routing_key = BindingPattern},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
    
    Subscribe = #'basic.consume'{queue = QueueName,
                                  no_ack = true},
    #'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()),
    
    io:format("Subscriber listening for pattern: ~s~n", [BindingPattern]),
    subscriber_loop().

subscriber_loop() ->
    receive
        {#'basic.deliver'{routing_key = RKey}, #amqp_msg{payload = Payload}} ->
            io:format("[~s] ~s~n", [RKey, Payload]),
            subscriber_loop()
    end.

使用示例

%% 所有错误日志(任意地区和模块)
topic_subscriber:start_subscriber(Conn, <<"*.error.*">>).

%% 中国区所有日志
topic_subscriber:start_subscriber(Conn, <<"cn.#">>).

%% 认证模块所有日志(任意地区和级别)
topic_subscriber:start_subscriber(Conn, <<"#.auth">>).

%% 中国区认证模块所有级别
topic_subscriber:start_subscriber(Conn, <<"cn.*.auth">>).

%% 所有日志
topic_subscriber:start_subscriber(Conn, <<"#">>).

1.5 RPC(远程过程调用)

场景描述

客户端通过 RabbitMQ 调用远程服务并等待响应。

架构图

sequenceDiagram
    participant Client
    participant ReqQueue as rpc_queue
    participant Server
    participant ReplyQueue as reply_queue<br/>(临时队列)
    
    Client->>ReplyQueue: 声明临时队列(独占)
    Client->>ReqQueue: 发送请求<br/>reply_to=reply_queue<br/>correlation_id=123
    
    Server->>ReqQueue: 消费请求
    Server->>Server: 处理请求<br/>fib(30)
    Server->>ReplyQueue: 发送响应<br/>correlation_id=123
    
    Client->>ReplyQueue: 接收响应<br/>匹配 correlation_id
    Client->>Client: 返回结果

实现代码

服务端

-module(rpc_server).
-export([start/1]).

start(Connection) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 声明 RPC 队列
    QDeclare = #'queue.declare'{queue = <<"rpc_queue">>},
    #'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare),
    
    %% prefetch=1(公平分发,支持多个 server)
    #'basic.qos_ok'{} = amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 1}),
    
    Subscribe = #'basic.consume'{queue = <<"rpc_queue">>,
                                  no_ack = false},
    #'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()),
    
    io:format("RPC server started~n"),
    server_loop(Channel).

server_loop(Channel) ->
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag},
         #amqp_msg{payload = Payload,
                   props = #'P_basic'{reply_to = ReplyTo,
                                      correlation_id = CorrelationId}}} ->
            %% 解析请求
            N = binary_to_integer(Payload),
            io:format("Received request: fib(~p)~n", [N]),
            
            %% 计算结果
            Result = fib(N),
            io:format("Result: ~p~n", [Result]),
            
            %% 发送响应
            Response = #amqp_msg{
                         payload = integer_to_binary(Result),
                         props = #'P_basic'{correlation_id = CorrelationId}
                        },
            Publish = #'basic.publish'{exchange = <<>>,
                                       routing_key = ReplyTo},
            amqp_channel:cast(Channel, Publish, Response),
            
            %% 确认请求
            amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
            
            server_loop(Channel)
    end.

fib(0) -> 0;
fib(1) -> 1;
fib(N) when N > 1 -> fib(N-1) + fib(N-2).

客户端

-module(rpc_client).
-export([call/2]).

call(Connection, N) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    
    %% 声明回复队列(临时、独占)
    #'queue.declare_ok'{queue = ReplyQueue} = 
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
    
    %% 订阅回复队列
    Subscribe = #'basic.consume'{queue = ReplyQueue,
                                  no_ack = true},
    #'basic.consume_ok'{consumer_tag = ConsumerTag} = 
        amqp_channel:subscribe(Channel, Subscribe, self()),
    
    %% 生成唯一 correlation_id
    CorrelationId = list_to_binary(uuid:to_string(uuid:v4())),
    
    %% 发送请求
    Request = #amqp_msg{
                payload = integer_to_binary(N),
                props = #'P_basic'{reply_to = ReplyQueue,
                                   correlation_id = CorrelationId}
               },
    Publish = #'basic.publish'{exchange = <<>>,
                               routing_key = <<"rpc_queue">>},
    amqp_channel:cast(Channel, Publish, Request),
    
    io:format("Sent RPC request: fib(~p)~n", [N]),
    
    %% 等待响应
    Result = wait_response(CorrelationId, 60000),  %% 60s 超时
    
    %% 清理
    amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
    amqp_channel:close(Channel),
    
    Result.

wait_response(CorrelationId, Timeout) ->
    receive
        {#'basic.deliver'{},
         #amqp_msg{payload = Payload,
                   props = #'P_basic'{correlation_id = CorrelationId}}} ->
            %% 匹配 correlation_id
            io:format("Received RPC response: ~s~n", [Payload]),
            binary_to_integer(Payload);
        
        {#'basic.deliver'{},
         #amqp_msg{props = #'P_basic'{correlation_id = OtherId}}} ->
            %% 不匹配,继续等待
            io:format("Ignoring response with correlation_id: ~s~n", [OtherId]),
            wait_response(CorrelationId, Timeout)
    after Timeout ->
        {error, timeout}
    end.

使用示例

%% 启动服务端
spawn(fun() -> rpc_server:start(Connection) end).

%% 客户端调用
Result = rpc_client:call(Connection, 30).
%% 输出:832040

关键点

  1. reply_to:指定回复队列(每个客户端独立)
  2. correlation_id:匹配请求和响应(支持并发请求)
  3. 临时队列:客户端断开自动删除
  4. 超时处理:防止永久阻塞

二、生产环境最佳实践

2.1 连接与通道管理

连接池

问题:频繁建立连接开销大(TCP 握手、认证)。

最佳实践

-module(rabbitmq_pool).
-behaviour(gen_server).

-record(state, {
    connections = [] :: [pid()],
    pool_size = 5 :: pos_integer()
}).

%% 启动连接池
start_link(PoolSize, ConnParams) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, 
                          {PoolSize, ConnParams}, []).

init({PoolSize, ConnParams}) ->
    %% 创建连接池
    Connections = [begin
                      {ok, Conn} = amqp_connection:start(ConnParams),
                      Conn
                   end || _ <- lists:seq(1, PoolSize)],
    {ok, #state{connections = Connections, pool_size = PoolSize}}.

%% 获取连接(轮询)
get_connection() ->
    gen_server:call(?MODULE, get_connection).

handle_call(get_connection, _From, State = #state{connections = [Conn|Rest]}) ->
    %% 轮询选择连接
    {reply, Conn, State#state{connections = Rest ++ [Conn]}}.

配置建议

  • 连接数:5-10(每个应用实例)
  • Channel 数:每个连接 10-50
  • 心跳:60 秒

Channel 重用

反模式

%% 错误:每次发布创建新 Channel
publish_message(Connection, Message) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    %% 发布
    amqp_channel:close(Channel).

最佳实践

%% 正确:重用 Channel
-record(state, {channel}).

init(Connection) ->
    {ok, Channel} = amqp_connection:open_channel(Connection),
    {ok, #state{channel = Channel}}.

publish_message(Message, State = #state{channel = Channel}) ->
    %% 直接使用已有 Channel
    Publish = #'basic.publish'{exchange = <<>>, routing_key = <<"queue">>},
    amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Message}),
    State.

2.2 消息持久化策略

可靠性级别

级别 配置 数据丢失风险 性能
最低 非持久化队列 + 非持久化消息 节点重启丢失 最高
中等 持久化队列 + 非持久化消息 消息丢失,队列保留
持久化队列 + 持久化消息 内存数据丢失 较低
最高 持久化 + Publisher Confirms + Consumer Acks 极低(需配合备份) 最低

高可靠性配置

-module(reliable_publisher).
-export([send_with_confirms/2]).

send_with_confirms(Channel, Messages) ->
    %% 1. 声明持久化队列
    QDeclare = #'queue.declare'{queue = <<"reliable_queue">>,
                                 durable = true,  %% 队列持久化
                                 arguments = [
                                   {<<"x-queue-type">>, longstr, <<"quorum">>}  %% 仲裁队列
                                 ]},
    #'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare),
    
    %% 2. 启用 Publisher Confirms
    #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
    
    %% 3. 注册 confirm 处理器
    amqp_channel:register_confirm_handler(Channel, self()),
    
    %% 4. 发送持久化消息
    lists:foreach(
      fun({SeqNo, Msg}) ->
          Publish = #'basic.publish'{exchange = <<>>,
                                     routing_key = <<"reliable_queue">>,
                                     mandatory = true},  %% 强制路由
          AmqpMsg = #amqp_msg{
                      payload = Msg,
                      props = #'P_basic'{delivery_mode = 2}  %% 持久化
                     },
          amqp_channel:cast(Channel, Publish, AmqpMsg),
          
          %% 等待 confirm
          receive
              #'basic.ack'{delivery_tag = SeqNo, multiple = false} ->
                  io:format("Message ~p confirmed~n", [SeqNo]);
              #'basic.nack'{delivery_tag = SeqNo} ->
                  io:format("Message ~p rejected, retrying...~n", [SeqNo]),
                  %% 重试逻辑
                  error({message_rejected, SeqNo})
          after 5000 ->
              error({confirm_timeout, SeqNo})
          end
      end, lists:zip(lists:seq(1, length(Messages)), Messages)).

关键配置

  1. 仲裁队列(Quorum Queue):

    • 多副本(默认 3)
    • Raft 一致性协议
    • 自动故障转移
  2. Mandatory 标志

    • 无法路由时返回 basic.return
    • 避免消息黑洞
  3. Publisher Confirms

    • 确认消息已持久化
    • 异步批量确认(提高性能)

2.3 消费者最佳实践

Prefetch 调优

原则:根据处理时间和网络延迟选择。

%% 场景 1:快速处理(<10ms)
#'basic.qos_ok'{} = amqp_channel:call(Channel, 
                                     #'basic.qos'{prefetch_count = 1}).

%% 场景 2:中等处理(100ms-1s)
#'basic.qos_ok'{} = amqp_channel:call(Channel, 
                                     #'basic.qos'{prefetch_count = 10}).

%% 场景 3:慢速处理(>1s)
#'basic.qos_ok'{} = amqp_channel:call(Channel, 
                                     #'basic.qos'{prefetch_count = 100}).

异常处理

-module(resilient_consumer).
-export([consume/2]).

consume(Channel, QueueName) ->
    Subscribe = #'basic.consume'{queue = QueueName,
                                  no_ack = false},
    #'basic.consume_ok'{} = amqp_channel:subscribe(Channel, Subscribe, self()),
    consumer_loop(Channel, #{retries => 3, backoff => 1000}).

consumer_loop(Channel, Opts) ->
    receive
        {#'basic.deliver'{delivery_tag = Tag, redelivered = Redelivered},
         #amqp_msg{payload = Payload}} ->
            case process_message(Payload, Redelivered, Opts) of
                ok ->
                    %% 成功:确认
                    amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
                    consumer_loop(Channel, Opts);
                
                {error, retriable} when not Redelivered ->
                    %% 可重试错误:重新入队
                    amqp_channel:cast(Channel, #'basic.nack'{
                                        delivery_tag = Tag,
                                        requeue = true}),
                    consumer_loop(Channel, Opts);
                
                {error, _} ->
                    %% 不可重试或已重试:拒绝(进入死信队列)
                    amqp_channel:cast(Channel, #'basic.reject'{
                                        delivery_tag = Tag,
                                        requeue = false}),
                    consumer_loop(Channel, Opts)
            end
    end.

process_message(Payload, Redelivered, #{retries := MaxRetries}) ->
    try
        %% 业务逻辑
        Result = business_logic(Payload),
        ok
    catch
        error:network_error ->
            %% 网络错误:可重试
            {error, retriable};
        error:{timeout, _} ->
            {error, retriable};
        error:invalid_data ->
            %% 数据错误:不可重试
            {error, invalid_data};
        Class:Reason:Stacktrace ->
            io:format("Error processing message: ~p:~p~n~p~n", 
                     [Class, Reason, Stacktrace]),
            {error, unknown}
    end.

死信队列配置

%% 主队列
QDeclare = #'queue.declare'{
    queue = <<"main_queue">>,
    durable = true,
    arguments = [
        {<<"x-dead-letter-exchange">>, longstr, <<"dlx">>},  %% 死信交换器
        {<<"x-dead-letter-routing-key">>, longstr, <<"dead">>}  %% 死信路由键
    ]
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare).

%% 死信交换器
ExDeclare = #'exchange.declare'{exchange = <<"dlx">>,
                                 type = <<"direct">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare).

%% 死信队列
DlqDeclare = #'queue.declare'{queue = <<"dead_letter_queue">>,
                               durable = true},
#'queue.declare_ok'{} = amqp_channel:call(Channel, DlqDeclare).

%% 绑定
Binding = #'queue.bind'{queue = <<"dead_letter_queue">>,
                        exchange = <<"dlx">>,
                        routing_key = <<"dead">>},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding).

2.4 性能调优

批量操作

发布批量

-module(batch_publisher).
-export([send_batch/2]).

send_batch(Channel, Messages) when length(Messages) > 0 ->
    %% 批量发送(不等待 confirms)
    lists:foreach(
      fun(Msg) ->
          Publish = #'basic.publish'{exchange = <<>>, routing_key = <<"queue">>},
          amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Msg})
      end, Messages),
    
    %% 统一等待 confirms(如启用)
    wait_batch_confirms(length(Messages)),
    ok.

wait_batch_confirms(0) -> ok;
wait_batch_confirms(N) ->
    receive
        #'basic.ack'{multiple = false} ->
            wait_batch_confirms(N - 1);
        #'basic.ack'{multiple = true, delivery_tag = Tag} ->
            %% 批量确认
            wait_batch_confirms(0)
    after 10000 ->
        error(confirm_timeout)
    end.

性能提升

  • 单条发布:5,000 msg/s
  • 批量发布(100 条/批):50,000 msg/s

连接参数优化

ConnParams = #amqp_params_network{
    host = "localhost",
    port = 5672,
    virtual_host = <<"/">>,
    username = <<"guest">>,
    password = <<"guest">>,
    heartbeat = 60,
    connection_timeout = 60000,
    channel_max = 2047,
    frame_max = 131072,
    
    %% 性能优化
    socket_options = [
        {nodelay, true},           %% 禁用 Nagle 算法(降低延迟)
        {sndbuf, 196608},          %% 发送缓冲区 192KB
        {recbuf, 196608},          %% 接收缓冲区 192KB
        {keepalive, true}          %% TCP keepalive
    ]
}.

服务器配置

%% rabbitmq.conf

%% 内存优化
vm_memory_high_watermark.relative = 0.6

%% 磁盘优化(使用 SSD)
disk_free_limit.absolute = 10GB

%% 队列优化
queue_index_embed_msgs_below = 4096  %% 小消息嵌入索引

%% Credit Flow 优化(降低内存波动)
credit_flow_default_credit = {800, 400}

%% TCP 优化
tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true
tcp_listen_options.sndbuf = 196608
tcp_listen_options.recbuf = 196608

2.5 集群部署

集群架构

flowchart TB
    subgraph Cluster[RabbitMQ 集群]
        Node1[节点 1<br/>rabbit@node1]
        Node2[节点 2<br/>rabbit@node2]
        Node3[节点 3<br/>rabbit@node3]
    end
    
    subgraph HAProxy[负载均衡]
        LB[HAProxy<br/>5672]
    end
    
    subgraph Clients[客户端]
        Client1[应用 1]
        Client2[应用 2]
        Client3[应用 3]
    end
    
    Clients --> LB
    LB --> Node1
    LB --> Node2
    LB --> Node3
    
    Node1 <-->|元数据复制| Node2
    Node2 <-->|元数据复制| Node3
    Node3 <-->|元数据复制| Node1

节点部署步骤

  1. 设置 Erlang Cookie(所有节点相同):
echo "secretcookie" > /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
  1. 启动节点
# 节点 1
rabbitmq-server -detached

# 节点 2、3
rabbitmq-server -detached
  1. 加入集群(在节点 2、3 上执行):
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
  1. 验证集群
rabbitmqctl cluster_status

输出示例:

Cluster status of node rabbit@node1 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node2,rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]

HAProxy 配置

global
    log /dev/log local0
    maxconn 4096

defaults
    log     global
    mode    tcp
    option  tcplog
    timeout connect 5000ms
    timeout client  600000ms
    timeout server  600000ms

listen rabbitmq
    bind *:5672
    mode tcp
    balance roundrobin
    server node1 10.0.1.1:5672 check inter 5s rise 2 fall 3
    server node2 10.0.1.2:5672 check inter 5s rise 2 fall 3
    server node3 10.0.1.3:5672 check inter 5s rise 2 fall 3

listen rabbitmq_mgmt
    bind *:15672
    mode http
    balance roundrobin
    server node1 10.0.1.1:15672 check
    server node2 10.0.1.2:15672 check
    server node3 10.0.1.3:15672 check

三、故障排查

3.1 常见问题诊断

问题 1:消息堆积

症状:队列消息数持续增长,消费速度慢于发布速度。

排查

# 查看队列状态
rabbitmqctl list_queues name messages_ready messages_unacknowledged consumers

# 输出示例
work_queue      10000   500     2

原因

  • 消费者数量不足
  • 消费者处理慢
  • 消费者 prefetch 过小

解决

  1. 增加消费者:
%% 启动多个 worker
[spawn(fun() -> task_worker:start_worker(Conn, N) end) 
 || N <- lists:seq(1, 10)].
  1. 优化处理逻辑:

    • 异步I/O
    • 批量处理
    • 缓存热数据
  2. 调整 prefetch:

#'basic.qos_ok'{} = amqp_channel:call(Channel, 
                                     #'basic.qos'{prefetch_count = 100}).

问题 2:内存告警

症状

=WARNING REPORT==== Memory alarm set on node rabbit@node1 ===

排查

# 查看内存使用
rabbitmqctl status | grep memory

# 查看每个连接内存
rabbitmqctl list_connections name user vhost channels memory | sort -n -k 6

原因

  • 消息堆积(队列内存占用)
  • 连接/通道过多
  • 大消息

解决

  1. 增加内存上限:
%% rabbitmq.conf
vm_memory_high_watermark.relative = 0.6  %% 默认 0.4
  1. 启用队列长度限制:
QDeclare = #'queue.declare'{
    queue = <<"limited_queue">>,
    arguments = [
        {<<"x-max-length">>, long, 10000},  %% 最多 10000 条
        {<<"x-overflow">>, longstr, <<"drop-head">>}  %% 丢弃队首
    ]
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare).
  1. 减小消息大小:
    • 压缩大消息
    • 使用引用(消息存外部存储)

问题 3:网络分区

症状

Partitions: [{rabbit@node1,[rabbit@node2]}]

排查

rabbitmqctl cluster_status

原因

  • 网络不稳定
  • 心跳超时

解决

  1. 自动恢复(配置分区模式):
%% rabbitmq.conf
cluster_partition_handling = autoheal  %% 自动选主(默认 ignore)

模式说明:

  • ignore:忽略(手动处理)
  • autoheal:自动恢复(多数派获胜)
  • pause_minority:暂停少数派
  1. 手动恢复:
# 在少数派节点上
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
  1. 预防:
    • 增加心跳间隔(net_ticktime
    • 使用稳定网络
    • 监控网络延迟

3.2 监控指标

关键指标

指标 命令 告警阈值
队列消息数 rabbitmqctl list_queues messages > 10000
未确认消息数 rabbitmqctl list_queues messages_unacknowledged > 1000
内存使用率 rabbitmqctl status | grep mem_used > 80%
磁盘剩余空间 rabbitmqctl status | grep disk_free < 10GB
连接数 rabbitmqctl list_connections | wc -l > 10000
文件句柄 rabbitmqctl status | grep file_descriptors > 80%

Prometheus 监控

  1. 启用插件:
rabbitmq-plugins enable rabbitmq_prometheus
  1. 配置 Prometheus:
# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['localhost:15692']
  1. 常用 PromQL:
# 队列消息数
rabbitmq_queue_messages{queue="work_queue"}

# 发布速率
rate(rabbitmq_channel_messages_published_total[1m])

# 消费速率
rate(rabbitmq_channel_messages_delivered_total[1m])

# 内存使用率
rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes

四、安全最佳实践

4.1 用户与权限

创建应用专用用户

# 删除默认 guest 用户(生产环境)
rabbitmqctl delete_user guest

# 创建应用用户
rabbitmqctl add_user myapp secretpassword
rabbitmqctl set_permissions -p / myapp ".*" ".*" ".*"
rabbitmqctl set_user_tags myapp management

最小权限原则

# 只读用户(监控)
rabbitmqctl add_user monitor monitorpass
rabbitmqctl set_permissions -p / monitor "" "" ".*"  %% 只能读
rabbitmqctl set_user_tags monitor monitoring

# 发布者(只能写)
rabbitmqctl add_user publisher pubpass
rabbitmqctl set_permissions -p / publisher "" "^events$" ""  %% 只能发布到 events 交换器

# 消费者(只能读特定队列)
rabbitmqctl add_user consumer conspass
rabbitmqctl set_permissions -p / consumer "^tasks$" "" "^tasks$"  %% 只能读 tasks 队列

4.2 TLS 加密

服务端配置

  1. 生成证书:
# 使用 tls-gen 工具
git clone https://github.com/rabbitmq/tls-gen
cd tls-gen/basic
make
  1. 配置 RabbitMQ:
%% rabbitmq.conf
listeners.ssl.default = 5671

ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile   = /path/to/server_certificate.pem
ssl_options.keyfile    = /path/to/server_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true

客户端连接

ConnParams = #amqp_params_network{
    host = "localhost",
    port = 5671,
    virtual_host = <<"/">>,
    username = <<"myapp">>,
    password = <<"secretpassword">>,
    
    %% TLS 配置
    ssl_options = [
        {cacertfile, "/path/to/ca_certificate.pem"},
        {certfile, "/path/to/client_certificate.pem"},
        {keyfile, "/path/to/client_key.pem"},
        {verify, verify_peer},
        {server_name_indication, "localhost"},
        {depth, 2}
    ]
},
{ok, Connection} = amqp_connection:start(ConnParams).

五、总结与检查清单

开发阶段检查清单

  • 选择合适的交换器类型(direct/topic/fanout/headers)
  • 设计合理的 routing key 结构
  • 决定消息持久化策略(队列 + 消息)
  • 实现连接池和 Channel 复用
  • 启用 Publisher Confirms(需要可靠性时)
  • 实现消费者手动确认和异常处理
  • 配置死信队列处理失败消息
  • 设置合理的 prefetch 值
  • 添加日志和监控埋点

部署阶段检查清单

  • 移除或禁用 guest 用户
  • 创建应用专用用户(最小权限)
  • 启用 TLS 加密(生产环境)
  • 配置 HAProxy 或其他负载均衡器
  • 设置内存和磁盘告警阈值
  • 配置集群(3 节点最小 HA)
  • 启用 Prometheus 监控插件
  • 配置告警规则(Alertmanager)
  • 设置自动备份策略
  • 验证网络分区恢复模式

运维阶段检查清单

  • 监控队列消息堆积
  • 监控内存和磁盘使用率
  • 监控连接和通道数量
  • 定期检查集群健康状态
  • 定期审查用户权限
  • 定期清理未使用的队列和交换器
  • 定期测试故障恢复流程
  • 定期更新 RabbitMQ 版本
  • 定期审查性能指标和日志