1. API概述

RocksDB提供了丰富的API接口,主要分为以下几个层次:

  • C++ API: 核心的C++接口,提供完整功能
  • C API: C语言绑定接口,便于其他语言调用
  • Java API: Java语言绑定
  • Python API: Python语言绑定

本文档主要分析C++ API的核心接口及其实现。

2. 核心API接口分析

2.1 数据库打开接口 - DB::Open

2.1.1 接口定义

位置: include/rocksdb/db.h

// 基础打开接口:打开单个列族的数据库
// @param options: 数据库配置选项,包含所有配置参数
// @param name: 数据库路径,指定数据库文件存储位置
// @param dbptr: 输出参数,返回数据库实例的智能指针
// @return: 操作状态,OK表示成功,其他表示失败原因
static Status Open(const Options& options, 
                   const std::string& name,
                   std::unique_ptr<DB>* dbptr);

// 多列族打开接口:打开包含多个列族的数据库
// @param db_options: 数据库级别的配置选项
// @param name: 数据库路径
// @param column_families: 列族描述符向量,包含所有列族的名称和选项
// @param handles: 输出参数,返回列族句柄向量
// @param dbptr: 输出参数,返回数据库实例
// @return: 操作状态
static Status Open(const DBOptions& db_options, 
                   const std::string& name,
                   const std::vector<ColumnFamilyDescriptor>& column_families,
                   std::vector<ColumnFamilyHandle*>* handles,
                   std::unique_ptr<DB>* dbptr);

2.1.2 实现入口函数

位置: db/db_impl/db_impl_open.cc:2197

// DB::Open的基础实现,处理单列族情况
Status DB::Open(const Options& options, const std::string& dbname,
                std::unique_ptr<DB>* dbptr) {
  // 将Options分解为DBOptions和ColumnFamilyOptions
  DBOptions db_options(options);
  ColumnFamilyOptions cf_options(options);
  
  // 创建列族描述符向量,包含默认列族
  std::vector<ColumnFamilyDescriptor> column_families;
  column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
  
  // 如果启用了统计信息持久化,添加统计列族
  if (db_options.persist_stats_to_disk) {
    column_families.emplace_back(kPersistentStatsColumnFamilyName, cf_options);
  }
  
  // 调用多列族版本的Open函数
  std::vector<ColumnFamilyHandle*> handles;
  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
  
  // 清理临时的列族句柄(DBImpl内部持有引用)
  if (s.ok()) {
    if (db_options.persist_stats_to_disk) {
      assert(handles.size() == 2);
      if (handles[1] != nullptr) {
        delete handles[1];  // 删除统计列族句柄
      }
    } else {
      assert(handles.size() == 1);
    }
    delete handles[0];  // 删除默认列族句柄
  }
  return s;
}

2.1.3 核心实现函数

位置: db/db_impl/db_impl_open.cc:2224

// 多列族Open的实现入口
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
                const std::vector<ColumnFamilyDescriptor>& column_families,
                std::vector<ColumnFamilyHandle*>* handles,
                std::unique_ptr<DB>* dbptr) {
  const bool kSeqPerBatch = true;   // 每批次独立序列号
  const bool kBatchPerTxn = true;   // 每事务独立批次
  
  // 设置线程状态跟踪
  ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
  
  bool can_retry = false;
  Status s;
  
  // 重试机制:某些错误情况下可以重试打开
  do {
    s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
                     !kSeqPerBatch, kBatchPerTxn, can_retry, &can_retry);
  } while (!s.ok() && can_retry);
  
  ThreadStatusUtil::ResetThreadStatus();
  return s;
}

2.1.4 DBImpl::Open实现

位置: db/db_impl/db_impl_open.cc:2380

