MongoDB-12-复制模块-API

1. API总览

复制模块是MongoDB副本集功能的核心实现,提供了数据复制、一致性保证、故障转移等关键功能。主要API接口分为以下几个层次:

  • 复制协调器层: ReplicationCoordinator,管理副本集的整体状态和协调
  • 存储接口层: StorageInterface,提供复制相关的存储操作
  • 初始同步层: InitialSyncer、DatabaseCloner、CollectionCloner,处理新节点的初始数据同步
  • Oplog处理层: OplogApplier、OplogFetcher,处理操作日志的获取和应用
  • 选举投票层: TopologyCoordinator,处理副本集选举逻辑

2. 核心API接口

2.1 ReplicationCoordinator - 复制协调器

基本信息

  • 名称: ReplicationCoordinator
  • 协议/方法: C++ 抽象基类,复制系统的主要接口
  • 幂等性: 根据具体操作而定

请求结构体

// 复制协调器核心接口
class ReplicationCoordinator : public SyncSourceSelector {
public:
    // 启动复制系统
    virtual void startup(OperationContext* opCtx,
                        StorageEngine* storageEngine) = 0;

    // 关闭复制系统
    virtual void shutdown(OperationContext* opCtx,
                         BSONObjBuilder* shutdownTimeElapsedBuilder) = 0;

    // 获取复制模式
    virtual ReplicationMode getReplicationMode() const = 0;

    // 设置副本集配置
    virtual Status processReplSetReconfig(OperationContext* opCtx,
                                        const ReplSetReconfigArgs& args,
                                        BSONObjBuilder* resultObj) = 0;

    // 等待写关注
    virtual StatusAndDuration awaitReplication(
        OperationContext* opCtx,
        const OpTime& opTime,
        const WriteConcernOptions& writeConcern) = 0;

    // 步进为主节点
    virtual StatusWith<OpTime> stepUp(OperationContext* opCtx,
                                     const BSONObj& obj,
                                     bool force,
                                     bool skipDryRun) = 0;

    // 步退为从节点
    virtual Status stepDown(OperationContext* opCtx,
                          bool force,
                          const Milliseconds& waitTime,
                          const Milliseconds& stepdownTime) = 0;
};
字段 类型 必填 默认 约束 说明
opCtx OperationContext* - 非空 操作上下文
storageEngine StorageEngine* - 非空 存储引擎指针
args ReplSetReconfigArgs& - - 副本集重配置参数
writeConcern WriteConcernOptions& - - 写关注选项
opTime OpTime& - - 操作时间戳
force bool false - 是否强制执行

响应结构体

// 状态和持续时间结构
struct StatusAndDuration {
    Status status;          // 操作状态
    Milliseconds duration;  // 操作持续时间
    
    StatusAndDuration(const Status& stat, Milliseconds ms) 
        : status(stat), duration(ms) {}
};

// 复制模式枚举
enum class ReplicationMode {
    kNone,        // 无复制
    kPrimary,     // 主节点模式
    kSecondary,   // 从节点模式
    kInitialSync  // 初始同步模式
};
字段 类型 必填 默认 约束 说明
status Status - - 操作执行状态
duration Milliseconds - >= 0 操作耗时

入口函数与关键代码

