概述

Kafka 在 KRaft(Kafka Raft)模式下以内置的一致性层替代 ZooKeeper,负责元数据日志复制、选举与快照。本文聚焦实现细节,补充关键函数核心代码、跨层调用链、时序图、类结构图与相似内容合并说明,使用中性技术表述。

1. 组件架构总览

graph TB
  subgraph KRaft
    RM[KafkaRaftManager]
    QM[QuorumController]
    QR[RaftClient]
    S[Scheduler]
  end
  subgraph Storage
    ML[MetadataLog]
    SL[SnapshotManager]
  end
  subgraph Network
    SR[RaftRequestSender]
    H[RequestHandler]
  end

  RM --> QM
  QM --> QR
  QR --> ML
  ML --> SL
  QR --> SR
  SR --> H

2. 关键函数核心代码与说明(精要)

// Raft 客户端追加元数据条目(摘要)
public CompletableFuture<OffsetAndEpoch> appendAsLeader(List<Record> records, long timeoutMs) {
  if (!role.isLeader()) return CompletableFuture.failedFuture(new NotLeaderException("not leader"));
  CompletableFuture<OffsetAndEpoch> f = new CompletableFuture<>();
  long startTime = time.milliseconds();
  OffsetAndEpoch appended = metadataLog.append(records);
  inflightAppends.put(appended.offset(), f);
  // 触发复制
  requestSender.broadcastAppend(appended);
  scheduler.schedule("append-timeout", () -> {
    if (!f.isDone()) f.completeExceptionally(new TimeoutException("replication timeout"));
  }, timeoutMs);
  return f;
}
  • 功能:作为 Leader 将记录写入元数据日志并发起复制,返回提交位置的未来结果。
  • 边界:领导权变更、超时、磁盘异常与重复提交的处理。
// 选举与角色转换(摘要)
void onElectionTimeout() {
  if (!role.isLeader()) {
    currentTerm += 1;
    votedFor = localId;
    role = Role.CANDIDATE;
    requestSender.broadcastVoteRequest(currentTerm, lastLogOffset(), lastLogTerm());
  }
}
  • 功能:在超时未收到心跳时发起选举,转换为候选者并广播投票请求。
// 处理来自Leader的AppendEntries(摘要)
AppendResponse handleAppend(AppendRequest req) {
  if (req.term() < currentTerm) return AppendResponse.reject(currentTerm);
  if (log.mismatch(req.prevOffset(), req.prevTerm())) return AppendResponse.reject(currentTerm);
  log.truncateFrom(req.prevOffset()+1);
  log.append(req.entries());
  commitUntil(req.leaderCommit());
  lastLeaderHeartbeatTimeMs = time.milliseconds();
  role = Role.FOLLOWER;
  currentTerm = req.term();
  return AppendResponse.accept(currentTerm, log.lastOffset());
}
  • 功能:校验前置匹配、截断冲突并追加新条目,推进提交点,保持为 Follower。
// 提交并应用(摘要)
private void commitUntil(long commitOffset) {
  while (lastApplied < Math.min(commitOffset, log.lastOffset())) {
    lastApplied += 1;
    MetadataRecord rec = log.read(lastApplied);
    stateMachine.apply(rec);
  }
}
  • 功能:将提交的元数据记录应用到状态机(控制器元数据)。
// 快照触发(摘要)
void maybeSnapshot() {
  if (log.sizeInBytes() > snapshotThresholdBytes) {
    Snapshot s = snapshotter.create(lastApplied, stateMachine.snapshot());
    snapshotStore.persist(s);
    log.compactUpTo(s.lastIncludedOffset());
  }
}
  • 功能:根据阈值生成与持久化快照,并进行日志压缩。

3. 调用链(跨层)

flowchart LR
  CTRL[QuorumController.apply] --> RC[RaftClient.appendAsLeader]
  RC --> ML[MetadataLog.append]
  RC --> RS[RaftRequestSender.broadcastAppend]
  RS --> F1[Follower.handleAppend]
  F1 --> ML2[MetadataLog.append]
  F1 --> COM[commitUntil]
  COM --> SM[StateMachine.apply]

4. 时序图(选举与复制)

sequenceDiagram
  participant L as Leader
  participant F1 as Follower-1
  participant F2 as Follower-2
  participant LOG as MetadataLog
  participant SM as StateMachine

  Note over L,F2: 正常复制
  L->>LOG: append(records)
  L->>F1: AppendEntries(term, prev, entries)
  L->>F2: AppendEntries(term, prev, entries)
  F1-->>L: AppendAck(lastOffset)
  F2-->>L: AppendAck(lastOffset)
  L->>L: advanceCommitIndex()
  L->>SM: apply(committed)

  Note over L,F2: 选举超时
  F1->>F1: election timeout
  F1->>F2: RequestVote(term+1)
  F1->>L: RequestVote(term+1)
  F2-->>F1: Vote(granted)
  L-->>F1: Vote(reject or grant)
  F1->>F1: becomeLeaderIfMajority()

5. 类结构图与继承关系(简化)

classDiagram
  class KafkaRaftManager
  class QuorumController
  class RaftClient
  class MetadataLog
  class SnapshotManager
  class StateMachine
  class RaftRequestSender

  KafkaRaftManager --> QuorumController
  QuorumController --> RaftClient
  RaftClient --> MetadataLog
  MetadataLog --> SnapshotManager
  RaftClient --> RaftRequestSender
  QuorumController --> StateMachine