// DBImpl的核心Open实现
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
                    const std::vector<ColumnFamilyDescriptor>& column_families,
                    std::vector<ColumnFamilyHandle*>* handles,
                    std::unique_ptr<DB>* dbptr,
                    bool seq_per_batch, bool batch_per_txn, bool first_time,
                    bool* can_retry) {
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
  const ReadOptions read_options(Env::IOActivity::kDBOpen);

  // 1. 验证配置选项
  Status s = ValidateOptionsByTable(db_options, column_families);
  if (!s.ok()) return s;
  
  s = ValidateOptions(db_options, column_families);
  if (!s.ok()) return s;

  // 2. 初始化输出参数
  *dbptr = nullptr;
  assert(handles);
  handles->clear();

  // 3. 计算最大写缓冲区大小
  size_t max_write_buffer_size = 0;
  MinAndMaxPreserveSeconds preserve_info;
  for (const auto& cf : column_families) {
    max_write_buffer_size = 
        std::max(max_write_buffer_size, cf.options.write_buffer_size);
    preserve_info.Combine(cf.options);
  }

  // 4. 创建DBImpl实例
  auto impl = std::make_unique<DBImpl>(db_options, dbname, seq_per_batch,
                                       batch_per_txn);
  
  // 5. 检查日志创建状态
  if (!impl->immutable_db_options_.info_log) {
    s = impl->init_logger_creation_s_;
    return s;
  }

  // 6. 创建必要的目录
  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
  if (s.ok()) {
    // 创建数据库路径和列族路径
    std::vector<std::string> paths;
    for (auto& db_path : impl->immutable_db_options_.db_paths) {
      paths.emplace_back(db_path.path);
    }
    for (auto& cf : column_families) {
      for (auto& cf_path : cf.options.cf_paths) {
        paths.emplace_back(cf_path.path);
      }
    }
    
    for (const auto& path : paths) {
      s = impl->env_->CreateDirIfMissing(path);
      if (!s.ok()) break;
    }

    // 启用自动恢复(仅单路径情况)
    if (paths.size() <= 1) {
      impl->error_handler_.EnableAutoRecovery();
    }
  }

  // 7. 创建归档目录
  if (s.ok()) {
    s = impl->CreateArchivalDirectory();
  }
  
  if (!s.ok()) return s;

  // 8. 设置WAL路径标志
  impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
  
  // 9. 获取锁并开始恢复过程
  RecoveryContext recovery_ctx;
  impl->options_mutex_.Lock();
  impl->mutex_.Lock();

  // 10. 处理create_if_missing和error_if_exists选项
  uint64_t recovered_seq(kMaxSequenceNumber);
  bool save_current_options = false;
  bool log_dir_synced = false;
  
  // ... 恢复过程的详细实现
  // 包括WAL恢复、Manifest恢复、列族创建等
  
  // 11. 启动后台任务
  if (s.ok()) {
    s = impl->StartPeriodicTaskScheduler();
  }
  if (s.ok()) {
    s = impl->RegisterRecordSeqnoTimeWorker();
  }
  
  impl->options_mutex_.Unlock();
  
  // 12. 返回结果
  if (s.ok()) {
    *dbptr = std::move(impl);
  } else {
    // 清理失败情况下的资源
    for (auto* h : *handles) {
      delete h;
    }
    handles->clear();
  }
  
  return s;
}

2.1.5 调用链路时序图

sequenceDiagram
    participant App as 应用程序
    participant DB as DB::Open
    participant DBImpl as DBImpl::Open
    participant Env as Environment
    participant WAL as WAL Recovery
    participant Manifest as Manifest Recovery
    participant CF as Column Family
    
    App->>DB: Open(options, dbname, dbptr)
    DB->>DB: 创建列族描述符
    DB->>DBImpl: Open(db_options, column_families, ...)
    
    DBImpl->>DBImpl: ValidateOptions()
    DBImpl->>DBImpl: new DBImpl()
    DBImpl->>Env: CreateDirIfMissing()
    Env-->>DBImpl: OK
    
    DBImpl->>DBImpl: 获取数据库锁
    DBImpl->>Manifest: RecoverLogFiles()
    Manifest->>WAL: 恢复WAL文件
    WAL-->>Manifest: 恢复的数据
    Manifest-->>DBImpl: 恢复完成
    
    DBImpl->>CF: 创建列族句柄
    CF-->>DBImpl: 列族句柄
    
    DBImpl->>DBImpl: 启动后台任务
    DBImpl-->>DB: DBImpl实例
    DB-->>App: 数据库实例

2.2 写入接口 - Put/Delete/Merge

2.2.1 Put接口定义

位置: include/rocksdb/db.h

// 基础Put接口:向默认列族写入键值对
// @param options: 写入选项,控制同步、WAL等行为
// @param key: 键,用户定义的键值
// @param value: 值,与键关联的数据
// @return: 操作状态
virtual Status Put(const WriteOptions& options, const Slice& key,
                   const Slice& value) {
  return Put(options, DefaultColumnFamily(), key, value);
}

// 列族Put接口:向指定列族写入键值对
// @param options: 写入选项
// @param column_family: 目标列族句柄
// @param key: 键
// @param value: 值
// @return: 操作状态
virtual Status Put(const WriteOptions& options,
                   ColumnFamilyHandle* column_family, 
                   const Slice& key,
                   const Slice& value) = 0;

// 带时间戳的Put接口:支持用户定义时间戳
// @param options: 写入选项
// @param column_family: 列族句柄
// @param key: 键
// @param ts: 时间戳
// @param value: 值
// @return: 操作状态
virtual Status Put(const WriteOptions& options,
                   ColumnFamilyHandle* column_family, 
                   const Slice& key,
                   const Slice& ts, 
                   const Slice& value) = 0;

2.2.2 DBImpl::Put实现