// 复制协调器实现
class ReplicationCoordinatorImpl : public ReplicationCoordinator {
public:
    // 等待写关注实现
    StatusAndDuration awaitReplication(
        OperationContext* opCtx,
        const OpTime& opTime,
        const WriteConcernOptions& writeConcern) override {
        
        auto start = Date_t::now();
        
        // 1) 验证写关注参数
        Status wcStatus = _validateWriteConcern(writeConcern);
        if (!wcStatus.isOK()) {
            return StatusAndDuration(wcStatus, Milliseconds(0));
        }
        
        // 2) 获取当前复制状态
        stdx::unique_lock<Latch> lock(_mutex);
        if (_memberState.secondary() && writeConcern.wMode == "majority") {
            // 从节点不能满足majority写关注
            return StatusAndDuration(
                Status(ErrorCodes::NotWritablePrimary, "Not primary"),
                Milliseconds(Date_t::now() - start)
            );
        }
        
        // 3) 检查opTime是否已经被足够多的节点确认
        if (_isOpTimeReplicatedEnough(opTime, writeConcern)) {
            return StatusAndDuration(Status::OK(), 
                                   Milliseconds(Date_t::now() - start));
        }
        
        // 4) 等待复制完成
        OpTime lastCommittedOpTime = _getLastCommittedOpTime();
        if (opTime <= lastCommittedOpTime) {
            return StatusAndDuration(Status::OK(),
                                   Milliseconds(Date_t::now() - start));
        }
        
        // 5) 注册等待器
        auto waiter = std::make_shared<WaiterInfo>();
        waiter->opTime = opTime;
        waiter->writeConcern = writeConcern;
        waiter->promise = std::make_unique<Promise<void>>();
        
        _replicationWaiterList.add(waiter);
        lock.unlock();
        
        // 6) 等待信号或超时
        auto future = waiter->promise->getFuture();
        Status waitStatus = Status::OK();
        
        if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout) {
            waitStatus = future.waitFor(opCtx, Milliseconds(writeConcern.wTimeout));
        } else {
            waitStatus = future.getNoThrow(opCtx);
        }
        
        auto duration = Milliseconds(Date_t::now() - start);
        return StatusAndDuration(waitStatus, duration);
    }
    
    // 步进为主节点
    StatusWith<OpTime> stepUp(OperationContext* opCtx,
                             const BSONObj& obj,
                             bool force,
                             bool skipDryRun) override {
        
        // 1) 检查当前状态
        stdx::unique_lock<Latch> lk(_mutex);
        if (_memberState.primary()) {
            return Status(ErrorCodes::AlreadyInitialized, "Already primary");
        }
        
        if (!_memberState.secondary()) {
            return Status(ErrorCodes::NotSecondary, "Not in secondary state");
        }
        
        // 2) 检查是否满足选举条件
        if (!force && !_topCoord->checkIfElectable()) {
            return Status(ErrorCodes::NotElectable, "Node is not electable");
        }
        
        // 3) 开始选举过程
        _topCoord->processWinElection(_getLastCommittedOpTime());
        
        // 4) 设置为主节点状态
        _memberState = MemberState::RS_PRIMARY;
        
        // 5) 通知状态变更
        _externalState->signalOplogWaiters();
        
        // 6) 返回当前opTime
        OpTime electionOpTime = _getMyLastAppliedOpTime();
        lk.unlock();
        
        // 7) 记录选举日志
        LOGV2(21434, "Election succeeded, became primary");
        
        return electionOpTime;
    }

private:
    // 验证写关注
    Status _validateWriteConcern(const WriteConcernOptions& writeConcern) {
        if (writeConcern.wNumNodes < 0) {
            return Status(ErrorCodes::BadValue, "Invalid w value");
        }
        
        if (writeConcern.wTimeout < 0) {
            return Status(ErrorCodes::BadValue, "Invalid wtimeout value");
        }
        
        return Status::OK();
    }
    
    // 检查opTime是否被足够多节点确认
    bool _isOpTimeReplicatedEnough(const OpTime& opTime,
                                  const WriteConcernOptions& writeConcern) {
        
        if (writeConcern.wMode == "majority") {
            return opTime <= _getLastCommittedOpTime();
        }
        
        int confirmedNodes = 0;
        for (const auto& memberData : _memberData) {
            if (memberData.getLastAppliedOpTime() >= opTime) {
                confirmedNodes++;
            }
        }
        
        return confirmedNodes >= writeConcern.wNumNodes;
    }
};

时序图(写关注等待)

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Primary as 主节点
    participant Secondary1 as 从节点1
    participant Secondary2 as 从节点2
    participant RC as ReplicationCoordinator
    
    Client->>Primary: insert(doc, {w:2, wtimeout:5000})
    Primary->>Primary: 写入本地oplog
    Primary->>RC: awaitReplication(opTime, w:2)
    
    Note over Primary: 复制到从节点
    Primary->>Secondary1: replicate oplog entry
    Primary->>Secondary2: replicate oplog entry
    
    Secondary1->>Secondary1: 应用oplog条目
    Secondary1->>Primary: confirm replication
    
    Secondary2->>Secondary2: 应用oplog条目  
    Secondary2->>Primary: confirm replication
    
    Note over RC: 检查写关注满足条件
    RC->>RC: checkReplicationStatus(w:2)
    RC-->>Primary: replication confirmed
    
    Primary-->>Client: {ok:1, writeConcern: {w:2, wtimeout:5000}}

