MongoDB-09-存储引擎模块-概览
1. 模块职责
存储引擎模块(src/mongo/db/storage)是MongoDB数据持久化的核心,提供统一的存储抽象接口,支持多种存储引擎实现。该模块负责数据的增删改查、事务管理、索引维护和崩溃恢复。
1.1 主要功能
- 存储抽象: 提供StorageEngine接口,支持插件化存储引擎
- 事务管理: 实现ACID事务语义,支持多文档事务
- 恢复单元: RecoveryUnit管理事务生命周期和回滚
- 记录存储: RecordStore接口管理文档的增删改查
- 索引存储: SortedDataInterface接口管理索引数据
- 检查点: Checkpoint机制确保数据持久性
- 快照隔离: 提供时间点一致性读取
1.2 输入/输出
输入:
- CRUD操作请求
- 事务控制指令
- 索引构建请求
- 检查点触发信号
输出:
- 持久化数据文件
- 事务提交/回滚结果
- 数据恢复状态
- 存储统计信息
1.3 上下游依赖
依赖模块(上游):
- platform模块:文件系统和内存管理
- util模块:并发控制和工具函数
- base模块:错误处理和基础类型
被依赖模块(下游):
- db-catalog模块:集合和索引元数据管理
- db-exec模块:查询执行需要读取数据
- db-repl模块:复制需要oplog持久化
- db-commands模块:命令执行需要数据存储
1.4 生命周期
- 初始化阶段: 加载存储引擎,恢复数据文件
- 运行阶段: 处理读写请求,管理事务
- 检查点: 定期刷新数据到磁盘
- 恢复阶段: 崩溃后从检查点和日志恢复
- 关闭阶段: 完成最终检查点,关闭数据文件
2. 模块架构
2.1 架构图
flowchart TB
subgraph "存储引擎接口层"
StorageEngine[Storage Engine<br/>存储引擎接口]
RecoveryUnit[Recovery Unit<br/>恢复单元接口]
RecordStore[Record Store<br/>记录存储接口]
SortedDataInterface[Sorted Data Interface<br/>索引数据接口]
end
subgraph "事务管理层"
TransactionManager[Transaction Manager<br/>事务管理器]
LockManager[Lock Manager<br/>锁管理器]
SnapshotManager[Snapshot Manager<br/>快照管理器]
OplogManager[Oplog Manager<br/>操作日志管理]
end
subgraph "WiredTiger引擎实现"
WTStorageEngine[WT Storage Engine<br/>WT存储引擎]
WTRecoveryUnit[WT Recovery Unit<br/>WT恢复单元]
WTRecordStore[WT Record Store<br/>WT记录存储]
WTIndex[WT Index<br/>WT索引实现]
WTSession[WT Session<br/>WT会话]
end
subgraph "数据文件层"
DataFiles[Data Files<br/>数据文件]
IndexFiles[Index Files<br/>索引文件]
JournalFiles[Journal Files<br/>日志文件]
CheckpointFiles[Checkpoint Files<br/>检查点文件]
end
subgraph "缓存层"
BufferManager[Buffer Manager<br/>缓冲区管理器]
PageCache[Page Cache<br/>页面缓存]
IndexCache[Index Cache<br/>索引缓存]
end
subgraph "压缩层"
CompressionManager[Compression Manager<br/>压缩管理器]
SnappyCompressor[Snappy Compressor<br/>Snappy压缩器]
ZlibCompressor[Zlib Compressor<br/>Zlib压缩器]
ZstdCompressor[Zstd Compressor<br/>Zstd压缩器]
end
StorageEngine --> RecoveryUnit
StorageEngine --> RecordStore
StorageEngine --> SortedDataInterface
RecoveryUnit --> TransactionManager
TransactionManager --> LockManager
TransactionManager --> SnapshotManager
TransactionManager --> OplogManager
StorageEngine -.实现.-> WTStorageEngine
RecoveryUnit -.实现.-> WTRecoveryUnit
RecordStore -.实现.-> WTRecordStore
SortedDataInterface -.实现.-> WTIndex
WTStorageEngine --> WTSession
WTRecoveryUnit --> WTSession
WTRecordStore --> WTSession
WTIndex --> WTSession
WTSession --> DataFiles
WTSession --> IndexFiles
WTSession --> JournalFiles
WTSession --> CheckpointFiles
WTStorageEngine --> BufferManager
BufferManager --> PageCache
BufferManager --> IndexCache
WTRecordStore --> CompressionManager
CompressionManager --> SnappyCompressor
CompressionManager --> ZlibCompressor
CompressionManager --> ZstdCompressor
2.2 架构说明
2.2.1 图意概述
该架构图展示了MongoDB存储引擎的六层架构:接口层、事务层、实现层、文件层、缓存层和压缩层。存储引擎接口层定义统一API,WiredTiger作为默认实现,通过事务管理确保ACID特性,最终数据持久化到磁盘文件。
2.2.2 核心组件职责
存储引擎接口层:
StorageEngine:存储引擎的顶层接口,管理数据库和集合RecoveryUnit:事务恢复单元,管理单个事务的生命周期RecordStore:记录存储接口,提供文档的CRUD操作SortedDataInterface:有序数据接口,用于索引的维护和查询
事务管理层:
TransactionManager:协调多文档事务,实现两阶段提交LockManager:管理资源锁,避免并发冲突SnapshotManager:管理数据快照,提供时间点一致性OplogManager:管理操作日志,用于复制和恢复
WiredTiger引擎实现:
WTStorageEngine:WiredTiger存储引擎的具体实现WTRecoveryUnit:基于WiredTiger事务的恢复单元WTRecordStore:基于WiredTiger的记录存储WTIndex:基于WiredTiger的索引实现WTSession:WiredTiger会话,管理连接和游标
数据文件层:
DataFiles:存储集合数据的文件IndexFiles:存储索引数据的文件JournalFiles:预写日志文件,确保持久性CheckpointFiles:检查点文件,支持崩溃恢复
缓存层:
BufferManager:缓冲区管理,协调内存使用PageCache:数据页缓存,减少磁盘I/OIndexCache:索引页缓存,加速索引查询
压缩层:
CompressionManager:压缩管理,选择合适的压缩算法SnappyCompressor:Snappy压缩器,速度优先ZlibCompressor:Zlib压缩器,平衡压缩率和速度ZstdCompressor:Zstd压缩器,高压缩率
2.2.3 关键边界条件
-
事务隔离级别:
- 默认:读已提交(Read Committed)
- 可选:快照隔离(Snapshot Isolation)
- MongoDB事务:多文档ACID事务
-
锁粒度:
- 意向锁:数据库级别
- 集合锁:集合级别
- 文档锁:WiredTiger提供(行级锁)
- 死锁检测:超时机制
-
缓存大小:
- 默认:可用内存的50%
- 最小:256MB
- 最大:无限制(受系统内存约束)
- 淘汰策略:LRU
-
检查点频率:
- 默认:60秒或2GB数据变更
- 可配置:checkpointSizeMB参数
- 强制:关闭时必须完成检查点
2.2.4 异常处理与回退
-
事务回滚:
- 自动回滚:事务执行失败时
- 手动回滚:用户调用abort
- 级联回滚:事务冲突时回滚较新事务
- 部分回滚:嵌套事务的部分回滚
-
崩溃恢复:
- 检查点恢复:从最近检查点开始
- 日志重放:应用检查点后的WAL日志
- 一致性检查:验证数据完整性
- 修复模式:数据损坏时的修复选项
-
磁盘故障:
- 副本集:从其他节点恢复数据
- 备份恢复:从备份文件恢复
- 部分恢复:跳过损坏的数据页
- 数据验证:检测和报告数据损坏
-
内存不足:
- 缓存淘汰:释放最少使用的页面
- 操作降级:禁用某些内存密集操作
- 错误返回:内存分配失败时返回错误
- 重试机制:等待内存释放后重试
2.2.5 性能关键点
-
I/O优化:
- 顺序写入:日志和数据文件顺序写
- 批量操作:多个写操作合并
- 预读:预测性数据预读
- 异步I/O:非阻塞磁盘操作
-
缓存优化:
- 命中率:目标>95%
- 预热:启动时预加载热点数据
- 分层:频繁数据保留在内存
- 压缩:内存中数据压缩存储
-
并发优化:
- 无锁数据结构:减少锁竞争
- 细粒度锁:降低锁持有时间
- 读写分离:读操作不阻塞写操作
- 并行恢复:多线程并行恢复
-
压缩优化:
- 自适应:根据数据特征选择算法
- 流式压缩:边读边解压
- 预压缩:索引构建时预压缩
- 缓存压缩数据:避免重复压缩
2.2.6 容量假设
- 单个集合大小:无限制(受磁盘空间约束)
- 单个文档大小:16MB
- 并发事务数:数万个
- 索引数量:每集合最多64个
- 数据库数量:无限制
2.2.7 版本兼容与演进
-
WiredTiger版本:
- MongoDB集成特定版本的WiredTiger
- 向后兼容:旧数据文件格式
- 升级:滚动升级支持
-
数据格式演进:
- 版本标识:数据文件包含格式版本
- 转换工具:旧格式到新格式转换
- 混合模式:支持同时读取多种格式
-
API演进:
- 向后兼容:老接口继续支持
- 废弃通知:逐步淘汰老接口
- 新特性:通过featureFlag控制
3. 核心算法
3.1 MVCC快照隔离算法
3.1.1 算法目的
实现多版本并发控制(MVCC),为每个事务提供一致的数据视图,避免读写冲突。
3.1.2 输入输出
输入:
- 事务开始时间戳
- 读取请求(记录ID)
- 写入请求(记录ID和数据)
输出:
- 时间点一致的数据快照
- 事务提交/回滚结果
3.1.3 核心代码
// 快照管理器核心实现
class SnapshotManager {
public:
// 创建快照
Timestamp createSnapshot(OperationContext* opCtx) {
// 1) 获取当前提交时间戳
Timestamp readTimestamp = _getStableTimestamp();
// 2) 创建快照对象
auto snapshot = std::make_shared<Snapshot>(readTimestamp);
// 3) 注册到活跃快照列表
stdx::lock_guard<stdx::mutex> lock(_mutex);
_activeSnapshots[readTimestamp] = snapshot;
// 4) 设置到操作上下文
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(readTimestamp);
return readTimestamp;
}
// 读取数据(基于快照)
StatusWith<BSONObj> readRecord(OperationContext* opCtx,
RecordId recordId) {
// 1) 获取快照时间戳
Timestamp snapTs = shard_role_details::getRecoveryUnit(opCtx)
->getTimestamp();
// 2) 查找记录的版本链
auto versionChain = _getVersionChain(recordId);
// 3) 找到快照时间戳之前的最新版本
for (const auto& version : versionChain) {
if (version.commitTimestamp <= snapTs &&
!version.isDeleted()) {
return version.data;
}
}
// 4) 记录不存在或在快照时间之后创建
return Status(ErrorCodes::NoSuchKey, "Record not found in snapshot");
}
// 写入数据(创建新版本)
Status writeRecord(OperationContext* opCtx,
RecordId recordId,
const BSONObj& data) {
// 1) 分配事务ID和时间戳
TransactionId txnId = _allocateTransactionId();
Timestamp writeTs = _getCurrentTimestamp();
// 2) 创建新版本
RecordVersion newVersion;
newVersion.recordId = recordId;
newVersion.data = data.getOwned();
newVersion.transactionId = txnId;
newVersion.writeTimestamp = writeTs;
newVersion.commitTimestamp = Timestamp(); // 待提交时设置
// 3) 添加到版本链
_addToVersionChain(recordId, newVersion);
// 4) 记录到事务日志
shard_role_details::getRecoveryUnit(opCtx)->registerChange(
std::make_unique<RecordWriteChange>(recordId, newVersion));
return Status::OK();
}
// 提交事务
Status commitTransaction(OperationContext* opCtx,
Timestamp commitTimestamp) {
auto recoveryUnit = shard_role_details::getRecoveryUnit(opCtx);
TransactionId txnId = recoveryUnit->getTransactionId();
// 1) 冲突检测
if (!_validateNoConflicts(txnId, commitTimestamp)) {
return Status(ErrorCodes::WriteConflict,
"Transaction conflicts with committed transaction");
}
// 2) 设置所有版本的提交时间戳
for (auto& change : recoveryUnit->getChanges()) {
if (auto writeChange = dynamic_cast<RecordWriteChange*>(change)) {
writeChange->version.commitTimestamp = commitTimestamp;
}
}
// 3) 持久化到磁盘
_persistChanges(recoveryUnit->getChanges());
// 4) 更新最大提交时间戳
_updateMaxCommitTimestamp(commitTimestamp);
return Status::OK();
}
private:
// 记录版本结构
struct RecordVersion {
RecordId recordId;
BSONObj data;
TransactionId transactionId;
Timestamp writeTimestamp; // 写入时间戳
Timestamp commitTimestamp; // 提交时间戳(0表示未提交)
bool isDeleted() const { return data.isEmpty(); }
};
// 获取记录的版本链(按时间戳降序)
std::vector<RecordVersion> _getVersionChain(RecordId recordId) {
// (省略:从存储引擎读取版本链)
return {};
}
stdx::mutex _mutex;
std::map<Timestamp, std::shared_ptr<Snapshot>> _activeSnapshots;
Timestamp _stableTimestamp;
Timestamp _maxCommitTimestamp;
};
3.1.4 算法步骤注释
- 创建快照: 获取稳定时间戳,创建一致性视图
- 读取数据: 在版本链中找到快照时间戳之前的最新版本
- 写入数据: 创建新版本,加入版本链,延迟提交
- 冲突检测: 检查事务间的读写冲突
- 提交事务: 设置提交时间戳,持久化数据
3.1.5 冲突检测
// 冲突检测算法
bool _validateNoConflicts(TransactionId txnId, Timestamp commitTs) {
// 1) 获取事务的读写集合
auto readSet = _getTransactionReadSet(txnId);
auto writeSet = _getTransactionWriteSet(txnId);
// 2) 检查写-写冲突
for (const RecordId& recordId : writeSet) {
auto versionChain = _getVersionChain(recordId);
for (const auto& version : versionChain) {
// 其他事务在我们读取后提交了写入
if (version.commitTimestamp > _transactionStartTs[txnId] &&
version.commitTimestamp < commitTs &&
version.transactionId != txnId) {
return false; // 写-写冲突
}
}
}
// 3) 检查读-写冲突
for (const RecordId& recordId : readSet) {
auto versionChain = _getVersionChain(recordId);
for (const auto& version : versionChain) {
// 其他事务在我们读取后提交了写入
if (version.commitTimestamp > _transactionStartTs[txnId] &&
version.commitTimestamp < commitTs &&
version.transactionId != txnId) {
return false; // 读-写冲突
}
}
}
return true; // 无冲突
}
3.2 检查点算法
3.2.1 算法目的
定期将内存中的数据和日志刷新到磁盘,建立一致性恢复点,减少崩溃恢复时间。
3.2.2 核心代码
// 检查点管理器核心实现
class CheckpointManager {
public:
// 执行检查点
Status performCheckpoint(bool sync = true) {
CheckpointStats stats;
auto startTime = Date_t::now();
// 1) 暂停新的写操作(可选)
if (_pauseWritesDuringCheckpoint) {
_pauseWrites();
}
// 2) 等待所有活跃事务完成或达到一定时间点
Timestamp checkpointTimestamp = _waitForQuiescePoint();
// 3) 刷新脏页到磁盘
Status flushStatus = _flushDirtyPages(checkpointTimestamp);
if (!flushStatus.isOK()) {
return flushStatus;
}
// 4) 刷新日志到磁盘
Status journalStatus = _flushJournal(checkpointTimestamp);
if (!journalStatus.isOK()) {
return journalStatus;
}
// 5) 创建检查点元数据
CheckpointMetadata metadata;
metadata.timestamp = checkpointTimestamp;
metadata.lastOplogEntry = _getLastOplogEntry();
metadata.openTransactions = _getOpenTransactions();
// 6) 持久化检查点元数据
Status metadataStatus = _persistCheckpointMetadata(metadata);
if (!metadataStatus.isOK()) {
return metadataStatus;
}
// 7) 更新检查点状态
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
_lastCheckpointTimestamp = checkpointTimestamp;
_checkpointInProgress = false;
}
// 8) 恢复写操作
if (_pauseWritesDuringCheckpoint) {
_resumeWrites();
}
// 9) 记录统计信息
stats.duration = Date_t::now() - startTime;
stats.pagesWritten = _pagesWrittenCount;
stats.bytesWritten = _bytesWrittenCount;
return Status::OK();
}
// 崩溃恢复
Status recoverFromCheckpoint() {
// 1) 查找最新的有效检查点
auto checkpointMetadata = _findLatestCheckpoint();
if (!checkpointMetadata) {
return Status(ErrorCodes::DataCorruptionDetected,
"No valid checkpoint found");
}
// 2) 恢复到检查点状态
Status restoreStatus = _restoreFromCheckpoint(*checkpointMetadata);
if (!restoreStatus.isOK()) {
return restoreStatus;
}
// 3) 重放检查点后的日志
Timestamp recoveryStartTs = checkpointMetadata->timestamp;
Status replayStatus = _replayJournal(recoveryStartTs);
if (!replayStatus.isOK()) {
return replayStatus;
}
// 4) 恢复未完成的事务
for (const auto& txnId : checkpointMetadata->openTransactions) {
// 根据事务状态决定提交或回滚
_recoverTransaction(txnId);
}
// 5) 验证数据完整性
Status validateStatus = _validateDataIntegrity();
if (!validateStatus.isOK()) {
return validateStatus;
}
return Status::OK();
}
private:
// 刷新脏页
Status _flushDirtyPages(Timestamp checkpointTs) {
auto dirtyPages = _getDirtyPages();
// 按页面依赖关系排序,确保依赖页面先写入
std::sort(dirtyPages.begin(), dirtyPages.end(),
[](const Page& a, const Page& b) {
return a.lsn < b.lsn; // 按LSN排序
});
// 批量写入脏页
size_t batchSize = 64; // 每批64页
for (size_t i = 0; i < dirtyPages.size(); i += batchSize) {
size_t end = std::min(i + batchSize, dirtyPages.size());
std::vector<Page> batch(dirtyPages.begin() + i,
dirtyPages.begin() + end);
Status writeStatus = _writePagesToDisk(batch);
if (!writeStatus.isOK()) {
return writeStatus;
}
}
// 强制磁盘同步
return _syncDataFiles();
}
stdx::mutex _mutex;
bool _checkpointInProgress = false;
Timestamp _lastCheckpointTimestamp;
std::atomic<size_t> _pagesWrittenCount{0};
std::atomic<size_t> _bytesWrittenCount{0};
};
3.2.3 检查点流程图
sequenceDiagram
autonumber
participant App as 应用程序
participant CP as CheckpointManager
participant TM as TransactionManager
participant Buffer as BufferManager
participant Journal as JournalManager
participant Disk as 磁盘存储
Note over CP: 定时器触发检查点
CP->>CP: 开始检查点
CP->>TM: 获取静默点时间戳
TM->>TM: 等待活跃事务到达一致状态
TM-->>CP: 返回checkpointTimestamp
CP->>Buffer: 获取脏页列表
Buffer-->>CP: 返回脏页
CP->>Buffer: 按依赖顺序刷新脏页
Buffer->>Disk: 批量写入数据页
Disk-->>Buffer: 写入完成
Buffer-->>CP: 脏页刷新完成
CP->>Journal: 刷新日志到磁盘
Journal->>Disk: 写入WAL日志
Disk-->>Journal: 写入完成
Journal-->>CP: 日志刷新完成
CP->>CP: 创建检查点元数据
CP->>Disk: 持久化检查点元数据
Disk-->>CP: 元数据写入完成
CP->>CP: 更新检查点状态
CP-->>App: 检查点完成