位置: db/db_impl/db_impl.h:212

// DBImpl中Put接口的声明
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
           const Slice& key, const Slice& value) override;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
           const Slice& key, const Slice& ts, const Slice& value) override;

位置: db/db_impl/db_impl_write.cc:2786

// DB基类中Put的默认实现(通过WriteBatch)
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
               const Slice& key, const Slice& value) {
  // 获取默认列族的比较器信息
  ColumnFamilyHandle* default_cf = DefaultColumnFamily();
  assert(default_cf);
  const Comparator* const default_cf_ucmp = default_cf->GetComparator();
  assert(default_cf_ucmp);
  
  // 创建WriteBatch进行批量写入
  WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
                   opt.protection_bytes_per_key,
                   default_cf_ucmp->timestamp_size());
  
  // 将Put操作添加到批次中
  Status s = batch.Put(column_family, key, value);
  if (!s.ok()) {
    return s;
  }
  
  // 执行批量写入
  return Write(opt, &batch);
}

2.2.3 核心写入实现 - DBImpl::WriteImpl

位置: db/db_impl/db_impl_write.cc

// DBImpl的核心写入实现函数
// @param write_options: 写入选项
// @param my_batch: 要写入的批次
// @param callback: 写入回调函数
// @param log_used: 输出参数,返回使用的日志序号
// @param log_ref: 日志引用
// @param disable_memtable: 是否禁用MemTable写入
// @param seq_used: 输出参数,返回使用的序列号
// @param batch_cnt: 批次计数
// @param pre_release_callback: 预释放回调
// @param write_after_commit_callback: 提交后写入回调
// @return: 操作状态
Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, 
                         UserWriteCallback* callback,
                         uint64_t* log_used, 
                         uint64_t log_ref,
                         bool disable_memtable, 
                         uint64_t* seq_used,
                         size_t batch_cnt,
                         PreReleaseCallback* pre_release_callback,
                         WriteAfterCommitCallback* write_after_commit_callback) {
  
  // 1. 检查数据库状态
  if (write_options.sync && write_options.disableWAL) {
    return Status::InvalidArgument("Sync writes has to enable WAL.");
  }
  
  if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
    return Status::NotSupported(
        "pipelined_write is not compatible with concurrent_prepare");
  }

  // 2. 创建写入组
  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                        disable_memtable, batch_cnt, pre_release_callback,
                        write_after_commit_callback);

  // 3. 进入写入队列
  if (!write_options.disableWAL) {
    RecordTick(stats_, WRITE_WITH_WAL);
  }

  StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

  write_thread_.JoinBatchGroup(&w);
  
  // 4. 如果是组长,执行实际写入
  if (w.state == WriteThread::STATE_GROUP_LEADER) {
    // 获取写入组
    WriteContext write_context;
    WriteThread::WriteGroup write_group;
    bool in_parallel_group = false;
    uint64_t last_sequence = versions_->LastSequence();
    
    write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
    
    // 5. 执行写入操作
    Status status = WriteToWAL(write_group, &log_used, &write_context);
    
    if (status.ok() && !write_options.disableWAL) {
      status = WriteToMemTable(write_group, &write_context, seq_used);
    }
    
    // 6. 完成写入组
    write_thread_.ExitAsBatchGroupLeader(write_group, status);
  }
  
  // 7. 等待写入完成
  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    // 并行MemTable写入
    assert(w.ShouldWriteToMemTable());
    ColumnFamilyMemTablesImpl column_family_memtables(
        versions_->GetColumnFamilySet());
    w.status = WriteBatchInternal::InsertInto(
        &w, w.sequence, &column_family_memtables, &flush_scheduler_,
        &trim_history_scheduler_, write_options.ignore_missing_column_families,
        0 /*recovery_log_number*/, this, true /*concurrent_memtable_writes*/,
        seq_per_batch_, w.batch_cnt, batch_per_txn_,
        write_options.memtable_insert_hint_per_batch);
    
    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
      // 最后一个并行写入者负责清理
      MemTableInsertStatusCheck(w.status);
      versions_->SetLastSequence(w.write_group->last_sequence);
      write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
    }
  }
  
  // 8. 等待最终完成
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
    if (seq_used != nullptr) {
      *seq_used = w.sequence;
    }
  }
  
  assert(w.state == WriteThread::STATE_COMPLETED);
  return w.FinalStatus();
}

2.2.4 写入操作时序图