异常/回退与性能要点

异常处理:

  • NotWritablePrimary:非主节点时的写关注错误
  • WriteConcernFailed:写关注超时或失败
  • NodeNotFound:副本集成员不存在
  • InterruptedAtShutdown:关闭过程中的中断

性能优化:

  • 并行复制:多线程同时向多个从节点复制
  • 批量应用:批量应用oplog条目减少锁竞争
  • 写关注缓存:缓存写关注等待器减少内存分配

2.2 StorageInterface - 存储接口

基本信息

  • 名称: StorageInterface
  • 协议/方法: 复制系统与存储层的抽象接口
  • 幂等性: 根据操作类型而定

请求结构体

// 存储接口核心方法
class StorageInterface {
public:
    // 插入文档
    virtual Status insertDocument(OperationContext* opCtx,
                                const NamespaceString& nss,
                                const TimestampedBSONObj& doc,
                                long long term) = 0;

    // 批量插入文档
    virtual Status insertDocuments(OperationContext* opCtx,
                                 const NamespaceString& nss,
                                 const std::vector<InsertStatement>& docs) = 0;

    // 创建oplog
    virtual Status createOplog(OperationContext* opCtx,
                              const NamespaceString& nss) = 0;

    // 查找单个文档
    virtual StatusWith<BSONObj> findSingleton(OperationContext* opCtx,
                                             const NamespaceString& nss) = 0;

    // 删除集合中的所有文档
    virtual Status deleteByFilter(OperationContext* opCtx,
                                const NamespaceString& nss,
                                const BSONObj& filter) = 0;

    // 获取集合大小
    virtual StatusWith<size_t> getCollectionSize(OperationContext* opCtx,
                                                const NamespaceString& nss) = 0;
};

// 时间戳化的BSON对象
struct TimestampedBSONObj {
    BSONObj obj;        // BSON文档
    Timestamp timestamp; // 时间戳
    
    TimestampedBSONObj(BSONObj o, Timestamp ts) 
        : obj(std::move(o)), timestamp(ts) {}
};

入口函数与关键代码

// 存储接口实现
class StorageInterfaceImpl : public StorageInterface {
public:
    // 插入文档实现
    Status insertDocument(OperationContext* opCtx,
                         const NamespaceString& nss,
                         const TimestampedBSONObj& doc,
                         long long term) override {
        
        // 1) 获取集合
        AutoGetCollection autoColl(opCtx, nss, MODE_IX);
        Collection* collection = autoColl.getCollection();
        
        if (!collection) {
            return Status(ErrorCodes::NamespaceNotFound,
                         str::stream() << "Collection " << nss << " does not exist");
        }
        
        // 2) 设置时间戳
        if (!doc.timestamp.isNull()) {
            opCtx->recoveryUnit()->setTimestamp(doc.timestamp);
        }
        
        // 3) 执行插入
        WriteUnitOfWork wuow(opCtx);
        
        InsertStatement insertStmt;
        insertStmt.doc = doc.obj;
        
        Status insertStatus = collection->insertDocument(
            opCtx, insertStmt, nullptr /* opDebug */);
        
        if (!insertStatus.isOK()) {
            return insertStatus;
        }
        
        // 4) 提交事务
        wuow.commit();
        return Status::OK();
    }
    
    // 创建oplog实现
    Status createOplog(OperationContext* opCtx,
                      const NamespaceString& nss) override {
        
        // 1) 检查oplog是否已存在
        AutoGetDb autoDb(opCtx, nss.db(), MODE_X);
        Database* db = autoDb.getDb();
        
        if (db && db->getCollection(opCtx, nss)) {
            return Status(ErrorCodes::NamespaceExists,
                         str::stream() << "Oplog " << nss << " already exists");
        }
        
        // 2) 创建数据库(如果不存在)
        if (!db) {
            db = DatabaseHolder::get(opCtx)->openDb(opCtx, nss.db());
        }
        
        // 3) 设置oplog集合选项
        CollectionOptions options;
        options.capped = true;
        options.cappedSize = 192 * 1024 * 1024;  // 192MB默认大小
        options.cappedMaxDocs = -1;  // 无文档数量限制
        
        // 4) 创建oplog集合
        WriteUnitOfWork wuow(opCtx);
        
        Status createStatus = db->userCreateNS(opCtx, nss, options);
        if (!createStatus.isOK()) {
            return createStatus;
        }
        
        wuow.commit();
        
        LOGV2(21435, "Created oplog collection", "namespace"_attr = nss);
        return Status::OK();
    }
    
