RabbitMQ-01-核心服务器模块-rabbit

模块概览

职责与边界

rabbit 模块是 RabbitMQ 服务器的应用入口模块,实现 Erlang/OTP application 行为。主要职责包括:

  1. 应用生命周期管理:启动、停止、重启 RabbitMQ 应用
  2. 启动流程协调:通过 boot steps 机制协调各组件的初始化顺序
  3. 插件系统集成:加载并启动已启用的插件
  4. 预启动阶段协调:配置解析、特性标志初始化、集群检查
  5. 监督树根节点:创建并监督 rabbit_sup 根监督者
  6. 状态查询接口:提供运行状态、告警信息等查询 API

输入与输出

输入

  • 配置文件(rabbitmq.confadvanced.config
  • 环境变量
  • 启用的插件列表(enabled_plugins 文件)
  • 命令行参数

输出

  • 运行中的 RabbitMQ 节点
  • 日志信息(structured logging)
  • 启动/停止状态
  • 告警信息

上下游依赖

上游

  • Erlang VM(BEAM)
  • rabbitmq_prelaunch 应用(预启动阶段)

下游

  • rabbit_sup:根监督者
  • rabbit_boot_steps:启动步骤协调器
  • rabbit_plugins:插件管理
  • rabbit_feature_flags:特性标志系统
  • mnesiakhepri:元数据存储
  • ra:Raft 一致性库
  • osiris:日志存储引擎

生命周期

stateDiagram-v2
    [*] --> NotStarted: Erlang VM 启动
    NotStarted --> Booting: rabbit:start() / rabbit:boot()
    
    Booting --> PrelaunchPhase1: 启动 rabbitmq_prelaunch
    PrelaunchPhase1 --> PrelaunchPhase2: 解析配置与上下文
    PrelaunchPhase2 --> PrelaunchPhase2: 1. 特性标志初始化
    PrelaunchPhase2 --> PrelaunchPhase2: 2. 日志系统初始化
    PrelaunchPhase2 --> PrelaunchPhase2: 3. Khepri/Ra 系统启动
    PrelaunchPhase2 --> PrelaunchPhase2: 4. 集群检查
    PrelaunchPhase2 --> PrelaunchPhase2: 5. Mnesia 启动(如适用)
    
    PrelaunchPhase2 --> CoreStarting: rabbit:start/2 回调
    CoreStarting --> CoreStarting: 1. 创建 rabbit_sup
    CoreStarting --> CoreStarting: 2. 注册 rabbit 进程名
    CoreStarting --> CoreStarting: 3. 加载插件
    CoreStarting --> CoreStarting: 4. 刷新特性标志
    CoreStarting --> CoreStarting: 5. 运行 boot steps
    
    CoreStarting --> CoreStarted: boot steps 完成
    CoreStarted --> PostlaunchPhase: 后启动阶段
    PostlaunchPhase --> PostlaunchPhase: 1. 启动插件应用
    PostlaunchPhase --> PostlaunchPhase: 2. 通知节点上线
    PostlaunchPhase --> PostlaunchPhase: 3. 恢复虚拟主机
    PostlaunchPhase --> PostlaunchPhase: 4. 启动网络监听
    
    PostlaunchPhase --> Ready: 标记为 ready
    Ready --> Running: 正常服务
    
    Running --> Stopping: rabbit:stop()
    Stopping --> Stopped: 清理资源
    Stopped --> [*]
    
    Running --> PrepStopping: rabbit:prep_stop/1
    PrepStopping --> Stopping: 停止接受连接

模块架构图

flowchart TB
    subgraph "应用入口"
        Start[rabbit:start/0<br/>rabbit:boot/0]
        StartIt[start_it/1<br/>启动协调]
        Start --> StartIt
    end
    
    subgraph "预启动阶段"
        PrelaunchApp[rabbitmq_prelaunch<br/>应用启动]
        RunPrelaunch2[run_prelaunch_second_phase/0]
        
        PrelaunchApp --> RunPrelaunch2
        
        EnabledPlugins[启用插件文件处理]
        FeatureFlagsSetup[特性标志注册表初始化]
        LoggingSetup[日志系统设置]
        RaSystems[Ra 系统启动]
        KhepriSetup[Khepri 启动]
        ClusterCheck[集群检查与兼容性验证]
        MnesiaStart[Mnesia 启动<br/>如不使用 Khepri]
        
        RunPrelaunch2 --> EnabledPlugins
        EnabledPlugins --> FeatureFlagsSetup
        FeatureFlagsSetup --> LoggingSetup
        LoggingSetup --> RaSystems
        RaSystems --> KhepriSetup
        KhepriSetup --> ClusterCheck
        ClusterCheck --> MnesiaStart
    end
    
    subgraph "核心启动阶段"
        RabbitStart[rabbit:start/2<br/>application 回调]
        CreateSup[创建 rabbit_sup<br/>监督树根]
        RegisterName[注册 rabbit 进程名]
        LoadPlugins[加载插件应用]
        RefreshFF[刷新特性标志]
        RunBootSteps[运行 boot steps<br/>rabbit_boot_steps:run_boot_steps/1]
        
        RabbitStart --> CreateSup
        CreateSup --> RegisterName
        RegisterName --> LoadPlugins
        LoadPlugins --> RefreshFF
        RefreshFF --> RunBootSteps
    end
    
    subgraph "Boot Steps"
        direction TB
        PreBoot[pre_boot]
        CodecCheck[codec_correctness_check]
        DatabaseInit[database<br/>rabbit_db:init/0]
        FileHandles[file_handle_cache<br/>启动文件句柄缓存]
        WorkerPool[worker_pool<br/>启动工作进程池]
        ExternalInfra[external_infrastructure<br/>启动网络/日志等]
        Recovery[recovery<br/>队列与交换器恢复]
        RoutingReady[routing_ready<br/>路由系统就绪]
        LogExchange[log_exchange<br/>日志交换器]
        DirectClient[direct_client<br/>内部客户端支持]
        NetworkingMetadata[networking_metadata<br/>网络元数据初始化]
        Notify[notify_cluster<br/>通知集群节点上线]
        
        PreBoot --> CodecCheck
        CodecCheck --> DatabaseInit
        DatabaseInit --> FileHandles
        FileHandles --> WorkerPool
        WorkerPool --> ExternalInfra
        ExternalInfra --> Recovery
        Recovery --> RoutingReady
        RoutingReady --> LogExchange
        LogExchange --> DirectClient
        DirectClient --> NetworkingMetadata
        NetworkingMetadata --> Notify
    end
    
    subgraph "后启动阶段"
        PostlaunchPhase[run_postlaunch_phase/1]
        StartPluginApps[启动插件应用]
        NotifyUp[通知节点上线<br/>rabbit_node_monitor]
        RecoverVHosts[恢复虚拟主机<br/>rabbit_vhosts:boot/0]
        StartListeners[启动网络监听器<br/>rabbit_networking:boot/0]
        SetReady[标记状态为 ready<br/>rabbit_boot_state:set/1]
        
        PostlaunchPhase --> StartPluginApps
        StartPluginApps --> NotifyUp
        NotifyUp --> RecoverVHosts
        RecoverVHosts --> StartListeners
        StartListeners --> SetReady
    end
    
    StartIt --> PrelaunchApp
    MnesiaStart --> RabbitStart
    RunBootSteps --> PostlaunchPhase
    SetReady --> Ready[运行中<br/>ready state]

架构图说明

启动流程分解

  1. 入口函数

    • rabbit:start/0:供开发/测试使用,应用以 temporary 模式启动,失败抛出异常
    • rabbit:boot/0:供生产环境使用,应用以 transient 模式启动,失败导致节点退出
    • 两者最终都调用 start_it/1 协调启动
  2. 预启动阶段第一阶段rabbitmq_prelaunch 应用):

    • 尽早解析配置文件和环境变量
    • 创建上下文(context)映射,供后续阶段使用
    • 确保 Mnesia 未启动(避免配置不一致)
  3. 预启动阶段第二阶段run_prelaunch_second_phase/0):

    • 处理启用插件文件(enabled_plugins
    • 初始化特性标志注册表(feature flags registry)
    • 设置日志系统(使用 Erlang logger)
    • 启动 Ra 系统和 Khepri(即使使用 Mnesia 也会启动,为迁移做准备)
    • 执行集群检查(节点兼容性、特性标志一致性)
    • 如不使用 Khepri,启动 Mnesia
  4. 核心启动阶段rabbit:start/2):

    • 创建 rabbit_sup 监督树
    • 注册 rabbit 进程名(用于远程节点检查 is_running/1
    • 加载插件应用(不启动)
    • 刷新特性标志(包括插件的特性标志)
    • 运行 boot steps
  5. Boot Steps

    • 使用拓扑排序确保依赖顺序
    • 每个 step 由 {StepName, [{description, Desc}, {mfa, {M, F, A}}, {requires, [...]}, {enables, [...]}]} 定义
    • 关键 steps:
      • database:初始化 Mnesia/Khepri,创建表结构
      • file_handle_cache:启动文件句柄缓存(限制打开文件数)
      • external_infrastructure:启动核心基础设施(网络、日志、监控)
      • recovery:恢复持久化的队列和交换器
      • routing_ready:路由系统准备就绪
      • notify_cluster:通知集群其他节点本节点已启动
  6. 后启动阶段run_postlaunch_phase/1):

    • 启动插件应用(按依赖顺序)
    • 通知节点监控器(rabbit_node_monitor:notify_node_up/0
    • 恢复虚拟主机进程
    • 启动网络监听器(TCP/TLS)
    • 标记节点为 ready 状态

边界条件

  • 并发启动保护:通过注册 rabbit_boot 进程名防止重复启动
  • 超时控制
    • 启动开始超时:1 分钟(BOOT_START_TIMEOUT
    • 启动完成超时:12 小时(BOOT_FINISH_TIMEOUT),适用于大规模集群
    • 状态检查间隔:100ms(BOOT_STATUS_CHECK_INTERVAL
  • 原子性:预启动阶段任何步骤失败导致整个启动失败
  • 幂等性:多次调用 start/0 会检测到已启动状态,避免重复

扩展点

  • Boot Steps:插件可以定义自己的 boot steps,声明依赖关系
  • Cleanup Steps:插件可以定义 cleanup steps,在停止时执行
  • Prelaunch Hooks:通过 rabbitmq_prelaunch 扩展预启动逻辑

状态持有

  • 进程字典
    • 权限缓存配置(permission_cache_can_expire
    • 其他临时状态
  • 全局状态
    • rabbit_boot_state:ETS 表存储启动状态(booting/core_started/ready/stopped)
    • rabbit 注册进程名:标识节点运行状态
    • rabbit_boot 注册进程名:标识节点正在启动

资源占用

  • 内存:启动阶段峰值约 50-200MB(取决于配置和插件)
  • 进程数:启动完成后约 100-500 个 Erlang 进程(取决于配置)
  • 文件句柄:监听端口、日志文件、Mnesia/Khepri 数据文件

核心 API 详解

API 1: start/0 - 启动应用(开发模式)

基本信息

  • 名称start/0
  • 协议/方法:Erlang 函数调用
  • 幂等性:否(重复调用会检测已启动状态,返回 ok)
  • 适用场景:开发、测试、Erlang shell 交互

函数签名

-spec start() -> 'ok'.

请求参数

无参数。

响应

  • 成功ok
  • 失败:抛出异常(throw({error, Reason})

核心代码

start() ->
    %% start() vs. boot(): we want to throw an error in start().
    start_it(temporary).

start_it(StartType) ->
    case spawn_boot_marker() of
        {ok, Marker} ->
            ?LOG_INFO("RabbitMQ is asked to start...", []),
            try
                {Millis, ok} = timer:tc(
                    fun() ->
                        %% 1. 启动预启动应用
                        {ok, _} = application:ensure_all_started(
                                    rabbitmq_prelaunch, StartType),
                        %% 2. 启动 rabbit 应用
                        {ok, _} = application:ensure_all_started(
                                    rabbit, StartType),
                        %% 3. 等待启动完成或停止
                        wait_for_ready_or_stopped()
                    end, millisecond),
                ?LOG_INFO("Time to start RabbitMQ: ~b ms", [Millis]),
                stop_boot_marker(Marker),
                ok
            catch
                error:{badmatch, Error}:_ ->
                    %% 失败时停止预启动应用并清理
                    _ = application:stop(rabbitmq_prelaunch),
                    stop_boot_marker(Marker),
                    case StartType of
                        temporary -> throw(Error);  %% 抛出异常
                        _         -> exit(Error)    %% 退出节点
                    end
            end;
        {already_booting, Marker} ->
            %% 已在启动中,等待完成
            stop_boot_marker(Marker),
            ok
    end.

代码逐段解释

  1. 启动模式选择(第 2 行):

    • temporary 模式:应用崩溃不影响节点,适合开发测试
    • 区别于 boot/0 使用的 transient 模式
  2. 启动标记进程(第 4-5 行):

    • spawn_boot_marker/0 创建一个名为 rabbit_boot 的进程
    • 用于跨节点检测启动状态(is_booting/1
    • 如果已存在该进程,说明正在启动中,避免重复
  3. 计时启动过程(第 7-14 行):

    • 使用 timer:tc/2 测量启动耗时
    • 先启动 rabbitmq_prelaunch(预启动阶段第一阶段)
    • 再启动 rabbit(触发 start/2 回调)
    • wait_for_ready_or_stopped/0 阻塞等待启动完成或失败
  4. 错误处理(第 16-24 行):

    • 捕获 badmatch 错误(ensure_all_started 失败)
    • 停止预启动应用,清理上下文
    • temporary 模式抛出异常,transient 模式退出节点
  5. 并发启动保护(第 25-27 行):

    • 如果已有其他调用在启动,返回 ok
    • 依赖第一次调用完成启动

时序图

sequenceDiagram
    autonumber
    participant User as 用户/CLI
    participant Rabbit as rabbit 模块
    participant PrelaunchApp as rabbitmq_prelaunch
    participant AppCtl as application 控制器
    participant BootState as rabbit_boot_state
    
    User->>+Rabbit: start()
    Rabbit->>Rabbit: spawn_boot_marker()<br/>注册 rabbit_boot 进程
    
    alt 首次启动
        Rabbit->>+PrelaunchApp: application:ensure_all_started(rabbitmq_prelaunch)
        PrelaunchApp->>PrelaunchApp: 解析配置与环境
        PrelaunchApp->>PrelaunchApp: 创建上下文
        PrelaunchApp-->>-Rabbit: {ok, [rabbitmq_prelaunch]}
        
        Rabbit->>+AppCtl: application:ensure_all_started(rabbit)
        AppCtl->>Rabbit: start(normal, [])<br/>回调
        Note over Rabbit: 详见 start/2 时序图
        AppCtl-->>-Rabbit: {ok, [rabbit, ...]}
        
        Rabbit->>+BootState: wait_for_ready_or_stopped()
        BootState->>BootState: 轮询状态<br/>间隔 100ms
        BootState-->>-Rabbit: ok (状态为 ready)
        
        Rabbit->>Rabbit: stop_boot_marker()<br/>注销 rabbit_boot 进程
        Rabbit-->>-User: ok
    else 已在启动中
        Rabbit->>Rabbit: spawn_boot_marker()<br/>失败(已存在)
        Rabbit->>Rabbit: stop_boot_marker()
        Rabbit-->>User: ok
    end

异常与回退

  1. 配置文件错误

    • 预启动阶段抛出 {error, {bad_config, ...}}
    • 停止已启动的应用,抛出异常给调用者
  2. 依赖应用启动失败

    • 例如 Mnesia 启动失败
    • 停止 rabbitmq_prelaunch,抛出 {could_not_start, mnesia, Reason}
  3. Boot Step 失败

    • 某个 boot step 的 MFA 返回 {error, Reason}
    • 立即停止启动,退出节点(如果 transient 模式)
  4. 超时

    • 启动超过 12 小时(BOOT_FINISH_TIMEOUT
    • wait_for_ready_or_stopped/0 抛出 {timeout, waiting_for_ready}

性能要点

  • 启动时间

    • 空配置单节点:1-3 秒
    • 大规模集群(数千队列):30 秒 - 5 分钟
    • 影响因素:队列恢复、消息索引重建、集群同步
  • 瓶颈点

    • Mnesia 表加载(磁盘 I/O)
    • 队列进程启动(CPU 密集)
    • 网络往返(集群节点通信)
  • 优化建议

    • 使用 SSD 存储 Mnesia 数据
    • 减少持久化队列数量
    • 启用 Khepri(比 Mnesia 更快)

API 2: boot/0 - 启动应用(生产模式)

基本信息

  • 名称boot/0
  • 协议/方法:Erlang 函数调用
  • 幂等性:否
  • 适用场景:生产环境、操作系统服务(systemd)

函数签名

-spec boot() -> 'ok'.

核心代码

boot() ->
    %% start() vs. boot(): we want the node to exit in boot(). Because
    %% applications are started with `transient`, any error during their
    %% startup will abort the node.
    start_it(transient).

与 start/0 的区别

特性 start/0 boot/0
启动模式 temporary transient
失败行为 抛出异常(不退出节点) 退出节点
适用场景 开发/测试 生产环境
重启策略 手动重启 操作系统自动重启

时序图

start/0 相同,区别在于错误处理:

sequenceDiagram
    autonumber
    participant SystemD as systemd/启动脚本
    participant Rabbit as rabbit 模块
    participant BEAM as Erlang VM
    
    SystemD->>+Rabbit: boot()
    Rabbit->>Rabbit: start_it(transient)
    
    alt 启动成功
        Rabbit-->>-SystemD: ok
    else 启动失败
        Rabbit->>+BEAM: exit(Error)
        BEAM->>BEAM: 终止 VM
        BEAM-->>-SystemD: 退出码 非 0
        SystemD->>SystemD: 重启服务<br/>根据 systemd 配置
    end

API 3: stop/0 - 停止应用

基本信息

  • 名称stop/0
  • 协议/方法:Erlang 函数调用
  • 幂等性:是(多次调用均返回 ok)

函数签名

-spec stop() -> 'ok'.

核心代码

stop() ->
    case wait_for_ready_or_stopped() of
        ok ->
            ?LOG_INFO("Stopping RabbitMQ", []),
            %% 1. 停止插件应用
            Plugins = rabbit_plugins:active(),
            stop_apps(Plugins),
            %% 2. 停止 rabbit 应用
            rabbit_prelaunch:set_stop_reason(stop),
            ok = application:stop(rabbit),
            %% 3. 停止预启动应用
            ok = application:stop(rabbitmq_prelaunch),
            ?LOG_INFO("Stopped RabbitMQ application", []);
        {error, _} = Error ->
            Error
    end.

代码逐段解释

  1. 等待启动完成或已停止(第 2 行):

    • 如果正在启动中,等待启动完成再停止
    • 如果已停止,直接返回 ok
  2. 停止插件应用(第 4-5 行):

    • rabbit_plugins:active/0 获取当前激活的插件
    • stop_apps/1 按依赖倒序停止插件
    • 触发插件的 cleanup steps
  3. 停止核心应用(第 6-8 行):

    • 设置停止原因(用于日志和诊断)
    • application:stop(rabbit) 触发 prep_stop/1stop/1 回调
    • 停止 rabbit_sup 监督树(所有子进程)
  4. 停止预启动应用(第 9 行):

    • 清理上下文映射
    • 关闭日志输出

时序图

sequenceDiagram
    autonumber
    participant User as 用户/CLI
    participant Rabbit as rabbit 模块
    participant PluginMgr as rabbit_plugins
    participant AppCtl as application 控制器
    participant RabbitSup as rabbit_sup
    participant Children as 各子系统
    
    User->>+Rabbit: stop()
    Rabbit->>Rabbit: wait_for_ready_or_stopped()
    
    Rabbit->>+PluginMgr: active()
    PluginMgr-->>-Rabbit: [plugin1, plugin2, ...]
    
    loop 每个插件(倒序)
        Rabbit->>+AppCtl: application:stop(PluginN)
        AppCtl->>AppCtl: 触发 cleanup steps
        AppCtl->>AppCtl: 停止插件监督树
        AppCtl-->>-Rabbit: ok
    end
    
    Rabbit->>+AppCtl: application:stop(rabbit)
    AppCtl->>+Rabbit: prep_stop(State)<br/>预停止回调
    Rabbit->>Rabbit: 停止接受新连接
    Rabbit->>Rabbit: 关闭监听器
    Rabbit-->>-AppCtl: State
    
    AppCtl->>+Rabbit: stop(State)<br/>停止回调
    Rabbit->>+RabbitSup: supervisor:terminate_child(...)
    RabbitSup->>+Children: 逐个停止子进程
    Children->>Children: 保存状态(如队列索引)
    Children->>Children: 关闭连接/文件
    Children-->>-RabbitSup: ok
    RabbitSup-->>-Rabbit: ok
    Rabbit-->>-AppCtl: ok
    AppCtl-->>-Rabbit: ok
    
    Rabbit->>+AppCtl: application:stop(rabbitmq_prelaunch)
    AppCtl->>AppCtl: 清理上下文
    AppCtl-->>-Rabbit: ok
    
    Rabbit-->>-User: ok

异常与回退

  1. 连接未关闭

    • prep_stop/1 会等待所有客户端连接关闭(有超时)
    • 超时后强制关闭连接
  2. 队列未保存状态

    • 经典队列保存索引到磁盘
    • 仲裁队列通过 Ra 快照持久化
    • 保存失败不影响停止,但下次启动恢复时间增加
  3. 插件停止失败

    • 记录错误日志,继续停止其他插件
    • 不阻止核心应用停止

API 4: start/2 - Application 回调(核心启动)

基本信息

  • 名称start/2
  • 协议/方法application 行为回调
  • 调用者:Erlang Application Controller

函数签名

-spec start('normal', []) ->
    {'error',
     {'erlang_version_too_old',
      {'found', string(), string()},
      {'required', string(), string()}}} |
    {'ok', pid()}.

请求参数

参数 类型 说明
StartType normal 正常启动(非 takeover/failover)
StartArgs [] 启动参数(通常为空)

响应结构

  • 成功{ok, SupPid} - SupPidrabbit_sup 的进程 ID
  • 失败{error, Reason} - 启动失败原因

核心代码

start(normal, []) ->
    %% 1. 重置启动状态为 booting
    rabbit_boot_state:set(booting),
    rabbit_prelaunch:clear_stop_reason(),
    
    try
        %% 2. 运行预启动第二阶段
        run_prelaunch_second_phase(),
        
        %% 3. 打印产品信息
        ProductInfo = product_info(),
        ?LOG_INFO(
           "~n Starting ~ts ~ts on Erlang ~ts [~ts]~n ~ts~n ~ts",
           [product_name(), product_version(), rabbit_misc:otp_release(),
            emu_flavor(), ?COPYRIGHT_MESSAGE, product_license_line()]),
        log_motd(),
        
        %% 4. 创建根监督者
        {ok, SupPid} = rabbit_sup:start_link(),
        
        %% 5. 启动特性标志控制器(如启用 feature_flags_v2)
        ok = rabbit_sup:start_child(rabbit_ff_controller),
        
        %% 6. 注册 rabbit 进程名(用于远程节点检查)
        true = register(rabbit, self()),
        
        print_banner(),
        log_banner(),
        warn_if_kernel_config_dubious(),
        
        %% 7. 设置插件
        Plugins = rabbit_plugins:setup(),
        ?LOG_DEBUG("Loading the following plugins: ~tp", [Plugins]),
        ok = app_utils:load_applications(Plugins),
        
        %% 8. 刷新特性标志(包括插件的)
        case rabbit_feature_flags:refresh_feature_flags_after_app_load() of
            ok     -> ok;
            Error1 -> throw(Error1)
        end,
        
        persist_static_configuration(),
        
        %% 9. 运行 boot steps
        ok = rabbit_boot_steps:run_boot_steps([rabbit | Plugins]),
        rabbit_boot_state:set(core_started),
        
        %% 10. 运行后启动阶段
        run_postlaunch_phase(Plugins),
        {ok, SupPid}
    catch
        throw:{error, _} = Error ->
            _ = mnesia:stop(),
            rabbit_prelaunch:set_stop_reason(Error),
            rabbit_boot_state:set(stopped),
            Error
    end.

代码逐段解释

  1. 状态初始化(第 2-3 行):

    • 设置启动状态为 booting
    • 清除上次停止原因(如果有)
  2. 预启动第二阶段(第 6 行):

    • 详见前文"预启动阶段第二阶段"
    • 包括特性标志、日志、Khepri/Ra、集群检查、Mnesia 启动
  3. 产品信息(第 8-14 行):

    • 记录 RabbitMQ 版本、Erlang 版本、许可证信息
    • 显示 MOTD(Message of the Day)
  4. 创建监督树(第 17 行):

    • rabbit_sup:start_link/0 创建根监督者
    • 返回的 SupPid 将作为 application master 的监督进程
  5. 启动特性标志控制器(第 19 行):

    • rabbit_ff_controller 管理特性标志状态(v2 架构)
    • 必须在 boot steps 前启动,因为刷新特性标志可能用到它
  6. 注册进程名(第 21 行):

    • register(rabbit, self()) 注册 application master 进程
    • 远程节点通过 whereis({rabbit, Node}) 检查节点是否运行
  7. 设置插件(第 26-28 行):

    • rabbit_plugins:setup/0 读取 enabled_plugins 文件
    • 返回启用的插件列表(按依赖顺序)
    • 加载插件应用(不启动)
  8. 刷新特性标志(第 31-35 行):

    • 扫描所有已加载应用的模块属性
    • 注册新发现的特性标志
    • 检查冲突或不兼容的特性标志
  9. 运行 Boot Steps(第 39-40 行):

    • 详见后文"Boot Steps 详解"
    • 拓扑排序确保依赖顺序
    • 初始化核心子系统(数据库、文件缓存、网络等)
  10. 后启动阶段(第 43 行):

    • 启动插件应用
    • 通知集群节点上线
    • 恢复虚拟主机
    • 启动网络监听器

时序图

sequenceDiagram
    autonumber
    participant AppCtl as Application Controller
    participant Rabbit as rabbit:start/2
    participant Prelaunch as run_prelaunch_second_phase
    participant RabbitSup as rabbit_sup
    participant BootSteps as rabbit_boot_steps
    participant DB as rabbit_db
    participant Plugins as plugin applications
    participant Postlaunch as run_postlaunch_phase
    
    AppCtl->>+Rabbit: start(normal, [])
    Rabbit->>Rabbit: rabbit_boot_state:set(booting)
    
    Rabbit->>+Prelaunch: run_prelaunch_second_phase()
    Prelaunch->>Prelaunch: 启用插件文件处理
    Prelaunch->>Prelaunch: 特性标志注册表初始化
    Prelaunch->>Prelaunch: 日志系统设置
    Prelaunch->>Prelaunch: Ra 系统启动
    Prelaunch->>Prelaunch: Khepri 启动
    Prelaunch->>Prelaunch: 集群检查
    Prelaunch->>Prelaunch: Mnesia 启动(如适用)
    Prelaunch-->>-Rabbit: ok
    
    Rabbit->>+RabbitSup: start_link()
    RabbitSup->>RabbitSup: 初始化监督者<br/>one_for_all 策略
    RabbitSup-->>-Rabbit: {ok, SupPid}
    
    Rabbit->>Rabbit: register(rabbit, self())
    Rabbit->>Rabbit: rabbit_plugins:setup()<br/>获取插件列表
    Rabbit->>Rabbit: app_utils:load_applications(Plugins)
    Rabbit->>Rabbit: refresh_feature_flags_after_app_load()
    
    Rabbit->>+BootSteps: run_boot_steps([rabbit | Plugins])
    
    loop 每个 boot step(拓扑排序)
        BootSteps->>BootSteps: 检查 requires 依赖
        BootSteps->>+DB: 示例: rabbit_db:init()
        DB->>DB: 初始化数据库<br/>创建表结构
        DB-->>-BootSteps: ok
    end
    
    BootSteps-->>-Rabbit: ok
    Rabbit->>Rabbit: rabbit_boot_state:set(core_started)
    
    Rabbit->>+Postlaunch: run_postlaunch_phase(Plugins)
    
    loop 每个插件
        Postlaunch->>+Plugins: application:ensure_all_started(PluginN)
        Plugins->>Plugins: 启动插件监督树
        Plugins->>Plugins: 插件 boot steps
        Plugins-->>-Postlaunch: {ok, [PluginN | Deps]}
    end
    
    Postlaunch->>Postlaunch: rabbit_node_monitor:notify_node_up()
    Postlaunch->>Postlaunch: rabbit_vhosts:boot()<br/>恢复虚拟主机
    Postlaunch->>Postlaunch: rabbit_networking:boot()<br/>启动监听器
    Postlaunch->>Postlaunch: rabbit_boot_state:set(ready)
    Postlaunch-->>-Rabbit: ok
    
    Rabbit-->>-AppCtl: {ok, SupPid}

异常与回退

  1. 预启动阶段失败

    • 例如:配置错误、Mnesia 启动失败、集群不兼容
    • 停止 Mnesia(如已启动)
    • 设置停止原因和状态为 stopped
    • 返回 {error, Reason} 给 Application Controller
  2. 监督者启动失败

    • rabbit_sup:start_link/0 失败(极少见)
    • 抛出异常,Application Controller 标记应用为失败
  3. 特性标志冲突

    • 发现不兼容的特性标志定义
    • 抛出 {error, {feature_flags_error, ...}}
  4. Boot Step 失败

    • 某个 boot step 的 MFA 返回非 ok
    • 立即停止启动,执行异常处理
    • 回滚已完成的 boot steps(如果有 cleanup)
  5. 插件启动失败

    • 后启动阶段某插件启动失败
    • 停止已启动的插件
    • 停止核心应用

性能要点

  • 并行化

    • Boot steps 通过依赖声明实现隐式并行(未来优化)
    • 插件启动串行(按依赖顺序)
    • 虚拟主机恢复可并行
  • 关键路径

    • 数据库初始化(Mnesia/Khepri)
    • 队列恢复(recovery boot step)
    • 网络监听器启动
  • 内存占用

    • Boot steps 阶段:100-300MB
    • 插件启动后:200-500MB
    • 虚拟主机恢复后:取决于队列数

API 5: stop/1 - Application 回调(停止)

基本信息

  • 名称stop/1
  • 协议/方法application 行为回调
  • 调用者:Erlang Application Controller

函数签名

-spec stop(term()) -> 'ok'.

请求参数

参数 类型 说明
State term() prep_stop/1 返回的状态

核心代码

stop(State) ->
    ok = rabbit_alarm:stop(),
    ok = case rabbit_mnesia:is_clustered() of
             true  -> ok;
             false -> rabbit_table:clear_ram_only_tables()
         end,
    ok = rabbit_boot_state:set(stopped),
    State.

代码逐段解释

  1. 停止告警系统(第 2 行):

    • rabbit_alarm:stop/0 停止内存/磁盘告警监控
    • 注销告警处理器
  2. 清理内存表(第 3-6 行):

    • 非集群模式:清空 Mnesia RAM-only 表(连接、通道、消费者等)
    • 集群模式:表由其他节点维护,不清理
  3. 设置停止状态(第 7 行):

    • rabbit_boot_state:set(stopped) 标记节点已停止
    • 其他节点可通过 is_running/1 检测到
  4. 返回状态(第 8 行):

    • 返回 prep_stop/1 传递的状态(通常忽略)

与 prep_stop/1 的关系

sequenceDiagram
    participant AppCtl as Application Controller
    participant Rabbit as rabbit 模块
    participant RabbitSup as rabbit_sup
    
    AppCtl->>+Rabbit: prep_stop(State)
    Note over Rabbit: 预停止阶段<br/>停止接受新连接
    Rabbit->>Rabbit: 关闭监听器
    Rabbit->>Rabbit: 等待连接关闭(有超时)
    Rabbit-->>-AppCtl: NewState
    
    AppCtl->>+RabbitSup: supervisor:terminate_child(...)
    Note over RabbitSup: 停止所有子进程<br/>(网络、队列、交换器等)
    RabbitSup-->>-AppCtl: ok
    
    AppCtl->>+Rabbit: stop(NewState)
    Note over Rabbit: 最终清理
    Rabbit->>Rabbit: 停止告警系统
    Rabbit->>Rabbit: 清理内存表
    Rabbit->>Rabbit: 设置状态为 stopped
    Rabbit-->>-AppCtl: ok

API 6: prep_stop/1 - 预停止回调

基本信息

  • 名称prep_stop/1
  • 协议/方法application 行为回调
  • 调用者:Erlang Application Controller
  • 目的:在停止监督树前执行清理(如关闭监听器)

函数签名

-spec prep_stop(term()) -> term().

核心代码

prep_stop(State) ->
    rabbit_alarm:unregister(self()),
    Listeners = rabbit_networking:stop(),
    ok = rabbit_boot_state:set(stopping),
    rabbit_boot_state:wait_for_exit(),
    %% Saved so we can try to avoid stalling shutdown.
    %% See rabbit_mnesia:remove_node_offline/1.
    rabbit_nodes:ensure_written(),
    State.

代码逐段解释

  1. 注销告警监听(第 2 行):

    • Application master 进程注销为告警接收者
    • 避免在停止过程中收到告警消息
  2. 停止网络监听器(第 3 行):

    • rabbit_networking:stop/0 关闭所有 TCP/TLS 监听端口
    • 拒绝新连接
    • 返回已停止的监听器列表(用于日志)
  3. 设置停止状态(第 4 行):

    • rabbit_boot_state:set(stopping) 标记节点正在停止
    • 其他模块可通过检查状态决定行为
  4. 等待退出信号(第 5 行):

    • wait_for_exit/0 等待特定进程退出或超时
    • 确保优雅关闭连接
  5. 保存节点信息(第 8 行):

    • rabbit_nodes:ensure_written/0 持久化当前节点列表
    • 用于离线节点移除功能

关键数据结构与类型

类型定义

%% 启动类型
-type restart_type() :: 'permanent' | 'transient' | 'temporary'.

%% 应用参数
-type param() :: atom().
-type app_name() :: atom().

%% 产品信息
-type product_info() :: #{
    product_name := string(),
    product_version := string(),
    product_base_name => string(),
    product_base_version => string(),
    product_overridden => boolean()
}.

类型说明

restart_type/0

说明 使用场景
permanent 应用退出时节点总是重启 关键系统服务
transient 正常退出不重启,异常退出重启节点 生产环境 RabbitMQ
temporary 应用退出不影响节点 开发/测试

启动状态(rabbit_boot_state)

状态 含义 转换条件
booting 正在启动 start/2 入口
core_started 核心启动完成 Boot steps 完成
ready 完全就绪 后启动阶段完成
stopping 正在停止 prep_stop/1 入口
stopped 已停止 stop/1 完成

状态转换图

stateDiagram-v2
    [*] --> booting: start/2 调用
    booting --> core_started: boot steps 完成
    core_started --> ready: 后启动阶段完成
    ready --> stopping: prep_stop/1 调用
    stopping --> stopped: stop/1 完成
    stopped --> [*]
    
    booting --> stopped: 启动失败
    core_started --> stopped: 后启动失败

UML 类图

classDiagram
    class rabbit {
        <<module>>
        +start() ok
        +boot() ok
        +stop() ok
        +start(normal, []) {ok, pid()}
        +prep_stop(State) State
        +stop(State) ok
        +status() status_map()
        +is_running() boolean()
        +is_serving() boolean()
        -start_it(restart_type()) ok
        -run_prelaunch_second_phase() ok
        -run_postlaunch_phase([app_name()]) ok
        -spawn_boot_marker() {ok, pid()} | {already_booting, pid()}
        -wait_for_ready_or_stopped() ok
    }
    
    class rabbit_boot_state {
        <<module>>
        +set(State) ok
        +get() State
        +wait_for(State, Timeout) ok | {error, timeout}
        +has_reached(State) boolean()
    }
    
    class rabbit_sup {
        <<supervisor>>
        +start_link() {ok, pid()}
        +start_child(Module) ok
    }
    
    class rabbit_boot_steps {
        <<module>>
        +run_boot_steps([app_name()]) ok
        +run_cleanup_steps([app_name()]) ok
        +find_steps() [{app_name(), step_name(), attributes()}]
        -sort_boot_steps([step_def()]) [step_def()]
    }
    
    class rabbit_plugins {
        <<module>>
        +setup() [app_name()]
        +active() [app_name()]
    }
    
    rabbit --> rabbit_boot_state : uses
    rabbit --> rabbit_sup : creates
    rabbit --> rabbit_boot_steps : uses
    rabbit --> rabbit_plugins : uses
    rabbit_boot_steps --> rabbit : callbacks (boot step MFAs)

Boot Steps 详解

Boot Step 机制

Boot steps 是 RabbitMQ 的启动协调机制,通过声明式依赖关系自动确定初始化顺序。

Boot Step 定义

-rabbit_boot_step({step_name,
                   [{description, "Human readable description"},
                    {mfa, {Module, Function, Args}},  %% 执行的函数
                    {requires, [other_step1, other_step2]},  %% 依赖步骤
                    {enables, [enabled_step1]}]}).  %% 启用步骤

核心 Boot Steps 列表

Step Name 描述 MFA 关键依赖
pre_boot 启动开始标记 - -
codec_correctness_check 验证编解码器正确性 rabbit_registry:codec_correctness_check/0 pre_boot
database 初始化数据库 rabbit_db:init/0 file_handle_cache
file_handle_cache 启动文件句柄缓存 rabbit:start_fhc/0 pre_boot
worker_pool 启动工作进程池 worker_pool:start_link/0 pre_boot
external_infrastructure 启动外部基础设施 rabbit_event:init/0 database
rabbit_registry 注册表初始化 rabbit_registry:init/0 external_infrastructure
recovery 恢复队列与交换器 rabbit:recover/0 database
routing_ready 路由系统就绪 - recovery
log_exchange 日志交换器 - routing_ready
direct_client 内部客户端支持 - external_infrastructure
networking 网络子系统 rabbit_networking:boot/0 direct_client
notify_cluster 通知集群 rabbit_node_monitor:notify_node_up/0 networking

Boot Steps 依赖图

graph TD
    pre_boot[pre_boot]
    codec[codec_correctness_check]
    fhc[file_handle_cache]
    pool[worker_pool]
    db[database]
    infra[external_infrastructure]
    registry[rabbit_registry]
    recovery[recovery]
    routing[routing_ready]
    log[log_exchange]
    direct[direct_client]
    net[networking]
    notify[notify_cluster]
    
    pre_boot --> codec
    pre_boot --> fhc
    pre_boot --> pool
    fhc --> db
    db --> infra
    db --> recovery
    infra --> registry
    infra --> direct
    recovery --> routing
    routing --> log
    direct --> net
    net --> notify

关键 Boot Step 详解

database Boot Step

-rabbit_boot_step({database,
                   [{description, "database"},
                    {mfa, {rabbit_db, init, []}},
                    {requires, file_handle_cache},
                    {enables, external_infrastructure}]}).

功能

  • 初始化 Mnesia 或 Khepri 数据库
  • 创建表结构(队列、交换器、绑定、用户等)
  • 执行数据库迁移(如从 Mnesia 到 Khepri)
  • 等待表可用

核心代码rabbit_db:init/0):

init() ->
    %% 1. 确定使用的数据库(Mnesia 或 Khepri)
    Backend = case rabbit_khepri:is_enabled() of
                  true  -> khepri;
                  false -> mnesia
              end,
    
    %% 2. 等待表/数据可用
    case Backend of
        mnesia ->
            ok = rabbit_table:wait_for_replicated(),
            ok = rabbit_table:wait(rabbit_user:pattern_match_all());
        khepri ->
            ok = rabbit_khepri:wait_for_store()
    end,
    
    %% 3. 执行数据迁移(如需要)
    case rabbit_feature_flags:is_enabled(khepri_db) of
        true ->
            ok = rabbit_db_m2k_converter:sync_if_needed();
        false ->
            ok
    end,
    
    %% 4. 初始化各数据层模块
    ok = rabbit_db_cluster:init(),
    ok = rabbit_db_user:init(),
    ok = rabbit_db_vhost:init(),
    ok = rabbit_db_exchange:init(),
    ok = rabbit_db_queue:init(),
    ok = rabbit_db_binding:init(),
    ok.

recovery Boot Step

-rabbit_boot_step({recovery,
                   [{description, "recovery"},
                    {mfa, {rabbit, recover, []}},
                    {requires, database},
                    {enables, routing_ready}]}).

功能

  • 恢复持久化的交换器
  • 恢复持久化的队列(启动队列进程)
  • 重建路由表(绑定关系)

核心代码rabbit:recover/0):

recover() ->
    %% 1. 恢复所有虚拟主机的交换器
    [rabbit_exchange:recover(VHost) || VHost <- rabbit_vhost:list_names()],
    
    %% 2. 恢复所有虚拟主机的队列
    [begin
        Queues = rabbit_amqqueue:list(VHost),
        [rabbit_amqqueue:recover(Q) || Q <- Queues]
     end || VHost <- rabbit_vhost:list_names()],
    
    ok.

时序

sequenceDiagram
    participant BS as Boot Steps
    participant Recover as rabbit:recover/0
    participant Exchange as rabbit_exchange
    participant Queue as rabbit_amqqueue
    participant QProc as Queue 进程
    
    BS->>+Recover: recover()
    
    loop 每个虚拟主机
        Recover->>+Exchange: recover(VHost)
        Exchange->>Exchange: 从数据库加载交换器
        Exchange->>Exchange: 应用 policies
        Exchange-->>-Recover: [ExchangeNames]
        
        Recover->>+Queue: list(VHost)
        Queue->>Queue: 从数据库加载队列定义
        Queue-->>-Recover: [Queues]
        
        loop 每个队列
            Recover->>+Queue: recover(Queue)
            Queue->>+QProc: 启动队列进程
            QProc->>QProc: 恢复消息索引
            QProc->>QProc: 重建状态
            QProc-->>-Queue: {ok, QPid}
            Queue-->>-Recover: ok
        end
    end
    
    Recover-->>-BS: ok

networking Boot Step

实际由后启动阶段的 rabbit_networking:boot/0 执行。

功能

  • 启动 TCP/TLS 监听器
  • 根据配置监听多个端口和接口
  • 注册协议处理器

核心代码rabbit_networking:boot/0):

boot() ->
    %% 1. 读取监听器配置
    Listeners = application:get_env(rabbit, tcp_listeners, [5672]),
    SSLListeners = application:get_env(rabbit, ssl_listeners, []),
    
    %% 2. 启动 TCP 监听器
    [begin
        {ok, _} = rabbit_networking:start_tcp_listener(Address, Port, Opts)
     end || {Address, Port, Opts} <- Listeners],
    
    %% 3. 启动 TLS 监听器
    [begin
        {ok, _} = rabbit_networking:start_ssl_listener(Address, Port, Opts)
     end || {Address, Port, Opts} <- SSLListeners],
    
    ok.

后启动阶段详解

run_postlaunch_phase/1

run_postlaunch_phase(Plugins) ->
    ?LOG_DEBUG(""),
    ?LOG_DEBUG("== Postlaunch phase =="),
    
    %% 1. 启动插件应用
    ?LOG_DEBUG("Starting plugin applications"),
    start_apps(Plugins, #{}),
    
    %% 2. 通知节点上线
    rabbit_node_monitor:notify_node_up(),
    
    %% 3. 恢复虚拟主机
    rabbit_vhosts:boot(),
    
    %% 4. 启动网络监听器
    rabbit_networking:boot(),
    
    %% 5. 标记为 ready
    rabbit_boot_state:set(ready),
    ?LOG_INFO("Server startup complete; ~b plugins started.", [length(Plugins)]),
    
    ok.

时序图

sequenceDiagram
    autonumber
    participant Start as rabbit:start/2
    participant Post as run_postlaunch_phase
    participant Plugins as Plugin Applications
    participant NodeMon as rabbit_node_monitor
    participant VHosts as rabbit_vhosts
    participant Net as rabbit_networking
    participant State as rabbit_boot_state
    
    Start->>+Post: run_postlaunch_phase(Plugins)
    
    loop 每个插件
        Post->>+Plugins: start_apps([Plugin])
        Plugins->>Plugins: application:ensure_all_started
        Plugins->>Plugins: 运行插件 boot steps
        Plugins-->>-Post: ok
    end
    
    Post->>+NodeMon: notify_node_up()
    NodeMon->>NodeMon: 通知集群节点<br/>本节点已上线
    NodeMon->>NodeMon: 建立节点监控
    NodeMon-->>-Post: ok
    
    Post->>+VHosts: boot()
    VHosts->>VHosts: 确保每个 vhost<br/>有运行的进程
    VHosts->>VHosts: 协调多节点 vhost 进程
    VHosts-->>-Post: ok
    
    Post->>+Net: boot()
    Net->>Net: 启动 TCP 监听器
    Net->>Net: 启动 TLS 监听器
    Net->>Net: 注册协议处理器
    Net-->>-Post: ok
    
    Post->>+State: set(ready)
    State->>State: 写入 ETS 表
    State->>State: 通知等待进程
    State-->>-Post: ok
    
    Post-->>-Start: ok

关键函数调用链路

启动完整调用链

用户调用 rabbit:start/0
  └─> start_it(temporary)
      ├─> spawn_boot_marker()  // 注册 rabbit_boot 进程
      ├─> application:ensure_all_started(rabbitmq_prelaunch)
         └─> rabbitmq_prelaunch:start/2
             ├─> 解析配置文件
             ├─> 创建上下文
             └─> 设置环境变量
      
      ├─> application:ensure_all_started(rabbit)
         └─> rabbit:start(normal, [])
             ├─> rabbit_boot_state:set(booting)
             
             ├─> run_prelaunch_second_phase()
                ├─> rabbit_prelaunch_enabled_plugins_file:setup()
                ├─> rabbit_prelaunch_feature_flags:setup()
                ├─> rabbit_prelaunch_logging:setup()
                ├─> rabbit_ra_systems:setup()
                ├─> rabbit_khepri:setup()
                ├─> rabbit_prelaunch_cluster:setup()
                └─> mnesia:start() (如不使用 Khepri)
             
             ├─> rabbit_sup:start_link()
                └─> supervisor:init([])
                    └─> 初始化监督者(one_for_all
             
             ├─> register(rabbit, self())
             
             ├─> rabbit_plugins:setup()
                ├─> 读取 enabled_plugins 文件
                └─> 按依赖顺序排列插件
             
             ├─> app_utils:load_applications(Plugins)
             
             ├─> rabbit_feature_flags:refresh_feature_flags_after_app_load()
                ├─> 扫描模块属性
                ├─> 注册新特性标志
                └─> 检查兼容性
             
             ├─> rabbit_boot_steps:run_boot_steps([rabbit | Plugins])
                ├─> find_steps([rabbit | Plugins])
                   ├─> 扫描模块属性(-rabbit_boot_step
                   └─> sort_boot_steps()
                       └─> 拓扑排序(digraph_utils:topsort
                
                └─> for each step:
                    ├─> run_step(Attributes, mfa)
                    └─> apply(M, F, A)
                        例如:
                        ├─> rabbit_db:init()
                        ├─> rabbit:start_fhc()
                        ├─> rabbit:recover()
                        └─> ...
             
             ├─> rabbit_boot_state:set(core_started)
             
             └─> run_postlaunch_phase(Plugins)
                 ├─> start_apps(Plugins, #{})
                    └─> for each plugin:
                        ├─> app_utils:load_applications([Plugin])
                        ├─> rabbit_boot_steps:run_boot_steps([Plugin])
                        └─> application:ensure_all_started(Plugin)
                 
                 ├─> rabbit_node_monitor:notify_node_up()
                 ├─> rabbit_vhosts:boot()
                 ├─> rabbit_networking:boot()
                 └─> rabbit_boot_state:set(ready)
      
      └─> wait_for_ready_or_stopped()
          └─> rabbit_boot_state:wait_for(ready, 12h)
              └─> 轮询检查状态,间隔 100ms

配置与可观测

关键配置项

配置项 默认值 说明 影响
cluster_name 节点名 集群名称 管理界面显示,日志标识
boot_steps_timeout 无限 Boot step 超时 启动失败快速检测
default_vhost / 默认虚拟主机 首次启动自动创建
default_user guest 默认用户名 首次启动自动创建
default_permissions .* 默认权限 读写配置权限
disk_free_limit 50MB 磁盘剩余下限 触发告警,阻塞发布
vm_memory_high_watermark 0.4 内存水位线 触发流控,阻塞发布
log_levels - 日志级别映射 控制日志输出

观测指标

启动耗时(分阶段)

  • 预启动阶段第一阶段:< 1 秒(配置解析)
  • 预启动阶段第二阶段:1-5 秒(Mnesia/Khepri 启动,集群检查)
  • Boot steps:2-60 秒(队列恢复是瓶颈)
  • 后启动阶段:1-10 秒(插件启动,网络监听)

日志关键字

日志 含义 级别
Starting <ProductName> <Version> 开始启动 INFO
Running boot step <Step> 执行 boot step INFO
Time to start RabbitMQ: <N> ms 启动完成,耗时 INFO
Server startup complete; <N> plugins started 全部就绪 INFO
Stopping RabbitMQ 开始停止 INFO
Stopped RabbitMQ application 停止完成 INFO

可监控状态

%% 检查运行状态
rabbit:is_running().
%% => true | false

%% 检查服务状态(是否接受连接)
rabbit:is_serving().
%% => true | false

%% 获取详细状态
rabbit:status().
%% => #{
%%     pid => <0.123.0>,
%%     running_applications => [...],
%%     listeners => [{port, 5672}, ...],
%%     alarms => []
%% }

实战经验与最佳实践

启动优化

  1. 使用 Khepri 替代 Mnesia

    • 启动更快(无需等待表复制)
    • 一致性更强(Raft 协议)
    • 配置:rabbitmq.conf 中设置 metadata_store = khepri
  2. 减少队列恢复时间

    • 避免大量小队列(合并或使用 streams)
    • 使用 quorum queues(恢复更快)
    • 定期清理未使用的队列
  3. 并行启动节点

    • 集群节点可并行启动
    • 使用 peer discovery 自动加入
    • 避免串行等待节点上线

故障排查

启动卡住

现象rabbit:start() 长时间不返回

排查步骤

  1. 检查日志最后一条 boot step:

    tail -f /var/log/rabbitmq/rabbit@hostname.log | grep "Running boot step"
    
  2. 常见卡住点:

    • database:Mnesia 表损坏或网络分区
    • recovery:大量队列恢复慢
    • networking:端口被占用
  3. 解决方案:

    • Mnesia 损坏:使用 rabbitmq-diagnostics cluster_status 检查,必要时重建节点
    • 队列恢复慢:增加 queue_index_embed_msgs_below 配置,使用 SSD
    • 端口占用:更改配置或停止占用进程

启动失败后无法再启动

原因rabbitmq_prelaunch 未正确停止,上下文残留

解决方案

%% Erlang shell
application:stop(rabbitmq_prelaunch).
application:stop(rabbit).
%% 然后重新启动
rabbit:start().

插件启动失败

现象:核心启动成功,但某插件失败

排查

# 查看插件状态
rabbitmq-plugins list
# 禁用有问题的插件
rabbitmq-plugins disable <plugin_name>
# 重新启动
rabbitmqctl stop_app
rabbitmqctl start_app

性能监控

启动性能基准

场景 队列数 消息数 启动时间 瓶颈
空节点 0 0 1-2s
小规模 100 10K 3-5s 队列恢复
中规模 1000 100K 10-30s 队列恢复 + 索引
大规模 10000 1M 60-300s 磁盘 I/O

监控脚本

#!/bin/bash
# 监控启动进度
while ! rabbitmqctl await_online_nodes 1; do
    echo "Waiting for node to come online..."
    sleep 1
done

rabbitmqctl await_startup
echo "RabbitMQ startup complete!"

典型场景时序图

场景 1:首次启动(新节点)

sequenceDiagram
    autonumber
    participant User
    participant Rabbit as rabbit:start/0
    participant Mnesia
    participant DB as rabbit_db
    participant VHost as rabbit_vhost
    
    User->>+Rabbit: start()
    Rabbit->>Rabbit: 预启动阶段
    Rabbit->>+Mnesia: mnesia:start()
    Mnesia->>Mnesia: 检测首次启动<br/>无数据目录
    Mnesia->>Mnesia: 创建 schema
    Mnesia-->>-Rabbit: ok
    
    Rabbit->>+DB: init() (boot step)
    DB->>Mnesia: 创建表<br/>rabbit_user, rabbit_vhost, etc.
    Mnesia->>Mnesia: 创建磁盘表<br/>rabbit_durable_queue
    Mnesia->>Mnesia: 创建内存表<br/>rabbit_route
    Mnesia-->>DB: ok
    DB-->>-Rabbit: ok
    
    Rabbit->>+DB: maybe_insert_default_data() (boot step)
    DB->>+VHost: add(<<"/">>, #{})
    VHost->>Mnesia: 插入默认 vhost
    VHost-->>-DB: ok
    DB->>DB: 创建默认用户 "guest"
    DB->>DB: 设置默认权限
    DB-->>-Rabbit: ok
    
    Rabbit->>Rabbit: 后续 boot steps
    Rabbit->>Rabbit: 后启动阶段
    Rabbit-->>-User: ok

场景 2:集群节点加入

sequenceDiagram
    autonumber
    participant CLI
    participant NewNode as 新节点 rabbit
    participant ExistingNode as 已有节点 rabbit
    participant Mnesia
    
    Note over NewNode: 初始启动(单节点)
    CLI->>+NewNode: start()
    NewNode->>NewNode: 预启动阶段
    NewNode-->>-CLI: ok (单节点集群)
    
    Note over NewNode,ExistingNode: 加入集群
    CLI->>+NewNode: stop_app()
    NewNode->>NewNode: 停止应用<br/>保留 Mnesia
    NewNode-->>-CLI: ok
    
    CLI->>+NewNode: join_cluster(ExistingNode)
    NewNode->>+Mnesia: change_config(extra_db_nodes, [ExistingNode])
    Mnesia->>+ExistingNode: 请求集群信息
    ExistingNode->>ExistingNode: 检查兼容性<br/>特性标志一致性
    ExistingNode-->>-Mnesia: cluster_info
    Mnesia->>Mnesia: 复制表定义
    Mnesia-->>-NewNode: ok
    NewNode-->>-CLI: ok
    
    CLI->>+NewNode: start_app()
    NewNode->>NewNode: 预启动阶段<br/>(检测到集群配置)
    NewNode->>+ExistingNode: 同步元数据
    ExistingNode-->>-NewNode: queues, exchanges, bindings
    NewNode->>NewNode: Boot steps
    NewNode->>NewNode: 后启动阶段
    NewNode->>+ExistingNode: notify_node_up()
    ExistingNode->>ExistingNode: 更新集群视图
    ExistingNode-->>-NewNode: ok
    NewNode-->>-CLI: ok

场景 3:优雅停止与重启

sequenceDiagram
    autonumber
    participant CLI
    participant Rabbit as rabbit:stop/0
    participant Net as rabbit_networking
    participant Conn as 客户端连接
    participant Queue as 队列进程
    participant Mnesia
    
    CLI->>+Rabbit: stop()
    
    Rabbit->>+Net: stop()
    Net->>Net: 关闭监听器<br/>拒绝新连接
    Net->>+Conn: 发送连接关闭帧<br/>connection.close
    Conn->>Conn: 客户端处理关闭
    Conn-->>-Net: connection.close-ok
    Net-->>-Rabbit: ok
    
    Rabbit->>Rabbit: application:stop(rabbit)
    Rabbit->>Rabbit: prep_stop(State)
    Rabbit->>Rabbit: 监督树停止子进程
    
    loop 每个队列
        Rabbit->>+Queue: terminate(shutdown, State)
        Queue->>Queue: 保存队列索引
        Queue->>Queue: 刷新消息到磁盘
        Queue->>Mnesia: 更新队列元数据<br/>(如消息数)
        Queue-->>-Rabbit: ok
    end
    
    Rabbit->>+Mnesia: mnesia:stop()
    Mnesia->>Mnesia: 保存表到磁盘
    Mnesia->>Mnesia: 关闭事务日志
    Mnesia-->>-Rabbit: ok
    
    Rabbit->>Rabbit: stop(State)
    Rabbit->>Rabbit: rabbit_boot_state:set(stopped)
    Rabbit-->>-CLI: ok
    
    Note over CLI,Mnesia: 重启
    CLI->>+Rabbit: start()
    Rabbit->>Rabbit: 预启动阶段
    Rabbit->>+Mnesia: mnesia:start()
    Mnesia->>Mnesia: 加载表(快速)
    Mnesia-->>-Rabbit: ok
    Rabbit->>Rabbit: Boot steps<br/>(跳过数据创建)
    Rabbit->>Queue: recover()
    Queue->>Queue: 加载队列索引<br/>(无需重建)
    Rabbit-->>-CLI: ok

附录

常用命令

# 启动(前台)
rabbitmq-server

# 启动(后台)
rabbitmq-server -detached

# 停止
rabbitmqctl stop

# 停止应用(保留 Erlang 节点)
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 等待启动完成
rabbitmqctl await_startup

# 检查状态
rabbitmqctl status

# 查看告警
rabbitmqctl alarm_list

相关模块

  • rabbit_sup:根监督者(详见 RabbitMQ-02 文档)
  • rabbit_boot_steps:Boot steps 协调
  • rabbit_boot_state:启动状态管理
  • rabbit_plugins:插件管理
  • rabbit_feature_flags:特性标志系统
  • rabbit_prelaunch:预启动阶段协调(rabbitmq_prelaunch 应用)