sequenceDiagram
    participant App as 应用程序
    participant DB as DBImpl
    participant WT as WriteThread
    participant WAL as WAL Writer
    participant MT as MemTable
    participant BG as Background
    
    App->>DB: Put(key, value)
    DB->>DB: 创建WriteBatch
    DB->>WT: JoinBatchGroup()
    
    alt 成为组长
        WT->>DB: 执行写入
        DB->>WAL: WriteToWAL()
        WAL-->>DB: WAL写入完成
        DB->>MT: WriteToMemTable()
        MT-->>DB: MemTable写入完成
        DB->>WT: ExitAsBatchGroupLeader()
    else 跟随者
        WT->>WT: 等待组长完成
    end
    
    WT-->>DB: 写入完成
    DB-->>App: Status::OK
    
    Note over MT,BG: MemTable满时触发Flush
    MT->>BG: TriggerFlush()
    BG->>BG: FlushMemTable()

2.3 读取接口 - Get

2.3.1 Get接口定义

位置: include/rocksdb/db.h

// 基础Get接口:从默认列族读取
// @param options: 读取选项,控制快照、缓存等行为
// @param key: 要查找的键
// @param value: 输出参数,返回找到的值
// @return: 操作状态,NotFound表示键不存在
virtual Status Get(const ReadOptions& options, const Slice& key,
                   std::string* value) {
  return Get(options, DefaultColumnFamily(), key, value);
}

// 列族Get接口:从指定列族读取
// @param options: 读取选项
// @param column_family: 目标列族句柄
// @param key: 键
// @param value: 输出值
// @return: 操作状态
virtual Status Get(const ReadOptions& options,
                   ColumnFamilyHandle* column_family, const Slice& key,
                   std::string* value) {
  PinnableSlice pinnable_val(value);
  assert(!pinnable_val.IsPinned());
  auto s = Get(options, column_family, key, &pinnable_val);
  if (s.ok() && pinnable_val.IsPinned()) {
    value->assign(pinnable_val.data(), pinnable_val.size());
  }
  return s;
}

// PinnableSlice版本:避免不必要的内存拷贝
// @param options: 读取选项
// @param column_family: 列族句柄
// @param key: 键
// @param pinnable_val: 可固定的值切片,避免拷贝
// @param timestamp: 输出参数,返回时间戳(如果启用)
// @return: 操作状态
virtual Status Get(const ReadOptions& options,
                   ColumnFamilyHandle* column_family, const Slice& key,
                   PinnableSlice* pinnable_val, std::string* timestamp) = 0;

2.3.2 DBImpl::Get实现

位置: db/db_impl/db_impl.h:264

// DBImpl中Get接口的实现
Status Get(const ReadOptions& _read_options,
           ColumnFamilyHandle* column_family, const Slice& key,
           PinnableSlice* value, std::string* timestamp) override;

位置: db/db_impl/db_impl.cc

// DBImpl::Get的核心实现
Status DBImpl::Get(const ReadOptions& read_options,
                   ColumnFamilyHandle* column_family, const Slice& key,
                   PinnableSlice* pinnable_val, std::string* timestamp) {
  // 1. 验证参数
  assert(pinnable_val != nullptr);
  
  // 获取列族数据
  auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
  
  if (tracer_) {
    // 记录跟踪信息
    InstrumentedMutexLock lock(&trace_mutex_);
    if (tracer_) {
      tracer_->Get(column_family, key);
    }
  }

  // 2. 调用GetImpl执行实际读取
  GetImplOptions get_impl_options;
  get_impl_options.column_family = column_family;
  get_impl_options.value = pinnable_val;
  get_impl_options.timestamp = timestamp;
  get_impl_options.get_value = true;
  
  return GetImpl(read_options, key, get_impl_options);
}

2.3.3 核心读取实现 - DBImpl::GetImpl

位置: db/db_impl/db_impl.cc