    // 批量插入文档
    Status insertDocuments(OperationContext* opCtx,
                          const NamespaceString& nss,
                          const std::vector<InsertStatement>& docs) override {
        
        if (docs.empty()) {
            return Status::OK();
        }
        
        // 1) 获取集合
        AutoGetCollection autoColl(opCtx, nss, MODE_IX);
        Collection* collection = autoColl.getCollection();
        
        if (!collection) {
            return Status(ErrorCodes::NamespaceNotFound,
                         str::stream() << "Collection " << nss << " not found");
        }
        
        // 2) 批量插入
        WriteUnitOfWork wuow(opCtx);
        
        for (const auto& insertStmt : docs) {
            Status insertStatus = collection->insertDocument(
                opCtx, insertStmt, nullptr);
            
            if (!insertStatus.isOK()) {
                return insertStatus;
            }
        }
        
        wuow.commit();
        return Status::OK();
    }
};

2.3 InitialSyncer - 初始同步器

基本信息

  • 名称: InitialSyncer
  • 协议/方法: 新节点加入副本集时的数据同步接口
  • 幂等性: 支持中断后重新开始

请求结构体

// 初始同步器接口
class InitialSyncer {
public:
    // 开始初始同步
    virtual StatusWith<OpTimeAndWallTime> startup(
        OperationContext* opCtx,
        std::uint32_t initialSyncMaxAttempts) = 0;

    // 关闭初始同步
    virtual void shutdown() = 0;

    // 获取同步状态
    virtual InitialSyncState getState() const = 0;

    // 获取统计信息
    virtual BSONObj getInitialSyncProgress() const = 0;
};

// 初始同步状态枚举
enum class InitialSyncState {
    kInactive,          // 未激活
    kRunning,           // 运行中
    kComplete,          // 完成
    kFailed             // 失败
};

// 初始同步参数
struct InitialSyncOptions {
    HostAndPort syncSource;     // 同步源
    OpTime beginTimestamp;      // 开始时间戳
    bool allowRollback;         // 是否允许回滚
    std::uint32_t maxRetries;   // 最大重试次数
};

入口函数与关键代码

// 初始同步器实现
class InitialSyncerImpl : public InitialSyncer {
public:
    // 启动初始同步
    StatusWith<OpTimeAndWallTime> startup(
        OperationContext* opCtx,
        std::uint32_t initialSyncMaxAttempts) override {
        
        stdx::lock_guard<Latch> lock(_mutex);
        
        // 1) 检查当前状态
        if (_state != InitialSyncState::kInactive) {
            return Status(ErrorCodes::IllegalOperation,
                         "Initial sync already in progress");
        }
        
        _state = InitialSyncState::kRunning;
        _attempts = 0;
        _maxAttempts = initialSyncMaxAttempts;
        
        // 2) 开始异步同步过程
        auto future = _startSync(opCtx);
        
        // 3) 等待同步完成
        return future.getNoThrow(opCtx);
    }
    
    // 获取初始同步进度
    BSONObj getInitialSyncProgress() const override {
        stdx::lock_guard<Latch> lock(_mutex);
        
        BSONObjBuilder bob;
        bob.append("state", _getStateString(_state));
        bob.append("attempts", static_cast<int>(_attempts));
        bob.append("maxAttempts", static_cast<int>(_maxAttempts));
        
        if (_currentPhase) {
            bob.append("currentPhase", *_currentPhase);
        }
        
        if (_totalBytesToClone > 0) {
            bob.append("totalBytesToClone", static_cast<long long>(_totalBytesToClone));
            bob.append("totalBytesCloned", static_cast<long long>(_totalBytesCloned));
            
            double progress = static_cast<double>(_totalBytesCloned) / _totalBytesToClone;
            bob.append("progress", progress);
        }
        
        return bob.obj();
    }

private:
    // 开始同步过程
    Future<OpTimeAndWallTime> _startSync(OperationContext* opCtx) {
        return _executor->submit([this, opCtx] {
            return _runInitialSync(opCtx);
        });
    }
    
