MongoDB-09-存储引擎模块-API#
1. 存储引擎核心API#
1.1 StorageEngine接口#
1.1.1 基本信息#
- 名称:
StorageEngine
- 作用: 存储引擎的顶层接口,管理数据库和集合
- 实现: WiredTigerKVEngine(默认)、DevNullKVEngine(测试用)
1.1.2 核心方法结构#
class StorageEngine {
public:
// 恢复接口
virtual void loadCatalog(OperationContext* opCtx,
boost::optional<Timestamp> stableTimestamp) = 0;
// 数据库管理
virtual void listDatabases(std::vector<DatabaseName>* out) const = 0;
virtual bool supportsCappedCollections() const = 0;
// 集合管理
virtual Status createRecordStore(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionOptions& options) = 0;
virtual std::unique_ptr<RecordStore> getRecordStore(
OperationContext* opCtx,
const NamespaceString& nss,
StringData ident,
const CollectionOptions& options) = 0;
// 索引管理
virtual Status createSortedDataInterface(OperationContext* opCtx,
const CollectionOptions& collOptions,
StringData ident,
const IndexDescriptor* desc) = 0;
virtual std::unique_ptr<SortedDataInterface> getSortedDataInterface(
OperationContext* opCtx,
const CollectionOptions& collOptions,
StringData ident,
const IndexDescriptor* desc) = 0;
// 事务支持
virtual std::unique_ptr<RecoveryUnit> newRecoveryUnit() = 0;
virtual bool supportsDocLocking() const = 0;
virtual bool supportsDirectoryPerDB() const = 0;
// 检查点管理
virtual Status createCheckpoint(OperationContext* opCtx) = 0;
virtual Timestamp getStableTimestamp() const = 0;
virtual void setStableTimestamp(Timestamp stableTimestamp,
bool force = false) = 0;
// 元数据管理
virtual void setJournalListener(JournalListener* jl) = 0;
virtual bool isDurable() const = 0;
virtual bool isEphemeral() const = 0;
// 备份恢复
virtual StatusWith<std::vector<std::string>> beginBackup(
OperationContext* opCtx) = 0;
virtual void endBackup(OperationContext* opCtx) = 0;
};
1.1.3 方法详细说明#
| 方法名 |
返回类型 |
参数 |
功能说明 |
| loadCatalog |
void |
opCtx, stableTimestamp |
启动时加载数据库目录,恢复集合和索引元数据 |
| createRecordStore |
Status |
opCtx, nss, options |
创建新的记录存储,用于存储集合文档 |
| getRecordStore |
unique_ptr |
opCtx, nss, ident, options |
获取现有记录存储的接口 |
| createSortedDataInterface |
Status |
opCtx, collOptions, ident, desc |
创建新的索引存储 |
| getSortedDataInterface |
unique_ptr |
opCtx, collOptions, ident, desc |
获取现有索引存储的接口 |
| newRecoveryUnit |
unique_ptr |
无 |
创建新的事务恢复单元 |
| createCheckpoint |
Status |
opCtx |
创建数据一致性检查点 |
| getStableTimestamp |
Timestamp |
无 |
获取当前稳定时间戳 |
| setStableTimestamp |
void |
stableTimestamp, force |
设置稳定时间戳,用于快照读取 |
1.1.4 入口函数与关键代码#
// WiredTiger存储引擎实现
class WiredTigerKVEngine : public StorageEngine {
public:
// 创建记录存储
Status createRecordStore(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionOptions& options) override {
// 1) 生成唯一标识符
std::string ident = _createIdent(nss, options);
// 2) 构造WiredTiger表配置
StringBuilder config;
config << "type=file,internal_page_max=16k,leaf_page_max=16k";
// 配置压缩
if (!options.storageEngine.isEmpty()) {
BSONElement compression = options.storageEngine["wiredTiger"]["configString"];
if (!compression.eoo()) {
config << "," << compression.String();
}
}
// 3) 创建WiredTiger表
WT_SESSION* session = _getSession(opCtx);
std::string uri = "table:" + ident;
int ret = session->create(session, uri.c_str(), config.str().c_str());
if (ret != 0) {
return wtRCToStatus(ret, "Failed to create collection");
}
// 4) 注册到目录
_catalog->newEntry(opCtx, nss, ident, BSONObj());
return Status::OK();
}
// 获取记录存储
std::unique_ptr<RecordStore> getRecordStore(
OperationContext* opCtx,
const NamespaceString& nss,
StringData ident,
const CollectionOptions& options) override {
// 1) 检查标识符是否存在
if (!_catalog->isCollectionIdent(ident)) {
return nullptr;
}
// 2) 确定集合类型
if (nss.isOplog()) {
// Oplog使用特殊的记录存储
return std::make_unique<WiredTigerRecordStore>(
this, opCtx, nss.toString(), ident, true, /* isOplog */
nullptr, /* cappedCallback */);
} else if (options.capped) {
// 固定集合
return std::make_unique<WiredTigerRecordStore>(
this, opCtx, nss.toString(), ident, false,
std::make_shared<CappedCallback>(options));
} else {
// 普通集合
return std::make_unique<WiredTigerRecordStore>(
this, opCtx, nss.toString(), ident, false, nullptr);
}
}
// 创建检查点
Status createCheckpoint(OperationContext* opCtx) override {
// 1) 获取检查点管理器
auto checkpointThread = WiredTigerCheckpointThread::get(_conn);
// 2) 触发检查点
checkpointThread->waitForCheckpointToComplete();
// 3) 验证检查点完成
WT_SESSION* session = _getSession(opCtx);
int ret = session->checkpoint(session, "force=true");
if (ret != 0) {
return wtRCToStatus(ret, "Checkpoint failed");
}
return Status::OK();
}
private:
WT_CONNECTION* _conn; // WiredTiger连接
std::unique_ptr<WiredTigerCatalog> _catalog; // 目录管理
std::string _canonicalName; // 引擎名称
bool _durable; // 是否持久化
};
1.1.5 调用链路分析#
sequenceDiagram
autonumber
participant DB as Database
participant SE as StorageEngine
participant WT as WiredTiger
participant Catalog as WTCatalog
participant File as DataFile
DB->>SE: createRecordStore(nss, options)
activate SE
SE->>SE: 生成ident标识符
SE->>SE: 构造WT配置字符串
SE->>WT: session->create(uri, config)
activate WT
WT->>File: 创建数据文件
File-->>WT: 文件创建成功
WT-->>SE: 表创建成功
deactivate WT
SE->>Catalog: newEntry(nss, ident)
activate Catalog
Catalog->>Catalog: 更新元数据
Catalog-->>SE: 元数据已更新
deactivate Catalog
SE-->>DB: Status::OK()
deactivate SE
1.2 RecordStore接口#
1.2.1 基本信息#
- 名称:
RecordStore
- 作用: 记录存储接口,提供文档的CRUD操作
- 实现: WiredTigerRecordStore
1.2.2 核心方法结构#
class RecordStore {
public:
// 插入记录
virtual Status insertRecord(OperationContext* opCtx,
const char* data,
int len,
RecordId* out) = 0;
virtual Status insertRecords(OperationContext* opCtx,
std::vector<Record>* records,
const std::vector<Timestamp>& timestamps) = 0;
// 更新记录
virtual Status updateRecord(OperationContext* opCtx,
const RecordId& recordId,
const char* data,
int len) = 0;
// 删除记录
virtual Status deleteRecord(OperationContext* opCtx,
const RecordId& recordId) = 0;
// 查询记录
virtual RecordData dataFor(OperationContext* opCtx,
const RecordId& recordId) const = 0;
virtual bool findRecord(OperationContext* opCtx,
const RecordId& recordId,
RecordData* out) const = 0;
// 游标操作
virtual std::unique_ptr<SeekableRecordCursor> getCursor(
OperationContext* opCtx,
bool forward = true) const = 0;
// 统计信息
virtual int64_t numRecords(OperationContext* opCtx) const = 0;
virtual int64_t dataSize(OperationContext* opCtx) const = 0;
virtual int64_t storageSize(OperationContext* opCtx) const = 0;
// 集合特性
virtual bool isCapped() const = 0;
virtual bool isOplog() const = 0;
virtual void setCappedCallback(CappedCallback* cb) = 0;
// 验证和修复
virtual Status validate(OperationContext* opCtx,
ValidateResults* results,
BSONObjBuilder* output) = 0;
virtual void doDeleteRecord(OperationContext* opCtx,
const RecordId& recordId) = 0;
};
1.2.3 WiredTiger实现的核心代码#
class WiredTigerRecordStore : public RecordStore {
public:
WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
OperationContext* opCtx,
StringData nss,
StringData ident,
bool isOplog,
std::shared_ptr<CappedCallback> cappedCallback)
: _kvEngine(kvEngine),
_nss(nss.toString()),
_ident(ident.toString()),
_isOplog(isOplog),
_cappedCallback(cappedCallback) {
// 构造表URI
_tableURI = "table:" + _ident;
// 打开游标以验证表存在
WT_SESSION* session = _kvEngine->getSession(opCtx);
WT_CURSOR* cursor;
int ret = session->open_cursor(session, _tableURI.c_str(),
nullptr, nullptr, &cursor);
invariant(ret == 0);
cursor->close(cursor);
}
// 插入记录实现
Status insertRecord(OperationContext* opCtx,
const char* data,
int len,
RecordId* out) override {
// 1) 获取WiredTiger会话
WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(opCtx);
WT_SESSION* session = ru->getSession();
// 2) 打开游标
WT_CURSOR* cursor;
int ret = session->open_cursor(session, _tableURI.c_str(),
nullptr, "overwrite=false", &cursor);
if (ret != 0) {
return wtRCToStatus(ret, "Failed to open cursor for insert");
}
// 3) 生成RecordId
RecordId recordId;
if (_isOplog) {
// Oplog使用时间戳作为RecordId
recordId = _generateOplogRecordId(opCtx);
} else {
// 普通集合使用递增ID
recordId = _nextRecordId();
}
// 4) 设置键值
WiredTigerItem key(recordId.repr());
cursor->set_key(cursor, key.Get());
WiredTigerItem value(data, len);
cursor->set_value(cursor, value.Get());
// 5) 插入记录
ret = WT_OP_CHECK(cursor->insert(cursor));
cursor->close(cursor);
if (ret != 0) {
if (ret == WT_DUPLICATE_KEY) {
return Status(ErrorCodes::DuplicateKey,
"Duplicate key error");
}
return wtRCToStatus(ret, "Insert failed");
}
// 6) 更新统计信息
_sizeStorer->store(_sizeStorerUri, _numRecords.addAndFetch(1),
_dataSize.addAndFetch(len));
// 7) 处理固定集合截断
if (_cappedCallback) {
_cappedCallback->notifyOfInsert(opCtx, recordId);
}
*out = recordId;
return Status::OK();
}
// 查找记录实现
RecordData dataFor(OperationContext* opCtx,
const RecordId& recordId) const override {
// 1) 获取会话和游标
WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(opCtx);
WT_CURSOR* cursor = ru->getCursor(_tableURI, true);
// 2) 设置查找键
WiredTigerItem key(recordId.repr());
cursor->set_key(cursor, key.Get());
// 3) 查找记录
int ret = WT_OP_CHECK(cursor->search(cursor));
if (ret == WT_NOTFOUND) {
return RecordData();
}
invariant(ret == 0);
// 4) 获取数据
WT_ITEM value;
ret = cursor->get_value(cursor, &value);
invariant(ret == 0);
// 5) 返回数据(共享缓冲区)
return RecordData(static_cast<const char*>(value.data), value.size)
.getOwned();
}
// 删除记录实现
Status deleteRecord(OperationContext* opCtx,
const RecordId& recordId) override {
// 1) 获取当前记录大小(用于统计)
RecordData oldData = dataFor(opCtx, recordId);
if (oldData.data() == nullptr) {
return Status(ErrorCodes::NoSuchKey, "Record not found");
}
// 2) 获取游标
WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(opCtx);
WT_CURSOR* cursor = ru->getCursor(_tableURI, true);
// 3) 定位到记录
WiredTigerItem key(recordId.repr());
cursor->set_key(cursor, key.Get());
int ret = WT_OP_CHECK(cursor->search(cursor));
if (ret == WT_NOTFOUND) {
return Status(ErrorCodes::NoSuchKey, "Record not found");
}
// 4) 删除记录
ret = WT_OP_CHECK(cursor->remove(cursor));
if (ret != 0) {
return wtRCToStatus(ret, "Delete failed");
}
// 5) 更新统计信息
_sizeStorer->store(_sizeStorerUri,
_numRecords.subtractAndFetch(1),
_dataSize.subtractAndFetch(oldData.size()));
return Status::OK();
}
private:
WiredTigerKVEngine* _kvEngine;
std::string _nss;
std::string _ident;
std::string _tableURI;
bool _isOplog;
std::shared_ptr<CappedCallback> _cappedCallback;
mutable AtomicWord<long long> _numRecords;
mutable AtomicWord<long long> _dataSize;
std::unique_ptr<SizeStorer> _sizeStorer;
};
1.3 RecoveryUnit接口#
1.3.1 基本信息#
- 名称:
RecoveryUnit
- 作用: 事务恢复单元,管理单个事务的生命周期
- 实现: WiredTigerRecoveryUnit
1.3.2 核心方法结构#
class RecoveryUnit {
public:
// 事务控制
virtual void beginUnitOfWork(OperationContext* opCtx) = 0;
virtual void commitUnitOfWork() = 0;
virtual void abortUnitOfWork() = 0;
// 持久性控制
virtual bool waitUntilDurable(OperationContext* opCtx) = 0;
virtual bool waitUntilUnjournaledWritesDurable(OperationContext* opCtx) = 0;
// 快照管理
virtual void abandonSnapshot() = 0;
virtual Status setTimestamp(Timestamp timestamp) = 0;
virtual Timestamp getTimestamp() const = 0;
// 更改跟踪
virtual void registerChange(std::unique_ptr<Change> change) = 0;
virtual void onRollback(std::function<void()> callback) = 0;
virtual void onCommit(std::function<void()> callback) = 0;
// 状态查询
virtual bool inUnitOfWork() const = 0;
virtual bool hasUncommittedWrites() const = 0;
// 读取状态
virtual void setReadOnce(bool readOnce) = 0;
virtual bool getReadOnce() const = 0;
};
1.3.3 WiredTiger实现的核心代码#
class WiredTigerRecoveryUnit : public RecoveryUnit {
public:
// 开始工作单元(事务)
void beginUnitOfWork(OperationContext* opCtx) override {
invariant(!_inUnitOfWork);
// 1) 设置事务状态
_inUnitOfWork = true;
_hasUncommittedWrites = false;
// 2) 开始WT事务
if (!_isTimestamped()) {
// 无时间戳事务
int ret = _session->begin_transaction(_session, nullptr);
invariant(ret == 0);
} else {
// 带时间戳事务
StringBuilder config;
config << "read_timestamp=" << _commitTimestamp.asULL();
int ret = _session->begin_transaction(_session, config.str().c_str());
invariant(ret == 0);
}
// 3) 注册到活跃事务列表
_kvEngine->registerActiveTransaction(this);
}
// 提交工作单元
void commitUnitOfWork() override {
invariant(_inUnitOfWork);
try {
// 1) 执行提交前回调
for (auto& callback : _onCommitCallbacks) {
callback();
}
// 2) 应用所有变更
for (auto& change : _changes) {
change->commit(boost::none);
}
// 3) 提交WT事务
if (_isTimestamped()) {
StringBuilder config;
config << "commit_timestamp=" << _commitTimestamp.asULL();
int ret = _session->commit_transaction(_session,
config.str().c_str());
invariant(ret == 0);
} else {
int ret = _session->commit_transaction(_session, nullptr);
invariant(ret == 0);
}
// 4) 清理状态
_inUnitOfWork = false;
_hasUncommittedWrites = false;
_changes.clear();
_onCommitCallbacks.clear();
_onRollbackCallbacks.clear();
} catch (...) {
// 提交失败,回滚事务
abortUnitOfWork();
throw;
}
}
// 回滚工作单元
void abortUnitOfWork() override {
invariant(_inUnitOfWork);
try {
// 1) 执行回滚前回调
for (auto& callback : _onRollbackCallbacks) {
callback();
}
// 2) 回滚所有变更
for (auto it = _changes.rbegin(); it != _changes.rend(); ++it) {
(*it)->rollback();
}
// 3) 回滚WT事务
int ret = _session->rollback_transaction(_session, nullptr);
invariant(ret == 0);
} catch (...) {
// 忽略回滚中的异常
}
// 4) 清理状态
_inUnitOfWork = false;
_hasUncommittedWrites = false;
_changes.clear();
_onCommitCallbacks.clear();
_onRollbackCallbacks.clear();
}
// 设置时间戳
Status setTimestamp(Timestamp timestamp) override {
if (_inUnitOfWork) {
return Status(ErrorCodes::BadValue,
"Cannot set timestamp in active transaction");
}
_commitTimestamp = timestamp;
_isTimestamped = true;
return Status::OK();
}
// 等待持久化
bool waitUntilDurable(OperationContext* opCtx) override {
// 1) 检查是否需要等待
if (!_hasUncommittedWrites) {
return true;
}
// 2) 强制检查点
_kvEngine->flushAllFiles(opCtx, true);
// 3) 等待日志同步
return _kvEngine->waitUntilDurable(opCtx, _commitTimestamp);
}
private:
WT_SESSION* _session;
WiredTigerKVEngine* _kvEngine;
bool _inUnitOfWork = false;
bool _hasUncommittedWrites = false;
bool _isTimestamped = false;
Timestamp _commitTimestamp;
std::vector<std::unique_ptr<Change>> _changes;
std::vector<std::function<void()>> _onCommitCallbacks;
std::vector<std::function<void()>> _onRollbackCallbacks;
};
2. 存储引擎时序图#
2.1 事务提交流程#
sequenceDiagram
autonumber
participant App as 应用程序
participant RU as RecoveryUnit
participant WT as WiredTiger
participant Journal as Journal
participant Disk as 磁盘
App->>RU: beginUnitOfWork()
activate RU
RU->>WT: begin_transaction()
activate WT
WT-->>RU: 事务已开始
deactivate WT
App->>RU: insertRecord(data)
RU->>RU: registerChange()
RU->>WT: cursor->insert()
activate WT
WT->>WT: 写入缓冲区
WT-->>RU: 插入成功
deactivate WT
App->>RU: commitUnitOfWork()
RU->>RU: 执行onCommit回调
loop 应用所有变更
RU->>RU: change->commit()
end
RU->>WT: commit_transaction()
activate WT
WT->>Journal: 写入事务日志
activate Journal
Journal->>Disk: 持久化日志
Disk-->>Journal: 日志已写入
Journal-->>WT: 日志提交完成
deactivate Journal
WT-->>RU: 事务提交成功
deactivate WT
RU->>RU: 清理事务状态
RU-->>App: 提交完成
deactivate RU
2.2 检查点执行流程#
sequenceDiagram
autonumber
participant Timer as 定时器
participant CP as CheckpointThread
participant SE as StorageEngine
participant Buffer as BufferPool
participant WT as WiredTiger
participant Disk as 磁盘
Timer->>CP: 触发检查点
activate CP
CP->>SE: 获取稳定时间戳
SE-->>CP: stableTimestamp
CP->>Buffer: 获取脏页列表
Buffer-->>CP: dirtyPages[]
CP->>WT: 开始检查点
activate WT
loop 刷新脏页
WT->>Disk: 写入数据页
Disk-->>WT: 页面已写入
end
WT->>Disk: 同步数据文件
Disk-->>WT: 同步完成
WT->>Disk: 写入检查点元数据
Disk-->>WT: 元数据已写入
WT-->>CP: 检查点完成
deactivate WT
CP->>SE: 更新检查点状态
SE-->>CP: 状态已更新
CP-->>Timer: 检查点执行完成
deactivate CP