etcd 源码笔记:Watch 机制(server/watchableStore)与实战要点

面向 watchServerserverWatchStreamwatchableStore 的完整链路,梳理 创建/取消/进度/事件派发/补偿 的关键路径与工程实践。


1)gRPC 入口与双循环

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
    sws := serverWatchStream{
        sg:         ws.sg,
        watchable:  ws.watchable,              // *watchableStore
        gRPCStream: stream,                    // 客户端双向流
        watchStream: ws.watchable.NewWatchStream(), // 该连接上的 watcher 集合
        ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), // 控制面下行
    }
    go sws.sendLoop() // 负责“下发”:事件/控制回应
    go sws.recvLoop() // 负责“上行”:创建/取消/进度请求
    return nil
}
  • 一个 gRPC 连接可承载多个 WatchserverWatchStream.watchStream 维护该连接所有 watcher。
  • **控制面(ctrlStream)数据面(watchStream.Chan)**分离:创建/取消/进度回应走 ctrlStream,变更事件走 watchStream.Chan()

2)创建/取消/进度:recvLoop

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (sws *serverWatchStream) recvLoop() error {
    for {
        req, err := sws.gRPCStream.Recv()
        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest:
            creq := uv.CreateRequest
            wsrev := sws.watchStream.Rev()
            rev := creq.StartRevision
            if rev == 0 { rev = wsrev + 1 } // 默认从当前快照之后开始

            id, err := sws.watchStream.Watch(
                mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...,
            )
            wr := &pb.WatchResponse{
                Header:   sws.newResponseHeader(wsrev),
                WatchId:  int64(id),
                Created:  true,
                Canceled: err != nil, // 启动失败(如压缩点之前)
                // CompactRevision 由底层上报(ErrCompacted 时携带)
            }
            select {
            case sws.ctrlStream <- wr:
            case <-sws.closec:
                return nil
            }

        case *pb.WatchRequest_CancelRequest:
            if uv.CancelRequest != nil {
                id := uv.CancelRequest.WatchId
                if err := sws.watchStream.Cancel(mvcc.WatchID(id)); err == nil {
                    sws.ctrlStream <- &pb.WatchResponse{
                        Header:   sws.newResponseHeader(sws.watchStream.Rev()),
                        WatchId:  id,
                        Canceled: true,
                    }
                    // 清理该 watcher 的进度/碎片跟踪
                    sws.mu.Lock()
                    delete(sws.progress, mvcc.WatchID(id))
                    delete(sws.prevKV,  mvcc.WatchID(id))
                    delete(sws.fragment, mvcc.WatchID(id))
                    sws.mu.Unlock()
                }
            }

        case *pb.WatchRequest_ProgressRequest:
            // 客户端请求“进度通知”,便于推进 header.revision 以避免长期无事件导致的卡滞/被压实
            sws.enqueueProgressResponse()
        }
    }
}

关键点

  • StartRevision=0 → 从“当前快照之后”开始(wsrev+1)。
  • StartRevision <= compactionRev,底层会返回 ErrCompacted,服务端通过 Canceled=true + CompactRevision=... 告知客户端需以 compactRev+1 重新创建
  • ProgressRequest 用于显式进度回应(无事件时也下发 header.revision),配合客户端 WithProgressNotify 使用。

3)注册 watcher:watchStream → watchableStore

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
    w, cancel := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
    ws.cancels[id]  = cancel
    ws.watchers[id] = w
    return id, nil
}

func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
    wa := &watcher{key: key, end: end, minRev: startRev, id: id, ch: ch, fcs: fcs}
    synced := startRev > s.store.currentRev || startRev == 0
    if synced { s.synced.add(wa) } else { s.unsynced.add(wa) }
    return wa, func() { s.cancelWatcher(wa) }
}
  • watcher.minRev 指定最小需要的 revision

    • synced:从当下开始持续接收新事件;
    • unsynced:需先补历史(从 minRev..currentRev 扫描 backend)。

