本文按 server → raft/node → transport → apply/MVCC → watch/lease → backend 提交 的顺序梳理 Put
的完整路径,并结合工程经验给出可落地的调优与排障要点。
1)写入入口与入队前置检查
1
| func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
|
具体处理落在:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, errors.ErrTooManyRequests // 保护性背压:apply 追不动 commit
}
r.Header = &pb.RequestHeader{ID: s.reqIDGen.Next()} // 生成 requestID
data, err := r.Marshal()
if err != nil { return nil, err }
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, errors.ErrRequestTooLarge
}
id := r.ID
if id == 0 { id = r.Header.ID }
ch := s.w.Register(id) // 用于等待“此提案”的应用结果
err = s.r.Propose(cctx, data) // 提案(MsgProp)
select {
case x := <-ch: // 阻塞等待 apply 结果
return x.(*apply2.Result), nil
}
}
|
要点
2)Raft 提案与 Leader 追加
2.1 提案入队
1
2
3
4
5
6
7
8
9
| func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
// node.run 循环:
case pm := <-propc:
m := pm.m
m.From = r.id
_ = r.Step(m) // 统一进入 r.Step()
|
2.2 Leader 处理 MsgProp
1
2
3
4
5
6
7
8
9
10
11
12
13
| func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.leadTransferee != None {
return ErrProposalDropped // 正在移交领导权,丢弃提案
}
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend() // 广播复制
}
return nil
}
|
2.3 追加 Entry
1
2
3
4
5
6
7
8
9
| func (r *raft) appendEntry(es ...pb.Entry) bool {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
_ = r.raftLog.append(es...) // 仅追加到内存 raftLog
return true
}
|
3)复制:Append 流控、补发、快照
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() { return false }
lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, _ := r.raftLog.term(lastIndex)
var ents []pb.Entry
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, _ = r.raftLog.entries(nextIndex, r.maxMsgSize) // 裁剪到最大消息
}
if len(ents) == 0 && !sendIfEmpty { return false }
_ = pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex)
r.send(pb.Message{
To: to, Type: pb.MsgApp,
Index: lastIndex, LogTerm: lastTerm,
Entries: ents, Commit: r.raftLog.committed,
})
return true
}
|
要点
Inflights
限流:each follower 有独立滑动窗口,满了则仅发心跳/空 MsgApp
保活,等 ack 清窗。- 缺失日志 → 发快照:若 follower 的
nextIndex
早于本地已 compact 的索引,转而发送快照。 Progress
维护每个 follower 的 Match/Next/State
,失败重试走回探测/对齐流程。
4)Ready 环与 WAL 持久化、网络发送
node 与 raftNode 的主循环会根据 HasReady()
取出需要处理的 Ready{...}
:
rd.Messages
:待发给 peer 的 raft 消息(Append/Heartbeat 等)。rd.Entries
:需要持久化到 WAL 的新增日志。rd.CommittedEntries
:已经“多数派提交”,等待 应用 的日志。rd.HardState/Snapshot
:元状态/快照。
典型处理(伪):
1
2
3
4
5
6
7
| case rd := <-r.Ready():
if islead {
r.transport.Send(r.processMessages(rd.Messages)) // 发送网络消息
}
_ = r.storage.Save(rd.HardState, rd.Entries) // WAL 持久化
// ...
r.Advance()
|
说明:etcd 对 “发送与保存” 做了管线化,但在对客户端应答前必须WAL 持久化 + 应用,Raft 安全性不受影响。
5)提交与应用:Committed → Apply
当 follower AppendResp 达到多数派后,leader 可能推进 commitIndex
:
1
2
3
4
5
6
| case pb.MsgAppResp:
pr.RecentActive = true
if r.maybeCommit() {
releasePendingReadIndexMessages(r)
r.bcastAppend() // 通知新的 commitIndex
}
|
随后 HasReady()
会携带 CommittedEntries
,经 raftNode
转发给 server 的 applyAll
:
1
2
| case ap := <-s.r.apply():
sched.Schedule(NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) }))
|
applyAll
里根据 Entry.Type
分发到 v3 applier,例如:
1
2
3
| func (a *applierV3backend) Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, p) // 进入 MVCC
}
|
6)MVCC 与租约:索引更新、值落盘、事件派发
6.1 写事务
1
2
3
4
5
6
7
8
9
10
11
| func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()
return newMetricsTxnWrite(&storeTxnWrite{
storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
})
}
|
6.2 Put 细节(版本与租约)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func (tw *storeTxnWrite) put(key, val []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev // CreateRevision 默认用本次 rev
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.Main // 已存在:沿用 created_rev
}
idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))}
kv := mvccpb.KeyValue{
Key: key, Value: val,
CreateRevision: c, ModRevision: rev, Version: ver+1,
Lease: int64(leaseID),
}
d, _ := kv.Marshal()
tw.tx.UnsafeSeqPut(schema.Key, RevToBytes(idxRev, nil), d) // backend/bolt
tw.s.kvindex.Put(key, idxRev) // 索引更新
tw.changes = append(tw.changes, kv) // 用于 watch 事件
// 处理租约 attach/detach(oldLease → leaseID)
}
|
kvindex
维护 key → 多版本 revision 的倒排索引;真正值存于 backend(bolt)schema.Key
桶,键是 main/sub
编码的 revision。Version
自增;ModRevision
设置为 rev
;首次创建时 CreateRevision=rev
,覆盖写沿用原 CreateRevision
。
6.3 Watch 通知与缓慢观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 { tw.TxnWrite.End(); return }
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, ch := range changes {
evs[i].Kv = &changes[i]
evs[i].Type = ternary(ch.CreateRevision == 0, mvccpb.DELETE, mvccpb.PUT)
if ch.CreateRevision == 0 { evs[i].Kv.ModRevision = rev }
}
tw.s.mu.Lock()
tw.s.notify(rev, evs) // 发送给 watchers;慢 watcher 迁入 victims 队列
tw.TxnWrite.End() // 释放并进入 batchTx 提交流程
tw.s.mu.Unlock()
}
|
慢 watcher 会被标记为 victim
,转入独立队列异步补发,避免阻塞主线。
7)BatchTx 提交与读写缓冲回写
1
2
3
4
5
6
7
8
9
10
11
12
| func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock()
t.buf.writeback(&t.backend.readTx.buf) // 写缓冲回写到读缓冲:读可见
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false) // 触发 bolt 事务提交(含 fsync)
}
}
t.batchTx.Unlock()
}
|
- 写事务结束时先把写缓冲回写到读缓冲,从而 读事务立即可见 最新数据(无须等到磁盘提交)。
- 达到门限触发 bolt 事务提交(WAL/fsync);门限可控,平衡吞吐与持久化开销。
8)链路小结(Put 到返回)
- 入口:背压/大小校验 → 注册回调通道 →
Propose(MsgProp)
; - Leader:
appendEntry
到 raftLog → bcastAppend
复制; - 复制:按 follower
Progress
发送 Append(流控/补发/快照); - 多数派提交:推进
commitIndex
→ CommittedEntries
下发; - Apply:applierV3 执行 MVCC 写、索引更新、租约处理、watch 通知;
- BatchTx:回写读缓冲、按门限提交 bolt;唤醒入口处等待的请求,返回结果。
9)实战经验与调优建议
9.1 写入大小与事务设计
- 值尽量小(建议 ≤ 1MB;更大请拆分或外部存储),降低复制/快照/WAL 压力。
- 批量写(Txn)要控制每次 key 数与总字节,减少一次性巨型 Entry;合理分页提交。
9.2 背压与延迟治理
9.3 复制稳定性
9.4 backend 膨胀与读放大
- 配置 自动压缩(
--auto-compaction-retention
)与 定期 defrag,减少历史版本与碎片。 - 热点前缀大量写入导致 cursor 扫描放大:合理设计 key 前缀与分桶;对“批量扫描”配合分页。
9.5 Watch 流控
- 下游消费慢会被标记为 victim:使用有界缓冲和幂等重放,或将不敏感场景转为主动拉取。
- 多租户/多订阅高并发时,拆分 watch 前缀,减少单前缀 fan-out 成本。
9.6 Leader 变更与提案丢弃
- 领导权移交期间
MsgProp
会被丢弃:客户端要重试幂等;谨慎在高写压时主动触发 transfer-leadership
。
9.7 配额与保护
- 设置
--quota-backend-bytes
,避免 backend 过大阻塞提交; - 关注
backend size
接近配额时写入会受限,提前扩容或做数据分层。
10)排查清单(常用指标)
- 复制/共识:
etcd_server_proposals_pending
, etcd_server_proposals_committed
, ..._applied
- 磁盘:
etcd_disk_wal_fsync_duration_seconds
, etcd_disk_backend_commit_duration_seconds
- 后端:
etcd_mvcc_db_total_size_in_bytes
, etcd_debugging_mvcc_db_compaction_keys_total
- 快照:
etcd_debugging_snapshot_save_total_durations_seconds
- 网络:
etcd_network_peer_round_trip_time_seconds
结语
写入路径的性能瓶颈大多不在 “Raft 算法” 本身,而在 复制稳定性 与 后端 IO。把握背压阈值、写入大小、快照/压缩策略与磁盘能力,才能既保证线性一致,又拿到稳定的吞吐与尾延迟。
创建时间: 2025年06月08日
本文由 tommie blog 原创发布