etcd 对外 API 深入分析与调用链路
etcd 通过 gRPC 对外提供服务,主要包含 KV、Watch、Lease、Cluster、Auth、Maintenance 等服务。本文将深入分析每个 API 的实现细节和完整调用链路。
1. gRPC 服务概览
1.1 服务定义
etcd 在 api/etcdserverpb/rpc.proto
中定义了以下 gRPC 服务:
// KV 服务 - 键值操作
service KV {
rpc Range(RangeRequest) returns (RangeResponse);
rpc Put(PutRequest) returns (PutResponse);
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse);
rpc Txn(TxnRequest) returns (TxnResponse);
rpc Compact(CompactionRequest) returns (CompactionResponse);
}
// Watch 服务 - 监听变化
service Watch {
rpc Watch(stream WatchRequest) returns (stream WatchResponse);
}
// Lease 服务 - 租约管理
service Lease {
rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse);
rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse);
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse);
rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse);
rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse);
}
// Cluster 服务 - 集群管理
service Cluster {
rpc MemberAdd(MemberAddRequest) returns (MemberAddResponse);
rpc MemberRemove(MemberRemoveRequest) returns (MemberRemoveResponse);
rpc MemberUpdate(MemberUpdateRequest) returns (MemberUpdateResponse);
rpc MemberList(MemberListRequest) returns (MemberListResponse);
rpc MemberPromote(MemberPromoteRequest) returns (MemberPromoteResponse);
}
// Auth 服务 - 认证授权
service Auth {
rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse);
rpc AuthDisable(AuthDisableRequest) returns (AuthDisableResponse);
rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse);
rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse);
rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse);
rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse);
rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse);
}
// Maintenance 服务 - 维护操作
service Maintenance {
rpc Alarm(AlarmRequest) returns (AlarmResponse);
rpc Status(StatusRequest) returns (StatusResponse);
rpc Defragment(DefragmentRequest) returns (DefragmentResponse);
rpc Hash(HashRequest) returns (HashResponse);
rpc HashKV(HashKVRequest) returns (HashKVResponse);
rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse);
rpc MoveLeader(MoveLeaderRequest) returns (MoveLeaderResponse);
}
1.2 gRPC 服务器初始化
// server/etcdserver/api/v3rpc/grpc.go
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
// TLS 配置
if tls != nil {
opts = append(opts, grpc.Creds(credentials.NewTransportCredential(tls)))
}
// 监控指标
serverMetrics := grpc_prometheus.NewServerMetrics()
prometheus.Register(serverMetrics)
// 拦截器链
chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s), // 日志拦截器
newUnaryInterceptor(s), // 业务拦截器
serverMetrics.UnaryServerInterceptor(), // 指标拦截器
}
chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s), // 流拦截器
serverMetrics.StreamServerInterceptor(),
}
opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...))
// 消息大小限制
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead())))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))
grpcServer := grpc.NewServer(append(opts, gopts...)...)
// 注册服务
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
return grpcServer
}
2. KV 服务详细分析
2.1 Range API - 读取键值
API 入口函数
// server/etcdserver/api/v3rpc/key.go
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
// 1. 请求验证
if err := checkRangeRequest(r); err != nil {
return nil, err
}
// 2. 调用 EtcdServer.Range
resp, err := s.kv.Range(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
// 3. 填充响应头
s.hdr.fill(resp.Header)
return resp, nil
}
// 请求验证函数
func checkRangeRequest(r *pb.RangeRequest) error {
if len(r.Key) == 0 {
return rpctypes.ErrGRPCEmptyKey
}
if _, ok := pb.RangeRequest_SortOrder_name[int32(r.SortOrder)]; !ok {
return rpctypes.ErrGRPCInvalidSortOption
}
if _, ok := pb.RangeRequest_SortTarget_name[int32(r.SortTarget)]; !ok {
return rpctypes.ErrGRPCInvalidSortOption
}
return nil
}
EtcdServer 层处理
// server/etcdserver/v3_server.go
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
var span trace.Span
ctx, span = traceutil.Tracer.Start(ctx, "range", trace.WithAttributes(
attribute.String("range_begin", string(r.GetKey())),
attribute.String("range_end", string(r.GetRangeEnd())),
))
defer span.End()
var resp *pb.RangeResponse
var err error
// 检查一致性要求
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() {
resp, err = s.applyV3Base.Range(ctx, nil, r)
}
// 根据一致性级别选择处理方式
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
}
return resp, err
}
一致性处理逻辑
// server/etcdserver/v3_server.go
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) (err error) {
// 检查是否需要线性一致性读
if r.Serializable {
// 串行化读:直接从本地读取
get()
return nil
}
// 线性一致性读:需要通过 ReadIndex
return s.linearizableReadNotify(ctx)
}
// 线性一致性读通知
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
// 发送读通知
select {
case s.readwaitc <- struct{}{}:
default:
}
// 等待读索引确认
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}
MVCC 层执行
// server/etcdserver/txn/range.go
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
trace := traceutil.New("range",
traceutil.Field{Key: "range_begin", Value: string(r.Key)},
traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
)
// 创建读事务
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txnRead.End()
return executeRange(ctx, lg, txnRead, r)
}
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
trace := traceutil.Get(ctx)
// 构建范围选项
ro := mvcc.RangeOptions{
Limit: r.Limit,
Rev: r.Revision,
Count: r.CountOnly,
}
// 执行范围查询
rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, trace, err
}
// 构建响应
resp := &pb.RangeResponse{
Header: &pb.ResponseHeader{
Revision: rr.Rev,
},
Count: int64(rr.Count),
More: rr.Count > len(rr.KVs),
}
// 填充键值对
if !r.CountOnly {
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.Kvs[i] = &rr.KVs[i]
}
}
return resp, trace, nil
}
2.2 Put API - 写入键值
API 入口函数
// server/etcdserver/api/v3rpc/key.go
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
// 1. 请求验证
if err := checkPutRequest(r); err != nil {
return nil, err
}
// 2. 调用 EtcdServer.Put
resp, err := s.kv.Put(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
// 3. 填充响应头
s.hdr.fill(resp.Header)
return resp, nil
}
// 请求验证函数
func checkPutRequest(r *pb.PutRequest) error {
if len(r.Key) == 0 {
return rpctypes.ErrGRPCEmptyKey
}
if r.IgnoreValue && len(r.Value) != 0 {
return rpctypes.ErrGRPCValueProvided
}
if r.IgnoreLease && r.Lease != 0 {
return rpctypes.ErrGRPCLeaseProvided
}
return nil
}
EtcdServer 层处理
// server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
var span trace.Span
ctx, span = traceutil.Tracer.Start(ctx, "put", trace.WithAttributes(
attribute.String("key", string(r.GetKey())),
))
defer span.End()
ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
// 通过 Raft 提案处理写请求
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}
Raft 提案处理
// server/etcdserver/server.go
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
result, err := s.processInternalRaftRequestOnce(ctx, r)
if err != nil {
return nil, err
}
if result.Err != nil {
return nil, result.Err
}
return result.Resp, nil
}
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply.Result, error) {
// 1. 检查背压
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, errors.ErrTooManyRequests
}
// 2. 生成请求 ID
r.Header = &pb.RequestHeader{ID: s.reqIDGen.Next()}
// 3. 序列化请求
data, err := r.Marshal()
if err != nil {
return nil, err
}
// 4. 检查请求大小
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, errors.ErrRequestTooLarge
}
// 5. 注册等待通道
id := r.ID
if id == 0 {
id = r.Header.ID
}
ch := s.w.Register(id)
// 6. 提交 Raft 提案
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
err = s.r.Propose(cctx, data)
if err != nil {
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, err
}
proposalsPending.Inc()
defer proposalsPending.Dec()
// 7. 等待应用结果
select {
case x := <-ch:
return x.(*apply.Result), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.stopping:
return nil, ErrStopped
}
}
Apply 层执行
// server/etcdserver/apply/apply.go
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.options.Logger, a.options.Lessor, a.options.KV, p)
}
MVCC 层执行
// server/etcdserver/txn/put.go
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
trace := traceutil.New("put",
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return nil, trace, lease.ErrLeaseNotFound
}
}
// 创建写事务
txn := kv.Write(trace)
defer txn.End()
var rr *mvcc.RangeResult
var err error
// 如果需要返回前一个值
if p.PrevKv {
rr, err = txn.Range(ctx, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, trace, err
}
}
// 执行 Put 操作
rev := txn.Put(p.Key, val, leaseID)
// 构建响应
resp := &pb.PutResponse{Header: &pb.ResponseHeader{Revision: rev}}
if p.PrevKv && rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
return resp, trace, nil
}
2.3 DeleteRange API - 删除键值
API 入口函数
// server/etcdserver/api/v3rpc/key.go
func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
// 1. 请求验证
if err := checkDeleteRequest(r); err != nil {
return nil, err
}
// 2. 调用 EtcdServer.DeleteRange
resp, err := s.kv.DeleteRange(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
// 3. 填充响应头
s.hdr.fill(resp.Header)
return resp, nil
}
MVCC 层执行
// server/etcdserver/txn/delete.go
func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
trace := traceutil.New("delete_range",
traceutil.Field{Key: "range_begin", Value: string(dr.Key)},
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
)
// 创建写事务
txn := kv.Write(trace)
defer txn.End()
// 执行删除操作
resp, err := deleteRange(ctx, txn, dr)
if err != nil {
return nil, trace, err
}
return resp, trace, nil
}
func deleteRange(ctx context.Context, txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
var rr *mvcc.RangeResult
var err error
// 如果需要返回被删除的键值对
if dr.PrevKv {
rr, err = txn.Range(ctx, dr.Key, mkGteRange(dr.RangeEnd), mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
// 执行删除
n, rev := txn.DeleteRange(dr.Key, mkGteRange(dr.RangeEnd))
// 构建响应
resp := &pb.DeleteRangeResponse{
Header: &pb.ResponseHeader{Revision: rev},
Deleted: n,
}
if dr.PrevKv && rr != nil {
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.PrevKvs[i] = &rr.KVs[i]
}
}
return resp, nil
}
2.4 Txn API - 事务操作
API 入口函数
// server/etcdserver/api/v3rpc/key.go
func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
// 1. 请求验证
if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
return nil, err
}
// 2. 检查操作重叠
if _, _, err := checkIntervals(r.Success); err != nil {
return nil, err
}
if _, _, err := checkIntervals(r.Failure); err != nil {
return nil, err
}
// 3. 调用 EtcdServer.Txn
resp, err := s.kv.Txn(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
// 4. 填充响应头
s.hdr.fill(resp.Header)
return resp, nil
}
MVCC 层执行
// server/etcdserver/txn/txn.go
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
trace := traceutil.New("txn")
isWrite := !isReadonlyTxnRequest(rt)
var txn mvcc.TxnWrite
if isWrite {
if txnModeWriteWithSharedBuffer {
txn = kv.Write(trace)
} else {
txn = kv.WriteWithoutBuffer(trace)
}
} else {
txn = kv.Read(mvcc.ConcurrentReadTxMode, trace)
}
defer txn.End()
// 执行事务
txnResp, _ := newTxnResp(rt, len(rt.Success), len(rt.Failure))
// 评估比较条件
txnPath := compareToPath(txn, rt)
// 执行对应的操作序列
if _, err := executeTxn(ctx, lg, txn, rt, txnPath, txnResp); err != nil {
return nil, trace, err
}
rev := txn.Rev()
if len(txnResp.Responses) != 0 {
txnResp.Header.Revision = rev
}
return txnResp, trace, nil
}
// 执行事务操作
func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
for i, req := range reqs {
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resp, err := executeRange(ctx, lg, txnWrite, tv.RequestRange)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
case *pb.RequestOp_RequestPut:
prevKV, err := getPrevKV(trace, txnWrite, tv.RequestPut)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed to get prevKV on put: %w", err)
}
resp := put(ctx, txnWrite, tv.RequestPut, prevKV)
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
case *pb.RequestOp_RequestDeleteRange:
resp, err := deleteRange(ctx, txnWrite, tv.RequestDeleteRange)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp, _, err := Txn(ctx, lg, tv.RequestTxn, true, txnWrite, nil)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed Txn: %w", err)
}
respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn = resp
txns++
}
}
return txns, nil
}
3. Watch 服务详细分析
3.1 Watch API - 监听变化
API 入口函数
// server/etcdserver/api/v3rpc/watch.go
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
sws := serverWatchStream{
sg: ws.sg,
watchable: ws.watchable,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
sws.wg.Add(1)
go func() {
sws.sendLoop()
sws.wg.Done()
}()
errc := make(chan error, 1)
go func() {
if rerr := sws.recvLoop(); rerr != nil {
if isClientCtxErr(stream.Context().Err(), rerr) {
sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
} else {
sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
streamFailures.WithLabelValues("receive", "watch").Inc()
}
errc <- rerr
}
}()
select {
case err := <-errc:
close(sws.closec)
case <-stream.Context().Done():
close(sws.closec)
}
sws.wg.Wait()
return nil
}
接收循环处理
// server/etcdserver/api/v3rpc/watch.go
func (sws *serverWatchStream) recvLoop() error {
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
if uv.CreateRequest == nil {
break
}
creq := uv.CreateRequest
if len(creq.Key) == 0 {
// 空键错误处理
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId,
Canceled: true,
CancelReason: "etcdserver: no key given",
}
continue
}
// 创建 watcher
filters := FiltersFromRequest(creq)
wsrev := sws.watchStream.Rev()
rev := creq.StartRevision
if rev == 0 {
rev = wsrev + 1
}
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
if err == nil {
sws.mu.Lock()
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
if creq.Fragment {
sws.fragment[id] = true
}
sws.mu.Unlock()
}
// 发送创建响应
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: err != nil,
}
select {
case sws.ctrlStream <- wr:
case <-sws.closec:
return nil
}
case *pb.WatchRequest_CancelRequest:
if uv.CancelRequest != nil {
id := uv.CancelRequest.WatchId
err := sws.watchStream.Cancel(mvcc.WatchID(id))
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: id,
Canceled: true,
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
delete(sws.fragment, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, // response is not associated with any WatchId
}
}
default:
continue
}
}
}
发送循环处理
// server/etcdserver/api/v3rpc/watch.go
func (sws *serverWatchStream) sendLoop() {
// watch ids that are currently active
ids := make(map[mvcc.WatchID]struct{})
// watch responses pending on a watch id creation message
pending := make(map[mvcc.WatchID][]*pb.WatchResponse)
interval := GetProgressReportInterval()
progressTicker := time.NewTicker(interval)
defer func() {
progressTicker.Stop()
// drain the chan to clean up pending goroutines
for ws := range sws.watchStream.Chan() {
mvcc.ReportEventReceived(len(ws.Events))
}
for _, wrs := range pending {
for _, wr := range wrs {
mvcc.ReportEventReceived(len(wr.Events))
}
}
}()
for {
select {
case wresp, ok := <-sws.watchStream.Chan():
if !ok {
return
}
// 处理 watch 事件
mvcc.ReportEventReceived(len(wresp.Events))
if _, hasId := ids[wresp.WatchID]; !hasId {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], sws.createWatchResponse(wresp, sws.prevKV[wresp.WatchID]))
pending[wresp.WatchID] = wrs
continue
}
wr := sws.createWatchResponse(wresp, sws.prevKV[wresp.WatchID])
if sws.fragment[wresp.WatchID] {
sws.sendFragments(wr)
} else {
sws.gRPCStream.Send(wr)
}
case c, ok := <-sws.ctrlStream:
if !ok {
return
}
if c.Created {
// 标记 watch 已创建
ids[mvcc.WatchID(c.WatchId)] = struct{}{}
// 发送缓存的响应
wrs, ok := pending[mvcc.WatchID(c.WatchId)]
if ok {
delete(pending, mvcc.WatchID(c.WatchId))
for _, wr := range wrs {
sws.gRPCStream.Send(wr)
}
}
} else if c.Canceled {
delete(ids, mvcc.WatchID(c.WatchId))
delete(pending, mvcc.WatchID(c.WatchId))
}
sws.gRPCStream.Send(c)
case <-progressTicker.C:
sws.sendProgressNotify()
case <-sws.closec:
return
}
}
}
4. 调用链路时序图
4.1 Range 调用链路
sequenceDiagram
participant Client
participant gRPC
participant kvServer
participant EtcdServer
participant AuthStore
participant ReadNotifier
participant MVCC
participant Backend
Client->>gRPC: Range Request
gRPC->>kvServer: Range(ctx, req)
kvServer->>kvServer: checkRangeRequest()
kvServer->>EtcdServer: Range(ctx, req)
alt Linearizable Read
EtcdServer->>AuthStore: IsRangePermitted()
EtcdServer->>ReadNotifier: linearizableReadNotify()
ReadNotifier->>EtcdServer: ReadIndex confirmed
end
EtcdServer->>MVCC: Range(ctx, req)
MVCC->>MVCC: Read(ConcurrentReadTxMode)
MVCC->>Backend: UnsafeRange()
Backend-->>MVCC: Key-Values
MVCC-->>EtcdServer: RangeResult
EtcdServer-->>kvServer: RangeResponse
kvServer->>kvServer: fill header
kvServer-->>gRPC: RangeResponse
gRPC-->>Client: Response
4.2 Put 调用链路
sequenceDiagram
participant Client
participant gRPC
participant kvServer
participant EtcdServer
participant RaftNode
participant Applier
participant MVCC
participant Watch
participant Backend
Client->>gRPC: Put Request
gRPC->>kvServer: Put(ctx, req)
kvServer->>kvServer: checkPutRequest()
kvServer->>EtcdServer: Put(ctx, req)
EtcdServer->>EtcdServer: raftRequest()
EtcdServer->>EtcdServer: processInternalRaftRequestOnce()
EtcdServer->>RaftNode: Propose(data)
Note over RaftNode: Raft Consensus
RaftNode->>RaftNode: Append & Replicate
RaftNode->>RaftNode: Commit when majority
RaftNode->>Applier: Apply(entry)
Applier->>MVCC: Put(key, value, lease)
MVCC->>Backend: BatchTx.Put()
MVCC->>Watch: Notify(events)
Backend-->>MVCC: Committed
MVCC-->>Applier: Applied
Applier-->>EtcdServer: Result
EtcdServer-->>kvServer: PutResponse
kvServer->>kvServer: fill header
kvServer-->>gRPC: PutResponse
gRPC-->>Client: Response
4.3 Watch 调用链路
sequenceDiagram
participant Client
participant gRPC
participant WatchServer
participant WatchStream
participant WatchableStore
participant MVCC
Client->>gRPC: Watch Stream
gRPC->>WatchServer: Watch(stream)
WatchServer->>WatchServer: create serverWatchStream
WatchServer->>WatchServer: start sendLoop & recvLoop
Client->>gRPC: CreateRequest
gRPC->>WatchServer: recvLoop receives
WatchServer->>WatchStream: Watch(key, startRev)
WatchStream->>WatchableStore: watch(key, startRev)
WatchableStore->>WatchableStore: add to synced/unsynced
WatchServer-->>Client: Created Response
Note over MVCC: On data changes
MVCC->>WatchableStore: notify(events)
WatchableStore->>WatchableStore: match watchers
WatchableStore-->>WatchStream: events
WatchStream-->>WatchServer: sendLoop receives
WatchServer-->>Client: WatchResponse with events
Client->>gRPC: CancelRequest
gRPC->>WatchServer: recvLoop receives
WatchServer->>WatchStream: Cancel(watchId)
WatchStream->>WatchableStore: cancel watcher
WatchServer-->>Client: Canceled Response
5. 关键数据结构
5.1 请求响应结构
// RangeRequest 范围查询请求
type RangeRequest struct {
Key []byte // 查询的键
RangeEnd []byte // 范围结束键
Limit int64 // 返回数量限制
Revision int64 // 指定版本号
SortOrder SortOrder // 排序方式
SortTarget SortTarget // 排序目标
Serializable bool // 是否串行化读
KeysOnly bool // 仅返回键
CountOnly bool // 仅返回计数
MinModRevision int64 // 最小修改版本
MaxModRevision int64 // 最大修改版本
MinCreateRevision int64 // 最小创建版本
MaxCreateRevision int64 // 最大创建版本
}
// PutRequest 写入请求
type PutRequest struct {
Key []byte // 键
Value []byte // 值
Lease int64 // 租约ID
PrevKv bool // 返回前一个值
IgnoreValue bool // 忽略值
IgnoreLease bool // 忽略租约
}
// WatchRequest 监听请求
type WatchRequest struct {
RequestUnion isWatchRequest_RequestUnion
}
type WatchRequest_CreateRequest struct {
CreateRequest *WatchCreateRequest
}
type WatchCreateRequest struct {
Key []byte // 监听的键
RangeEnd []byte // 范围结束键
StartRevision int64 // 开始版本号
ProgressNotify bool // 进度通知
Filters []WatchCreateRequest_FilterType // 过滤器
PrevKv bool // 返回前一个值
WatchId int64 // Watch ID
Fragment bool // 分片发送
}
5.2 内部处理结构
// InternalRaftRequest 内部 Raft 请求
type InternalRaftRequest struct {
Header *RequestHeader
ID uint64
// KV 操作
Range *RangeRequest
Put *PutRequest
DeleteRange *DeleteRangeRequest
Txn *TxnRequest
Compaction *CompactionRequest
// Lease 操作
LeaseGrant *LeaseGrantRequest
LeaseRevoke *LeaseRevokeRequest
// Auth 操作
AuthEnable *AuthEnableRequest
AuthDisable *AuthDisableRequest
Authenticate *AuthenticateRequest
// 其他操作...
}
// serverWatchStream Watch 流处理器
type serverWatchStream struct {
lg *zap.Logger
clusterID types.ID
memberID types.ID
maxRequestBytes int
sg etcdserver.RaftStatusGetter
watchable mvcc.WatchableKV
ag AuthGetter
gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse
// 状态跟踪
mu sync.RWMutex
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
fragment map[mvcc.WatchID]bool
closec chan struct{}
wg sync.WaitGroup
}
6. 总结
etcd 的 API 设计具有以下特点:
- 分层架构:gRPC API → EtcdServer → Raft/MVCC → Backend,职责清晰
- 一致性保证:通过 ReadIndex 机制保证线性一致性读
- 事务支持:完整的 ACID 事务语义
- 流式处理:Watch 和 Lease KeepAlive 使用双向流
- 错误处理:完善的错误码和错误处理机制
- 性能优化:批量操作、并发读、写缓冲等优化手段
这种设计使得 etcd 能够提供高性能、强一致性的分布式键值存储服务,满足各种分布式系统的需求。