// DBImpl的核心读取实现
// @param read_options: 读取选项
// @param key: 查找的键
// @param get_impl_options: 获取实现选项
// @return: 操作状态
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
                       GetImplOptions& get_impl_options) {
  
  // 1. 准备读取上下文
  assert(get_impl_options.value != nullptr ||
         get_impl_options.merge_operands != nullptr);
  
  StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
  PERF_TIMER_GUARD(get_snapshot_time);

  auto cfh = static_cast<ColumnFamilyHandleImpl*>(
      get_impl_options.column_family);
  auto cfd = cfh->cfd();

  // 2. 获取快照
  SequenceNumber snapshot;
  if (read_options.snapshot != nullptr) {
    snapshot = read_options.snapshot->GetSequenceNumber();
  } else {
    snapshot = versions_->LastSequence();
  }

  // 3. 创建查找键
  LookupKey lkey(key, snapshot, read_options.timestamp);
  
  // 4. 准备合并上下文
  MergeContext merge_context;
  RangeDelAggregator range_del_agg(&cfd->internal_comparator(), snapshot);
  
  Status s;
  bool done = false;
  
  // 5. 从MemTable查找
  {
    PERF_TIMER_GUARD(get_from_memtable_time);
    
    // 5.1 从活跃MemTable查找
    SuperVersion* sv = GetAndRefSuperVersion(cfd);
    bool skip_memtable = (read_options.read_tier == kPersistedTier &&
                         has_unpersisted_data_.load(std::memory_order_relaxed));
    
    if (!skip_memtable) {
      // 查找活跃MemTable
      if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
                       &merge_context, &range_del_agg, &seq,
                       read_options, get_impl_options.callback,
                       get_impl_options.is_blob_index)) {
        done = true;
        get_impl_options.value->PinSelf();
        RecordTick(stats_, MEMTABLE_HIT);
      } else if ((s.ok() || s.IsMergeInProgress()) &&
                 !range_del_agg.IsEmpty()) {
        // 检查范围删除
        done = true;
        s = Status::NotFound();
      }
      
      // 5.2 从不可变MemTable查找
      if (!done && !s.IsNotFound() && sv->imm->Get(
              lkey, get_impl_options.value->GetSelf(), &s, &merge_context,
              &range_del_agg, &seq, read_options, get_impl_options.callback,
              get_impl_options.is_blob_index)) {
        done = true;
        get_impl_options.value->PinSelf();
        RecordTick(stats_, MEMTABLE_HIT);
      }
    }
    
    ReturnAndCleanupSuperVersion(cfd, sv);
  }
  
  // 6. 从SST文件查找
  if (!done) {
    PERF_TIMER_GUARD(get_from_output_files_time);
    sv = GetAndRefSuperVersion(cfd);
    
    s = sv->current->Get(read_options, lkey, get_impl_options.value,
                        get_impl_options.timestamp, &s, &merge_context,
                        &range_del_agg, get_impl_options.callback,
                        get_impl_options.is_blob_index,
                        get_impl_options.value_found);
    
    ReturnAndCleanupSuperVersion(cfd, sv);
    RecordTick(stats_, MEMTABLE_MISS);
  }
  
  // 7. 处理合并操作
  if (s.ok() && get_impl_options.get_value && !done) {
    if (!get_impl_options.value->empty()) {
      // 处理Merge操作的结果
      s = MergeHelper::TimedFullMerge(
          merge_operator, key, get_impl_options.value,
          merge_context.GetOperands(), get_impl_options.value->GetSelf(),
          immutable_db_options_.info_log.get(), statistics_.get(),
          immutable_db_options_.clock, nullptr /* result_operand */,
          true);
    }
  }

  // 8. 更新统计信息
  if (s.ok()) {
    get_impl_options.value->PinSelf();
    RecordTick(stats_, NUMBER_KEYS_READ);
    size_t size = get_impl_options.value->size();
    RecordTick(stats_, BYTES_READ, size);
    MeasureTime(stats_, BYTES_PER_READ, size);
  }
  RecordTick(stats_, NUMBER_DB_SEEK);

  return s;
}

2.3.4 读取操作时序图

sequenceDiagram
    participant App as 应用程序
    participant DB as DBImpl
    participant MT as MemTable
    participant IMT as Immutable MemTable
    participant SST as SST Files
    participant Cache as Block Cache
    
    App->>DB: Get(key)
    DB->>DB: 创建LookupKey
    DB->>DB: 获取快照序列号
    
    DB->>MT: 查找活跃MemTable
    alt 在MemTable中找到
        MT-->>DB: value + OK
        DB-->>App: value
    else MemTable中未找到
        DB->>IMT: 查找不可变MemTable
        alt 在Immutable MemTable中找到
            IMT-->>DB: value + OK
            DB-->>App: value
        else Immutable MemTable中未找到
            DB->>SST: 查找SST文件
            SST->>Cache: 查找块缓存
            alt 缓存命中
                Cache-->>SST: block_data
            else 缓存未命中
                SST->>SST: 从磁盘读取
                SST->>Cache: 缓存块数据
            end
            SST-->>DB: value/NotFound
            DB-->>App: value/NotFound
        end
    end

2.4 批量操作接口 - WriteBatch

2.4.1 WriteBatch接口定义

位置: include/rocksdb/write_batch.h