    // 执行初始同步主逻辑
    StatusWith<OpTimeAndWallTime> _runInitialSync(OperationContext* opCtx) {
        
        while (_attempts < _maxAttempts) {
            _attempts++;
            
            try {
                // 1) 选择同步源
                HostAndPort syncSource = _chooseSyncSource(opCtx);
                if (syncSource.empty()) {
                    return Status(ErrorCodes::InitialSyncFailure,
                                "No suitable sync source found");
                }
                
                // 2) 连接到同步源
                auto clientStatus = _connectToSyncSource(syncSource);
                if (!clientStatus.isOK()) {
                    continue;  // 重试
                }
                
                // 3) 获取开始时间点
                _currentPhase = "getting initial sync timestamp";
                auto beginTimestamp = _getBeginTimestamp(clientStatus.getValue());
                
                // 4) 克隆所有数据库
                _currentPhase = "cloning databases";
                Status cloneStatus = _cloneDatabases(opCtx, clientStatus.getValue());
                if (!cloneStatus.isOK()) {
                    continue;  // 重试
                }
                
                // 5) 应用oplog
                _currentPhase = "applying oplog";
                auto finalOpTime = _applyOplog(opCtx, clientStatus.getValue(), beginTimestamp);
                if (!finalOpTime.isOK()) {
                    continue;  // 重试
                }
                
                // 6) 初始同步完成
                _state = InitialSyncState::kComplete;
                LOGV2(21436, "Initial sync completed successfully",
                     "finalOpTime"_attr = finalOpTime.getValue());
                
                return finalOpTime.getValue();
                
            } catch (const DBException& ex) {
                LOGV2_WARNING(21437, "Initial sync attempt failed",
                             "attempt"_attr = _attempts,
                             "error"_attr = ex.toString());
                
                if (_attempts >= _maxAttempts) {
                    _state = InitialSyncState::kFailed;
                    return ex.toStatus();
                }
                
                // 等待重试间隔
                sleepFor(Seconds(1));
            }
        }
        
        _state = InitialSyncState::kFailed;
        return Status(ErrorCodes::InitialSyncFailure,
                     "Exceeded maximum initial sync attempts");
    }
    
    // 克隆数据库
    Status _cloneDatabases(OperationContext* opCtx, DBClientConnection* client) {
        
        // 1) 获取数据库列表
        auto dbListResult = client->runCommand(
            OpMsgRequest::fromDBAndBody("admin", BSON("listDatabases" << 1)));
        
        if (!dbListResult.isOK()) {
            return dbListResult.getStatus();
        }
        
        BSONObj dbListObj = dbListResult.getValue();
        BSONElement databases = dbListObj["databases"];
        
        // 2) 克隆每个数据库
        for (auto&& dbElement : databases.Obj()) {
            BSONObj dbInfo = dbElement.Obj();
            std::string dbName = dbInfo["name"].String();
            
            // 跳过系统数据库
            if (dbName == "admin" || dbName == "local" || dbName == "config") {
                continue;
            }
            
            // 3) 克隆数据库
            Status cloneDbStatus = _cloneDatabase(opCtx, client, dbName);
            if (!cloneDbStatus.isOK()) {
                return cloneDbStatus;
            }
        }
        
        return Status::OK();
    }
};

3. Oplog处理API

3.1 OplogApplier - Oplog应用器

基本信息

  • 名称: OplogApplier
  • 协议/方法: 处理oplog条目应用的接口
  • 幂等性: 支持幂等应用(重复应用相同oplog条目)

请求结构体

// Oplog应用器接口
class OplogApplier {
public:
    // 应用oplog条目批次
    virtual StatusWith<OpTime> applyOplogBatch(
        OperationContext* opCtx,
        const std::vector<OplogEntry>& entries) = 0;

    // 获取应用统计信息
    virtual OplogApplierStats getStats() const = 0;

    // 设置应用模式
    virtual void setApplyMode(ApplyMode mode) = 0;
};

