etcd Server 模块深入分析
EtcdServer 是 etcd 的核心模块,负责协调各个子系统的工作,处理客户端请求,管理集群状态。本文将深入分析 Server 模块的架构设计、启动流程和关键实现。
1. Server 模块架构
1.1 整体架构图
graph TB
subgraph "EtcdServer Core"
ES[EtcdServer]
CONFIG[ServerConfig]
BOOTSTRAP[Bootstrap]
end
subgraph "Request Processing"
GRPC[gRPC Handlers]
INTERCEPTOR[Interceptors]
APPLIER[UberApplier]
DISPATCHER[Request Dispatcher]
end
subgraph "Consensus Layer"
RAFT[RaftNode]
TRANSPORT[RaftTransport]
WAL[Write-Ahead Log]
SNAPSHOT[Snapshotter]
end
subgraph "Storage Layer"
MVCC[MVCC Store]
BACKEND[Backend/BoltDB]
LESSOR[Lessor]
AUTH[AuthStore]
end
subgraph "Cluster Management"
CLUSTER[RaftCluster]
MEMBERSHIP[Membership]
STATS[Statistics]
end
subgraph "Monitoring & Control"
METRICS[Metrics]
HEALTH[Health Check]
ALARM[Alarm Store]
NOTIFIER[Notifiers]
end
ES --> CONFIG
ES --> BOOTSTRAP
ES --> GRPC
ES --> RAFT
ES --> MVCC
ES --> CLUSTER
ES --> METRICS
GRPC --> INTERCEPTOR
INTERCEPTOR --> APPLIER
APPLIER --> DISPATCHER
RAFT --> TRANSPORT
RAFT --> WAL
RAFT --> SNAPSHOT
MVCC --> BACKEND
MVCC --> LESSOR
MVCC --> AUTH
CLUSTER --> MEMBERSHIP
CLUSTER --> STATS
METRICS --> HEALTH
METRICS --> ALARM
METRICS --> NOTIFIER
1.2 核心数据结构
// server/etcdserver/server.go
type EtcdServer struct {
// 原子操作字段 - 保持 64 位对齐
inflightSnapshots atomic.Int64 // 正在进行的快照数量
appliedIndex atomic.Uint64 // 已应用的日志索引
committedIndex atomic.Uint64 // 已提交的日志索引
term atomic.Uint64 // 当前任期
lead atomic.Uint64 // 当前领导者ID
// 一致性索引管理
consistIndex cindex.ConsistentIndexer
// Raft 节点
r raftNode
// 通道和同步
readych chan struct{} // 就绪通知
readwaitc chan struct{} // 读等待通知
readNotifier *notifier // 读通知器
stop chan struct{} // 停止信号
stopping chan struct{} // 正在停止
done chan struct{} // 完成信号
leaderChanged *notify.Notifier // 领导者变更通知
// 配置和日志
Cfg config.ServerConfig
lgMu *sync.RWMutex
lg *zap.Logger
// 等待机制
w wait.Wait // 请求等待
applyWait wait.WaitTime // 应用等待
// 集群管理
memberID types.ID
attributes membership.Attributes
cluster *membership.RaftCluster
// 存储相关
v2store v2store.Store
snapshotter *snap.Snapshotter
be backend.Backend
lessor lease.Lessor
kv mvcc.ConsistentWatchableKV
authStore auth.AuthStore
// 应用器
uberApply apply.UberApplier
// 统计和监控
stats *stats.ServerStats
lstats *stats.LeaderStats
// 其他组件
compactor mvcc.Compactor
peerRt http.RoundTripper
reqIDGen *idutil.Generator
AccessController *AccessController
}
2. 启动流程分析
2.1 启动时序图
sequenceDiagram
participant Main
participant Bootstrap
participant EtcdServer
participant RaftNode
participant Backend
participant MVCC
participant Lessor
participant AuthStore
participant Compactor
Main->>Bootstrap: bootstrap(cfg)
Bootstrap->>Bootstrap: TouchDirAll(dataDir)
Bootstrap->>Backend: bootstrapBackend()
Bootstrap->>Bootstrap: bootstrapWAL()
Bootstrap->>Bootstrap: bootstrapCluster()
Bootstrap->>Bootstrap: bootstrapStorage()
Bootstrap->>Bootstrap: bootstrapRaft()
Bootstrap-->>Main: bootstrappedServer
Main->>EtcdServer: NewServer(cfg)
EtcdServer->>EtcdServer: initialize fields
EtcdServer->>Lessor: NewLessor()
EtcdServer->>MVCC: NewStore()
EtcdServer->>AuthStore: NewAuthStore()
EtcdServer->>Compactor: NewCompactor()
EtcdServer-->>Main: EtcdServer
Main->>EtcdServer: start()
EtcdServer->>EtcdServer: initialize channels
EtcdServer->>RaftNode: start()
EtcdServer->>EtcdServer: run() // 主循环
Note over EtcdServer: Server ready to serve
2.2 Bootstrap 过程
// server/etcdserver/bootstrap.go
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
// 1. 检查和创建数据目录
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %w", terr)
}
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %w", terr)
}
// 2. 初始化快照管理器
ss := bootstrapSnapshot(cfg)
// 3. 初始化 Raft HTTP 传输
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
if err != nil {
return nil, err
}
// 4. 检查 WAL 是否存在
haveWAL := wal.Exist(cfg.WALDir())
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
// 5. 初始化后端存储
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
if err != nil {
return nil, err
}
// 6. 初始化 WAL
var bwal *bootstrappedWAL
if haveWAL {
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %w", err)
}
cfg.Logger.Info("Bootstrapping WAL from snapshot")
bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot, backend.ci)
}
// 7. 初始化集群
cfg.Logger.Info("bootstrapping cluster")
cluster, err := bootstrapCluster(cfg, bwal, prt)
if err != nil {
backend.Close()
return nil, err
}
// 8. 初始化存储
cfg.Logger.Info("bootstrapping storage")
s := bootstrapStorage(cfg, st, backend, bwal, cluster)
if err = cluster.Finalize(cfg, s); err != nil {
backend.Close()
return nil, err
}
// 9. 初始化 Raft
cfg.Logger.Info("bootstrapping raft")
raft := bootstrapRaft(cfg, cluster, s.wal)
return &bootstrappedServer{
prt: prt,
ss: ss,
storage: s,
cluster: cluster,
raft: raft,
}, nil
}
2.3 Server 初始化
// server/etcdserver/server.go
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// 1. 执行 bootstrap
b, err := bootstrap(cfg)
if err != nil {
cfg.Logger.Error("bootstrap failed", zap.Error(err))
return nil, err
}
cfg.Logger.Info("bootstrap successfully")
defer func() {
if err != nil {
b.Close()
}
}()
// 2. 创建统计对象
sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
// 3. 初始化 EtcdServer
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
lgMu: new(sync.RWMutex),
lg: cfg.Logger,
errorc: make(chan error, 1),
v2store: b.storage.st,
snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
memberID: b.cluster.nodeID,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.cluster.cl,
stats: sstats,
lstats: lstats,
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.storage.backend.ci,
firstCommitInTerm: notify.NewNotifier(),
clusterVersionChanged: notify.NewNotifier(),
}
// 4. 设置后端存储
srv.be = b.storage.backend.be
srv.beHooks = b.storage.backend.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
// 5. 初始化租约管理器
srv.lessor = lease.NewLessor(
srv.Logger(),
srv.be,
srv.cluster,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
// 6. 初始化 MVCC 存储
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
})
// 7. 初始化认证存储
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, srv.kv, cfg.BcryptCost)
// 8. 初始化压缩器
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger(), cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
return nil, err
}
srv.compactor.Run()
}
// 9. 初始化应用器
srv.uberApply = apply.NewUberApplier(
apply.ApplierOptions{
Logger: srv.Logger(),
KV: srv.kv,
Lessor: srv.lessor,
AuthStore: srv.authStore,
AlarmStore: srv.alarmStore,
Quota: srv.Cfg.QuotaBackendBytes,
WarningApplyDuration: cfg.WarningApplyDuration,
TxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
})
return srv, nil
}
3. 请求处理机制
3.1 请求处理架构
graph TB
subgraph "Request Flow"
CLIENT[Client Request]
GRPC[gRPC Handler]
INTERCEPTOR[Interceptors]
SERVER[EtcdServer]
RAFT[Raft Propose]
APPLY[Apply]
RESPONSE[Response]
end
subgraph "Interceptor Chain"
LOG[Log Interceptor]
AUTH[Auth Interceptor]
METRICS[Metrics Interceptor]
QUOTA[Quota Interceptor]
end
subgraph "Apply Chain"
UBER[UberApplier]
CORRUPT[CorruptApplier]
CAPPED[CappedApplier]
AUTHAPP[AuthApplier]
BACKEND[BackendApplier]
end
CLIENT --> GRPC
GRPC --> LOG
LOG --> AUTH
AUTH --> METRICS
METRICS --> QUOTA
QUOTA --> SERVER
SERVER --> RAFT
RAFT --> UBER
UBER --> CORRUPT
CORRUPT --> CAPPED
CAPPED --> AUTHAPP
AUTHAPP --> BACKEND
BACKEND --> RESPONSE
3.2 Raft 请求处理
// server/etcdserver/server.go
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
result, err := s.processInternalRaftRequestOnce(ctx, r)
if err != nil {
return nil, err
}
if result.Err != nil {
return nil, result.Err
}
return result.Resp, nil
}
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply.Result, error) {
// 1. 背压检查 - 防止 commit 和 apply 差距过大
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, errors.ErrTooManyRequests
}
// 2. 生成请求 ID
r.Header = &pb.RequestHeader{ID: s.reqIDGen.Next()}
// 3. 序列化请求
data, err := r.Marshal()
if err != nil {
return nil, err
}
// 4. 检查请求大小
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, errors.ErrRequestTooLarge
}
// 5. 注册等待通道
id := r.ID
if id == 0 {
id = r.Header.ID
}
ch := s.w.Register(id)
// 6. 提交 Raft 提案
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
err = s.r.Propose(cctx, data)
if err != nil {
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, err
}
proposalsPending.Inc()
defer proposalsPending.Dec()
// 7. 等待应用结果
select {
case x := <-ch:
return x.(*apply.Result), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.stopping:
return nil, ErrStopped
}
}
3.3 主运行循环
// server/etcdserver/server.go
func (s *EtcdServer) run() {
lg := s.Logger()
// 获取快照信息
sn, err := s.r.raftStorage.Snapshot()
if err != nil {
lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
}
// 创建调度器
sched := schedule.NewFIFOScheduler(lg)
// Raft 就绪处理器
rh := &raftReadyHandler{
getLead: func() (lead uint64) { return s.getLead() },
updateLead: func(lead uint64) { s.setLead(lead) },
updateLeadership: func(newLeader bool) {
if !s.isLeader() {
// 降级处理
if s.lessor != nil {
s.lessor.Demote()
}
if s.compactor != nil {
s.compactor.Pause()
}
} else {
// 升级处理
if newLeader {
t := time.Now()
s.leadTimeMu.Lock()
s.leadElectedTime = t
s.leadTimeMu.Unlock()
}
if s.compactor != nil {
s.compactor.Resume()
}
}
if newLeader {
s.leaderChanged.Notify()
}
if s.stats != nil {
s.stats.BecomeLeader()
}
},
updateCommittedIndex: func(ci uint64) {
cci := s.getCommittedIndex()
if ci > cci {
s.setCommittedIndex(ci)
}
},
}
// 启动 Raft 节点
s.r.start(rh)
// 初始化进度状态
ep := etcdProgress{
confState: sn.Metadata.ConfState,
diskSnapshotIndex: sn.Metadata.Index,
memorySnapshotIndex: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}
defer func() {
s.wgMu.Lock()
close(s.stopping)
s.wgMu.Unlock()
s.cancel()
sched.Stop()
s.wg.Wait()
s.r.stop()
s.Cleanup()
close(s.done)
}()
// 获取过期租约通道
var expiredLeaseC <-chan []*lease.Lease
if s.lessor != nil {
expiredLeaseC = s.lessor.ExpiredLeasesC()
}
// 主事件循环
for {
select {
case ap := <-s.r.apply():
// 处理 Raft 应用
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(schedule.NewJob("server_applyAll", f))
case leases := <-expiredLeaseC:
// 处理过期租约
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
// 处理错误
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-getSyncC():
// 同步处理
if s.v2store.HasTTLKeys() {
s.sync(s.Cfg.ReqTimeout())
}
case <-s.stop:
return
}
}
}
4. Apply 机制详解
4.1 UberApplier 架构
graph TB
subgraph "UberApplier Chain"
UBER[UberApplier]
CORRUPT[CorruptApplier]
CAPPED[CappedApplier]
AUTH[AuthApplier]
QUOTA[QuotaApplier]
BACKEND[BackendApplier]
end
subgraph "Request Types"
PUT[Put Request]
GET[Range Request]
DEL[Delete Request]
TXN[Txn Request]
LEASE[Lease Request]
AUTHREQ[Auth Request]
end
UBER --> CORRUPT
CORRUPT --> CAPPED
CAPPED --> AUTH
AUTH --> QUOTA
QUOTA --> BACKEND
PUT --> UBER
GET --> UBER
DEL --> UBER
TXN --> UBER
LEASE --> UBER
AUTHREQ --> UBER
4.2 UberApplier 实现
// server/etcdserver/apply/uber_applier.go
type UberApplier interface {
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result
}
type uberApplier struct {
lg *zap.Logger
alarmStore *v3alarm.AlarmStore
warningApplyDuration time.Duration
// 当前生效的应用器(考虑告警状态)
applyV3 applierV3
// 基础应用器
applyV3base applierV3
}
func NewUberApplier(opts ApplierOptions) UberApplier {
applyV3base := newApplierV3(opts)
ua := &uberApplier{
lg: opts.Logger,
alarmStore: opts.AlarmStore,
warningApplyDuration: opts.WarningApplyDuration,
applyV3: applyV3base,
applyV3base: applyV3base,
}
ua.restoreAlarms()
return ua
}
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
// 执行应用器链:CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend
// 然后分发到具体的方法:Put, Range, Delete 等
return a.applyV3.Apply(r, shouldApplyV3, a.dispatch)
}
// dispatch 将请求分发到具体的处理方法
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
op := "unknown"
ar := &Result{}
defer func(start time.Time) {
success := ar.Err == nil || errors.Is(ar.Err, mvcc.ErrCompacted)
txn.ApplySecObserve(v3Version, op, success, time.Since(start))
txn.WarnOfExpensiveRequest(a.lg, a.warningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.Resp, ar.Err)
if !success {
txn.WarnOfFailedRequest(a.lg, start, &pb.InternalRaftStringer{Request: r}, ar.Resp, ar.Err)
}
}(time.Now())
// 处理集群级别的请求
switch {
case r.ClusterVersionSet != nil:
op = "ClusterVersionSet"
a.applyV3.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return ar
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet"
a.applyV3.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
return ar
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet"
a.applyV3.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
return ar
}
if !shouldApplyV3 {
return nil
}
// 处理 V3 API 请求
switch {
case r.Range != nil:
op = "Range"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(r.Txn)
case r.Compaction != nil:
op = "Compaction"
ar.Resp, ar.Physc, ar.Trace, ar.Err = a.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
op = "LeaseGrant"
ar.Resp, ar.Err = a.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
op = "LeaseRevoke"
ar.Resp, ar.Err = a.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.Alarm != nil:
op = "Alarm"
ar.Resp, ar.Err = a.Alarm(r.Alarm)
case r.Authenticate != nil:
op = "Authenticate"
ar.Resp, ar.Err = a.applyV3.Authenticate(r.Authenticate)
case r.AuthEnable != nil:
op = "AuthEnable"
ar.Resp, ar.Err = a.applyV3.AuthEnable()
case r.AuthDisable != nil:
op = "AuthDisable"
ar.Resp, ar.Err = a.applyV3.AuthDisable()
// ... 其他认证相关请求
default:
lg.Panic("not implemented", zap.Stringer("request", r))
}
return ar
}
4.3 应用器链实现
// server/etcdserver/apply/apply.go
type applierV3 interface {
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result
Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
// 租约相关
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)
// 认证相关
Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
AuthEnable() (*pb.AuthEnableResponse, error)
AuthDisable() (*pb.AuthDisableResponse, error)
// ... 其他方法
}
// 后端应用器 - 链的最底层
type applierV3backend struct {
options ApplierOptions
}
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.options.Logger, a.options.Lessor, a.options.KV, p)
}
func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.options.Logger, a.options.KV, r)
}
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.options.Logger, a.options.KV, dr)
}
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.options.Logger, rt, a.options.TxnModeWriteWithSharedBuffer, a.options.KV, a.options.Lessor)
}
5. 线性一致性读实现
5.1 ReadIndex 机制
// server/etcdserver/v3_server.go
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
// 发送读通知
select {
case s.readwaitc <- struct{}{}:
default:
}
// 等待读索引确认
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}
// 线性读循环
func (s *EtcdServer) linearizableReadLoop() {
for {
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.leaderChanged.Receive()
select {
case <-leaderChangedNotifier:
continue // 领导者变更,重新开始
case <-s.readwaitc:
case <-s.stopping:
return
}
// 获取当前读索引
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if err != nil {
s.readMu.Lock()
s.readNotifier.notify(err)
s.readMu.Unlock()
continue
}
// 等待应用索引追上读索引
appliedIndex := s.getAppliedIndex()
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
// 通知读请求可以继续
s.readMu.Lock()
s.readNotifier.notify(nil)
s.readMu.Unlock()
}
}
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
for {
select {
case rs := <-s.r.readStateC:
if bytes.Equal(rs.RequestCtx, uint64ToBigEndianBytes(requestId)) {
return rs.Index, nil
}
case <-leaderChangedNotifier:
return 0, ErrLeaderChanged
case <-s.stopping:
return 0, ErrStopped
}
}
}
6. 关键监控指标
6.1 性能指标
// server/etcdserver/metrics.go
var (
// 提案相关指标
proposalsCommitted = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "proposals_committed_total",
Help: "The total number of consensus proposals committed.",
})
proposalsApplied = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "proposals_applied_total",
Help: "The total number of consensus proposals applied.",
})
proposalsPending = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "proposals_pending",
Help: "The current number of pending proposals to commit.",
})
proposalsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "proposals_failed_total",
Help: "The total number of failed proposals seen.",
})
// 领导者相关指标
leaderChanges = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "leader_changes_seen_total",
Help: "The number of leader changes seen.",
})
// 读索引相关指标
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "read_indexes_failed_total",
Help: "The total number of failed read indexes seen.",
})
// 慢请求指标
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
)
6.2 健康检查
// server/etcdserver/server.go
func (s *EtcdServer) IsHealthy() bool {
if s.Leader() == raft.None {
return false
}
if s.alarmStore.NOSPACE() {
return false
}
return true
}
func (s *EtcdServer) CheckHealth(ctx context.Context) error {
if s.Leader() == raft.None {
return errors.New("etcd cluster unavailable or unhealthy")
}
// 执行一个简单的读操作来检查健康状态
_, err := s.Range(ctx, &pb.RangeRequest{
Key: []byte("health"),
Serializable: true,
})
return err
}
7. 错误处理和恢复
7.1 错误类型
// server/etcdserver/errors/errors.go
var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrValueProvided = errors.New("etcdserver: value is provided")
ErrLeaseProvided = errors.New("etcdserver: lease is provided")
ErrTxnIDMismatch = errors.New("etcdserver: txn id mismatch")
ErrPermissionDenied = errors.New("etcdserver: permission denied")
ErrRoleNotFound = errors.New("etcdserver: role not found")
ErrUserNotFound = errors.New("etcdserver: user not found")
ErrAuthFailed = errors.New("etcdserver: authentication failed")
ErrAuthNotEnabled = errors.New("etcdserver: authentication is not enabled")
ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token")
ErrInvalidAuthMgmt = errors.New("etcdserver: invalid auth management")
)
7.2 恢复机制
// server/etcdserver/server.go
func (s *EtcdServer) Cleanup() {
// 关闭后端存储
if s.be != nil {
s.be.Close()
}
// 关闭租约管理器
if s.lessor != nil {
s.lessor.Stop()
}
// 关闭认证存储
if s.authStore != nil {
s.authStore.Close()
}
// 关闭压缩器
if s.compactor != nil {
s.compactor.Close()
}
// 关闭 KV 存储
if s.kv != nil {
s.kv.Close()
}
}
func (s *EtcdServer) Stop() {
select {
case s.stop <- struct{}{}:
case <-s.done:
return
}
<-s.done
}
8. 总结
EtcdServer 模块是 etcd 的核心,具有以下特点:
- 分层架构:清晰的分层设计,从 gRPC 接口到存储层
- 请求处理:完整的请求处理链,支持拦截器和应用器链
- 一致性保证:通过 ReadIndex 机制保证线性一致性读
- 错误处理:完善的错误处理和恢复机制
- 监控支持:丰富的监控指标和健康检查
- 高可用性:支持领导者选举和故障恢复
这种设计使得 EtcdServer 能够高效、可靠地处理各种客户端请求,同时保证数据的一致性和系统的高可用性。