4)下发:sendLoop 合并控制面与事件面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (sws *serverWatchStream) sendLoop() {
    for {
        select {
        case wresp, ok := <-sws.watchStream.Chan(): // 事件面
            wr := &pb.WatchResponse{
                Header:          sws.newResponseHeader(wresp.Revision),
                WatchId:         int64(wresp.WatchID),
                Events:          wresp.Events,
                CompactRevision: wresp.CompactRevision, // 压缩提示
                Canceled:        wresp.Canceled,
            }
            _ = sws.gRPCStream.Send(wr)

        case c, ok := <-sws.ctrlStream: // 控制面:创建/取消/进度回应
            _ = sws.gRPCStream.Send(c)
        }
    }
}
  • 事件发送失败(Send 出错)通常意味着客户端断流/网络异常,连接会被关闭,底层 watcher 被取消,需客户端重建。
  • 大量事件可能触发碎片化发送fragment 跟踪),保证单个响应不超过 gRPC/网络限制。

5)事件产生:写路径触发 notify

写事务在 watchableStore.TxnWrite.End() 中将变更转换为 Event,随后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
    victim := make(watcherBatch)
    for w, eb := range newWatcherBatch(&s.synced, evs) {
        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
            // ok
        } else {
            // 客户端慢:迁入 victim,主线不阻塞
            w.victim = true
            victim[w] = eb
            s.synced.delete(w)
        }
    }
    s.addVictim(victim)
}
  • 仅从 synced直接消费最新事件;发送失败则转入 victim(补偿队列),避免阻塞主线。

6)补历史与补偿:unsynced / victim

6.1 syncWatchersLoop:补历史(unsynced)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (s *watchableStore) syncWatchersLoop() {
    t := time.NewTicker(100 * time.Millisecond)
    defer t.Stop()
    for {
        _ = s.syncWatchers() // 每轮从 unsynced 拿一批,读后端填充事件
        <-t.C
    }
}

func (s *watchableStore) syncWatchers() int {
    wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)

    // 扫描 backend(bolt):[minRev, curRev]
    minBytes := RevToBytes(Revision{Main: minRev}, nil)
    maxBytes := RevToBytes(Revision{Main: curRev + 1}, nil)
    tx := s.store.b.ReadTx(); tx.RLock()
    revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
    tx.RUnlock()

    evs := kvsToEvents(s.store.lg, wg, revs, vs) // 转换为事件
    wb  := newWatcherBatch(wg, evs)

    victims := make(watcherBatch)
    for w := range wg.watchers {
        w.minRev = curRev + 1 // 默认同步至最新
        if eb, ok := wb[w]; ok {
            if eb.moreRev != 0 { w.minRev = eb.moreRev } // 仍有剩余,留在 unsynced
            if !w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
                w.victim = true
                victims[w] = eb
            }
        }
        if w.victim {
            // 留到 victim 补偿
        } else if w.minRev <= curRev {
            // 仍未追平:继续留在 unsynced
        } else {
            // 追平:迁入 synced
            s.synced.add(w)
            s.unsynced.delete(w)
        }
    }
    s.addVictim(victims)
    return len(wg.watchers)
}
  • unsynced 用于从历史版本回放事件,直到与 currentRev 对齐。
  • 若历史跨度越过 compactionRev,将触发 ErrCompacted:该 watcher 会被取消(Canceled=true, CompactRevision=...),客户端需compactRev+1 重新建表

6.2 syncVictimsLoop:补偿(victim)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (s *watchableStore) syncVictimsLoop() {
    for {
        for s.moveVictims() != 0 { /* 尽量清空 */ }
        select {
        case <-tickc:
        case <-s.victimc: return
        }
    }
}

func (s *watchableStore) moveVictims() (moved int) {
    victims := s.victims; s.victims = nil
    var still watcherBatch

    for _, wb := range victims {
        // 尝试把缓存的事件重新下发(无需再扫 backend)
        for w, eb := range wb {
            rev := w.minRev - 1
            if !w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
                if still == nil { still = make(watcherBatch) }
                still[w] = eb
            }
        }
        // 发送成功者:按 minRev 与 currentRev 决定回到 unsynced 或 synced
        curRev := s.store.currentRev
        for w, eb := range wb {
            if still != nil && still[w] != nil { continue }
            w.victim = false
            if eb.moreRev != 0 { w.minRev = eb.moreRev }
            if w.minRev <= curRev { s.unsynced.add(w) } else { s.synced.add(w) }
        }
    }
    s.victims = append(s.victims, still)
    return moved
}
  • victim 专为慢客户端设计,优先重放已缓存事件,减少后端压力。
  • 多轮仍发送失败,继续留在 victim,避免拖垮整体。