// 应用模式枚举
enum class ApplyMode {
    kInitialSync,       // 初始同步模式
    kSecondary,         // 从节点模式  
    kRecovering        // 恢复模式
};

// Oplog条目结构
class OplogEntry {
private:
    BSONObj _raw;              // 原始BSON
    OpTime _opTime;            // 操作时间
    OpTypeEnum _opType;        // 操作类型
    NamespaceString _nss;      // 命名空间
    BSONObj _o;                // 操作对象
    boost::optional<BSONObj> _o2;  // 更新条件(update操作)
    
public:
    OpTime getOpTime() const { return _opTime; }
    OpTypeEnum getOpType() const { return _opType; }
    NamespaceString getNss() const { return _nss; }
    BSONObj getObject() const { return _o; }
    boost::optional<BSONObj> getObject2() const { return _o2; }
};

时序图(Oplog应用)

sequenceDiagram
    autonumber
    participant Primary as 主节点
    participant Secondary as 从节点
    participant OF as OplogFetcher
    participant OA as OplogApplier
    participant Storage as 存储引擎
    
    Primary->>Primary: 执行写操作
    Primary->>Primary: 写入本地oplog
    
    Note over Secondary: 获取oplog条目
    Secondary->>OF: fetchNextBatch()
    OF->>Primary: query oplog collection
    Primary-->>OF: oplog entries batch
    OF-->>Secondary: entries[]
    
    Note over Secondary: 应用oplog条目
    Secondary->>OA: applyOplogBatch(entries)
    
    loop 应用每个oplog条目
        OA->>OA: validateOplogEntry(entry)
        
        alt insert操作
            OA->>Storage: insertDocument(doc)
        else update操作
            OA->>Storage: updateDocument(query, update)
        else delete操作
            OA->>Storage: deleteDocument(query)
        else command操作
            OA->>Storage: executeCommand(cmd)
        end
        
        Storage-->>OA: operation result
    end
    
    OA->>Storage: updateLastAppliedOpTime(opTime)
    OA-->>Secondary: batch applied successfully
    
    Secondary->>Secondary: signalOplogWaiters()

4. 副本集配置API

4.1 ReplSetConfig - 副本集配置

基本信息

  • 名称: ReplSetConfig
  • 协议/方法: 副本集配置管理接口
  • 幂等性: 配置更新操作支持版本控制

请求结构体

// 副本集配置类
class ReplSetConfig {
private:
    BSONObj _raw;                           // 原始配置
    std::string _replSetName;               // 副本集名称
    ConfigVersionAndTerm _versionAndTerm;   // 配置版本和任期
    std::vector<MemberConfig> _members;     // 成员配置列表
    WriteConcernOptions _defaultWriteConcern; // 默认写关注
    
public:
    // 获取副本集名称
    const std::string& getReplSetName() const { return _replSetName; }
    
    // 获取配置版本
    long long getConfigVersion() const { return _versionAndTerm.version; }
    
    // 获取成员数量
    int getNumMembers() const { return _members.size(); }
    
    // 获取指定成员配置
    const MemberConfig& getMemberAt(size_t i) const { return _members[i]; }
    
    // 查找成员
    const MemberConfig* findMemberByHostAndPort(const HostAndPort& hap) const;
    
    // 验证配置
    Status validate() const;
    
    // 检查是否为有效的大多数
    bool isValidMajority(const std::set<int>& memberIds) const;
};

// 成员配置
class MemberConfig {
private:
    int _id;                    // 成员ID
    HostAndPort _host;          // 主机地址
    int _priority;              // 优先级
    Seconds _slaveDelay;        // 从节点延迟
    bool _hidden;               // 是否隐藏
    bool _buildIndexes;         // 是否构建索引
    int _votes;                 // 投票数
    std::set<std::string> _tags; // 标签集合
    
public:
    int getId() const { return _id; }
    const HostAndPort& getHostAndPort() const { return _host; }
    int getPriority() const { return _priority; }
    bool isVoter() const { return _votes > 0; }
    bool isHidden() const { return _hidden; }
    bool shouldBuildIndexes() const { return _buildIndexes; }
};

5. 选举投票API

5.1 TopologyCoordinator - 拓扑协调器

基本信息

  • 名称: TopologyCoordinator
  • 协议/方法: 处理副本集拓扑和选举逻辑
  • 幂等性: 选举过程具有确定性结果

