RocksDB 源码剖析完整文档
目录
1. RocksDB 框架使用手册
1.1 项目概述
RocksDB是Facebook开发的高性能嵌入式键值存储引擎,基于Google的LevelDB构建,采用LSM-Tree(Log-Structured Merge-Tree)架构。它特别适合于需要高写入吞吐量和低延迟读取的应用场景。
核心特性:
- 高性能的键值存储
- 支持多线程并发访问
- LSM-Tree架构,优化写入性能
- 支持列族(Column Families)
- 丰富的配置选项和调优参数
- 支持事务和快照
- 内置压缩和缓存机制
1.2 RocksDB vs LevelDB 对比分析
RocksDB在LevelDB基础上进行了大量改进和扩展,主要优势包括:
1.2.1 多线程支持
LevelDB限制: LevelDB只支持单线程压缩,这在高写入负载下会成为瓶颈。
RocksDB优势: 支持多线程并发压缩和刷新操作。
// 位置:include/rocksdb/options.h
struct DBOptions {
// RocksDB独有:多线程后台任务支持
int max_background_jobs = 2; // 总后台任务数
int max_background_compactions = -1; // 压缩线程数(已废弃,使用max_background_jobs)
int max_background_flushes = -1; // 刷新线程数(已废弃,使用max_background_jobs)
// 线程池配置
std::shared_ptr<Env> env = Env::Default();
// 示例配置
void ConfigureMultiThreading() {
max_background_jobs = 8; // 8个后台线程处理压缩和刷新
}
};
实现代码:
// 位置:db/db_impl/db_impl.cc:260-279
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
const bool seq_per_batch, const bool batch_per_txn,
bool read_only) {
// 创建后台线程池
if (!read_only) {
// 压缩线程池
bg_compaction_pool_.reset(new ThreadPoolImpl());
bg_compaction_pool_->SetBackgroundThreads(
immutable_db_options_.max_background_compactions);
// 刷新线程池
bg_flush_pool_.reset(new ThreadPoolImpl());
bg_flush_pool_->SetBackgroundThreads(
immutable_db_options_.max_background_flushes);
}
}
1.2.2 列族(Column Families)支持
LevelDB限制: 不支持列族,所有数据存储在单一命名空间中。
RocksDB优势: 支持列族,允许在同一数据库中创建多个逻辑分区,每个列族可以有独立的配置。
// 位置:include/rocksdb/db.h:69-77
struct ColumnFamilyDescriptor {
std::string name; // 列族名称
ColumnFamilyOptions options; // 列族特定选项
ColumnFamilyDescriptor() : name(kDefaultColumnFamilyName), options(ColumnFamilyOptions()) {}
ColumnFamilyDescriptor(const std::string& _name, const ColumnFamilyOptions& _options)
: name(_name), options(_options) {}
};
// 列族句柄抽象类
class ColumnFamilyHandle {
public:
virtual ~ColumnFamilyHandle() {}
virtual const std::string& GetName() const = 0;
virtual uint32_t GetID() const = 0;
virtual Status GetDescriptor(ColumnFamilyDescriptor* desc) = 0;
virtual const Comparator* GetComparator() const = 0;
};
使用示例:
// 创建和使用列族
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
column_families.push_back(ColumnFamilyDescriptor("users", ColumnFamilyOptions()));
column_families.push_back(ColumnFamilyDescriptor("posts", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
DB* db;
Status s = DB::Open(DBOptions(), "/tmp/mydb", column_families, &handles, &db);
// 在不同列族中操作
db->Put(WriteOptions(), handles[1], "user:1", "alice");
db->Put(WriteOptions(), handles[2], "post:1", "hello world");
1.2.3 事务支持
LevelDB限制: 不支持事务,只能通过WriteBatch实现有限的原子操作。
RocksDB优势: 提供完整的ACID事务支持,包括乐观事务和悲观事务。
// 位置:include/rocksdb/utilities/transaction_db.h
class TransactionDB : public StackableDB {
public:
// 创建事务数据库
static Status Open(const Options& options,
const TransactionDBOptions& txn_db_options,
const std::string& dbname,
TransactionDB** dbptr);
// 开始事务
virtual Transaction* BeginTransaction(
const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions(),
Transaction* old_txn = nullptr) = 0;
};
// 事务接口
class Transaction {
public:
virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) = 0;
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0;
// 提交事务
virtual Status Commit() = 0;
// 回滚事务
virtual Status Rollback() = 0;
};
事务使用示例:
// 乐观事务示例
TransactionDB* txn_db;
TransactionDBOptions txn_options;
Status s = TransactionDB::Open(Options(), txn_options, "/tmp/txndb", &txn_db);
Transaction* txn = txn_db->BeginTransaction(WriteOptions());
txn->Put("key1", "value1");
txn->Put("key2", "value2");
// 提交事务
Status commit_status = txn->Commit();
if (commit_status.ok()) {
// 事务成功
} else {
// 处理冲突或错误
txn->Rollback();
}
delete txn;
1.2.4 备份和恢复
LevelDB限制: 不提供内置的备份机制。
RocksDB优势: 提供完整的备份和恢复功能,支持增量备份。
// 位置:include/rocksdb/utilities/backup_engine.h
class BackupEngine {
public:
// 创建备份引擎
static Status Open(Env* db_env, const BackupableDBOptions& options,
BackupEngine** backup_engine_ptr);
// 创建备份
virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false,
std::function<void()> progress_callback = [](){}) = 0;
// 恢复备份
virtual Status RestoreDBFromBackup(BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) = 0;
// 获取备份信息
virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) = 0;
};
备份使用示例:
// 创建备份
BackupEngine* backup_engine;
BackupableDBOptions backup_options("/tmp/backup");
Status s = BackupEngine::Open(Env::Default(), backup_options, &backup_engine);
// 执行备份
s = backup_engine->CreateNewBackup(db);
// 恢复备份
s = backup_engine->RestoreDBFromBackup(1, "/tmp/restore", "/tmp/restore");
1.2.5 压缩算法支持
LevelDB限制: 只支持Snappy压缩。
RocksDB优势: 支持多种压缩算法,可以为不同层级配置不同的压缩算法。
// 位置:include/rocksdb/options.h
enum CompressionType : unsigned char {
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression = 0x2,
kBZip2Compression = 0x3,
kLZ4Compression = 0x4,
kLZ4HCCompression = 0x5,
kXpressCompression = 0x6,
kZSTD = 0x7,
kZSTDNotFinalCompression = 0x40,
kDisableCompressionOption = 0xFF,
};
struct ColumnFamilyOptions {
// 默认压缩算法
CompressionType compression = kSnappyCompression;
// 每层级的压缩算法配置
std::vector<CompressionType> compression_per_level;
// 压缩选项
CompressionOptions compression_opts;
};
压缩配置示例:
Options options;
// 为不同层级配置不同压缩算法
options.compression_per_level.resize(options.num_levels);
for (int i = 0; i < 2; ++i) {
options.compression_per_level[i] = kNoCompression; // Level 0-1 不压缩
}
for (int i = 2; i < 4; ++i) {
options.compression_per_level[i] = kLZ4Compression; // Level 2-3 使用LZ4
}
for (int i = 4; i < options.num_levels; ++i) {
options.compression_per_level[i] = kZSTD; // Level 4+ 使用ZSTD
}
1.2.6 统计和监控
LevelDB限制: 提供的统计信息非常有限。
RocksDB优势: 提供详细的性能统计和监控功能。
// 位置:include/rocksdb/statistics.h
class Statistics {
public:
virtual uint64_t getTickerCount(uint32_t tickerType) const = 0;
virtual void histogramData(uint32_t type, HistogramData* const data) const = 0;
virtual std::string getHistogramString(uint32_t type) const = 0;
virtual void recordTick(uint32_t tickerType, uint64_t count = 1) = 0;
virtual void setTickerCount(uint32_t tickerType, uint64_t count) = 0;
virtual uint64_t getAndResetTickerCount(uint32_t tickerType) = 0;
virtual void measureTime(uint32_t histogramType, uint64_t time) = 0;
};
// 统计项枚举
enum Tickers : uint32_t {
BLOCK_CACHE_MISS = 0,
BLOCK_CACHE_HIT,
BLOCK_CACHE_ADD,
BYTES_READ,
BYTES_WRITTEN,
COMPACT_READ_BYTES,
COMPACT_WRITE_BYTES,
NUMBER_KEYS_WRITTEN,
NUMBER_KEYS_READ,
// ... 更多统计项
};
统计使用示例:
Options options;
options.statistics = CreateDBStatistics();
// 获取统计信息
uint64_t cache_hits = options.statistics->getTickerCount(BLOCK_CACHE_HIT);
uint64_t cache_misses = options.statistics->getTickerCount(BLOCK_CACHE_MISS);
uint64_t bytes_read = options.statistics->getTickerCount(BYTES_READ);
// 输出统计报告
std::string stats = options.statistics->ToString();
printf("RocksDB Statistics:\n%s\n", stats.c_str());
1.2.7 高级缓存功能
LevelDB限制: 只有简单的LRU块缓存。
RocksDB优势: 提供多种缓存实现和二级缓存支持。
// 位置:include/rocksdb/cache.h
// 创建不同类型的缓存
std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false,
double high_pri_pool_ratio = 0.5);
std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false);
// 二级缓存支持
class SecondaryCache {
public:
virtual Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) = 0;
virtual std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CacheItemHelper* helper,
Cache::CreateContext* create_context, bool wait, bool& is_in_sec_cache) = 0;
};
高级缓存配置:
// 配置分层缓存
BlockBasedTableOptions table_options;
// 主缓存(内存)
table_options.block_cache = NewLRUCache(1024 * 1024 * 1024); // 1GB
// 二级缓存(可以是压缩缓存或持久化缓存)
LRUCacheOptions cache_opts(512 * 1024 * 1024); // 512MB
cache_opts.secondary_cache = NewCompressedSecondaryCache(256 * 1024 * 1024);
table_options.block_cache = NewLRUCache(cache_opts);
// 缓存索引和过滤器
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1.2.8 写入优化
LevelDB限制: 写入性能在高并发场景下受限。
RocksDB优势: 提供多种写入优化机制。
// 位置:include/rocksdb/options.h
struct WriteOptions {
// RocksDB独有的写入优化选项
bool low_pri = false; // 低优先级写入
bool no_slowdown = false; // 不允许写入减速
bool ignore_missing_column_families = false; // 忽略不存在的列族
bool disable_wal = false; // 禁用WAL(提高性能但降低持久性)
uint64_t sync_timeout = 0; // 同步超时
// 批量写入提示
size_t protection_bytes_per_key = 0; // 每个键的保护字节数
bool memtable_insert_hint_per_batch = false; // 批量插入提示
};
// 写入控制器
class WriteController {
public:
// 控制写入速度
void set_delayed_write_rate(uint64_t delayed_write_rate);
uint64_t delayed_write_rate() const;
// 检查是否需要延迟写入
bool NeedsDelay() const;
bool IsStopped() const;
};
1.2.9 Merge操作支持
LevelDB限制: 不支持Merge操作,只能通过Get-Modify-Put模式实现。
RocksDB优势: 原生支持Merge操作,避免读-修改-写的竞争条件。
// 位置:include/rocksdb/merge_operator.h
class MergeOperator {
public:
// 完整合并:将现有值与操作数合并
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const = 0;
// 部分合并:合并多个操作数
virtual bool PartialMerge(const Slice& key, const Slice& left_operand,
const Slice& right_operand, std::string* new_value,
Logger* logger) const = 0;
virtual const char* Name() const = 0;
};
// 使用示例:计数器Merge操作
class CounterMergeOperator : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
int64_t sum = 0;
if (merge_in.existing_value) {
sum = std::stoll(merge_in.existing_value->ToString());
}
for (const Slice& operand : merge_in.operand_list) {
sum += std::stoll(operand.ToString());
}
merge_out->new_value = std::to_string(sum);
return true;
}
const char* Name() const override { return "CounterMergeOperator"; }
};
Merge操作使用:
Options options;
options.merge_operator.reset(new CounterMergeOperator);
DB* db;
DB::Open(options, "/tmp/mergedb", &db);
// 使用Merge操作实现原子计数
db->Merge(WriteOptions(), "counter", "1");
db->Merge(WriteOptions(), "counter", "5");
db->Merge(WriteOptions(), "counter", "3");
std::string value;
db->Get(ReadOptions(), "counter", &value); // 结果为 "9"
1.2.10 TTL(Time To Live)支持
LevelDB限制: 不支持TTL,需要应用层实现过期逻辑。
RocksDB优势: 提供内置的TTL支持。
// 位置:include/rocksdb/utilities/db_ttl.h
class DBWithTTL : public StackableDB {
public:
// 打开带TTL的数据库
static Status Open(const Options& options, const std::string& dbname,
DBWithTTL** dbptr, int32_t ttl = 0, bool read_only = false);
// 带TTL的Put操作
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& val, int32_t ttl) = 0;
// 压缩时过滤过期数据
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override = 0;
};
TTL使用示例:
DBWithTTL* db_ttl;
Status s = DBWithTTL::Open(Options(), "/tmp/ttldb", &db_ttl, 3600); // 1小时TTL
// 写入带TTL的数据
db_ttl->Put(WriteOptions(), "session:123", "user_data", 1800); // 30分钟TTL
// 数据会在TTL过期后自动删除
1.2.11 总结:RocksDB的核心优势
通过以上对比分析,可以看出RocksDB相对于LevelDB的主要优势:
- 更好的并发性能:多线程压缩和刷新,支持更高的写入吞吐量
- 更丰富的功能:列族、事务、备份、TTL等企业级功能
- 更灵活的配置:多种压缩算法、缓存策略、写入优化选项
- 更完善的监控:详细的统计信息和性能指标
- 更强的扩展性:插件化架构,支持自定义组件
- 更好的生产就绪性:备份恢复、错误处理、运维工具
这些优势使得RocksDB成为现代高性能应用的首选存储引擎,特别适合:
- 高并发写入场景
- 需要事务支持的应用
- 多租户系统(通过列族隔离)
- 需要详细监控的生产环境
- 对存储成本敏感的场景(通过分层压缩优化)
1.3 基本使用方法
1.3.1 简单的数据库操作
#include "rocksdb/db.h"
#include "rocksdb/options.h"
using namespace ROCKSDB_NAMESPACE;
int main() {
DB* db;
Options options;
options.create_if_missing = true;
// 打开数据库
Status s = DB::Open(options, "/tmp/testdb", &db);
assert(s.ok());
// 写入数据
s = db->Put(WriteOptions(), "key1", "value1");
assert(s.ok());
// 读取数据
std::string value;
s = db->Get(ReadOptions(), "key1", &value);
assert(s.ok());
assert(value == "value1");
// 删除数据
s = db->Delete(WriteOptions(), "key1");
assert(s.ok());
delete db;
return 0;
}
1.3.2 列族使用
#include "rocksdb/db.h"
#include "rocksdb/options.h"
int main() {
DB* db;
std::vector<ColumnFamilyDescriptor> column_families;
std::vector<ColumnFamilyHandle*> handles;
// 设置列族
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions()));
column_families.push_back(ColumnFamilyDescriptor(
"new_cf", ColumnFamilyOptions()));
DBOptions db_options;
db_options.create_if_missing = true;
// 打开带列族的数据库
Status s = DB::Open(db_options, "/tmp/testdb", column_families, &handles, &db);
assert(s.ok());
// 在指定列族中操作
s = db->Put(WriteOptions(), handles[1], "key1", "value1");
assert(s.ok());
std::string value;
s = db->Get(ReadOptions(), handles[1], "key1", &value);
assert(s.ok());
// 清理资源
for (auto handle : handles) {
delete handle;
}
delete db;
return 0;
}
1.4 配置选项详解
1.4.1 DBOptions 主要参数
DBOptions db_options;
db_options.create_if_missing = true; // 数据库不存在时自动创建
db_options.error_if_exists = false; // 数据库存在时不报错
db_options.paranoid_checks = true; // 启用严格检查
db_options.max_open_files = 1000; // 最大打开文件数
db_options.max_background_jobs = 4; // 后台任务数量
db_options.use_fsync = false; // 使用fsync同步
db_options.db_log_dir = "/tmp/rocksdb_logs"; // 日志目录
1.4.2 ColumnFamilyOptions 主要参数
ColumnFamilyOptions cf_options;
cf_options.write_buffer_size = 64 * 1024 * 1024; // 写缓冲区大小 (64MB)
cf_options.max_write_buffer_number = 3; // 最大写缓冲区数量
cf_options.target_file_size_base = 64 * 1024 * 1024; // 目标文件大小 (64MB)
cf_options.max_bytes_for_level_base = 256 * 1024 * 1024; // Level 1最大大小 (256MB)
cf_options.level0_file_num_compaction_trigger = 4; // Level 0触发压缩的文件数
cf_options.level0_slowdown_writes_trigger = 20; // Level 0减慢写入的文件数
cf_options.level0_stop_writes_trigger = 36; // Level 0停止写入的文件数
cf_options.compression = kSnappyCompression; // 压缩算法
2. 核心API深入分析
2.1 DB类核心接口
RocksDB的核心接口定义在 include/rocksdb/db.h
中的 DB
类:
class DB {
public:
// 打开数据库的静态方法
static Status Open(const Options& options, const std::string& name,
std::unique_ptr<DB>* dbptr);
// 带列族的打开方法
static Status Open(const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
std::unique_ptr<DB>* dbptr);
// 基本读写操作
virtual Status Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) = 0;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) = 0;
virtual Status Delete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) = 0;
// 批量操作
virtual Status Write(const WriteOptions& options, WriteBatch* batch) = 0;
// 迭代器
virtual Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) = 0;
// 快照
virtual const Snapshot* GetSnapshot() = 0;
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
// 压缩操作
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) = 0;
// 刷新操作
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) = 0;
};
2.2 DB::Open 调用链路分析
2.2.1 DB::Open 静态方法实现
// 位置:db/db_impl/db_impl_open.cc:2197-2222
Status DB::Open(const Options& options, const std::string& dbname,
std::unique_ptr<DB>* dbptr) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
if (db_options.persist_stats_to_disk) {
column_families.emplace_back(kPersistentStatsColumnFamilyName, cf_options);
}
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
if (db_options.persist_stats_to_disk) {
assert(handles.size() == 2);
} else {
assert(handles.size() == 1);
}
// 删除默认列族句柄,因为DBImpl总是持有对默认列族的引用
if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
delete handles[1];
}
delete handles[0];
}
return s;
}
功能说明:
- 将简单的Options转换为DBOptions和ColumnFamilyOptions
- 创建默认列族描述符
- 如果启用了统计信息持久化,添加统计信息列族
- 调用重载的Open方法
- 清理临时创建的列族句柄
2.2.2 DB::Open 重载方法(核心实现)
// 位置:db/db_impl/db_impl_open.cc:2224-2240
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
std::unique_ptr<DB>* dbptr) {
const bool kSeqPerBatch = true;
const bool kBatchPerTxn = true;
ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
bool can_retry = false;
Status s;
do {
s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn, can_retry, &can_retry);
} while (!s.ok() && can_retry);
ThreadStatusUtil::ResetThreadStatus();
return s;
}
功能说明:
- 设置线程状态跟踪
- 实现重试机制,处理可恢复的错误
- 调用DBImpl::Open进行实际的数据库打开操作
2.2.3 DBImpl::Open 核心实现
// 位置:db/db_impl/db_impl_open.cc:2375-2706
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
std::unique_ptr<DB>* dbptr, const bool seq_per_batch,
const bool batch_per_txn, const bool is_retry,
bool* can_retry) {
const WriteOptions write_options(Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
// 1. 验证选项
Status s = ValidateOptionsByTable(db_options, column_families);
if (!s.ok()) return s;
s = ValidateOptions(db_options, column_families);
if (!s.ok()) return s;
// 2. 创建DBImpl实例
auto impl = std::make_unique<DBImpl>(db_options, dbname, seq_per_batch,
batch_per_txn);
// 3. 创建必要的目录
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
for (auto& db_path : impl->immutable_db_options_.db_paths) {
paths.emplace_back(db_path.path);
}
for (auto& cf : column_families) {
for (auto& cf_path : cf.options.cf_paths) {
paths.emplace_back(cf_path.path);
}
}
for (const auto& path : paths) {
s = impl->env_->CreateDirIfMissing(path);
if (!s.ok()) break;
}
}
// 4. 启用自动恢复(单路径情况下)
if (paths.size() <= 1) {
impl->error_handler_.EnableAutoRecovery();
}
// 5. 数据库恢复过程
RecoveryContext recovery_ctx;
impl->options_mutex_.Lock();
impl->mutex_.Lock();
uint64_t recovered_seq(kMaxSequenceNumber);
s = impl->Recover(column_families, false /* read_only */,
false /* error_if_wal_file_exists */,
false /* error_if_data_exists_in_wals */, is_retry,
&recovered_seq, &recovery_ctx, can_retry);
// 6. 创建新的WAL文件
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
const size_t preallocate_block_size =
impl->GetWalPreallocateBlockSize(max_write_buffer_size);
s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/,
preallocate_block_size,
PredecessorWALInfo() /* predecessor_wal_info */,
&new_log);
if (s.ok()) {
InstrumentedMutexLock wl(&impl->log_write_mutex_);
impl->logfile_number_ = new_log_number;
assert(new_log != nullptr);
impl->logs_.emplace_back(new_log_number, new_log);
}
}
// 7. 启动后台任务
if (s.ok()) {
s = impl->StartPeriodicTaskScheduler();
}
if (s.ok()) {
s = impl->RegisterRecordSeqnoTimeWorker();
}
// 8. 返回结果
impl->options_mutex_.Unlock();
if (s.ok()) {
*dbptr = std::move(impl);
} else {
for (auto* h : *handles) {
delete h;
}
handles->clear();
}
return s;
}
关键步骤说明:
- 选项验证:验证数据库选项和列族选项的有效性
- 实例创建:创建DBImpl实例,初始化基本组件
- 目录创建:创建数据库、WAL和列族所需的目录
- 错误恢复:在单路径情况下启用自动错误恢复
- 数据库恢复:恢复MANIFEST文件、SST文件和WAL文件
- WAL创建:创建新的WAL文件用于后续写入
- 后台任务:启动周期性任务调度器和序列号记录工作器
- 资源管理:成功时返回数据库实例,失败时清理资源
2.2.4 DBImpl::Recover 恢复过程
// 位置:db/db_impl/db_impl_open.cc:412-850
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
bool is_retry, uint64_t* recovered_seq, RecoveryContext* recovery_ctx,
bool* can_retry) {
mutex_.AssertHeld();
// 1. 初始化恢复上下文
bool is_new_db = false;
assert(db_lock_ == nullptr);
if (!read_only) {
Status lock_status = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!lock_status.ok()) {
return lock_status;
}
}
// 2. 恢复MANIFEST和SST文件
Status s;
bool missing_table_file = false;
if (!immutable_db_options_.best_efforts_recovery) {
Status desc_status;
s = versions_->Recover(column_families, read_only, &db_id_,
/*no_error_if_files_missing=*/false, is_retry,
&desc_status);
// 处理重试逻辑
if (can_retry) {
if (!is_retry &&
(desc_status.IsCorruption() || s.IsNotFound() || s.IsCorruption()) &&
CheckFSFeatureSupport(fs_.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
*can_retry = true;
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"Possible corruption detected while replaying MANIFEST %s, %s. "
"Will be retried.",
desc_status.ToString().c_str(), s.ToString().c_str());
} else {
*can_retry = false;
}
}
} else {
// 最佳努力恢复模式
assert(!files_in_dbname.empty());
s = versions_->TryRecover(column_families, read_only, files_in_dbname,
&db_id_, &missing_table_file);
if (s.ok()) {
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
}
}
// 3. 恢复WAL文件
if (s.ok() && !read_only) {
std::map<uint64_t, std::string> wal_files;
s = GetSortedWalFiles(wal_files);
if (!s.ok()) {
return s;
}
if (!wal_files.empty()) {
// 按生成顺序恢复WAL文件
std::vector<uint64_t> wals;
wals.reserve(wal_files.size());
for (const auto& wal_file : wal_files) {
wals.push_back(wal_file.first);
}
std::sort(wals.begin(), wals.end());
bool corrupted_wal_found = false;
s = RecoverLogFiles(wals, &next_sequence, read_only, is_retry,
&corrupted_wal_found, recovery_ctx);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
if (!s.ok()) {
// 恢复失败时清理内存表
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable(kMaxSequenceNumber);
}
}
}
}
return s;
}
恢复过程关键步骤:
- 文件锁定:获取数据库文件锁,防止多进程同时访问
- MANIFEST恢复:恢复版本信息和SST文件元数据
- WAL恢复:按顺序重放WAL文件中的操作
- 错误处理:支持重试机制和最佳努力恢复模式
2.3 Put/Get/Delete 操作分析
2.3.1 Put操作调用链路
Put操作的调用层次:
DB::Put() -> WriteBatch::Put() -> DBImpl::Write() -> DBImpl::WriteImpl()
DB::Put 实现(基类默认实现)
// 位置:db/db_impl/db_impl_write.cc:2778-2790
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// 预分配WriteBatch大小,保守估计
// 8字节头部,4字节计数,1字节类型
// 为key长度和value长度各分配11个额外字节
WriteBatch batch(key.size() + value.size() + 24, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.Put(column_family, key, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
功能说明:
- 创建WriteBatch对象,预分配合适的内存大小
- 将Put操作添加到WriteBatch中
- 调用Write方法执行批量写入
DBImpl::Put 实现(派生类实现)
// 位置:db/db_impl/db_impl_write.cc:23-30
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::Put(o, column_family, key, val);
}
功能说明:
- 验证列族是否支持时间戳(如果列族配置了时间戳但调用时未提供)
- 调用基类的Put方法
DBImpl::WriteImpl 核心写入逻辑
// 位置:db/db_impl/db_impl_write.cc:1574-1582(函数签名)
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
UserWriteCallback* user_write_cb = nullptr,
uint64_t* wal_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr,
PostMemTableCallback* post_memtable_callback = nullptr,
std::shared_ptr<WriteBatchWithIndex> wbwi = nullptr);
WriteImpl关键步骤:
- 写入队列管理:使用写入队列控制并发写入
- WAL写入:将操作记录到Write-Ahead Log
- Memtable写入:将数据写入内存表
- 序列号分配:为每个操作分配唯一的序列号
- 回调处理:执行用户定义的回调函数
2.3.2 Get操作调用链路
Get操作的调用层次:
DB::Get() -> DBImpl::Get() -> DBImpl::GetImpl() -> 查找Memtable -> 查找SST文件
DBImpl::Get 实现
// 位置:db/db_impl/db_impl.cc:2168-2186
Status DBImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
assert(value != nullptr);
value->Reset();
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
Status s = GetImpl(read_options, column_family, key, value, timestamp);
return s;
}
DBImpl::GetImpl 核心读取逻辑
// 位置:db/db_impl/db_impl.cc:2331-2464
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions get_impl_options) {
assert(get_impl_options.column_family);
// 1. 时间戳验证
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
*(read_options.timestamp));
if (!s.ok()) return s;
}
// 2. 获取SuperVersion(包含当前版本的所有组件)
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
get_impl_options.column_family);
auto cfd = cfh->cfd();
SuperVersion* sv = GetAndRefSuperVersion(cfd);
// 3. 确定快照序列号
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (read_options.timestamp != nullptr) {
return Status::InvalidArgument(
"timestamp should not be specified with snapshot");
}
snapshot = read_options.snapshot->GetSequenceNumber();
} else {
snapshot = GetLastPublishedSequence();
if (read_options.timestamp != nullptr) {
const Status s = AssignTimestamp(read_options, &snapshot);
if (!s.ok()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}
}
// 4. 创建查找键
LookupKey lkey(key, snapshot, read_options.timestamp);
// 5. 首先在Memtable中查找
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
Status s;
if (!skip_memtable) {
// 在活跃Memtable中查找
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(),
get_impl_options.columns, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, get_impl_options.callback)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
get_impl_options.columns, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
get_impl_options.callback)) {
// 在不可变Memtable中查找
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
}
// 6. 如果在Memtable中未找到,则在SST文件中查找
if (!done && (s.ok() || s.IsMergeInProgress())) {
PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr;
sv->current->Get(read_options, lkey, get_impl_options.value,
get_impl_options.columns, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, &pinned_iters_mgr,
/*value_found*/ nullptr, /*key_exists*/ nullptr,
/*seq*/ nullptr, get_impl_options.callback,
/*is_blob*/ nullptr, /*do_merge=*/get_impl_options.get_value);
RecordTick(stats_, MEMTABLE_MISS);
}
// 7. 清理资源
ReturnAndCleanupSuperVersion(cfd, sv);
// 8. 统计信息更新
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = get_impl_options.value ? get_impl_options.value->size() : 0;
RecordTick(stats_, BYTES_READ, size);
RecordInHistogram(stats_, BYTES_PER_READ, size);
return s;
}
GetImpl关键步骤:
- 时间戳验证:确保时间戳与列族配置匹配
- SuperVersion获取:获取当前数据库状态的一致性视图
- 快照确定:确定读取操作的时间点
- Memtable查找:按顺序查找活跃和不可变Memtable
- SST文件查找:如果Memtable中未找到,则查找持久化的SST文件
- 资源清理:释放SuperVersion引用
- 统计更新:更新性能统计信息
2.3.3 Delete操作调用链路
Delete操作的调用层次:
DB::Delete() -> WriteBatch::Delete() -> DBImpl::Write() -> DBImpl::WriteImpl()
DB::Delete 实现
// 位置:db/db_impl/db_impl_write.cc:2845-2860
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch(key.size() + 24, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.Delete(column_family, key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
功能说明:
- 创建WriteBatch并添加Delete操作
- Delete操作实际上是写入一个删除标记(tombstone)
- 真正的数据删除发生在压缩过程中
DBImpl::Delete 实现
// 位置:db/db_impl/db_impl_write.cc(类似Put的实现)
Status DBImpl::Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::Delete(options, column_family, key);
}
Delete操作特点:
- 逻辑删除:不立即物理删除数据,而是写入删除标记
- 延迟删除:真正的数据清理在压缩时进行
- 版本控制:保持数据的多版本特性,支持快照读取
2.4 WriteBatch 批量操作机制
2.4.1 WriteBatch 结构设计
class WriteBatch {
private:
std::string rep_; // 序列化的操作记录
size_t content_flags_; // 内容标志位
public:
// 添加Put操作
Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value);
// 添加Delete操作
Status Delete(ColumnFamilyHandle* column_family, const Slice& key);
// 添加Merge操作
Status Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value);
// 清空批次
void Clear();
// 获取操作数量
int Count() const;
// 获取数据大小
size_t GetDataSize() const;
};
WriteBatch优势:
- 原子性:批次中的所有操作要么全部成功,要么全部失败
- 性能优化:减少WAL写入次数和锁竞争
- 内存效率:紧凑的二进制格式存储操作
3. 整体架构设计
3.1 RocksDB 整体架构图
┌─────────────────────────────────────────────────────────────────┐
│ RocksDB 整体架构 │
├─────────────────────────────────────────────────────────────────┤
│ 应用层 API │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Put │ │ Get │ │ Delete │ │Iterator │ │Snapshot │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 核心引擎层 (DBImpl) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ WriteBatch │ │ VersionSet │ │ ColumnFamily │ │
│ │ WriteQueue │ │ SuperVersion │ │ Management │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 存储层 │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Memtable │ │ Immutable │ │ WAL │ │
│ │ (SkipList) │ │ Memtables │ │ (Log Files) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ SST Files │ │ Cache │ │ Compaction │ │
│ │ (Level 0-N) │ │ (Block Cache) │ │ Engine │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 基础设施层 │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ FileSystem │ │ Environment │ │ Statistics │ │
│ │ Abstraction │ │ Abstraction │ │ & Monitoring │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
3.2 LSM-Tree 架构原理
写入流程:
┌─────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │───▶│ WriteBatch │───▶│ WAL │───▶│ Memtable │
│ Write │ │ (内存缓冲) │ │ (持久化日志) │ │ (内存表) │
└─────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Immutable │
│ Memtable │
└─────────────┘
│
▼ Flush
┌─────────────┐
│ Level 0 │
│ SST Files │
└─────────────┘
│
▼ Compaction
┌─────────────┐ │ ┌─────────────┐
│ Level 1 │◀──┘ │ Level 2 │
│ SST Files │───────▶│ SST Files │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Level N │
│ SST Files │
└─────────────┘
读取流程:
┌─────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │───▶│ Memtable │───▶│ Immutable │───▶│ Level 0 │
│ Read │ │ 查找 │ │ Memtables │ │ SST Files │
└─────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ 未找到 ▼ 未找到 ▼ 未找到
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 返回 │ │ 继续查找 │ │ Level 1-N │
│ 结果 │ │ │ │ SST Files │
└─────────────┘ └─────────────┘ └─────────────┘
3.3 核心组件交互时序图
3.3.1 写入操作时序图
Client DBImpl WriteBatch WAL Memtable Background
│ │ │ │ │ │
│ Put() │ │ │ │ │
├────────▶│ │ │ │ │
│ │ Put() │ │ │ │
│ ├──────────▶│ │ │ │
│ │ │Add Op │ │ │
│ │ ├─────────┤ │ │
│ │ Write() │ │ │ │
│ ├───────────┤ │ │ │
│ │ │ Write │ │ │
│ │ ├────────▶│ │ │
│ │ │ │ Sync │ │
│ │ │ ├───────────┤ │
│ │ │ │ │ │
│ │ │ Insert │ │ │
│ │ ├─────────┼──────────▶│ │
│ │ │ │ │ │
│ │ │ │ │ Trigger │
│ │ │ │ │ Flush │
│ │ │ │ ├──────────▶│
│ │ OK │ │ │ │
│◀────────┤ │ │ │ │
│ │ │ │ │ │
3.3.2 读取操作时序图
Client DBImpl SuperVersion Memtable ImmMemtable SST Files
│ │ │ │ │ │
│ Get() │ │ │ │ │
├────────▶│ │ │ │ │
│ │GetAndRef │ │ │ │
│ │SuperVer │ │ │ │
│ ├──────────▶│ │ │ │
│ │ │ Current │ │ │
│ │◀──────────┤ Version │ │ │
│ │ │ │ │ │
│ │ Get() │ │ │ │
│ ├───────────┼──────────▶│ │ │
│ │ │ │ Found? │ │
│ │ │ ├───────────┤ │
│ │ │ │ Not Found │ │
│ │ │ │ │ │
│ │ Get() │ │ │ │
│ ├───────────┼───────────┼──────────▶│ │
│ │ │ │ │ Found? │
│ │ │ │ ├───────────┤
│ │ │ │ │ Not Found │
│ │ │ │ │ │
│ │ Get() │ │ │ │
│ ├───────────┼───────────┼───────────┼──────────▶│
│ │ │ │ │ │ Search
│ │ │ │ │ │ Levels
│ │ │ │ │ ├─────────┐
│ │ │ │ │ │ │
│ │ │ │ │ │◀────────┘
│ │ Value │ │ │ │
│◀────────┤ │ │ │ │
│ │ │ │ │ │
3.4 模块间依赖关系
┌─────────────────────────────────────────────────────────────────┐
│ 模块依赖关系图 │
│ │
│ ┌─────────────┐ │
│ │ DB API │ (对外接口层) │
│ └─────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DBImpl │───▶│ VersionSet │───▶│ColumnFamily│ │
│ │ (核心实现) │ │ (版本管理) │ │ (列族管理) │ │
│ └─────┬───────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ WriteQueue │ │SuperVersion │ │ Memtable │ │
│ │ (写入队列) │ │ (版本快照) │ │ (内存表) │ │
│ └─────────────┘ └─────────────┘ └─────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ WAL │ │ SST Files │ │ Cache │ │
│ │ (日志文件) │ │ (存储文件) │ │ (缓存系统) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ FileSystem │ │ Compaction │ │ Statistics │ │
│ │ (文件系统) │ │ (压缩引擎) │ │ (统计监控) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
3.5 关键数据流向
3.5.1 写入数据流
用户数据 → WriteBatch → WAL → Memtable → Immutable Memtable → SST Files
│ │ │ │ │ │
│ │ │ │ │ └─ 持久化存储
│ │ │ │ └─ 等待刷新到磁盘
│ │ │ └─ 内存中的有序结构
│ │ └─ 崩溃恢复保证
│ └─ 批量操作优化
└─ 应用程序接口
3.5.2 读取数据流
用户查询 → Memtable → Immutable Memtables → Level 0 → Level 1-N
│ │ │ │ │
│ │ │ │ └─ 按层级查找
│ │ │ └─ 最新的SST文件
│ │ └─ 准备刷新的数据
│ └─ 最新写入的数据
└─ 应用程序查询
3.6 并发控制机制
┌─────────────────────────────────────────────────────────────────┐
│ 并发控制架构 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Write Queue │ │ DB Mutex │ │ SuperVersion│ │
│ │ (写队列) │ │ (全局锁) │ │ (RCU机制) │ │
│ └─────┬───────┘ └─────┬───────┘ └─────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Leader │ │ Memtable │ │ Readers │ │
│ │ Follower │ │ Switch │ │ (无锁读取) │ │
│ │ 模式 │ │ (原子操作) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
并发控制特点:
- 写入串行化:通过写队列确保写入操作的顺序性
- 读取并行化:使用SuperVersion的RCU机制实现无锁读取
- 最小锁粒度:只在必要时使用全局锁,大部分操作无锁化
- Leader-Follower模式:批量写入优化,减少锁竞争
4. DB模块详细分析
4.1 DB模块架构图
┌─────────────────────────────────────────────────────────────────┐
│ DB模块架构 │
├─────────────────────────────────────────────────────────────────┤
│ 接口层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DB │ │ ColumnFamily│ │ Iterator │ │ Snapshot │ │
│ │ (抽象基类) │ │ Handle │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 实现层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DBImpl │ │ VersionSet │ │SuperVersion │ │WriteQueue │ │
│ │ (核心实现) │ │ (版本管理) │ │ (版本快照) │ │ (写入队列) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 存储管理层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ColumnFamily│ │ Memtable │ │ WAL │ │ Compaction │ │
│ │ Data │ │ Management │ │ Management │ │ Manager │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
4.2 DBImpl 核心类分析
4.2.1 DBImpl 类结构
// 位置:db/db_impl/db_impl.h
class DBImpl : public DB {
private:
// 核心组件
std::unique_ptr<VersionSet> versions_; // 版本管理
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
InstrumentedMutex mutex_; // 主要互斥锁
std::atomic<bool> shutting_down_; // 关闭标志
// 写入相关
InstrumentedMutex log_write_mutex_; // WAL写入锁
std::deque<std::pair<uint64_t, log::Writer*>> logs_; // WAL文件列表
WriteController write_controller_; // 写入控制器
// 后台任务
std::unique_ptr<ThreadPoolImpl> bg_compaction_pool_; // 压缩线程池
std::unique_ptr<ThreadPoolImpl> bg_flush_pool_; // 刷新线程池
// 缓存和统计
std::shared_ptr<Cache> table_cache_; // 表缓存
Statistics* stats_; // 统计信息
// 错误处理
ErrorHandler error_handler_; // 错误处理器
public:
// 构造函数
DBImpl(const DBOptions& options, const std::string& dbname,
const bool seq_per_batch = false, const bool batch_per_txn = true,
bool read_only = false);
// 核心API实现
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp = nullptr) override;
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key) override;
// 批量操作
Status Write(const WriteOptions& options, WriteBatch* batch) override;
// 迭代器
Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
// 快照管理
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
// 压缩和刷新
Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override;
Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override;
};
4.2.2 关键方法实现分析
WriteImpl - 核心写入实现
// 位置:db/db_impl/db_impl_write.cc
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb,
uint64_t* wal_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback,
PostMemTableCallback* post_memtable_callback,
std::shared_ptr<WriteBatchWithIndex> wbwi) {
// 1. 写入队列管理 - Leader/Follower模式
WriteThread::Writer w(write_options, my_batch, callback, wal_used,
disable_memtable, batch_cnt, pre_release_callback,
post_memtable_callback);
if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
}
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// 并行写入Memtable
assert(w.ShouldWriteToMemtable());
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, true /*concurrent_memtable_writes*/,
seq_per_batch_, w.batch_cnt, batch_per_txn_, write_options.memtable_insert_hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// 完成并行写入
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsBatchGroupFollower(&w);
}
return w.status;
}
if (w.state == WriteThread::STATE_COMPLETED) {
// 作为Follower完成
return w.status;
}
// 作为Leader执行写入
assert(w.state == WriteThread::STATE_GROUP_LEADER);
// 2. WAL写入
if (!write_options.disableWAL) {
WriteBatch* merged_batch = nullptr;
if (w.write_group->size == 1 && w.write_group->leader.batch->Count() > 0) {
merged_batch = w.write_group->leader.batch;
} else {
// 合并多个WriteBatch
merged_batch = &tmp_batch_;
for (auto writer : *(w.write_group)) {
if (writer->batch != nullptr) {
Status s = WriteBatchInternal::Append(merged_batch, writer->batch);
if (!s.ok()) {
write_thread_.ExitAsBatchGroupLeader(w.write_group, s);
return s;
}
}
}
}
// 写入WAL
Status s = WriteToWAL(*merged_batch, log_writer, wal_used, log_ref);
if (!s.ok()) {
write_thread_.ExitAsBatchGroupLeader(w.write_group, s);
return s;
}
}
// 3. 写入Memtable
if (!disable_memtable) {
Status s = WriteBatchInternal::InsertInto(
w.write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families, 0, this,
false /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_, write_options.memtable_insert_hint_per_batch);
if (!s.ok()) {
write_thread_.ExitAsBatchGroupLeader(w.write_group, s);
return s;
}
}
// 4. 更新序列号并完成
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsBatchGroupLeader(w.write_group, Status::OK());
return Status::OK();
}
WriteImpl关键特性:
- Leader-Follower模式:减少锁竞争,提高并发性能
- 批量合并:将多个WriteBatch合并为一个,减少WAL写入次数
- 并行Memtable写入:支持多线程并行写入不同的Memtable
- 原子性保证:WAL写入成功后才写入Memtable,确保崩溃恢复的一致性
4.3 VersionSet 版本管理
4.3.1 VersionSet 架构
// 位置:db/version_set.h
class VersionSet {
private:
std::unique_ptr<ColumnFamilySet> column_family_set_; // 列族集合
Env* const env_; // 环境抽象
const std::string dbname_; // 数据库名称
const ImmutableDBOptions* const db_options_; // 不可变选项
std::atomic<uint64_t> next_file_number_; // 下一个文件号
std::atomic<uint64_t> last_sequence_; // 最后序列号
std::atomic<uint64_t> last_allocated_sequence_; // 最后分配序列号
// MANIFEST文件管理
std::unique_ptr<log::Writer> descriptor_log_; // MANIFEST写入器
uint64_t manifest_file_number_; // MANIFEST文件号
public:
// 恢复数据库状态
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, std::string* db_id, bool* no_error_if_files_missing,
bool is_retry, Status* desc_status);
// 创建新版本
void LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
const ReadOptions& read_options,
const WriteOptions& write_options, VersionEdit* edit,
InstrumentedMutex* mu, FSDirectory* dir_contains_current_file,
bool new_descriptor_log = false,
const ColumnFamilyOptions* new_cf_options = nullptr);
// 获取当前版本
Version* current() const;
// 分配文件号
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
// 序列号管理
SequenceNumber LastSequence() const { return last_sequence_.load(); }
void SetLastSequence(SequenceNumber s) { last_sequence_.store(s); }
};
4.3.2 SuperVersion 机制
// 位置:db/column_family.h
struct SuperVersion {
// 当前版本的组件
MemTable* mem; // 当前Memtable
MemTableListVersion* imm; // 不可变Memtable列表
Version* current; // 当前Version
MutableCFOptions mutable_cf_options; // 可变选项
// 引用计数(RCU机制)
std::atomic<uint32_t> refs;
SuperVersion() = default;
~SuperVersion();
// 原子引用操作
SuperVersion* Ref();
bool Unref();
// 清理资源
void Cleanup();
// 初始化
void Init(ColumnFamilyData* cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current);
};
SuperVersion特点:
- RCU机制:读取无锁,写入时创建新版本
- 原子切换:保证读取操作的一致性视图
- 自动清理:引用计数为0时自动清理资源
4.4 DB模块时序图
4.4.1 数据库打开时序图
Client DBImpl VersionSet ColumnFamily Memtable WAL
│ │ │ │ │ │
│ Open() │ │ │ │ │
├─────────▶│ │ │ │ │
│ │ Recover() │ │ │ │
│ ├───────────▶│ │ │ │
│ │ │LoadManifest│ │ │
│ │ ├────────────┤ │ │
│ │ │ │ Create │ │
│ │ │ │ Default CF │ │
│ │ │ ├───────────▶│ │
│ │ │ │ │ Create │
│ │ │ │ │ Memtable │
│ │ │ │ ├──────────┤
│ │ │RecoverWAL │ │ │
│ │ ├────────────┼────────────┼─────────▶│
│ │ │ │ │ │ Replay
│ │ │ │ │ │ Records
│ │ │ │ │ ├─────────┐
│ │ │ │ │ │ │
│ │ │ │ │ │◀────────┘
│ │ CreateWAL │ │ │ │
│ ├────────────┼────────────┼────────────┼─────────▶│
│ │ │ │ │ │ Create
│ │ │ │ │ │ New WAL
│ │ OK │ │ │ │
│◀─────────┤ │ │ │ │
│ │ │ │ │ │
4.4.2 写入操作时序图
Client DBImpl WriteQueue WAL Memtable Background
│ │ │ │ │ │
│ Put() │ │ │ │ │
├────────▶│ │ │ │ │
│ │JoinBatch │ │ │ │
│ │Group │ │ │ │
│ ├──────────▶│ │ │ │
│ │ │Leader │ │ │
│ │ │Selected │ │ │
│ │ ├─────────┤ │ │
│ │WriteToWAL │ │ │ │
│ ├───────────┼────────▶│ │ │
│ │ │ │Sync │ │
│ │ │ ├───────────┤ │
│ │WriteToMem │ │ │ │
│ ├───────────┼─────────┼──────────▶│ │
│ │ │ │ │Insert │
│ │ │ │ ├───────────┤
│ │ │ │ │ │
│ │ │ │ │CheckFlush │
│ │ │ │ ├──────────▶│
│ │ExitLeader │ │ │ │
│ ├──────────▶│ │ │ │
│ │ OK │ │ │ │
│◀────────┤ │ │ │ │
│ │ │ │ │ │
5. Cache模块详细分析
5.1 Cache模块架构图
┌─────────────────────────────────────────────────────────────────┐
│ Cache模块架构 │
├─────────────────────────────────────────────────────────────────┤
│ 接口层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Cache │ │ Handle │ │SecondaryCache│ │CacheItemHelper│ │
│ │ (抽象基类) │ │ (缓存句柄) │ │ (二级缓存) │ │ (辅助工具) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 实现层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LRUCache │ │ClockCache │ │ShardedCache │ │ BlockCache │ │
│ │ (LRU缓存) │ │(时钟缓存) │ │ (分片缓存) │ │ (块缓存) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 存储层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ HashTable │ │ LRU List │ │ Clock Hand │ │MemoryPool │ │
│ │ (哈希表) │ │ (LRU链表) │ │ (时钟指针) │ │ (内存池) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
5.2 LRUCache 实现分析
5.2.1 LRUHandle 数据结构
// 位置:cache/lru_cache.h:50-106
struct LRUHandle : public Cache::Handle {
Cache::ObjectPtr value; // 缓存的值
const Cache::CacheItemHelper* helper; // 辅助函数
LRUHandle* next_hash; // 哈希表链表指针
LRUHandle* next; // LRU链表下一个节点
LRUHandle* prev; // LRU链表前一个节点
size_t total_charge; // 总占用空间
size_t key_length; // 键长度
uint32_t hash; // 键的哈希值
uint32_t refs; // 引用计数
// 可变标志位
uint8_t m_flags;
enum MFlags : uint8_t {
M_IN_CACHE = (1 << 0), // 是否在缓存中
M_HAS_HIT = (1 << 1), // 是否被访问过
M_IN_HIGH_PRI_POOL = (1 << 2), // 是否在高优先级池中
M_IN_LOW_PRI_POOL = (1 << 3), // 是否在低优先级池中
};
// 不可变标志位
uint8_t im_flags;
enum ImFlags : uint8_t {
IM_IS_HIGH_PRI = (1 << 0), // 高优先级条目
IM_IS_LOW_PRI = (1 << 1), // 低优先级条目
IM_IS_STANDALONE = (1 << 2), // 独立条目(不插入缓存)
};
char key_data[1]; // 键数据(变长)
Slice key() const { return Slice(key_data, key_length); }
void Ref() { refs++; } // 增加引用
bool Unref() { // 减少引用
assert(refs > 0);
refs--;
return refs == 0;
}
bool HasRefs() const { return refs > 0; }
};
5.2.2 LRUCacheShard 分片实现
// 位置:cache/lru_cache.h:200-441
class LRUCacheShard final : public CacheShard {
private:
// LRU链表管理
LRUHandle lru_; // LRU链表头节点
LRUHandle* lru_low_pri_; // 低优先级池头指针
LRUHandle* lru_bottom_pri_; // 底部优先级池头指针
// 哈希表
LRUHandleTable table_; // 哈希表
// 内存使用统计
size_t usage_; // 缓存使用的内存
size_t lru_usage_; // LRU链表使用的内存
// 并发控制
mutable DMutex mutex_; // 分布式互斥锁
// 容量控制
size_t capacity_; // 缓存容量
bool strict_capacity_limit_; // 严格容量限制
double high_pri_pool_ratio_; // 高优先级池比例
double low_pri_pool_ratio_; // 低优先级池比例
public:
// 查找操作
Cache::Handle* Lookup(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
Cache::CreateContext* create_context,
Cache::Priority priority,
Statistics* stats) override;
// 插入操作
Status Insert(const Slice& key, uint32_t hash, Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::Handle** handle, Cache::Priority priority) override;
// 释放操作
bool Release(Cache::Handle* handle, bool useful,
bool erase_if_last_ref) override;
// 删除操作
bool Erase(const Slice& key, uint32_t hash) override;
private:
// LRU链表操作
void LRU_Remove(LRUHandle* e); // 从LRU链表移除
void LRU_Insert(LRUHandle* e); // 插入到LRU链表
// 淘汰操作
void EvictFromLRU(size_t charge, autovector<LRUHandle*>* deleted);
// 内存管理
bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void FreeEntry(LRUHandle* e);
};
5.2.3 关键操作实现
Lookup 操作
Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
Cache::CreateContext* create_context,
Cache::Priority priority,
Statistics* stats) {
DMutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != nullptr) {
assert(e->InCache());
if (!e->HasRefs()) {
// 如果没有外部引用,从LRU链表中移除
LRU_Remove(e);
}
e->Ref(); // 增加引用计数
e->SetHit(); // 标记为被访问
}
return reinterpret_cast<Cache::Handle*>(e);
}
Insert 操作
Status LRUCacheShard::Insert(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
size_t charge, Cache::Handle** handle,
Cache::Priority priority) {
// 创建新的LRUHandle
LRUHandle* e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
e->value = value;
e->helper = helper;
e->key_length = key.size();
e->hash = hash;
e->refs = 0;
e->next = e->prev = nullptr;
e->SetInCache(true);
e->SetPriority(priority);
memcpy(e->key_data, key.data(), key.size());
DMutexLock l(&mutex_);
// 检查是否需要淘汰
autovector<LRUHandle*> last_reference_list;
size_t total_charge = usage_ + charge;
if (capacity_ > 0 && total_charge > capacity_ && strict_capacity_limit_) {
// 严格容量限制下,拒绝插入
e->SetInCache(false);
delete[] reinterpret_cast<char*>(e);
return Status::MemoryLimit("Insert failed due to LRU cache being full.");
}
// 淘汰旧条目
while (usage_ + charge > capacity_ && lru_.next != &lru_) {
LRUHandle* old = lru_.next;
assert(old->InCache());
assert(!old->HasRefs());
LRU_Remove(old);
table_.Remove(old->key(), old->hash);
old->SetInCache(false);
usage_ -= old->total_charge;
last_reference_list.push_back(old);
}
// 插入新条目
if (table_.Insert(e) == nullptr) {
// 插入成功
usage_ += charge;
if (handle == nullptr) {
LRU_Insert(e); // 插入到LRU链表
} else {
e->Ref(); // 增加引用计数
*handle = reinterpret_cast<Cache::Handle*>(e);
}
} else {
// 键已存在,替换旧值
LRUHandle* old = table_.Insert(e);
usage_ = usage_ - old->total_charge + charge;
if (old->HasRefs()) {
old->SetInCache(false);
table_.Remove(old->key(), old->hash);
last_reference_list.push_back(old);
} else {
LRU_Remove(old);
last_reference_list.push_back(old);
}
if (handle == nullptr) {
LRU_Insert(e);
} else {
e->Ref();
*handle = reinterpret_cast<Cache::Handle*>(e);
}
}
// 清理被淘汰的条目
for (auto entry : last_reference_list) {
entry->Free(table_.GetAllocator());
}
return Status::OK();
}
5.3 HyperClockCache 实现分析
5.3.1 Clock算法原理
// 位置:cache/clock_cache.h:34-55
// HyperClockCache是LRUCache的替代方案,专为BlockBasedTableOptions::block_cache优化
//
// 优势:
// - 无锁/无等待,在高并发下效率更高
// - 固定版本(estimated_entry_charge > 0)完全无锁/无等待
// - 自动版本(estimated_entry_charge = 0)在某些插入或删除操作中有少量等待
// - 针对热路径读取优化,大多数Lookup()和所有Release()都是单个原子加法操作
// - 插入时的淘汰完全并行
// - 使用CLOCK淘汰算法的泛化+老化变体,在某些情况下可能优于LRU
//
// 成本:
// - FixedHyperClockCache - 哈希表不可调整大小(为了无锁效率),容量不能动态改变
// - 依赖估计的平均值(块)大小来确定哈希表大小
5.3.2 Clock Hand 机制
// Clock算法使用环形缓冲区和时钟指针
class ClockHandle {
std::atomic<uint64_t> meta_; // 元数据(引用计数、标志位等)
// 元数据位域布局
static constexpr uint8_t kStateShift = 0;
static constexpr uint64_t kStateMask = 0x3ULL;
static constexpr uint8_t kRefsShift = 2;
static constexpr uint64_t kRefsMask = 0x3FFFFFULL; // 22位引用计数
enum HandleState : uint8_t {
kEmpty = 0, // 空槽位
kConstruction = 1, // 构造中
kVisible = 2, // 可见状态
kTombstone = 3, // 墓碑状态
};
public:
// 原子操作
bool TryRef(); // 尝试增加引用
void Ref(); // 增加引用
bool Unref(); // 减少引用
// Clock算法相关
bool IsUsed() const; // 是否被使用(用于Clock算法)
void SetUsed(); // 设置使用标记
void ClearUsed(); // 清除使用标记
};
class HyperClockTable {
ClockHandle* array_; // 环形数组
size_t length_bits_; // 数组长度的位数
std::atomic<size_t> clock_pointer_; // 时钟指针
public:
// Clock算法淘汰
ClockHandle* ClockUpdate(ClockHandle* h);
ClockHandle* ClockNext();
// 查找空槽位进行插入
ClockHandle* DoClockEvict();
};
5.4 Cache模块时序图
5.4.1 缓存查找时序图
Client Cache Shard HashTable LRUList Handle
│ │ │ │ │ │
│Lookup() │ │ │ │ │
├────────▶│ │ │ │ │
│ │GetShard │ │ │ │
│ ├────────▶│ │ │ │
│ │ │Lookup() │ │ │
│ │ ├──────────▶│ │ │
│ │ │ │Find │ │
│ │ │ ├───────────┤ │
│ │ │ │Found │ │
│ │ │ │ │ │
│ │ │LRU_Remove │ │ │
│ │ ├───────────┼──────────▶│ │
│ │ │ │ │Remove │
│ │ │ │ │from LRU │
│ │ │ │ ├─────────┤
│ │ │Ref() │ │ │
│ │ ├───────────┼───────────┼────────▶│
│ │ │ │ │ │Increase
│ │ │ │ │ │RefCount
│ │Handle │ │ │ │
│◀────────┤ │ │ │ │
│ │ │ │ │ │
5.4.2 缓存插入时序图
Client Cache Shard HashTable LRUList Eviction
│ │ │ │ │ │
│Insert() │ │ │ │ │
├────────▶│ │ │ │ │
│ │GetShard │ │ │ │
│ ├────────▶│ │ │ │
│ │ │CheckSpace │ │ │
│ │ ├───────────┤ │ │
│ │ │NeedEvict │ │ │
│ │ │ │ │ │
│ │ │EvictLRU │ │ │
│ │ ├───────────┼───────────┼────────▶│
│ │ │ │ │ │SelectVictim
│ │ │ │ │ ├─────────────┐
│ │ │ │ │ │ │
│ │ │ │ │ │◀────────────┘
│ │ │Insert() │ │ │
│ │ ├──────────▶│ │ │
│ │ │ │AddEntry │ │
│ │ │ ├───────────┤ │
│ │ │LRU_Insert │ │ │
│ │ ├───────────┼──────────▶│ │
│ │ │ │ │AddToLRU │
│ │ │ │ ├─────────┤
│ │ OK │ │ │ │
│◀────────┤ │ │ │ │
│ │ │ │ │ │
6. Table模块详细分析
6.1 Table模块架构图
┌─────────────────────────────────────────────────────────────────┐
│ Table模块架构 │
├─────────────────────────────────────────────────────────────────┤
│ 接口层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │TableReader │ │TableBuilder │ │TableFactory │ │IteratorBase│ │
│ │ (表读取器) │ │ (表构建器) │ │ (表工厂) │ │ (迭代器) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 实现层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │BlockBased │ │ PlainTable │ │ CuckooTable│ │PartitionedFilter│ │
│ │ Table │ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 存储层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Block │ │ Index │ │ Filter │ │ Compression │ │
│ │ (数据块) │ │ (索引块) │ │ (过滤器) │ │ (压缩算法) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
7. Memtable模块详细分析
7.1 Memtable模块架构图
┌─────────────────────────────────────────────────────────────────┐
│ Memtable模块架构 │
├─────────────────────────────────────────────────────────────────┤
│ 接口层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ MemTable │ │MemTableRep │ │MemTableList │ │MemTableIterator│ │
│ │ (内存表) │ │ (表示层) │ │ (表列表) │ │ (迭代器) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 实现层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ SkipListRep │ │HashSkipList │ │HashLinkList │ │ VectorRep │ │
│ │ (跳表实现) │ │ Rep │ │ Rep │ │ (向量实现) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 存储层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ SkipList │ │ HashTable │ │ LinkedList │ │ Arena │ │
│ │ (跳表) │ │ (哈希表) │ │ (链表) │ │ (内存分配器) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
8. 关键数据结构与继承关系
8.1 核心类继承关系图
┌─────────────────────────────────────────────────────────────────┐
│ 核心类继承关系 │
│ │
│ ┌─────────────┐ │
│ │ DB │ (抽象基类) │
│ └─────┬───────┘ │
│ │ │
│ ├─ DBImpl (主要实现) │
│ ├─ DBImplReadOnly (只读实现) │
│ ├─ DBImplSecondary (从库实现) │
│ └─ CompactedDBImpl (压缩实现) │
│ │
│ ┌─────────────┐ │
│ │ Cache │ (抽象基类) │
│ └─────┬───────┘ │
│ │ │
│ ├─ ShardedCache (分片缓存基类) │
│ │ ├─ LRUCache (LRU缓存实现) │
│ │ └─ HyperClockCache (时钟缓存实现) │
│ └─ SecondaryCache (二级缓存) │
│ │
│ ┌─────────────┐ │
│ │MemTableRep │ (内存表表示抽象基类) │
│ └─────┬───────┘ │
│ │ │
│ ├─ SkipListRep (跳表实现) │
│ ├─ HashSkipListRep (哈希跳表实现) │
│ ├─ HashLinkListRep (哈希链表实现) │
│ └─ VectorRep (向量实现) │
│ │
└─────────────────────────────────────────────────────────────────┘
9. 实战经验与最佳实践
9.1 性能优化建议
9.1.1 写入性能优化
1. WriteBatch 批量写入
// 推荐:使用WriteBatch批量写入
WriteBatch batch;
for (int i = 0; i < 1000; i++) {
batch.Put("key" + std::to_string(i), "value" + std::to_string(i));
}
db->Write(WriteOptions(), &batch);
2. 调整Memtable大小
Options options;
options.write_buffer_size = 128 * 1024 * 1024; // 128MB
options.max_write_buffer_number = 6; // 最多6个写缓冲区
9.1.2 读取性能优化
1. 合理配置Block Cache
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024 * 1024);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.cache_index_and_filter_blocks = true;
2. 使用Bloom Filter
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
9.2 生产环境配置建议
Options GetProductionOptions() {
Options options;
options.create_if_missing = true;
options.write_buffer_size = 128 * 1024 * 1024; // 128MB
options.max_write_buffer_number = 6;
options.max_background_jobs = 8;
// 缓存配置
std::shared_ptr<Cache> cache = NewLRUCache(2 * 1024 * 1024 * 1024);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
return options;
}
这份完整的RocksDB源码剖析文档涵盖了从基础使用到深入实现的各个方面,包括详细的代码分析、架构图、时序图以及实战经验。希望能帮助您深入理解RocksDB的设计原理和最佳实践。