7)错误与边界行为

  • 压缩(compaction)StartRevision <= compactRev → 创建即 Canceled 并携带 CompactRevision处理:客户端以 CompactRevision+1 重新创建。
  • 分片/碎片化响应:事件量过大时,服务端拆分多个 WatchResponse 下发;客户端需按顺序合并。
  • 连接中断/背压:gRPC Send 失败会关闭流并取消所有 watcher。客户端要重连并按上次已处理的 revision 恢复
  • 过滤器FilterFunc 可按需要过滤 PUT/DELETE/某些字段,减小下行体积。
  • 进度通知:长时间无事件可用 WithProgressNotify 或显式发 ProgressRequest,服务端会下发仅含 header 的响应,用于推进水位

8)实战经验与优化建议

8.1 主题划分与前缀设计

  • 将不同业务/租户拆分前缀独立 Watch 流,避免单前缀风暴导致 fan-out、victim 泛滥。
  • 对需要大范围扫描的消费方,使用紧凑前缀 + 分页,减少一次性事件堆积。

8.2 慢消费者治理

  • 客户端快速消费异步处理,不要在回调内做阻塞 I/O。必要时做本地队列+幂等处理。

  • 关注被迁入 victim 的比率;若持续偏高,考虑:

    • 增加客户端并发/处理线程;
    • 拆分 watch 粒度;
    • 降低单事件负载(删除 PrevKV、裁剪 value)。
  • 服务端层面:合理设置 ctrlStream/发送缓冲,避免阻塞 sendLoop。

8.3 与压缩共存

  • 启用自动压缩(按保留窗口/时长),客户端必须实现重试从 compactRev+1 的逻辑;
  • 周期性发送 进度通知,帮助消费者尽快推进水位,降低被压缩命中的概率。

8.4 连接与恢复

  • 将 Watch 放在长连接上,断线按最后处理的 revision 恢复
  • 结合 WithRequireLeader 与合理重试策略,避免 leader 变更期间的抖动放大。

8.5 监控与排障

  • 核心指标建议观测:

    • 发送失败/重试:victim 队列长度、迁入迁出速率;
    • 延迟:从写入到 watch 收到的时间分布(端到端时延);
    • 后台扫描压力syncWatchersLoop 每轮处理数量、backend 读放大;
    • 压缩命中:Canceled+CompactRevision 的频度。
  • 典型问题定位:

    • 客户端积压 → victim 增长 → sendLoop Send 报错;
    • 后端膨胀/历史版本多 → unsynced 扫描耗时长;
    • 事件过大/过多 → 触发碎片化,客户端未正确合并导致乱序/丢失。

9)流程回顾(时序)

  1. 客户端 CreateRequest(key, startRev)recvLoop 处理 → 注册到 watchableStore(synced/unsynced)。
  2. 写事务提交触发 notify:从 synced 直接推事件;发送失败迁入 victim
  3. syncWatchersLoop 周期性从 unsynced 拉取历史并推送至追平。
  4. syncVictimsLoop 尝试重发缓存事件,将恢复者回归 unsynced/synced
  5. 客户端可随时 CancelRequestProgressRequest;服务端通过 ctrlStream 回应。

小结

  • Watch 通过 synced / unsynced / victim 三组协同,分别覆盖实时推送、历史补齐、慢消费者补偿,在不阻塞主线的前提下尽量保证不丢事件
  • 工程上把重心放在:前缀/主题拆分、快消费、进度推进、压缩重试、可观测性。把这些做好,Watch 才能在高写入/高订阅的场景下长期稳定运行。

创建时间: 2025年6月17日

本文由 tommie blog 原创发布