Consul 源码剖析 - Catalog 服务目录模块

1. 模块概览

1.1 职责定义

Catalog 模块是 Consul 的服务注册与发现核心,维护整个集群的服务目录。该模块负责管理节点(Node)、服务(Service)及其元数据,提供服务发现查询接口。

核心职责

  • 节点注册与注销(Node Registration/Deregistration)
  • 服务注册与注销(Service Registration/Deregistration)
  • 服务查询与过滤(按名称、标签、健康状态)
  • 服务元数据管理(Tags、Meta、Address、Port等)
  • 节点列表查询
  • 数据中心内服务目录维护
  • 与 Health Check 模块集成(仅返回健康服务)

1.2 输入与输出

输入

  • 节点和服务注册请求(通常由 Agent 发起)
  • 服务查询请求(DNS、HTTP、PreparedQuery)
  • 健康检查状态更新(Health 模块)

输出

  • 服务实例列表(包含 IP、Port、Tags、Meta)
  • 节点列表
  • 服务名称列表
  • 服务详细信息(ServiceNode结构)

1.3 上下游依赖

上游调用方

  • Agent(服务注册、节点注册)
  • DNS 接口(服务发现查询)
  • HTTP API(/v1/catalog/*)
  • PreparedQuery(高级查询)
  • Internal 模块(内部查询)

下游依赖

  • State Store(服务和节点数据存储)
  • Raft(注册/注销操作通过共识复制)
  • Health Check 模块(健康状态过滤)
  • ACL 模块(权限校验)

1.4 生命周期

初始化阶段

  • Server 启动时自动注册自身为节点
  • 加载 State Store 中的历史服务数据
  • 启动 Anti-Entropy 同步任务(Agent 端)

运行时阶段

  • 接收注册/注销请求
  • 通过 Raft Apply 写入 State Store
  • 响应查询请求(支持 Blocking Query)
  • 周期性同步 Agent 本地状态到 Catalog(Anti-Entropy)

关闭阶段

  • Agent 关闭时注销本地服务和节点(如配置 leave_on_terminate=true

2. 模块架构图

flowchart TB
    subgraph "服务注册入口"
        AgentAPI[Agent HTTP API<br>/v1/agent/service/register]
        CatalogAPI[Catalog HTTP API<br>/v1/catalog/register]
    end
    
    subgraph "Catalog RPC 端点"
        Register[Register<br>注册节点/服务]
        Deregister[Deregister<br>注销节点/服务]
        ListServices[ListServices<br>列出所有服务]
        ServiceNodes[ServiceNodes<br>查询服务实例]
        NodeServices[NodeServices<br>查询节点服务]
    end
    
    subgraph "查询过滤"
        TagFilter[标签过滤]
        HealthFilter[健康状态过滤]
        ACLFilter[ACL 权限过滤]
        ExprFilter[表达式过滤]
    end
    
    subgraph "数据存储"
        StateStore[State Store<br>Memdb]
        NodesTable[nodes 表]
        ServicesTable[services 表]
        ChecksTable[checks 表]
    end
    
    subgraph "数据同步"
        AntiEntropy[Anti-Entropy<br>Agent 本地同步]
        Raft[Raft 共识]
    end
    
    AgentAPI --> Register
    CatalogAPI --> Register
    
    Register --> ACLFilter
    Deregister --> ACLFilter
    ServiceNodes --> ACLFilter
    
    ACLFilter --> Raft
    Raft --> StateStore
    
    StateStore --> NodesTable
    StateStore --> ServicesTable
    StateStore --> ChecksTable
    
    ServiceNodes --> TagFilter
    ServiceNodes --> HealthFilter
    ServiceNodes --> ExprFilter
    
    TagFilter --> StateStore
    HealthFilter --> ChecksTable
    ExprFilter --> StateStore
    
    AntiEntropy --> Register
    AntiEntropy --> Deregister

2.1 架构说明

组件职责

  1. Register RPC

    • 作用:注册节点和服务到 Catalog
    • 边界:必须在 Leader 执行,通过 Raft 复制
    • 状态:写入 State Store 的 nodes、services、checks 表
  2. ServiceNodes RPC

    • 作用:查询指定服务的所有实例
    • 边界:支持标签过滤、健康状态过滤、ACL 过滤
    • 状态:从 State Store 读取,支持 Blocking Query
  3. Anti-Entropy

    • 作用:Agent 本地状态与 Catalog 的定期同步
    • 边界:每 60 秒(默认)全量对比,修复差异
    • 状态:无状态,基于差异计算触发 Register/Deregister

关键决策点

  • 注册位置:服务可以通过 Agent API 或直接通过 Catalog API 注册,Agent API 更常用
  • 健康过滤:ServiceNodes 查询可选择过滤不健康实例(filter="Checks.Status != critical"
  • Blocking Query:客户端可阻塞等待服务列表变化,减少轮询开销

边界条件

  • 并发注册:同一节点的多个服务可并发注册,通过 Raft 串行化
  • 注销幂等:重复注销不报错
  • 服务 ID 唯一性:单节点内 ServiceID 必须唯一
  • 超时:注册/注销操作超时 60 秒(RPC 超时)

异常处理

  • 节点不存在:注册服务时自动创建节点
  • 服务不存在:注销时忽略不存在的服务
  • 健康检查缺失:服务可不关联健康检查(默认视为健康)
  • ACL 拒绝:返回权限错误,操作不执行

性能要点

  • 查询缓存:DNS 和 HTTP 查询支持客户端缓存(TTL)
  • 索引优化:State Store 按服务名、节点名、标签建立索引
  • Blocking Query:减少无效轮询,P95 延迟 < 10ms
  • 批量查询:PreparedQuery 支持批量查询多个服务

3. 关键数据结构

3.1 核心数据结构类图

classDiagram
    class Node {
        +string ID
        +string Node
        +string Address
        +string Datacenter
        +map~string,string~ TaggedAddresses
        +map~string,string~ Meta
        +uint64 CreateIndex
        +uint64 ModifyIndex
    }
    
    class NodeService {
        +string ID
        +string Service
        +[]string Tags
        +map~string,string~ Meta
        +string Address
        +int Port
        +ServiceConnect Connect
        +int Weights
        +string Kind
        +ServiceProxy Proxy
        +uint64 CreateIndex
        +uint64 ModifyIndex
    }
    
    class ServiceNode {
        +string ID
        +string Node
        +string Address
        +string ServiceID
        +string ServiceName
        +[]string ServiceTags
        +string ServiceAddress
        +int ServicePort
        +map~string,string~ ServiceMeta
        +uint64 CreateIndex
        +uint64 ModifyIndex
    }
    
    class RegisterRequest {
        +string Datacenter
        +types.NodeID ID
        +string Node
        +string Address
        +map~string,string~ TaggedAddresses
        +map~string,string~ NodeMeta
        +NodeService Service
        +HealthCheck Check
        +[]HealthCheck Checks
        +bool SkipNodeUpdate
        +string Token
    }
    
    class DeregisterRequest {
        +string Datacenter
        +string Node
        +string ServiceID
        +string CheckID
        +string Token
    }
    
    class ServiceSpecificRequest {
        +string Datacenter
        +string ServiceName
        +[]string ServiceTags
        +string ServiceAddress
        +bool TagFilter
        +bool Connect
        +string PeerName
        +QueryOptions
    }
    
    class IndexedServiceNodes {
        +ServiceNodes ServiceNodes
        +QueryMeta
    }
    
    Node "1" --> "*" NodeService : has services
    ServiceNode --> Node : references
    ServiceNode --> NodeService : references
    RegisterRequest --> Node : creates/updates
    RegisterRequest --> NodeService : creates/updates
    ServiceSpecificRequest --> IndexedServiceNodes : queries

3.2 数据结构详细说明

3.2.1 RegisterRequest(注册请求)

字段说明

字段 类型 必填 说明 约束
Datacenter string 目标数据中心 默认本地 DC
ID types.NodeID 节点 UUID 可选,首次注册生成
Node string 节点名称 唯一标识节点
Address string 节点 IP 地址 必须可路由
TaggedAddresses map[string]string 带标签的地址 如 “lan”: “10.0.1.10”, “wan”: “203.0.113.1”
NodeMeta map[string]string 节点元数据 键值对,用于过滤
Service *NodeService 要注册的服务 可选,仅注册节点时为 nil
Check *HealthCheck 单个健康检查 已废弃,使用 Checks
Checks []HealthCheck 健康检查列表 可关联服务或节点
SkipNodeUpdate bool 是否跳过节点更新 true 时仅更新服务

3.2.2 NodeService(服务定义)

字段说明

字段 类型 必填 说明 约束
ID string 服务实例 ID 节点内唯一
Service string 服务名称 服务发现的键
Tags []string 服务标签 用于分类和过滤
Meta map[string]string 服务元数据 键值对
Address string 服务地址 默认使用节点地址
Port int 服务端口 1-65535
Kind string 服务类型 “connect-proxy”, “mesh-gateway” 等
Connect ServiceConnect Connect 配置 服务网格设置
Proxy ServiceProxy 代理配置 Sidecar 配置
Weights int 权重 DNS 轮询权重

3.2.3 ServiceNode(服务实例)

字段说明

ServiceNode 是查询结果的核心结构,包含节点和服务的组合信息。

字段 类型 说明
ID string 节点 ID
Node string 节点名称
Address string 节点地址
Datacenter string 数据中心
ServiceID string 服务实例 ID
ServiceName string 服务名称
ServiceTags []string 服务标签
ServiceAddress string 服务地址(如与节点不同)
ServicePort int 服务端口
ServiceMeta map[string]string 服务元数据
ServiceWeights Weights 服务权重

4. 核心 API 详细规格

4.1 Register - 注册节点和服务

4.1.1 基本信息

  • 名称Catalog.Register
  • 协议:RPC 方法
  • HTTP 映射PUT /v1/catalog/register
  • 幂等性:是(相同节点/服务覆盖)

4.1.2 请求结构体

type RegisterRequest struct {
    Datacenter      string                 // 目标数据中心
    ID              types.NodeID           // 节点 UUID
    Node            string                 // 节点名称
    Address         string                 // 节点 IP
    TaggedAddresses map[string]string      // LAN/WAN 地址
    NodeMeta        map[string]string      // 节点元数据
    Service         *NodeService           // 要注册的服务
    Check           *HealthCheck           // 单个健康检查(废弃)
    Checks          HealthChecks           // 健康检查列表
    SkipNodeUpdate  bool                   // 是否跳过节点更新
    Token           string                 // ACL Token
    WriteRequest                           // 包含 Namespace 等
}

字段表

字段 类型 必填 默认 约束 说明
Node string - 1-256 字符 节点唯一名称
Address string - 有效 IP 地址 节点可路由地址
Service *NodeService nil - 省略时仅注册节点
Service.ID string 条件 同 Service - 服务时必填,节点内唯一
Service.Service string 条件 - 1-256 字符 服务名称,服务时必填
Service.Port int 条件 0 1-65535 服务端口,服务时必填
Checks []HealthCheck [] - 健康检查列表

4.1.3 响应结构体

// 返回空结构体,操作成功无返回值
type struct{} 

4.1.4 入口函数与核心代码

func (c *Catalog) Register(
    args *structs.RegisterRequest,
    reply *struct{},
) error {
    // 1. 转发到 Leader
    if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done {
        return err
    }
    
    // 2. ACL 权限校验
    authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
    if err != nil {
        return err
    }
    
    // 3. 验证请求字段
    if err := nodePreApply(args.Node, string(args.ID)); err != nil {
        return err
    }
    if args.Address == "" && !args.SkipNodeUpdate {
        return fmt.Errorf("Must provide address if SkipNodeUpdate is not set")
    }
    
    // 4. 验证服务字段
    if args.Service != nil {
        if err := servicePreApply(args.Service, authz, args.Service.FillAuthzContext); err != nil {
            return err
        }
    }
    
    // 5. 规范化健康检查
    if args.Check != nil {
        args.Checks = append(args.Checks, args.Check)
        args.Check = nil
    }
    for _, check := range args.Checks {
        if check.Node == "" {
            check.Node = args.Node
        }
        checkPreApply(check)
    }
    
    // 6. ACL 权限验证
    state := c.srv.fsm.State()
    _, ns, err := state.NodeServices(nil, args.Node, &args.EnterpriseMeta, args.PeerName)
    if err := vetRegisterWithACL(authz, args, ns); err != nil {
        return err
    }
    
    // 7. Raft Apply
    _, err = c.srv.raftApply(structs.RegisterRequestType, args)
    return err
}

核心逻辑

  1. 步骤 1-2:转发到 Leader,解析 ACL Token
  2. 步骤 3-5:验证节点名称、地址、服务字段、健康检查字段
  3. 步骤 6:根据 ACL 策略验证权限:
    • 注册节点需要 node:write 权限
    • 注册服务需要 service:write 权限
    • 关联健康检查需要对应资源的 write 权限
  4. 步骤 7:通过 Raft Apply 持久化,FSM 处理时:
    • 创建或更新 nodes 表记录
    • 创建或更新 services 表记录
    • 创建或更新 checks 表记录
    • 更新索引(按节点名、服务名、标签)

4.1.5 调用链路

HTTP PUT /v1/catalog/register
agent/http/catalog.go: catalogRegister()
agent/rpc.go: RPC("Catalog.Register", ...)
agent/consul/catalog_endpoint.go: Catalog.Register()
agent/consul/server.go: raftApply(RegisterRequestType, ...)
agent/consul/fsm/fsm.go: FSM.Apply()
agent/consul/fsm/commands.go: applyRegister()
agent/consul/state/catalog.go: EnsureRegistration()
State Store 更新 nodes, services, checks 表

4.1.6 时序图

sequenceDiagram
    autonumber
    participant Agent as Consul Agent
    participant HTTP as HTTP API
    participant RPC as Catalog RPC
    participant ACL as ACL Resolver
    participant Leader as Raft Leader
    participant FSM as FSM
    participant State as State Store

    Note over Agent,State: 服务注册流程
    
    Agent->>HTTP: PUT /v1/catalog/register<br>{"Node": "node1", "Service": {...}}
    HTTP->>RPC: Catalog.Register(request)
    
    alt 非 Leader
        RPC->>Leader: 转发到 Leader
    end
    
    RPC->>ACL: ResolveToken(token)
    ACL-->>RPC: Authorizer
    
    RPC->>RPC: 验证节点字段<br>验证服务字段<br>验证健康检查字段
    
    RPC->>State: NodeServices(node)
    State-->>RPC: 现有服务列表
    
    RPC->>ACL: 检查 node:write 和 service:write 权限
    ACL-->>RPC: Allow / Deny
    
    alt 权限不足
        RPC-->>Agent: 403 Permission Denied
    end
    
    RPC->>Leader: raftApply(RegisterRequestType, request)
    Leader->>FSM: Apply Raft Log
    
    FSM->>State: EnsureRegistration(node, service, checks)
    State->>State: 更新 nodes 表<br>更新 services 表<br>更新 checks 表<br>更新索引
    State-->>FSM: 成功
    
    FSM-->>Leader: 成功
    Leader-->>RPC: 成功
    RPC-->>HTTP: 成功
    HTTP-->>Agent: 200 OK
    
    Note over State: 服务现在可被查询

4.2 ServiceNodes - 查询服务实例

4.2.1 基本信息

  • 名称Catalog.ServiceNodes
  • 协议:RPC 方法
  • HTTP 映射GET /v1/catalog/service/:service
  • 幂等性:是(只读操作)

4.2.2 请求结构体

type ServiceSpecificRequest struct {
    Datacenter     string        // 目标数据中心
    ServiceName    string        // 服务名称
    ServiceTags    []string      // 标签过滤(废弃,使用 Filter)
    ServiceTag     string        // 单个标签(废弃)
    ServiceAddress string        // 服务地址过滤
    TagFilter      bool          // 是否启用标签过滤
    Connect        bool          // 是否查询 Connect 代理
    PeerName       string        // Peering 对等连接名称
    QueryOptions               // 包含 Filter、Consistency、MinQueryIndex 等
}

字段表

字段 类型 必填 说明
ServiceName string 要查询的服务名称
Connect bool true 时查询该服务的 Connect 代理
Filter string bexpr 表达式过滤(如 “ServiceMeta.version == v1.0”)
MinQueryIndex uint64 Blocking Query,等待索引变化

4.2.3 响应结构体

type IndexedServiceNodes struct {
    ServiceNodes ServiceNodes   // 服务实例列表
    QueryMeta                   // 包含 Index、LastContact 等
}

type ServiceNode struct {
    ID                  string            // 节点 ID
    Node                string            // 节点名称
    Address             string            // 节点地址
    Datacenter          string            // 数据中心
    TaggedAddresses     map[string]string // LAN/WAN 地址
    NodeMeta            map[string]string // 节点元数据
    ServiceID           string            // 服务实例 ID
    ServiceName         string            // 服务名称
    ServiceTags         []string          // 服务标签
    ServiceAddress      string            // 服务地址
    ServicePort         int               // 服务端口
    ServiceMeta         map[string]string // 服务元数据
    ServiceWeights      Weights           // 权重
    ServiceKind         string            // 服务类型
    ServiceProxy        ServiceProxy      // 代理配置
    Checks              HealthChecks      // 健康检查列表
    CreateIndex         uint64            // 创建索引
    ModifyIndex         uint64            // 修改索引
}

4.2.4 入口函数与核心代码

func (c *Catalog) ServiceNodes(
    args *structs.ServiceSpecificRequest,
    reply *structs.IndexedServiceNodes,
) error {
    // 1. 转发到合适的 Server(可以是 Follower)
    if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, reply); done {
        return err
    }
    
    // 2. 确定查询函数
    var f func(memdb.WatchSet, *state.Store) (uint64, structs.ServiceNodes, error)
    
    if args.Connect {
        // 查询 Connect 代理
        f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
            return s.ConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
        }
    } else {
        // 普通服务查询
        f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
            if args.ServiceAddress != "" {
                return s.ServiceAddressNodes(ws, args.ServiceAddress, &args.EnterpriseMeta, args.PeerName)
            }
            if args.TagFilter {
                return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTags, &args.EnterpriseMeta, args.PeerName)
            }
            return s.ServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
        }
    }
    
    // 3. ACL 权限校验
    authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
    if err != nil {
        return err
    }
    
    // 4. 准备 bexpr 过滤器
    filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
    if err != nil {
        return err
    }
    
    // 5. Blocking Query
    return c.srv.blockingQuery(
        &args.QueryOptions,
        &reply.QueryMeta,
        func(ws memdb.WatchSet, state *state.Store) error {
            // 执行查询
            index, serviceNodes, err := f(ws, state)
            if err != nil {
                return err
            }
            
            reply.Index = index
            reply.ServiceNodes = serviceNodes
            
            // 6. ACL 过滤
            c.srv.filterACL(args.Token, reply)
            
            // 7. bexpr 表达式过滤
            if filter != nil {
                raw, err := filter.Execute(reply.ServiceNodes)
                if err != nil {
                    return err
                }
                reply.ServiceNodes = raw.(structs.ServiceNodes)
            }
            
            // 8. 按距离排序(如提供 Source)
            if args.Source.Node != "" {
                return c.srv.sortNodesByDistanceFrom(args.Source, reply.ServiceNodes)
            }
            
            return nil
        },
    )
}

核心逻辑

  1. 步骤 1-2:确定查询类型(普通服务 vs Connect 代理 vs 标签过滤)
  2. 步骤 3-4:解析 ACL Token,准备表达式过滤器
  3. 步骤 5:进入 Blocking Query 循环,支持长轮询
  4. 步骤 6:执行 State Store 查询,获取服务实例列表
  5. 步骤 7:ACL 过滤(移除无权限的服务)
  6. 步骤 8:bexpr 表达式过滤(如 ServiceMeta.version == "v1.0"
  7. 步骤 9:按网络距离排序(基于 Coordinate 模块)

4.2.5 时序图

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant HTTP as HTTP API
    participant RPC as Catalog RPC
    participant ACL as ACL Resolver
    participant State as State Store
    participant Coordinate as Coordinate 模块

    Client->>HTTP: GET /v1/catalog/service/web?filter=...
    HTTP->>RPC: Catalog.ServiceNodes(request)
    
    RPC->>ACL: ResolveToken(token)
    ACL-->>RPC: Authorizer
    
    RPC->>RPC: 准备查询函数<br>准备 bexpr 过滤器
    
    loop Blocking Query
        RPC->>State: ServiceNodes(ws, "web", ...)
        State->>State: 查询 services 表<br>关联 nodes 表<br>关联 checks 表
        State-->>RPC: ServiceNodes[] + Index
        
        RPC->>ACL: 过滤无权限服务
        ACL-->>RPC: 过滤后列表
        
        RPC->>RPC: 应用 bexpr 表达式过滤
        
        alt 有 Source 参数
            RPC->>Coordinate: sortNodesByDistanceFrom(source, nodes)
            Coordinate-->>RPC: 按距离排序的列表
        end
        
        alt Index 变化或超时
            break 退出循环
                RPC-->>HTTP: ServiceNodes[] + Index
            end
        end
        
        Note over RPC,State: 等待 Index 变化(最长 5 分钟)
    end
    
    HTTP-->>Client: 200 OK + JSON<br>[{"Node": "node1", "ServicePort": 8080, ...}]

4.3 ListServices - 列出所有服务

4.3.1 基本信息

  • 名称Catalog.ListServices
  • 协议:RPC 方法
  • HTTP 映射GET /v1/catalog/services
  • 幂等性:是

4.3.2 请求结构体

type DCSpecificRequest struct {
    Datacenter string        // 目标数据中心
    PeerName   string        // Peering 对等连接名称
    acl.EnterpriseMeta
    QueryOptions             // Filter, MinQueryIndex 等
}

4.3.3 响应结构体

type IndexedServices struct {
    Services map[string][]string   // 服务名 -> 标签列表
    QueryMeta
}

示例响应

{
  "Services": {
    "consul": [],
    "web": ["v1", "production"],
    "api": ["v2", "staging"],
    "redis": ["cache"]
  },
  "Index": 12345
}

4.3.4 核心代码

func (c *Catalog) ListServices(
    args *structs.DCSpecificRequest,
    reply *structs.IndexedServices,
) error {
    // 1. 转发(可以在 Follower 执行)
    if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, reply); done {
        return err
    }
    
    // 2. ACL 权限校验
    authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
    if err != nil {
        return err
    }
    
    // 3. 准备过滤器
    filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
    if err != nil {
        return err
    }
    
    // 4. Blocking Query
    return c.srv.blockingQuery(
        &args.QueryOptions,
        &reply.QueryMeta,
        func(ws memdb.WatchSet, state *state.Store) error {
            var (
                index    uint64
                services structs.Services
                err      error
            )
            
            // 查询所有服务
            index, services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
            if err != nil {
                return err
            }
            
            reply.Index, reply.Services = index, services
            
            // ACL 过滤(移除无权限的服务)
            c.srv.filterACL(args.Token, reply)
            
            // bexpr 过滤
            if filter != nil {
                // ListServices 不支持复杂过滤,仅支持按服务名
            }
            
            return nil
        },
    )
}

4.4 Deregister - 注销节点/服务

4.4.1 基本信息

  • 名称Catalog.Deregister
  • 协议:RPC 方法
  • HTTP 映射PUT /v1/catalog/deregister
  • 幂等性:是(重复注销成功)

4.4.2 请求结构体

type DeregisterRequest struct {
    Datacenter string           // 目标数据中心
    Node       string           // 节点名称
    ServiceID  string           // 要注销的服务 ID(可选)
    CheckID    types.CheckID    // 要注销的健康检查 ID(可选)
    PeerName   string           // Peering 名称
    acl.EnterpriseMeta
    WriteRequest
}

字段表

字段 类型 必填 说明
Node string 节点名称
ServiceID string 省略时注销整个节点
CheckID types.CheckID 省略时注销服务的所有检查

注销逻辑

  • 仅提供 Node:注销节点及其所有服务和检查
  • 提供 Node + ServiceID:仅注销指定服务及其检查
  • 提供 Node + CheckID:仅注销指定检查

4.4.3 核心代码

func (c *Catalog) Deregister(
    args *structs.DeregisterRequest,
    reply *struct{},
) error {
    // 1. 转发到 Leader
    if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, reply); done {
        return err
    }
    
    // 2. ACL 权限校验
    authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
    if err != nil {
        return err
    }
    
    // 3. 验证权限
    state := c.srv.fsm.State()
    _, ns, err := state.NodeServices(nil, args.Node, &args.EnterpriseMeta, args.PeerName)
    if err != nil {
        return err
    }
    
    // 注销节点需要 node:write
    if authz.NodeWrite(args.Node, nil) != acl.Allow {
        return acl.ErrPermissionDenied
    }
    
    // 注销服务需要 service:write
    if args.ServiceID != "" && ns != nil {
        if svc := ns.Services[args.ServiceID]; svc != nil {
            if authz.ServiceWrite(svc.Service, nil) != acl.Allow {
                return acl.ErrPermissionDenied
            }
        }
    }
    
    // 4. Raft Apply
    _, err = c.srv.raftApply(structs.DeregisterRequestType, args)
    return err
}

5. 关键功能实现分析

5.1 Anti-Entropy 状态同步

5.1.1 功能概述

Anti-Entropy 是 Agent 端的后台任务,定期将本地状态(服务、健康检查)与 Catalog 同步,修复不一致性。

触发场景

  • 网络分区恢复后
  • Agent 重启后
  • 服务注册/注销失败后

同步周期:默认 60 秒(sync_coordinate_interval_min

5.1.2 核心逻辑

func (a *Agent) syncChanges() {
    // 1. 读取 Catalog 中的节点服务列表
    remote, err := a.delegate.GetNodeServices(a.config.NodeName)
    if err != nil {
        return
    }
    
    // 2. 对比本地服务
    local := a.State.Services()
    
    // 3. 计算差异
    toRegister := []*structs.NodeService{}
    toDeregister := []string{}
    
    for id, localService := range local {
        remoteService := remote[id]
        if remoteService == nil {
            // 本地存在,远程不存在 -> 注册
            toRegister = append(toRegister, localService)
        } else if !localService.IsSame(remoteService) {
            // 本地与远程不一致 -> 重新注册
            toRegister = append(toRegister, localService)
        }
    }
    
    for id := range remote {
        if local[id] == nil {
            // 远程存在,本地不存在 -> 注销
            toDeregister = append(toDeregister, id)
        }
    }
    
    // 4. 批量注册
    for _, svc := range toRegister {
        req := &structs.RegisterRequest{
            Node:    a.config.NodeName,
            Address: a.config.AdvertiseAddr,
            Service: svc,
        }
        a.RPC("Catalog.Register", req, &struct{}{})
    }
    
    // 5. 批量注销
    for _, id := range toDeregister {
        req := &structs.DeregisterRequest{
            Node:      a.config.NodeName,
            ServiceID: id,
        }
        a.RPC("Catalog.Deregister", req, &struct{}{})
    }
}

6. 配置与最佳实践

6.1 关键配置项

配置项 默认值 说明 调优建议
sync_coordinate_interval_min 15s 最小同步间隔 默认合理
anti_entropy_interval 60s Anti-Entropy 周期 网络不稳定时可降低到 30s
catalog_batch_read 1024 批量查询大小 默认合理

6.2 最佳实践

1. 服务注册

  • 优先使用 Agent API(/v1/agent/service/register)而非 Catalog API
  • ServiceID 建议包含节点信息(如 web-node1),避免冲突
  • 为每个服务配置健康检查,确保故障快速检测

2. 服务查询

  • 使用 Blocking Query 减少轮询开销
  • 使用 bexpr 过滤器按元数据过滤(如 ServiceMeta.version == "v2"
  • 使用 Connect 查询获取服务网格代理

3. 标签规范

  • 标签用于分类和路由(如 production, v1, canary
  • 元数据用于扩展信息(如 version: 1.2.3, region: us-west

6.3 性能指标

典型延迟

操作 P50 P95 P99
服务注册 20ms 40ms 80ms
服务查询(本地缓存) 2ms 5ms 10ms
服务查询(Blocking Query) 1-60s 60s 300s

吞吐量

  • 注册 TPS:100-200(受 Raft 限制)
  • 查询 QPS:10,000+(Follower 可分担)