// WriteBatch类:用于批量写入操作
// 支持原子性地执行多个写入操作
class WriteBatch : public WriteBatchBase {
 public:
  // 构造函数
  // @param reserved_bytes: 预留字节数,用于优化内存分配
  // @param max_bytes: 最大字节数限制
  // @param protection_bytes_per_key: 每个键的保护字节数
  // @param default_cf_ts_sz: 默认列族时间戳大小
  explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0,
                      size_t protection_bytes_per_key = 0,
                      size_t default_cf_ts_sz = 0);

  // 添加Put操作到批次
  // @param column_family: 目标列族
  // @param key: 键
  // @param value: 值
  // @return: 操作状态
  Status Put(ColumnFamilyHandle* column_family, const Slice& key,
             const Slice& value) override;

  // 添加Delete操作到批次
  // @param column_family: 目标列族
  // @param key: 要删除的键
  // @return: 操作状态
  Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;

  // 添加Merge操作到批次
  // @param column_family: 目标列族
  // @param key: 键
  // @param value: 要合并的值
  // @return: 操作状态
  Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
               const Slice& value) override;

  // 清空批次内容
  void Clear() override;

  // 获取批次中的操作数量
  int Count() const override;

  // 获取批次的数据大小
  size_t GetDataSize() const override;

  // 检查批次是否有Put操作
  bool HasPut() const override;

  // 检查批次是否有Delete操作
  bool HasDelete() const override;

  // 检查批次是否有Merge操作
  bool HasMerge() const override;
};

2.4.2 WriteBatch实现示例

位置: db/write_batch.cc

// WriteBatch::Put的实现
Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
                       const Slice& value) {
  // 1. 获取列族ID
  uint32_t cf_id = GetColumnFamilyID(column_family);
  
  // 2. 检查键值大小限制
  if (key.size() > port::kMaxUint32) {
    return Status::InvalidArgument("key is too large");
  }
  if (value.size() > port::kMaxUint32) {
    return Status::InvalidArgument("value is too large");
  }

  // 3. 计算所需空间
  size_t key_size = key.size();
  size_t value_size = value.size();
  size_t cf_id_size = WriteBatchInternal::GetColumnFamilyIdSize(cf_id);
  size_t internal_key_size = key_size + 8;  // key + sequence + type
  size_t total_size = WriteBatchInternal::kHeader + cf_id_size + 
                     WriteBatchInternal::GetVarint32Size(internal_key_size) +
                     internal_key_size + 
                     WriteBatchInternal::GetVarint32Size(value_size) + 
                     value_size;

  // 4. 检查批次大小限制
  if (max_bytes_ > 0 && GetDataSize() + total_size > max_bytes_) {
    return Status::MemoryLimit("Write batch size limit exceeded");
  }

  // 5. 写入操作记录
  WriteBatchInternal::SetCount(this, Count() + 1);
  
  // 写入操作类型
  rep_.push_back(static_cast<char>(kTypeValue));
  
  // 写入列族ID
  if (cf_id != 0) {
    PutVarint32(&rep_, cf_id);
  }
  
  // 写入键长度和键数据
  PutVarint32(&rep_, static_cast<uint32_t>(key_size));
  rep_.append(key.data(), key_size);
  
  // 写入值长度和值数据
  PutVarint32(&rep_, static_cast<uint32_t>(value_size));
  rep_.append(value.data(), value_size);

  // 6. 更新保护信息(如果启用)
  if (prot_info_ != nullptr) {
    prot_info_->entries_.emplace_back(ProtectionInfo64()
        .ProtectKVO(key, value, kTypeValue)
        .ProtectC(cf_id));
  }

  return Status::OK();
}

2.5 迭代器接口 - Iterator

2.5.1 Iterator接口定义

位置: include/rocksdb/iterator.h

// Iterator抽象基类:用于遍历数据库内容
class Iterator : public Cleanable {
 public:
  Iterator() {}
  virtual ~Iterator() {}

  // 检查迭代器是否有效
  // @return: true表示迭代器指向有效的键值对
  virtual bool Valid() const = 0;

  // 移动到第一个键
  virtual void SeekToFirst() = 0;

  // 移动到最后一个键
  virtual void SeekToLast() = 0;

  // 定位到大于等于target的第一个键
  // @param target: 目标键
  virtual void Seek(const Slice& target) = 0;

  // 定位到小于target的最后一个键
  // @param target: 目标键
  virtual void SeekForPrev(const Slice& target) = 0;

  // 移动到下一个键
  virtual void Next() = 0;

  // 移动到前一个键
  virtual void Prev() = 0;

  // 获取当前键
  // @return: 当前键的切片,仅在Valid()为true时有效
  virtual Slice key() const = 0;

  // 获取当前值
  // @return: 当前值的切片,仅在Valid()为true时有效
  virtual Slice value() const = 0;

  // 获取迭代器状态
  // @return: 操作状态,OK表示正常
  virtual Status status() const = 0;

  // 刷新迭代器状态(用于快照一致性)
  virtual Status Refresh() { return Status::NotSupported("Refresh"); }

  // 获取属性值
  // @param prop_name: 属性名称
  // @param prop: 输出参数,返回属性值
  // @return: 是否成功获取属性
  virtual bool GetProperty(std::string prop_name, std::string* prop) {
    return false;
  }
};

2.5.2 创建迭代器

位置: include/rocksdb/db.h

