RabbitMQ-01-核心服务器模块-rabbit
模块概览
职责与边界
rabbit 模块是 RabbitMQ 服务器的应用入口模块,实现 Erlang/OTP application 行为。主要职责包括:
- 应用生命周期管理:启动、停止、重启 RabbitMQ 应用
- 启动流程协调:通过 boot steps 机制协调各组件的初始化顺序
- 插件系统集成:加载并启动已启用的插件
- 预启动阶段协调:配置解析、特性标志初始化、集群检查
- 监督树根节点:创建并监督
rabbit_sup根监督者 - 状态查询接口:提供运行状态、告警信息等查询 API
输入与输出
输入:
- 配置文件(
rabbitmq.conf或advanced.config) - 环境变量
- 启用的插件列表(
enabled_plugins文件) - 命令行参数
输出:
- 运行中的 RabbitMQ 节点
- 日志信息(structured logging)
- 启动/停止状态
- 告警信息
上下游依赖
上游:
- Erlang VM(BEAM)
rabbitmq_prelaunch应用(预启动阶段)
下游:
rabbit_sup:根监督者rabbit_boot_steps:启动步骤协调器rabbit_plugins:插件管理rabbit_feature_flags:特性标志系统mnesia或khepri:元数据存储ra:Raft 一致性库osiris:日志存储引擎
生命周期
stateDiagram-v2
[*] --> NotStarted: Erlang VM 启动
NotStarted --> Booting: rabbit:start() / rabbit:boot()
Booting --> PrelaunchPhase1: 启动 rabbitmq_prelaunch
PrelaunchPhase1 --> PrelaunchPhase2: 解析配置与上下文
PrelaunchPhase2 --> PrelaunchPhase2: 1. 特性标志初始化
PrelaunchPhase2 --> PrelaunchPhase2: 2. 日志系统初始化
PrelaunchPhase2 --> PrelaunchPhase2: 3. Khepri/Ra 系统启动
PrelaunchPhase2 --> PrelaunchPhase2: 4. 集群检查
PrelaunchPhase2 --> PrelaunchPhase2: 5. Mnesia 启动(如适用)
PrelaunchPhase2 --> CoreStarting: rabbit:start/2 回调
CoreStarting --> CoreStarting: 1. 创建 rabbit_sup
CoreStarting --> CoreStarting: 2. 注册 rabbit 进程名
CoreStarting --> CoreStarting: 3. 加载插件
CoreStarting --> CoreStarting: 4. 刷新特性标志
CoreStarting --> CoreStarting: 5. 运行 boot steps
CoreStarting --> CoreStarted: boot steps 完成
CoreStarted --> PostlaunchPhase: 后启动阶段
PostlaunchPhase --> PostlaunchPhase: 1. 启动插件应用
PostlaunchPhase --> PostlaunchPhase: 2. 通知节点上线
PostlaunchPhase --> PostlaunchPhase: 3. 恢复虚拟主机
PostlaunchPhase --> PostlaunchPhase: 4. 启动网络监听
PostlaunchPhase --> Ready: 标记为 ready
Ready --> Running: 正常服务
Running --> Stopping: rabbit:stop()
Stopping --> Stopped: 清理资源
Stopped --> [*]
Running --> PrepStopping: rabbit:prep_stop/1
PrepStopping --> Stopping: 停止接受连接
模块架构图
flowchart TB
subgraph "应用入口"
Start[rabbit:start/0<br/>rabbit:boot/0]
StartIt[start_it/1<br/>启动协调]
Start --> StartIt
end
subgraph "预启动阶段"
PrelaunchApp[rabbitmq_prelaunch<br/>应用启动]
RunPrelaunch2[run_prelaunch_second_phase/0]
PrelaunchApp --> RunPrelaunch2
EnabledPlugins[启用插件文件处理]
FeatureFlagsSetup[特性标志注册表初始化]
LoggingSetup[日志系统设置]
RaSystems[Ra 系统启动]
KhepriSetup[Khepri 启动]
ClusterCheck[集群检查与兼容性验证]
MnesiaStart[Mnesia 启动<br/>如不使用 Khepri]
RunPrelaunch2 --> EnabledPlugins
EnabledPlugins --> FeatureFlagsSetup
FeatureFlagsSetup --> LoggingSetup
LoggingSetup --> RaSystems
RaSystems --> KhepriSetup
KhepriSetup --> ClusterCheck
ClusterCheck --> MnesiaStart
end
subgraph "核心启动阶段"
RabbitStart[rabbit:start/2<br/>application 回调]
CreateSup[创建 rabbit_sup<br/>监督树根]
RegisterName[注册 rabbit 进程名]
LoadPlugins[加载插件应用]
RefreshFF[刷新特性标志]
RunBootSteps[运行 boot steps<br/>rabbit_boot_steps:run_boot_steps/1]
RabbitStart --> CreateSup
CreateSup --> RegisterName
RegisterName --> LoadPlugins
LoadPlugins --> RefreshFF
RefreshFF --> RunBootSteps
end
subgraph "Boot Steps"
direction TB
PreBoot[pre_boot]
CodecCheck[codec_correctness_check]
DatabaseInit[database<br/>rabbit_db:init/0]
FileHandles[file_handle_cache<br/>启动文件句柄缓存]
WorkerPool[worker_pool<br/>启动工作进程池]
ExternalInfra[external_infrastructure<br/>启动网络/日志等]
Recovery[recovery<br/>队列与交换器恢复]
RoutingReady[routing_ready<br/>路由系统就绪]
LogExchange[log_exchange<br/>日志交换器]
DirectClient[direct_client<br/>内部客户端支持]
NetworkingMetadata[networking_metadata<br/>网络元数据初始化]
Notify[notify_cluster<br/>通知集群节点上线]
PreBoot --> CodecCheck
CodecCheck --> DatabaseInit
DatabaseInit --> FileHandles
FileHandles --> WorkerPool
WorkerPool --> ExternalInfra
ExternalInfra --> Recovery
Recovery --> RoutingReady
RoutingReady --> LogExchange
LogExchange --> DirectClient
DirectClient --> NetworkingMetadata
NetworkingMetadata --> Notify
end
subgraph "后启动阶段"
PostlaunchPhase[run_postlaunch_phase/1]
StartPluginApps[启动插件应用]
NotifyUp[通知节点上线<br/>rabbit_node_monitor]
RecoverVHosts[恢复虚拟主机<br/>rabbit_vhosts:boot/0]
StartListeners[启动网络监听器<br/>rabbit_networking:boot/0]
SetReady[标记状态为 ready<br/>rabbit_boot_state:set/1]
PostlaunchPhase --> StartPluginApps
StartPluginApps --> NotifyUp
NotifyUp --> RecoverVHosts
RecoverVHosts --> StartListeners
StartListeners --> SetReady
end
StartIt --> PrelaunchApp
MnesiaStart --> RabbitStart
RunBootSteps --> PostlaunchPhase
SetReady --> Ready[运行中<br/>ready state]
架构图说明
启动流程分解
-
入口函数:
rabbit:start/0:供开发/测试使用,应用以temporary模式启动,失败抛出异常rabbit:boot/0:供生产环境使用,应用以transient模式启动,失败导致节点退出- 两者最终都调用
start_it/1协调启动
-
预启动阶段第一阶段(
rabbitmq_prelaunch应用):- 尽早解析配置文件和环境变量
- 创建上下文(context)映射,供后续阶段使用
- 确保 Mnesia 未启动(避免配置不一致)
-
预启动阶段第二阶段(
run_prelaunch_second_phase/0):- 处理启用插件文件(
enabled_plugins) - 初始化特性标志注册表(feature flags registry)
- 设置日志系统(使用 Erlang logger)
- 启动 Ra 系统和 Khepri(即使使用 Mnesia 也会启动,为迁移做准备)
- 执行集群检查(节点兼容性、特性标志一致性)
- 如不使用 Khepri,启动 Mnesia
- 处理启用插件文件(
-
核心启动阶段(
rabbit:start/2):- 创建
rabbit_sup监督树 - 注册
rabbit进程名(用于远程节点检查is_running/1) - 加载插件应用(不启动)
- 刷新特性标志(包括插件的特性标志)
- 运行 boot steps
- 创建
-
Boot Steps:
- 使用拓扑排序确保依赖顺序
- 每个 step 由
{StepName, [{description, Desc}, {mfa, {M, F, A}}, {requires, [...]}, {enables, [...]}]}定义 - 关键 steps:
database:初始化 Mnesia/Khepri,创建表结构file_handle_cache:启动文件句柄缓存(限制打开文件数)external_infrastructure:启动核心基础设施(网络、日志、监控)recovery:恢复持久化的队列和交换器routing_ready:路由系统准备就绪notify_cluster:通知集群其他节点本节点已启动
-
后启动阶段(
run_postlaunch_phase/1):- 启动插件应用(按依赖顺序)
- 通知节点监控器(
rabbit_node_monitor:notify_node_up/0) - 恢复虚拟主机进程
- 启动网络监听器(TCP/TLS)
- 标记节点为
ready状态
边界条件
- 并发启动保护:通过注册
rabbit_boot进程名防止重复启动 - 超时控制:
- 启动开始超时:1 分钟(
BOOT_START_TIMEOUT) - 启动完成超时:12 小时(
BOOT_FINISH_TIMEOUT),适用于大规模集群 - 状态检查间隔:100ms(
BOOT_STATUS_CHECK_INTERVAL)
- 启动开始超时:1 分钟(
- 原子性:预启动阶段任何步骤失败导致整个启动失败
- 幂等性:多次调用
start/0会检测到已启动状态,避免重复
扩展点
- Boot Steps:插件可以定义自己的 boot steps,声明依赖关系
- Cleanup Steps:插件可以定义 cleanup steps,在停止时执行
- Prelaunch Hooks:通过
rabbitmq_prelaunch扩展预启动逻辑
状态持有
- 进程字典:
- 权限缓存配置(
permission_cache_can_expire) - 其他临时状态
- 权限缓存配置(
- 全局状态:
rabbit_boot_state:ETS 表存储启动状态(booting/core_started/ready/stopped)rabbit注册进程名:标识节点运行状态rabbit_boot注册进程名:标识节点正在启动
资源占用
- 内存:启动阶段峰值约 50-200MB(取决于配置和插件)
- 进程数:启动完成后约 100-500 个 Erlang 进程(取决于配置)
- 文件句柄:监听端口、日志文件、Mnesia/Khepri 数据文件
核心 API 详解
API 1: start/0 - 启动应用(开发模式)
基本信息
- 名称:
start/0 - 协议/方法:Erlang 函数调用
- 幂等性:否(重复调用会检测已启动状态,返回 ok)
- 适用场景:开发、测试、Erlang shell 交互
函数签名
-spec start() -> 'ok'.
请求参数
无参数。
响应
- 成功:
ok - 失败:抛出异常(
throw({error, Reason}))
核心代码
start() ->
%% start() vs. boot(): we want to throw an error in start().
start_it(temporary).
start_it(StartType) ->
case spawn_boot_marker() of
{ok, Marker} ->
?LOG_INFO("RabbitMQ is asked to start...", []),
try
{Millis, ok} = timer:tc(
fun() ->
%% 1. 启动预启动应用
{ok, _} = application:ensure_all_started(
rabbitmq_prelaunch, StartType),
%% 2. 启动 rabbit 应用
{ok, _} = application:ensure_all_started(
rabbit, StartType),
%% 3. 等待启动完成或停止
wait_for_ready_or_stopped()
end, millisecond),
?LOG_INFO("Time to start RabbitMQ: ~b ms", [Millis]),
stop_boot_marker(Marker),
ok
catch
error:{badmatch, Error}:_ ->
%% 失败时停止预启动应用并清理
_ = application:stop(rabbitmq_prelaunch),
stop_boot_marker(Marker),
case StartType of
temporary -> throw(Error); %% 抛出异常
_ -> exit(Error) %% 退出节点
end
end;
{already_booting, Marker} ->
%% 已在启动中,等待完成
stop_boot_marker(Marker),
ok
end.
代码逐段解释
-
启动模式选择(第 2 行):
temporary模式:应用崩溃不影响节点,适合开发测试- 区别于
boot/0使用的transient模式
-
启动标记进程(第 4-5 行):
spawn_boot_marker/0创建一个名为rabbit_boot的进程- 用于跨节点检测启动状态(
is_booting/1) - 如果已存在该进程,说明正在启动中,避免重复
-
计时启动过程(第 7-14 行):
- 使用
timer:tc/2测量启动耗时 - 先启动
rabbitmq_prelaunch(预启动阶段第一阶段) - 再启动
rabbit(触发start/2回调) wait_for_ready_or_stopped/0阻塞等待启动完成或失败
- 使用
-
错误处理(第 16-24 行):
- 捕获
badmatch错误(ensure_all_started失败) - 停止预启动应用,清理上下文
temporary模式抛出异常,transient模式退出节点
- 捕获
-
并发启动保护(第 25-27 行):
- 如果已有其他调用在启动,返回 ok
- 依赖第一次调用完成启动
时序图
sequenceDiagram
autonumber
participant User as 用户/CLI
participant Rabbit as rabbit 模块
participant PrelaunchApp as rabbitmq_prelaunch
participant AppCtl as application 控制器
participant BootState as rabbit_boot_state
User->>+Rabbit: start()
Rabbit->>Rabbit: spawn_boot_marker()<br/>注册 rabbit_boot 进程
alt 首次启动
Rabbit->>+PrelaunchApp: application:ensure_all_started(rabbitmq_prelaunch)
PrelaunchApp->>PrelaunchApp: 解析配置与环境
PrelaunchApp->>PrelaunchApp: 创建上下文
PrelaunchApp-->>-Rabbit: {ok, [rabbitmq_prelaunch]}
Rabbit->>+AppCtl: application:ensure_all_started(rabbit)
AppCtl->>Rabbit: start(normal, [])<br/>回调
Note over Rabbit: 详见 start/2 时序图
AppCtl-->>-Rabbit: {ok, [rabbit, ...]}
Rabbit->>+BootState: wait_for_ready_or_stopped()
BootState->>BootState: 轮询状态<br/>间隔 100ms
BootState-->>-Rabbit: ok (状态为 ready)
Rabbit->>Rabbit: stop_boot_marker()<br/>注销 rabbit_boot 进程
Rabbit-->>-User: ok
else 已在启动中
Rabbit->>Rabbit: spawn_boot_marker()<br/>失败(已存在)
Rabbit->>Rabbit: stop_boot_marker()
Rabbit-->>User: ok
end
异常与回退
-
配置文件错误:
- 预启动阶段抛出
{error, {bad_config, ...}} - 停止已启动的应用,抛出异常给调用者
- 预启动阶段抛出
-
依赖应用启动失败:
- 例如 Mnesia 启动失败
- 停止
rabbitmq_prelaunch,抛出{could_not_start, mnesia, Reason}
-
Boot Step 失败:
- 某个 boot step 的 MFA 返回
{error, Reason} - 立即停止启动,退出节点(如果
transient模式)
- 某个 boot step 的 MFA 返回
-
超时:
- 启动超过 12 小时(
BOOT_FINISH_TIMEOUT) wait_for_ready_or_stopped/0抛出{timeout, waiting_for_ready}
- 启动超过 12 小时(
性能要点
-
启动时间:
- 空配置单节点:1-3 秒
- 大规模集群(数千队列):30 秒 - 5 分钟
- 影响因素:队列恢复、消息索引重建、集群同步
-
瓶颈点:
- Mnesia 表加载(磁盘 I/O)
- 队列进程启动(CPU 密集)
- 网络往返(集群节点通信)
-
优化建议:
- 使用 SSD 存储 Mnesia 数据
- 减少持久化队列数量
- 启用 Khepri(比 Mnesia 更快)
API 2: boot/0 - 启动应用(生产模式)
基本信息
- 名称:
boot/0 - 协议/方法:Erlang 函数调用
- 幂等性:否
- 适用场景:生产环境、操作系统服务(systemd)
函数签名
-spec boot() -> 'ok'.
核心代码
boot() ->
%% start() vs. boot(): we want the node to exit in boot(). Because
%% applications are started with `transient`, any error during their
%% startup will abort the node.
start_it(transient).
与 start/0 的区别
| 特性 | start/0 | boot/0 |
|---|---|---|
| 启动模式 | temporary |
transient |
| 失败行为 | 抛出异常(不退出节点) | 退出节点 |
| 适用场景 | 开发/测试 | 生产环境 |
| 重启策略 | 手动重启 | 操作系统自动重启 |
时序图
与 start/0 相同,区别在于错误处理:
sequenceDiagram
autonumber
participant SystemD as systemd/启动脚本
participant Rabbit as rabbit 模块
participant BEAM as Erlang VM
SystemD->>+Rabbit: boot()
Rabbit->>Rabbit: start_it(transient)
alt 启动成功
Rabbit-->>-SystemD: ok
else 启动失败
Rabbit->>+BEAM: exit(Error)
BEAM->>BEAM: 终止 VM
BEAM-->>-SystemD: 退出码 非 0
SystemD->>SystemD: 重启服务<br/>根据 systemd 配置
end
API 3: stop/0 - 停止应用
基本信息
- 名称:
stop/0 - 协议/方法:Erlang 函数调用
- 幂等性:是(多次调用均返回 ok)
函数签名
-spec stop() -> 'ok'.
核心代码
stop() ->
case wait_for_ready_or_stopped() of
ok ->
?LOG_INFO("Stopping RabbitMQ", []),
%% 1. 停止插件应用
Plugins = rabbit_plugins:active(),
stop_apps(Plugins),
%% 2. 停止 rabbit 应用
rabbit_prelaunch:set_stop_reason(stop),
ok = application:stop(rabbit),
%% 3. 停止预启动应用
ok = application:stop(rabbitmq_prelaunch),
?LOG_INFO("Stopped RabbitMQ application", []);
{error, _} = Error ->
Error
end.
代码逐段解释
-
等待启动完成或已停止(第 2 行):
- 如果正在启动中,等待启动完成再停止
- 如果已停止,直接返回 ok
-
停止插件应用(第 4-5 行):
rabbit_plugins:active/0获取当前激活的插件stop_apps/1按依赖倒序停止插件- 触发插件的 cleanup steps
-
停止核心应用(第 6-8 行):
- 设置停止原因(用于日志和诊断)
application:stop(rabbit)触发prep_stop/1和stop/1回调- 停止
rabbit_sup监督树(所有子进程)
-
停止预启动应用(第 9 行):
- 清理上下文映射
- 关闭日志输出
时序图
sequenceDiagram
autonumber
participant User as 用户/CLI
participant Rabbit as rabbit 模块
participant PluginMgr as rabbit_plugins
participant AppCtl as application 控制器
participant RabbitSup as rabbit_sup
participant Children as 各子系统
User->>+Rabbit: stop()
Rabbit->>Rabbit: wait_for_ready_or_stopped()
Rabbit->>+PluginMgr: active()
PluginMgr-->>-Rabbit: [plugin1, plugin2, ...]
loop 每个插件(倒序)
Rabbit->>+AppCtl: application:stop(PluginN)
AppCtl->>AppCtl: 触发 cleanup steps
AppCtl->>AppCtl: 停止插件监督树
AppCtl-->>-Rabbit: ok
end
Rabbit->>+AppCtl: application:stop(rabbit)
AppCtl->>+Rabbit: prep_stop(State)<br/>预停止回调
Rabbit->>Rabbit: 停止接受新连接
Rabbit->>Rabbit: 关闭监听器
Rabbit-->>-AppCtl: State
AppCtl->>+Rabbit: stop(State)<br/>停止回调
Rabbit->>+RabbitSup: supervisor:terminate_child(...)
RabbitSup->>+Children: 逐个停止子进程
Children->>Children: 保存状态(如队列索引)
Children->>Children: 关闭连接/文件
Children-->>-RabbitSup: ok
RabbitSup-->>-Rabbit: ok
Rabbit-->>-AppCtl: ok
AppCtl-->>-Rabbit: ok
Rabbit->>+AppCtl: application:stop(rabbitmq_prelaunch)
AppCtl->>AppCtl: 清理上下文
AppCtl-->>-Rabbit: ok
Rabbit-->>-User: ok
异常与回退
-
连接未关闭:
prep_stop/1会等待所有客户端连接关闭(有超时)- 超时后强制关闭连接
-
队列未保存状态:
- 经典队列保存索引到磁盘
- 仲裁队列通过 Ra 快照持久化
- 保存失败不影响停止,但下次启动恢复时间增加
-
插件停止失败:
- 记录错误日志,继续停止其他插件
- 不阻止核心应用停止
API 4: start/2 - Application 回调(核心启动)
基本信息
- 名称:
start/2 - 协议/方法:
application行为回调 - 调用者:Erlang Application Controller
函数签名
-spec start('normal', []) ->
{'error',
{'erlang_version_too_old',
{'found', string(), string()},
{'required', string(), string()}}} |
{'ok', pid()}.
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
StartType |
normal |
正常启动(非 takeover/failover) |
StartArgs |
[] |
启动参数(通常为空) |
响应结构
- 成功:
{ok, SupPid}-SupPid是rabbit_sup的进程 ID - 失败:
{error, Reason}- 启动失败原因
核心代码
start(normal, []) ->
%% 1. 重置启动状态为 booting
rabbit_boot_state:set(booting),
rabbit_prelaunch:clear_stop_reason(),
try
%% 2. 运行预启动第二阶段
run_prelaunch_second_phase(),
%% 3. 打印产品信息
ProductInfo = product_info(),
?LOG_INFO(
"~n Starting ~ts ~ts on Erlang ~ts [~ts]~n ~ts~n ~ts",
[product_name(), product_version(), rabbit_misc:otp_release(),
emu_flavor(), ?COPYRIGHT_MESSAGE, product_license_line()]),
log_motd(),
%% 4. 创建根监督者
{ok, SupPid} = rabbit_sup:start_link(),
%% 5. 启动特性标志控制器(如启用 feature_flags_v2)
ok = rabbit_sup:start_child(rabbit_ff_controller),
%% 6. 注册 rabbit 进程名(用于远程节点检查)
true = register(rabbit, self()),
print_banner(),
log_banner(),
warn_if_kernel_config_dubious(),
%% 7. 设置插件
Plugins = rabbit_plugins:setup(),
?LOG_DEBUG("Loading the following plugins: ~tp", [Plugins]),
ok = app_utils:load_applications(Plugins),
%% 8. 刷新特性标志(包括插件的)
case rabbit_feature_flags:refresh_feature_flags_after_app_load() of
ok -> ok;
Error1 -> throw(Error1)
end,
persist_static_configuration(),
%% 9. 运行 boot steps
ok = rabbit_boot_steps:run_boot_steps([rabbit | Plugins]),
rabbit_boot_state:set(core_started),
%% 10. 运行后启动阶段
run_postlaunch_phase(Plugins),
{ok, SupPid}
catch
throw:{error, _} = Error ->
_ = mnesia:stop(),
rabbit_prelaunch:set_stop_reason(Error),
rabbit_boot_state:set(stopped),
Error
end.
代码逐段解释
-
状态初始化(第 2-3 行):
- 设置启动状态为
booting - 清除上次停止原因(如果有)
- 设置启动状态为
-
预启动第二阶段(第 6 行):
- 详见前文"预启动阶段第二阶段"
- 包括特性标志、日志、Khepri/Ra、集群检查、Mnesia 启动
-
产品信息(第 8-14 行):
- 记录 RabbitMQ 版本、Erlang 版本、许可证信息
- 显示 MOTD(Message of the Day)
-
创建监督树(第 17 行):
rabbit_sup:start_link/0创建根监督者- 返回的
SupPid将作为 application master 的监督进程
-
启动特性标志控制器(第 19 行):
rabbit_ff_controller管理特性标志状态(v2 架构)- 必须在 boot steps 前启动,因为刷新特性标志可能用到它
-
注册进程名(第 21 行):
register(rabbit, self())注册 application master 进程- 远程节点通过
whereis({rabbit, Node})检查节点是否运行
-
设置插件(第 26-28 行):
rabbit_plugins:setup/0读取enabled_plugins文件- 返回启用的插件列表(按依赖顺序)
- 加载插件应用(不启动)
-
刷新特性标志(第 31-35 行):
- 扫描所有已加载应用的模块属性
- 注册新发现的特性标志
- 检查冲突或不兼容的特性标志
-
运行 Boot Steps(第 39-40 行):
- 详见后文"Boot Steps 详解"
- 拓扑排序确保依赖顺序
- 初始化核心子系统(数据库、文件缓存、网络等)
-
后启动阶段(第 43 行):
- 启动插件应用
- 通知集群节点上线
- 恢复虚拟主机
- 启动网络监听器
时序图
sequenceDiagram
autonumber
participant AppCtl as Application Controller
participant Rabbit as rabbit:start/2
participant Prelaunch as run_prelaunch_second_phase
participant RabbitSup as rabbit_sup
participant BootSteps as rabbit_boot_steps
participant DB as rabbit_db
participant Plugins as plugin applications
participant Postlaunch as run_postlaunch_phase
AppCtl->>+Rabbit: start(normal, [])
Rabbit->>Rabbit: rabbit_boot_state:set(booting)
Rabbit->>+Prelaunch: run_prelaunch_second_phase()
Prelaunch->>Prelaunch: 启用插件文件处理
Prelaunch->>Prelaunch: 特性标志注册表初始化
Prelaunch->>Prelaunch: 日志系统设置
Prelaunch->>Prelaunch: Ra 系统启动
Prelaunch->>Prelaunch: Khepri 启动
Prelaunch->>Prelaunch: 集群检查
Prelaunch->>Prelaunch: Mnesia 启动(如适用)
Prelaunch-->>-Rabbit: ok
Rabbit->>+RabbitSup: start_link()
RabbitSup->>RabbitSup: 初始化监督者<br/>one_for_all 策略
RabbitSup-->>-Rabbit: {ok, SupPid}
Rabbit->>Rabbit: register(rabbit, self())
Rabbit->>Rabbit: rabbit_plugins:setup()<br/>获取插件列表
Rabbit->>Rabbit: app_utils:load_applications(Plugins)
Rabbit->>Rabbit: refresh_feature_flags_after_app_load()
Rabbit->>+BootSteps: run_boot_steps([rabbit | Plugins])
loop 每个 boot step(拓扑排序)
BootSteps->>BootSteps: 检查 requires 依赖
BootSteps->>+DB: 示例: rabbit_db:init()
DB->>DB: 初始化数据库<br/>创建表结构
DB-->>-BootSteps: ok
end
BootSteps-->>-Rabbit: ok
Rabbit->>Rabbit: rabbit_boot_state:set(core_started)
Rabbit->>+Postlaunch: run_postlaunch_phase(Plugins)
loop 每个插件
Postlaunch->>+Plugins: application:ensure_all_started(PluginN)
Plugins->>Plugins: 启动插件监督树
Plugins->>Plugins: 插件 boot steps
Plugins-->>-Postlaunch: {ok, [PluginN | Deps]}
end
Postlaunch->>Postlaunch: rabbit_node_monitor:notify_node_up()
Postlaunch->>Postlaunch: rabbit_vhosts:boot()<br/>恢复虚拟主机
Postlaunch->>Postlaunch: rabbit_networking:boot()<br/>启动监听器
Postlaunch->>Postlaunch: rabbit_boot_state:set(ready)
Postlaunch-->>-Rabbit: ok
Rabbit-->>-AppCtl: {ok, SupPid}
异常与回退
-
预启动阶段失败:
- 例如:配置错误、Mnesia 启动失败、集群不兼容
- 停止 Mnesia(如已启动)
- 设置停止原因和状态为
stopped - 返回
{error, Reason}给 Application Controller
-
监督者启动失败:
rabbit_sup:start_link/0失败(极少见)- 抛出异常,Application Controller 标记应用为失败
-
特性标志冲突:
- 发现不兼容的特性标志定义
- 抛出
{error, {feature_flags_error, ...}}
-
Boot Step 失败:
- 某个 boot step 的 MFA 返回非 ok
- 立即停止启动,执行异常处理
- 回滚已完成的 boot steps(如果有 cleanup)
-
插件启动失败:
- 后启动阶段某插件启动失败
- 停止已启动的插件
- 停止核心应用
性能要点
-
并行化:
- Boot steps 通过依赖声明实现隐式并行(未来优化)
- 插件启动串行(按依赖顺序)
- 虚拟主机恢复可并行
-
关键路径:
- 数据库初始化(Mnesia/Khepri)
- 队列恢复(
recoveryboot step) - 网络监听器启动
-
内存占用:
- Boot steps 阶段:100-300MB
- 插件启动后:200-500MB
- 虚拟主机恢复后:取决于队列数
API 5: stop/1 - Application 回调(停止)
基本信息
- 名称:
stop/1 - 协议/方法:
application行为回调 - 调用者:Erlang Application Controller
函数签名
-spec stop(term()) -> 'ok'.
请求参数
| 参数 | 类型 | 说明 |
|---|---|---|
State |
term() |
prep_stop/1 返回的状态 |
核心代码
stop(State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
true -> ok;
false -> rabbit_table:clear_ram_only_tables()
end,
ok = rabbit_boot_state:set(stopped),
State.
代码逐段解释
-
停止告警系统(第 2 行):
rabbit_alarm:stop/0停止内存/磁盘告警监控- 注销告警处理器
-
清理内存表(第 3-6 行):
- 非集群模式:清空 Mnesia RAM-only 表(连接、通道、消费者等)
- 集群模式:表由其他节点维护,不清理
-
设置停止状态(第 7 行):
rabbit_boot_state:set(stopped)标记节点已停止- 其他节点可通过
is_running/1检测到
-
返回状态(第 8 行):
- 返回
prep_stop/1传递的状态(通常忽略)
- 返回
与 prep_stop/1 的关系
sequenceDiagram
participant AppCtl as Application Controller
participant Rabbit as rabbit 模块
participant RabbitSup as rabbit_sup
AppCtl->>+Rabbit: prep_stop(State)
Note over Rabbit: 预停止阶段<br/>停止接受新连接
Rabbit->>Rabbit: 关闭监听器
Rabbit->>Rabbit: 等待连接关闭(有超时)
Rabbit-->>-AppCtl: NewState
AppCtl->>+RabbitSup: supervisor:terminate_child(...)
Note over RabbitSup: 停止所有子进程<br/>(网络、队列、交换器等)
RabbitSup-->>-AppCtl: ok
AppCtl->>+Rabbit: stop(NewState)
Note over Rabbit: 最终清理
Rabbit->>Rabbit: 停止告警系统
Rabbit->>Rabbit: 清理内存表
Rabbit->>Rabbit: 设置状态为 stopped
Rabbit-->>-AppCtl: ok
API 6: prep_stop/1 - 预停止回调
基本信息
- 名称:
prep_stop/1 - 协议/方法:
application行为回调 - 调用者:Erlang Application Controller
- 目的:在停止监督树前执行清理(如关闭监听器)
函数签名
-spec prep_stop(term()) -> term().
核心代码
prep_stop(State) ->
rabbit_alarm:unregister(self()),
Listeners = rabbit_networking:stop(),
ok = rabbit_boot_state:set(stopping),
rabbit_boot_state:wait_for_exit(),
%% Saved so we can try to avoid stalling shutdown.
%% See rabbit_mnesia:remove_node_offline/1.
rabbit_nodes:ensure_written(),
State.
代码逐段解释
-
注销告警监听(第 2 行):
- Application master 进程注销为告警接收者
- 避免在停止过程中收到告警消息
-
停止网络监听器(第 3 行):
rabbit_networking:stop/0关闭所有 TCP/TLS 监听端口- 拒绝新连接
- 返回已停止的监听器列表(用于日志)
-
设置停止状态(第 4 行):
rabbit_boot_state:set(stopping)标记节点正在停止- 其他模块可通过检查状态决定行为
-
等待退出信号(第 5 行):
wait_for_exit/0等待特定进程退出或超时- 确保优雅关闭连接
-
保存节点信息(第 8 行):
rabbit_nodes:ensure_written/0持久化当前节点列表- 用于离线节点移除功能
关键数据结构与类型
类型定义
%% 启动类型
-type restart_type() :: 'permanent' | 'transient' | 'temporary'.
%% 应用参数
-type param() :: atom().
-type app_name() :: atom().
%% 产品信息
-type product_info() :: #{
product_name := string(),
product_version := string(),
product_base_name => string(),
product_base_version => string(),
product_overridden => boolean()
}.
类型说明
restart_type/0
| 值 | 说明 | 使用场景 |
|---|---|---|
permanent |
应用退出时节点总是重启 | 关键系统服务 |
transient |
正常退出不重启,异常退出重启节点 | 生产环境 RabbitMQ |
temporary |
应用退出不影响节点 | 开发/测试 |
启动状态(rabbit_boot_state)
| 状态 | 含义 | 转换条件 |
|---|---|---|
booting |
正在启动 | start/2 入口 |
core_started |
核心启动完成 | Boot steps 完成 |
ready |
完全就绪 | 后启动阶段完成 |
stopping |
正在停止 | prep_stop/1 入口 |
stopped |
已停止 | stop/1 完成 |
状态转换图
stateDiagram-v2
[*] --> booting: start/2 调用
booting --> core_started: boot steps 完成
core_started --> ready: 后启动阶段完成
ready --> stopping: prep_stop/1 调用
stopping --> stopped: stop/1 完成
stopped --> [*]
booting --> stopped: 启动失败
core_started --> stopped: 后启动失败
UML 类图
classDiagram
class rabbit {
<<module>>
+start() ok
+boot() ok
+stop() ok
+start(normal, []) {ok, pid()}
+prep_stop(State) State
+stop(State) ok
+status() status_map()
+is_running() boolean()
+is_serving() boolean()
-start_it(restart_type()) ok
-run_prelaunch_second_phase() ok
-run_postlaunch_phase([app_name()]) ok
-spawn_boot_marker() {ok, pid()} | {already_booting, pid()}
-wait_for_ready_or_stopped() ok
}
class rabbit_boot_state {
<<module>>
+set(State) ok
+get() State
+wait_for(State, Timeout) ok | {error, timeout}
+has_reached(State) boolean()
}
class rabbit_sup {
<<supervisor>>
+start_link() {ok, pid()}
+start_child(Module) ok
}
class rabbit_boot_steps {
<<module>>
+run_boot_steps([app_name()]) ok
+run_cleanup_steps([app_name()]) ok
+find_steps() [{app_name(), step_name(), attributes()}]
-sort_boot_steps([step_def()]) [step_def()]
}
class rabbit_plugins {
<<module>>
+setup() [app_name()]
+active() [app_name()]
}
rabbit --> rabbit_boot_state : uses
rabbit --> rabbit_sup : creates
rabbit --> rabbit_boot_steps : uses
rabbit --> rabbit_plugins : uses
rabbit_boot_steps --> rabbit : callbacks (boot step MFAs)
Boot Steps 详解
Boot Step 机制
Boot steps 是 RabbitMQ 的启动协调机制,通过声明式依赖关系自动确定初始化顺序。
Boot Step 定义
-rabbit_boot_step({step_name,
[{description, "Human readable description"},
{mfa, {Module, Function, Args}}, %% 执行的函数
{requires, [other_step1, other_step2]}, %% 依赖步骤
{enables, [enabled_step1]}]}). %% 启用步骤
核心 Boot Steps 列表
| Step Name | 描述 | MFA | 关键依赖 |
|---|---|---|---|
pre_boot |
启动开始标记 | - | - |
codec_correctness_check |
验证编解码器正确性 | rabbit_registry:codec_correctness_check/0 |
pre_boot |
database |
初始化数据库 | rabbit_db:init/0 |
file_handle_cache |
file_handle_cache |
启动文件句柄缓存 | rabbit:start_fhc/0 |
pre_boot |
worker_pool |
启动工作进程池 | worker_pool:start_link/0 |
pre_boot |
external_infrastructure |
启动外部基础设施 | rabbit_event:init/0 等 |
database |
rabbit_registry |
注册表初始化 | rabbit_registry:init/0 |
external_infrastructure |
recovery |
恢复队列与交换器 | rabbit:recover/0 |
database |
routing_ready |
路由系统就绪 | - | recovery |
log_exchange |
日志交换器 | - | routing_ready |
direct_client |
内部客户端支持 | - | external_infrastructure |
networking |
网络子系统 | rabbit_networking:boot/0 |
direct_client |
notify_cluster |
通知集群 | rabbit_node_monitor:notify_node_up/0 |
networking |
Boot Steps 依赖图
graph TD
pre_boot[pre_boot]
codec[codec_correctness_check]
fhc[file_handle_cache]
pool[worker_pool]
db[database]
infra[external_infrastructure]
registry[rabbit_registry]
recovery[recovery]
routing[routing_ready]
log[log_exchange]
direct[direct_client]
net[networking]
notify[notify_cluster]
pre_boot --> codec
pre_boot --> fhc
pre_boot --> pool
fhc --> db
db --> infra
db --> recovery
infra --> registry
infra --> direct
recovery --> routing
routing --> log
direct --> net
net --> notify
关键 Boot Step 详解
database Boot Step
-rabbit_boot_step({database,
[{description, "database"},
{mfa, {rabbit_db, init, []}},
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
功能:
- 初始化 Mnesia 或 Khepri 数据库
- 创建表结构(队列、交换器、绑定、用户等)
- 执行数据库迁移(如从 Mnesia 到 Khepri)
- 等待表可用
核心代码(rabbit_db:init/0):
init() ->
%% 1. 确定使用的数据库(Mnesia 或 Khepri)
Backend = case rabbit_khepri:is_enabled() of
true -> khepri;
false -> mnesia
end,
%% 2. 等待表/数据可用
case Backend of
mnesia ->
ok = rabbit_table:wait_for_replicated(),
ok = rabbit_table:wait(rabbit_user:pattern_match_all());
khepri ->
ok = rabbit_khepri:wait_for_store()
end,
%% 3. 执行数据迁移(如需要)
case rabbit_feature_flags:is_enabled(khepri_db) of
true ->
ok = rabbit_db_m2k_converter:sync_if_needed();
false ->
ok
end,
%% 4. 初始化各数据层模块
ok = rabbit_db_cluster:init(),
ok = rabbit_db_user:init(),
ok = rabbit_db_vhost:init(),
ok = rabbit_db_exchange:init(),
ok = rabbit_db_queue:init(),
ok = rabbit_db_binding:init(),
ok.
recovery Boot Step
-rabbit_boot_step({recovery,
[{description, "recovery"},
{mfa, {rabbit, recover, []}},
{requires, database},
{enables, routing_ready}]}).
功能:
- 恢复持久化的交换器
- 恢复持久化的队列(启动队列进程)
- 重建路由表(绑定关系)
核心代码(rabbit:recover/0):
recover() ->
%% 1. 恢复所有虚拟主机的交换器
[rabbit_exchange:recover(VHost) || VHost <- rabbit_vhost:list_names()],
%% 2. 恢复所有虚拟主机的队列
[begin
Queues = rabbit_amqqueue:list(VHost),
[rabbit_amqqueue:recover(Q) || Q <- Queues]
end || VHost <- rabbit_vhost:list_names()],
ok.
时序:
sequenceDiagram
participant BS as Boot Steps
participant Recover as rabbit:recover/0
participant Exchange as rabbit_exchange
participant Queue as rabbit_amqqueue
participant QProc as Queue 进程
BS->>+Recover: recover()
loop 每个虚拟主机
Recover->>+Exchange: recover(VHost)
Exchange->>Exchange: 从数据库加载交换器
Exchange->>Exchange: 应用 policies
Exchange-->>-Recover: [ExchangeNames]
Recover->>+Queue: list(VHost)
Queue->>Queue: 从数据库加载队列定义
Queue-->>-Recover: [Queues]
loop 每个队列
Recover->>+Queue: recover(Queue)
Queue->>+QProc: 启动队列进程
QProc->>QProc: 恢复消息索引
QProc->>QProc: 重建状态
QProc-->>-Queue: {ok, QPid}
Queue-->>-Recover: ok
end
end
Recover-->>-BS: ok
networking Boot Step
实际由后启动阶段的 rabbit_networking:boot/0 执行。
功能:
- 启动 TCP/TLS 监听器
- 根据配置监听多个端口和接口
- 注册协议处理器
核心代码(rabbit_networking:boot/0):
boot() ->
%% 1. 读取监听器配置
Listeners = application:get_env(rabbit, tcp_listeners, [5672]),
SSLListeners = application:get_env(rabbit, ssl_listeners, []),
%% 2. 启动 TCP 监听器
[begin
{ok, _} = rabbit_networking:start_tcp_listener(Address, Port, Opts)
end || {Address, Port, Opts} <- Listeners],
%% 3. 启动 TLS 监听器
[begin
{ok, _} = rabbit_networking:start_ssl_listener(Address, Port, Opts)
end || {Address, Port, Opts} <- SSLListeners],
ok.
后启动阶段详解
run_postlaunch_phase/1
run_postlaunch_phase(Plugins) ->
?LOG_DEBUG(""),
?LOG_DEBUG("== Postlaunch phase =="),
%% 1. 启动插件应用
?LOG_DEBUG("Starting plugin applications"),
start_apps(Plugins, #{}),
%% 2. 通知节点上线
rabbit_node_monitor:notify_node_up(),
%% 3. 恢复虚拟主机
rabbit_vhosts:boot(),
%% 4. 启动网络监听器
rabbit_networking:boot(),
%% 5. 标记为 ready
rabbit_boot_state:set(ready),
?LOG_INFO("Server startup complete; ~b plugins started.", [length(Plugins)]),
ok.
时序图
sequenceDiagram
autonumber
participant Start as rabbit:start/2
participant Post as run_postlaunch_phase
participant Plugins as Plugin Applications
participant NodeMon as rabbit_node_monitor
participant VHosts as rabbit_vhosts
participant Net as rabbit_networking
participant State as rabbit_boot_state
Start->>+Post: run_postlaunch_phase(Plugins)
loop 每个插件
Post->>+Plugins: start_apps([Plugin])
Plugins->>Plugins: application:ensure_all_started
Plugins->>Plugins: 运行插件 boot steps
Plugins-->>-Post: ok
end
Post->>+NodeMon: notify_node_up()
NodeMon->>NodeMon: 通知集群节点<br/>本节点已上线
NodeMon->>NodeMon: 建立节点监控
NodeMon-->>-Post: ok
Post->>+VHosts: boot()
VHosts->>VHosts: 确保每个 vhost<br/>有运行的进程
VHosts->>VHosts: 协调多节点 vhost 进程
VHosts-->>-Post: ok
Post->>+Net: boot()
Net->>Net: 启动 TCP 监听器
Net->>Net: 启动 TLS 监听器
Net->>Net: 注册协议处理器
Net-->>-Post: ok
Post->>+State: set(ready)
State->>State: 写入 ETS 表
State->>State: 通知等待进程
State-->>-Post: ok
Post-->>-Start: ok
关键函数调用链路
启动完整调用链
用户调用 rabbit:start/0
└─> start_it(temporary)
├─> spawn_boot_marker() // 注册 rabbit_boot 进程
├─> application:ensure_all_started(rabbitmq_prelaunch)
│ └─> rabbitmq_prelaunch:start/2
│ ├─> 解析配置文件
│ ├─> 创建上下文
│ └─> 设置环境变量
│
├─> application:ensure_all_started(rabbit)
│ └─> rabbit:start(normal, [])
│ ├─> rabbit_boot_state:set(booting)
│ │
│ ├─> run_prelaunch_second_phase()
│ │ ├─> rabbit_prelaunch_enabled_plugins_file:setup()
│ │ ├─> rabbit_prelaunch_feature_flags:setup()
│ │ ├─> rabbit_prelaunch_logging:setup()
│ │ ├─> rabbit_ra_systems:setup()
│ │ ├─> rabbit_khepri:setup()
│ │ ├─> rabbit_prelaunch_cluster:setup()
│ │ └─> mnesia:start() (如不使用 Khepri)
│ │
│ ├─> rabbit_sup:start_link()
│ │ └─> supervisor:init([])
│ │ └─> 初始化监督者(one_for_all)
│ │
│ ├─> register(rabbit, self())
│ │
│ ├─> rabbit_plugins:setup()
│ │ ├─> 读取 enabled_plugins 文件
│ │ └─> 按依赖顺序排列插件
│ │
│ ├─> app_utils:load_applications(Plugins)
│ │
│ ├─> rabbit_feature_flags:refresh_feature_flags_after_app_load()
│ │ ├─> 扫描模块属性
│ │ ├─> 注册新特性标志
│ │ └─> 检查兼容性
│ │
│ ├─> rabbit_boot_steps:run_boot_steps([rabbit | Plugins])
│ │ ├─> find_steps([rabbit | Plugins])
│ │ │ ├─> 扫描模块属性(-rabbit_boot_step)
│ │ │ └─> sort_boot_steps()
│ │ │ └─> 拓扑排序(digraph_utils:topsort)
│ │ │
│ │ └─> for each step:
│ │ ├─> run_step(Attributes, mfa)
│ │ └─> apply(M, F, A)
│ │ 例如:
│ │ ├─> rabbit_db:init()
│ │ ├─> rabbit:start_fhc()
│ │ ├─> rabbit:recover()
│ │ └─> ...
│ │
│ ├─> rabbit_boot_state:set(core_started)
│ │
│ └─> run_postlaunch_phase(Plugins)
│ ├─> start_apps(Plugins, #{})
│ │ └─> for each plugin:
│ │ ├─> app_utils:load_applications([Plugin])
│ │ ├─> rabbit_boot_steps:run_boot_steps([Plugin])
│ │ └─> application:ensure_all_started(Plugin)
│ │
│ ├─> rabbit_node_monitor:notify_node_up()
│ ├─> rabbit_vhosts:boot()
│ ├─> rabbit_networking:boot()
│ └─> rabbit_boot_state:set(ready)
│
└─> wait_for_ready_or_stopped()
└─> rabbit_boot_state:wait_for(ready, 12h)
└─> 轮询检查状态,间隔 100ms
配置与可观测
关键配置项
| 配置项 | 默认值 | 说明 | 影响 |
|---|---|---|---|
cluster_name |
节点名 | 集群名称 | 管理界面显示,日志标识 |
boot_steps_timeout |
无限 | Boot step 超时 | 启动失败快速检测 |
default_vhost |
/ |
默认虚拟主机 | 首次启动自动创建 |
default_user |
guest |
默认用户名 | 首次启动自动创建 |
default_permissions |
.* |
默认权限 | 读写配置权限 |
disk_free_limit |
50MB | 磁盘剩余下限 | 触发告警,阻塞发布 |
vm_memory_high_watermark |
0.4 | 内存水位线 | 触发流控,阻塞发布 |
log_levels |
- | 日志级别映射 | 控制日志输出 |
观测指标
启动耗时(分阶段)
- 预启动阶段第一阶段:< 1 秒(配置解析)
- 预启动阶段第二阶段:1-5 秒(Mnesia/Khepri 启动,集群检查)
- Boot steps:2-60 秒(队列恢复是瓶颈)
- 后启动阶段:1-10 秒(插件启动,网络监听)
日志关键字
| 日志 | 含义 | 级别 |
|---|---|---|
Starting <ProductName> <Version> |
开始启动 | INFO |
Running boot step <Step> |
执行 boot step | INFO |
Time to start RabbitMQ: <N> ms |
启动完成,耗时 | INFO |
Server startup complete; <N> plugins started |
全部就绪 | INFO |
Stopping RabbitMQ |
开始停止 | INFO |
Stopped RabbitMQ application |
停止完成 | INFO |
可监控状态
%% 检查运行状态
rabbit:is_running().
%% => true | false
%% 检查服务状态(是否接受连接)
rabbit:is_serving().
%% => true | false
%% 获取详细状态
rabbit:status().
%% => #{
%% pid => <0.123.0>,
%% running_applications => [...],
%% listeners => [{port, 5672}, ...],
%% alarms => []
%% }
实战经验与最佳实践
启动优化
-
使用 Khepri 替代 Mnesia:
- 启动更快(无需等待表复制)
- 一致性更强(Raft 协议)
- 配置:
rabbitmq.conf中设置metadata_store = khepri
-
减少队列恢复时间:
- 避免大量小队列(合并或使用 streams)
- 使用 quorum queues(恢复更快)
- 定期清理未使用的队列
-
并行启动节点:
- 集群节点可并行启动
- 使用 peer discovery 自动加入
- 避免串行等待节点上线
故障排查
启动卡住
现象:rabbit:start() 长时间不返回
排查步骤:
-
检查日志最后一条 boot step:
tail -f /var/log/rabbitmq/rabbit@hostname.log | grep "Running boot step" -
常见卡住点:
database:Mnesia 表损坏或网络分区recovery:大量队列恢复慢networking:端口被占用
-
解决方案:
- Mnesia 损坏:使用
rabbitmq-diagnostics cluster_status检查,必要时重建节点 - 队列恢复慢:增加
queue_index_embed_msgs_below配置,使用 SSD - 端口占用:更改配置或停止占用进程
- Mnesia 损坏:使用
启动失败后无法再启动
原因:rabbitmq_prelaunch 未正确停止,上下文残留
解决方案:
%% Erlang shell
application:stop(rabbitmq_prelaunch).
application:stop(rabbit).
%% 然后重新启动
rabbit:start().
插件启动失败
现象:核心启动成功,但某插件失败
排查:
# 查看插件状态
rabbitmq-plugins list
# 禁用有问题的插件
rabbitmq-plugins disable <plugin_name>
# 重新启动
rabbitmqctl stop_app
rabbitmqctl start_app
性能监控
启动性能基准
| 场景 | 队列数 | 消息数 | 启动时间 | 瓶颈 |
|---|---|---|---|---|
| 空节点 | 0 | 0 | 1-2s | 无 |
| 小规模 | 100 | 10K | 3-5s | 队列恢复 |
| 中规模 | 1000 | 100K | 10-30s | 队列恢复 + 索引 |
| 大规模 | 10000 | 1M | 60-300s | 磁盘 I/O |
监控脚本
#!/bin/bash
# 监控启动进度
while ! rabbitmqctl await_online_nodes 1; do
echo "Waiting for node to come online..."
sleep 1
done
rabbitmqctl await_startup
echo "RabbitMQ startup complete!"
典型场景时序图
场景 1:首次启动(新节点)
sequenceDiagram
autonumber
participant User
participant Rabbit as rabbit:start/0
participant Mnesia
participant DB as rabbit_db
participant VHost as rabbit_vhost
User->>+Rabbit: start()
Rabbit->>Rabbit: 预启动阶段
Rabbit->>+Mnesia: mnesia:start()
Mnesia->>Mnesia: 检测首次启动<br/>无数据目录
Mnesia->>Mnesia: 创建 schema
Mnesia-->>-Rabbit: ok
Rabbit->>+DB: init() (boot step)
DB->>Mnesia: 创建表<br/>rabbit_user, rabbit_vhost, etc.
Mnesia->>Mnesia: 创建磁盘表<br/>rabbit_durable_queue
Mnesia->>Mnesia: 创建内存表<br/>rabbit_route
Mnesia-->>DB: ok
DB-->>-Rabbit: ok
Rabbit->>+DB: maybe_insert_default_data() (boot step)
DB->>+VHost: add(<<"/">>, #{})
VHost->>Mnesia: 插入默认 vhost
VHost-->>-DB: ok
DB->>DB: 创建默认用户 "guest"
DB->>DB: 设置默认权限
DB-->>-Rabbit: ok
Rabbit->>Rabbit: 后续 boot steps
Rabbit->>Rabbit: 后启动阶段
Rabbit-->>-User: ok
场景 2:集群节点加入
sequenceDiagram
autonumber
participant CLI
participant NewNode as 新节点 rabbit
participant ExistingNode as 已有节点 rabbit
participant Mnesia
Note over NewNode: 初始启动(单节点)
CLI->>+NewNode: start()
NewNode->>NewNode: 预启动阶段
NewNode-->>-CLI: ok (单节点集群)
Note over NewNode,ExistingNode: 加入集群
CLI->>+NewNode: stop_app()
NewNode->>NewNode: 停止应用<br/>保留 Mnesia
NewNode-->>-CLI: ok
CLI->>+NewNode: join_cluster(ExistingNode)
NewNode->>+Mnesia: change_config(extra_db_nodes, [ExistingNode])
Mnesia->>+ExistingNode: 请求集群信息
ExistingNode->>ExistingNode: 检查兼容性<br/>特性标志一致性
ExistingNode-->>-Mnesia: cluster_info
Mnesia->>Mnesia: 复制表定义
Mnesia-->>-NewNode: ok
NewNode-->>-CLI: ok
CLI->>+NewNode: start_app()
NewNode->>NewNode: 预启动阶段<br/>(检测到集群配置)
NewNode->>+ExistingNode: 同步元数据
ExistingNode-->>-NewNode: queues, exchanges, bindings
NewNode->>NewNode: Boot steps
NewNode->>NewNode: 后启动阶段
NewNode->>+ExistingNode: notify_node_up()
ExistingNode->>ExistingNode: 更新集群视图
ExistingNode-->>-NewNode: ok
NewNode-->>-CLI: ok
场景 3:优雅停止与重启
sequenceDiagram
autonumber
participant CLI
participant Rabbit as rabbit:stop/0
participant Net as rabbit_networking
participant Conn as 客户端连接
participant Queue as 队列进程
participant Mnesia
CLI->>+Rabbit: stop()
Rabbit->>+Net: stop()
Net->>Net: 关闭监听器<br/>拒绝新连接
Net->>+Conn: 发送连接关闭帧<br/>connection.close
Conn->>Conn: 客户端处理关闭
Conn-->>-Net: connection.close-ok
Net-->>-Rabbit: ok
Rabbit->>Rabbit: application:stop(rabbit)
Rabbit->>Rabbit: prep_stop(State)
Rabbit->>Rabbit: 监督树停止子进程
loop 每个队列
Rabbit->>+Queue: terminate(shutdown, State)
Queue->>Queue: 保存队列索引
Queue->>Queue: 刷新消息到磁盘
Queue->>Mnesia: 更新队列元数据<br/>(如消息数)
Queue-->>-Rabbit: ok
end
Rabbit->>+Mnesia: mnesia:stop()
Mnesia->>Mnesia: 保存表到磁盘
Mnesia->>Mnesia: 关闭事务日志
Mnesia-->>-Rabbit: ok
Rabbit->>Rabbit: stop(State)
Rabbit->>Rabbit: rabbit_boot_state:set(stopped)
Rabbit-->>-CLI: ok
Note over CLI,Mnesia: 重启
CLI->>+Rabbit: start()
Rabbit->>Rabbit: 预启动阶段
Rabbit->>+Mnesia: mnesia:start()
Mnesia->>Mnesia: 加载表(快速)
Mnesia-->>-Rabbit: ok
Rabbit->>Rabbit: Boot steps<br/>(跳过数据创建)
Rabbit->>Queue: recover()
Queue->>Queue: 加载队列索引<br/>(无需重建)
Rabbit-->>-CLI: ok
附录
常用命令
# 启动(前台)
rabbitmq-server
# 启动(后台)
rabbitmq-server -detached
# 停止
rabbitmqctl stop
# 停止应用(保留 Erlang 节点)
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 等待启动完成
rabbitmqctl await_startup
# 检查状态
rabbitmqctl status
# 查看告警
rabbitmqctl alarm_list
相关模块
rabbit_sup:根监督者(详见 RabbitMQ-02 文档)rabbit_boot_steps:Boot steps 协调rabbit_boot_state:启动状态管理rabbit_plugins:插件管理rabbit_feature_flags:特性标志系统rabbit_prelaunch:预启动阶段协调(rabbitmq_prelaunch应用)