入口函数与关键代码

// 拓扑协调器核心方法
class TopologyCoordinator {
public:
    // 处理选举胜利
    void processWinElection(OpTime electionOpTime) {
        
        // 1) 设置为主节点状态
        _role = Role::kLeader;
        _leaderTerm++;
        _lastCommittedOpTime = electionOpTime;
        
        // 2) 重置所有成员状态
        for (auto& memberData : _memberData) {
            memberData.setLastAppliedOpTime(OpTime());
            memberData.setLastDurableOpTime(OpTime());
        }
        
        // 3) 记录选举时间
        _electionTime = _now();
        
        LOGV2(21438, "Election won, became primary",
             "term"_attr = _leaderTerm,
             "electionTime"_attr = _electionTime);
    }
    
    // 处理心跳响应
    HeartbeatResponseAction processHeartbeatResponse(
        Date_t now,
        Milliseconds networkRoundTripTime,
        const HostAndPort& target,
        const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
        
        // 1) 查找目标成员
        int targetIndex = _findMemberIndexByHostAndPort(target);
        if (targetIndex < 0) {
            return HeartbeatResponseAction::makeNoAction();
        }
        
        MemberData& member = _memberData[targetIndex];
        
        // 2) 处理心跳失败
        if (!hbResponse.isOK()) {
            member.setDownValues(now, hbResponse.getStatus().reason());
            return _checkForElectionHandoff();
        }
        
        // 3) 处理心跳成功
        const ReplSetHeartbeatResponse& response = hbResponse.getValue();
        
        member.setUpValues(
            now,
            response.getState(),
            response.getElectionTime(),
            response.getAppliedOpTime(),
            response.getDurableOpTime()
        );
        
        // 4) 检查是否需要重新选举
        if (_role == Role::kFollower && _shouldStartElection(now)) {
            return HeartbeatResponseAction::makeElectAction();
        }
        
        return HeartbeatResponseAction::makeNoAction();
    }
    
    // 检查是否应该开始选举
    bool _shouldStartElection(Date_t now) const {
        
        // 1) 检查是否在选举超时窗口内
        if (now < _electionTimeoutWhen) {
            return false;
        }
        
        // 2) 检查是否有足够的投票数
        int votingMembersCount = 0;
        int availableVotingMembersCount = 0;
        
        for (const auto& memberData : _memberData) {
            if (memberData.getConfigIndex() >= 0) {
                const MemberConfig& memberConfig = 
                    _rsConfig.getMemberAt(memberData.getConfigIndex());
                
                if (memberConfig.isVoter()) {
                    votingMembersCount++;
                    
                    if (memberData.getState().readable() || 
                        memberData.isSelf()) {
                        availableVotingMembersCount++;
                    }
                }
            }
        }
        
        // 3) 需要大多数节点可用才能选举
        int majorityVotes = votingMembersCount / 2 + 1;
        return availableVotingMembersCount >= majorityVotes;
    }
};

6. 错误处理与监控

6.1 常见错误码

错误码 含义 处理策略
NotWritablePrimary 非主节点写入 重新选择主节点或等待选举
WriteConcernFailed 写关注失败 检查副本集状态和网络连接
InitialSyncFailure 初始同步失败 重新选择同步源,清理本地数据
OplogStartMissing Oplog起始点缺失 执行初始同步重建数据
CursorNotFound 游标不存在 重新创建游标继续同步
HostUnreachable 主机不可达 更新副本集配置或修复网络

6.2 性能监控指标

// 复制统计信息
struct ReplicationStats {
    // Oplog相关
    long long oplogSizeMB;          // Oplog大小
    OpTime lastAppliedOpTime;       // 最后应用的OpTime
    OpTime lastDurableOpTime;       // 最后持久化的OpTime
    
    // 同步相关
    long long syncSourceLagMillis;  // 同步源延迟
    long long appliedOpsPerSecond;  // 每秒应用操作数
    long long networBytesPerSecond; // 每秒网络字节数
    
    // 选举相关
    long long electionTerm;         // 选举任期
    Date_t electionTime;           // 选举时间
    int votingMembers;             // 投票成员数
    int healthyMembers;            // 健康成员数
};