// 创建迭代器的接口
class DB {
 public:
  // 为默认列族创建迭代器
  // @param options: 读取选项
  // @return: 迭代器指针,使用完毕后需要删除
  virtual Iterator* NewIterator(const ReadOptions& options) {
    return NewIterator(options, DefaultColumnFamily());
  }

  // 为指定列族创建迭代器
  // @param options: 读取选项
  // @param column_family: 目标列族
  // @return: 迭代器指针
  virtual Iterator* NewIterator(const ReadOptions& options,
                                ColumnFamilyHandle* column_family) = 0;

  // 创建多个列族的迭代器
  // @param options: 读取选项
  // @param column_families: 列族句柄向量
  // @param iterators: 输出参数,返回迭代器向量
  // @return: 操作状态
  virtual Status NewIterators(
      const ReadOptions& options,
      const std::vector<ColumnFamilyHandle*>& column_families,
      std::vector<Iterator*>* iterators) = 0;
};

3. API使用示例

3.1 基础使用示例

#include <iostream>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/options.h"

using namespace ROCKSDB_NAMESPACE;

int main() {
  // 1. 配置数据库选项
  Options options;
  options.create_if_missing = true;           // 数据库不存在时创建
  options.write_buffer_size = 64 * 1024 * 1024; // 64MB写缓冲区
  options.max_write_buffer_number = 3;        // 最多3个写缓冲区
  options.target_file_size_base = 64 * 1024 * 1024; // 64MB目标文件大小
  
  // 2. 打开数据库
  DB* db;
  Status s = DB::Open(options, "/tmp/testdb", &db);
  if (!s.ok()) {
    std::cerr << "无法打开数据库: " << s.ToString() << std::endl;
    return -1;
  }

  // 3. 写入数据
  WriteOptions write_opts;
  write_opts.sync = true;  // 同步写入,确保持久性
  
  s = db->Put(write_opts, "key1", "value1");
  if (!s.ok()) {
    std::cerr << "写入失败: " << s.ToString() << std::endl;
  }

  // 4. 读取数据
  ReadOptions read_opts;
  std::string value;
  s = db->Get(read_opts, "key1", &value);
  if (s.ok()) {
    std::cout << "key1 = " << value << std::endl;
  } else if (s.IsNotFound()) {
    std::cout << "key1 未找到" << std::endl;
  } else {
    std::cerr << "读取失败: " << s.ToString() << std::endl;
  }

  // 5. 批量操作
  WriteBatch batch;
  batch.Put("key2", "value2");
  batch.Put("key3", "value3");
  batch.Delete("key1");
  
  s = db->Write(write_opts, &batch);
  if (!s.ok()) {
    std::cerr << "批量写入失败: " << s.ToString() << std::endl;
  }

  // 6. 迭代器遍历
  Iterator* it = db->NewIterator(read_opts);
  for (it->SeekToFirst(); it->Valid(); it->Next()) {
    std::cout << it->key().ToString() << " = " 
              << it->value().ToString() << std::endl;
  }
  
  if (!it->status().ok()) {
    std::cerr << "迭代器错误: " << it->status().ToString() << std::endl;
  }
  
  delete it;

  // 7. 关闭数据库
  delete db;
  return 0;
}

3.2 列族使用示例

#include "rocksdb/db.h"
#include "rocksdb/options.h"

using namespace ROCKSDB_NAMESPACE;

int main() {
  // 1. 配置数据库选项
  DBOptions db_options;
  db_options.create_if_missing = true;
  db_options.create_missing_column_families = true;

  // 2. 配置列族
  std::vector<ColumnFamilyDescriptor> column_families;
  
  // 默认列族
  ColumnFamilyOptions default_cf_options;
  column_families.emplace_back(kDefaultColumnFamilyName, default_cf_options);
  
  // 用户数据列族
  ColumnFamilyOptions user_cf_options;
  user_cf_options.write_buffer_size = 32 * 1024 * 1024; // 32MB
  column_families.emplace_back("users", user_cf_options);
  
  // 日志数据列族
  ColumnFamilyOptions log_cf_options;
  log_cf_options.write_buffer_size = 16 * 1024 * 1024; // 16MB
  log_cf_options.compression = kLZ4Compression;
  column_families.emplace_back("logs", log_cf_options);

  // 3. 打开数据库
  std::vector<ColumnFamilyHandle*> handles;
  DB* db;
  Status s = DB::Open(db_options, "/tmp/multi_cf_db", 
                      column_families, &handles, &db);
  if (!s.ok()) {
    std::cerr << "打开数据库失败: " << s.ToString() << std::endl;
    return -1;
  }

  // 4. 获取列族句柄
  ColumnFamilyHandle* default_cf = handles[0];
  ColumnFamilyHandle* users_cf = handles[1];
  ColumnFamilyHandle* logs_cf = handles[2];

  // 5. 向不同列族写入数据
  WriteOptions write_opts;
  
  // 用户数据
  s = db->Put(write_opts, users_cf, "user:1001", 
              R"({"name":"Alice","age":25})");
  s = db->Put(write_opts, users_cf, "user:1002", 
              R"({"name":"Bob","age":30})");
  
  // 日志数据
  s = db->Put(write_opts, logs_cf, "log:20231201:001", 
              "User Alice logged in");
  s = db->Put(write_opts, logs_cf, "log:20231201:002", 
              "User Bob updated profile");

  // 6. 从不同列族读取数据
  ReadOptions read_opts;
  std::string value;
  
  // 读取用户数据
  s = db->Get(read_opts, users_cf, "user:1001", &value);
  if (s.ok()) {
    std::cout << "用户1001: " << value << std::endl;
  }
  
  // 读取日志数据
  s = db->Get(read_opts, logs_cf, "log:20231201:001", &value);
  if (s.ok()) {
    std::cout << "日志: " << value << std::endl;
  }

  // 7. 清理资源
  for (auto handle : handles) {
    delete handle;
  }
  delete db;
  
  return 0;
}

3.3 事务使用示例

#include "rocksdb/db.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/transaction.h"

using namespace ROCKSDB_NAMESPACE;

int main() {
  // 1. 配置事务数据库选项
  Options options;
  options.create_if_missing = true;
  
  TransactionDBOptions txn_db_options;
  
  // 2. 打开事务数据库
  TransactionDB* txn_db;
  Status s = TransactionDB::Open(options, txn_db_options, 
                                 "/tmp/txn_db", &txn_db);
  if (!s.ok()) {
    std::cerr << "打开事务数据库失败: " << s.ToString() << std::endl;
    return -1;
  }

  // 3. 创建事务
  WriteOptions write_opts;
  ReadOptions read_opts;
  TransactionOptions txn_opts;
  
  Transaction* txn = txn_db->BeginTransaction(write_opts, txn_opts);

  // 4. 在事务中执行操作
  std::string value;
  
  // 读取账户余额
  s = txn->Get(read_opts, "account:alice", &value);
  int alice_balance = s.ok() ? std::stoi(value) : 0;
  
  s = txn->Get(read_opts, "account:bob", &value);
  int bob_balance = s.ok() ? std::stoi(value) : 0;
  
  // 转账操作:Alice向Bob转账100
  if (alice_balance >= 100) {
    alice_balance -= 100;
    bob_balance += 100;
    
    s = txn->Put("account:alice", std::to_string(alice_balance));
    if (s.ok()) {
      s = txn->Put("account:bob", std::to_string(bob_balance));
    }
    
    if (s.ok()) {
      // 5. 提交事务
      s = txn->Commit();
      if (s.ok()) {
        std::cout << "转账成功" << std::endl;
      } else {
        std::cerr << "提交失败: " << s.ToString() << std::endl;
      }
    } else {
      std::cerr << "写入失败: " << s.ToString() << std::endl;
      txn->Rollback();
    }
  } else {
    std::cout << "余额不足" << std::endl;
    txn->Rollback();
  }

  // 6. 清理资源
  delete txn;
  delete txn_db;
  
  return 0;
}

4. API调用链路总结

4.1 写入路径调用链

应用程序
  ↓ Put(key, value)
DB::Put()
  ↓ 创建WriteBatch
DBImpl::Write()
  ↓ 加入写入队列
WriteThread::JoinBatchGroup()
  ↓ 组长执行写入
DBImpl::WriteImpl()
  ├─ WriteToWAL()          // WAL写入
  │   └─ WALWriter::AddRecord()
  └─ WriteToMemTable()     // MemTable写入
      └─ MemTable::Add()

4.2 读取路径调用链

应用程序
  ↓ Get(key)
DBImpl::Get()
  ↓ 创建LookupKey
DBImpl::GetImpl()
  ├─ MemTable::Get()       // 活跃MemTable查找
  ├─ MemTableList::Get()   // 不可变MemTable查找
  └─ Version::Get()        // SST文件查找
      └─ TableCache::Get()
          └─ BlockBasedTable::Get()
              └─ BlockCache::Lookup()

4.3 数据库打开调用链

应用程序
  ↓ DB::Open()
DB::Open()
  ↓ 创建列族描述符
DBImpl::Open()
  ├─ ValidateOptions()     // 验证选项
  ├─ new DBImpl()          // 创建实例
  ├─ RecoverLogFiles()     // 恢复WAL
  ├─ RecoverManifest()     // 恢复Manifest
  └─ StartBackgroundWork() // 启动后台任务

这些API接口构成了RocksDB的核心功能,通过合理使用这些接口,可以构建高性能的存储应用。每个接口都经过精心设计,既保证了功能的完整性,又提供了良好的性能特征。