Consul 源码剖析 - Catalog 服务目录模块
1. 模块概览
1.1 职责定义
Catalog 模块是 Consul 的服务注册与发现核心,维护整个集群的服务目录。该模块负责管理节点(Node)、服务(Service)及其元数据,提供服务发现查询接口。
核心职责:
- 节点注册与注销(Node Registration/Deregistration)
- 服务注册与注销(Service Registration/Deregistration)
- 服务查询与过滤(按名称、标签、健康状态)
- 服务元数据管理(Tags、Meta、Address、Port等)
- 节点列表查询
- 数据中心内服务目录维护
- 与 Health Check 模块集成(仅返回健康服务)
1.2 输入与输出
输入:
- 节点和服务注册请求(通常由 Agent 发起)
- 服务查询请求(DNS、HTTP、PreparedQuery)
- 健康检查状态更新(Health 模块)
输出:
- 服务实例列表(包含 IP、Port、Tags、Meta)
- 节点列表
- 服务名称列表
- 服务详细信息(ServiceNode结构)
1.3 上下游依赖
上游调用方:
- Agent(服务注册、节点注册)
- DNS 接口(服务发现查询)
- HTTP API(/v1/catalog/*)
- PreparedQuery(高级查询)
- Internal 模块(内部查询)
下游依赖:
- State Store(服务和节点数据存储)
- Raft(注册/注销操作通过共识复制)
- Health Check 模块(健康状态过滤)
- ACL 模块(权限校验)
1.4 生命周期
初始化阶段:
- Server 启动时自动注册自身为节点
- 加载 State Store 中的历史服务数据
- 启动 Anti-Entropy 同步任务(Agent 端)
运行时阶段:
- 接收注册/注销请求
- 通过 Raft Apply 写入 State Store
- 响应查询请求(支持 Blocking Query)
- 周期性同步 Agent 本地状态到 Catalog(Anti-Entropy)
关闭阶段:
- Agent 关闭时注销本地服务和节点(如配置
leave_on_terminate=true)
2. 模块架构图
flowchart TB
subgraph "服务注册入口"
AgentAPI[Agent HTTP API<br>/v1/agent/service/register]
CatalogAPI[Catalog HTTP API<br>/v1/catalog/register]
end
subgraph "Catalog RPC 端点"
Register[Register<br>注册节点/服务]
Deregister[Deregister<br>注销节点/服务]
ListServices[ListServices<br>列出所有服务]
ServiceNodes[ServiceNodes<br>查询服务实例]
NodeServices[NodeServices<br>查询节点服务]
end
subgraph "查询过滤"
TagFilter[标签过滤]
HealthFilter[健康状态过滤]
ACLFilter[ACL 权限过滤]
ExprFilter[表达式过滤]
end
subgraph "数据存储"
StateStore[State Store<br>Memdb]
NodesTable[nodes 表]
ServicesTable[services 表]
ChecksTable[checks 表]
end
subgraph "数据同步"
AntiEntropy[Anti-Entropy<br>Agent 本地同步]
Raft[Raft 共识]
end
AgentAPI --> Register
CatalogAPI --> Register
Register --> ACLFilter
Deregister --> ACLFilter
ServiceNodes --> ACLFilter
ACLFilter --> Raft
Raft --> StateStore
StateStore --> NodesTable
StateStore --> ServicesTable
StateStore --> ChecksTable
ServiceNodes --> TagFilter
ServiceNodes --> HealthFilter
ServiceNodes --> ExprFilter
TagFilter --> StateStore
HealthFilter --> ChecksTable
ExprFilter --> StateStore
AntiEntropy --> Register
AntiEntropy --> Deregister
2.1 架构说明
组件职责:
-
Register RPC:
- 作用:注册节点和服务到 Catalog
- 边界:必须在 Leader 执行,通过 Raft 复制
- 状态:写入 State Store 的 nodes、services、checks 表
-
ServiceNodes RPC:
- 作用:查询指定服务的所有实例
- 边界:支持标签过滤、健康状态过滤、ACL 过滤
- 状态:从 State Store 读取,支持 Blocking Query
-
Anti-Entropy:
- 作用:Agent 本地状态与 Catalog 的定期同步
- 边界:每 60 秒(默认)全量对比,修复差异
- 状态:无状态,基于差异计算触发 Register/Deregister
关键决策点:
- 注册位置:服务可以通过 Agent API 或直接通过 Catalog API 注册,Agent API 更常用
- 健康过滤:ServiceNodes 查询可选择过滤不健康实例(
filter="Checks.Status != critical") - Blocking Query:客户端可阻塞等待服务列表变化,减少轮询开销
边界条件:
- 并发注册:同一节点的多个服务可并发注册,通过 Raft 串行化
- 注销幂等:重复注销不报错
- 服务 ID 唯一性:单节点内 ServiceID 必须唯一
- 超时:注册/注销操作超时 60 秒(RPC 超时)
异常处理:
- 节点不存在:注册服务时自动创建节点
- 服务不存在:注销时忽略不存在的服务
- 健康检查缺失:服务可不关联健康检查(默认视为健康)
- ACL 拒绝:返回权限错误,操作不执行
性能要点:
- 查询缓存:DNS 和 HTTP 查询支持客户端缓存(TTL)
- 索引优化:State Store 按服务名、节点名、标签建立索引
- Blocking Query:减少无效轮询,P95 延迟 < 10ms
- 批量查询:PreparedQuery 支持批量查询多个服务
3. 关键数据结构
3.1 核心数据结构类图
classDiagram
class Node {
+string ID
+string Node
+string Address
+string Datacenter
+map~string,string~ TaggedAddresses
+map~string,string~ Meta
+uint64 CreateIndex
+uint64 ModifyIndex
}
class NodeService {
+string ID
+string Service
+[]string Tags
+map~string,string~ Meta
+string Address
+int Port
+ServiceConnect Connect
+int Weights
+string Kind
+ServiceProxy Proxy
+uint64 CreateIndex
+uint64 ModifyIndex
}
class ServiceNode {
+string ID
+string Node
+string Address
+string ServiceID
+string ServiceName
+[]string ServiceTags
+string ServiceAddress
+int ServicePort
+map~string,string~ ServiceMeta
+uint64 CreateIndex
+uint64 ModifyIndex
}
class RegisterRequest {
+string Datacenter
+types.NodeID ID
+string Node
+string Address
+map~string,string~ TaggedAddresses
+map~string,string~ NodeMeta
+NodeService Service
+HealthCheck Check
+[]HealthCheck Checks
+bool SkipNodeUpdate
+string Token
}
class DeregisterRequest {
+string Datacenter
+string Node
+string ServiceID
+string CheckID
+string Token
}
class ServiceSpecificRequest {
+string Datacenter
+string ServiceName
+[]string ServiceTags
+string ServiceAddress
+bool TagFilter
+bool Connect
+string PeerName
+QueryOptions
}
class IndexedServiceNodes {
+ServiceNodes ServiceNodes
+QueryMeta
}
Node "1" --> "*" NodeService : has services
ServiceNode --> Node : references
ServiceNode --> NodeService : references
RegisterRequest --> Node : creates/updates
RegisterRequest --> NodeService : creates/updates
ServiceSpecificRequest --> IndexedServiceNodes : queries
3.2 数据结构详细说明
3.2.1 RegisterRequest(注册请求)
字段说明:
| 字段 | 类型 | 必填 | 说明 | 约束 |
|---|---|---|---|---|
| Datacenter | string | 是 | 目标数据中心 | 默认本地 DC |
| ID | types.NodeID | 否 | 节点 UUID | 可选,首次注册生成 |
| Node | string | 是 | 节点名称 | 唯一标识节点 |
| Address | string | 是 | 节点 IP 地址 | 必须可路由 |
| TaggedAddresses | map[string]string | 否 | 带标签的地址 | 如 “lan”: “10.0.1.10”, “wan”: “203.0.113.1” |
| NodeMeta | map[string]string | 否 | 节点元数据 | 键值对,用于过滤 |
| Service | *NodeService | 否 | 要注册的服务 | 可选,仅注册节点时为 nil |
| Check | *HealthCheck | 否 | 单个健康检查 | 已废弃,使用 Checks |
| Checks | []HealthCheck | 否 | 健康检查列表 | 可关联服务或节点 |
| SkipNodeUpdate | bool | 否 | 是否跳过节点更新 | true 时仅更新服务 |
3.2.2 NodeService(服务定义)
字段说明:
| 字段 | 类型 | 必填 | 说明 | 约束 |
|---|---|---|---|---|
| ID | string | 是 | 服务实例 ID | 节点内唯一 |
| Service | string | 是 | 服务名称 | 服务发现的键 |
| Tags | []string | 否 | 服务标签 | 用于分类和过滤 |
| Meta | map[string]string | 否 | 服务元数据 | 键值对 |
| Address | string | 否 | 服务地址 | 默认使用节点地址 |
| Port | int | 是 | 服务端口 | 1-65535 |
| Kind | string | 否 | 服务类型 | “connect-proxy”, “mesh-gateway” 等 |
| Connect | ServiceConnect | 否 | Connect 配置 | 服务网格设置 |
| Proxy | ServiceProxy | 否 | 代理配置 | Sidecar 配置 |
| Weights | int | 否 | 权重 | DNS 轮询权重 |
3.2.3 ServiceNode(服务实例)
字段说明:
ServiceNode 是查询结果的核心结构,包含节点和服务的组合信息。
| 字段 | 类型 | 说明 |
|---|---|---|
| ID | string | 节点 ID |
| Node | string | 节点名称 |
| Address | string | 节点地址 |
| Datacenter | string | 数据中心 |
| ServiceID | string | 服务实例 ID |
| ServiceName | string | 服务名称 |
| ServiceTags | []string | 服务标签 |
| ServiceAddress | string | 服务地址(如与节点不同) |
| ServicePort | int | 服务端口 |
| ServiceMeta | map[string]string | 服务元数据 |
| ServiceWeights | Weights | 服务权重 |
4. 核心 API 详细规格
4.1 Register - 注册节点和服务
4.1.1 基本信息
- 名称:
Catalog.Register - 协议:RPC 方法
- HTTP 映射:
PUT /v1/catalog/register - 幂等性:是(相同节点/服务覆盖)
4.1.2 请求结构体
type RegisterRequest struct {
Datacenter string // 目标数据中心
ID types.NodeID // 节点 UUID
Node string // 节点名称
Address string // 节点 IP
TaggedAddresses map[string]string // LAN/WAN 地址
NodeMeta map[string]string // 节点元数据
Service *NodeService // 要注册的服务
Check *HealthCheck // 单个健康检查(废弃)
Checks HealthChecks // 健康检查列表
SkipNodeUpdate bool // 是否跳过节点更新
Token string // ACL Token
WriteRequest // 包含 Namespace 等
}
字段表:
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| Node | string | 是 | - | 1-256 字符 | 节点唯一名称 |
| Address | string | 是 | - | 有效 IP 地址 | 节点可路由地址 |
| Service | *NodeService | 否 | nil | - | 省略时仅注册节点 |
| Service.ID | string | 条件 | 同 Service | - | 服务时必填,节点内唯一 |
| Service.Service | string | 条件 | - | 1-256 字符 | 服务名称,服务时必填 |
| Service.Port | int | 条件 | 0 | 1-65535 | 服务端口,服务时必填 |
| Checks | []HealthCheck | 否 | [] | - | 健康检查列表 |
4.1.3 响应结构体
// 返回空结构体,操作成功无返回值
type struct{}
4.1.4 入口函数与核心代码
func (c *Catalog) Register(
args *structs.RegisterRequest,
reply *struct{},
) error {
// 1. 转发到 Leader
if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done {
return err
}
// 2. ACL 权限校验
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
// 3. 验证请求字段
if err := nodePreApply(args.Node, string(args.ID)); err != nil {
return err
}
if args.Address == "" && !args.SkipNodeUpdate {
return fmt.Errorf("Must provide address if SkipNodeUpdate is not set")
}
// 4. 验证服务字段
if args.Service != nil {
if err := servicePreApply(args.Service, authz, args.Service.FillAuthzContext); err != nil {
return err
}
}
// 5. 规范化健康检查
if args.Check != nil {
args.Checks = append(args.Checks, args.Check)
args.Check = nil
}
for _, check := range args.Checks {
if check.Node == "" {
check.Node = args.Node
}
checkPreApply(check)
}
// 6. ACL 权限验证
state := c.srv.fsm.State()
_, ns, err := state.NodeServices(nil, args.Node, &args.EnterpriseMeta, args.PeerName)
if err := vetRegisterWithACL(authz, args, ns); err != nil {
return err
}
// 7. Raft Apply
_, err = c.srv.raftApply(structs.RegisterRequestType, args)
return err
}
核心逻辑:
- 步骤 1-2:转发到 Leader,解析 ACL Token
- 步骤 3-5:验证节点名称、地址、服务字段、健康检查字段
- 步骤 6:根据 ACL 策略验证权限:
- 注册节点需要
node:write权限 - 注册服务需要
service:write权限 - 关联健康检查需要对应资源的
write权限
- 注册节点需要
- 步骤 7:通过 Raft Apply 持久化,FSM 处理时:
- 创建或更新 nodes 表记录
- 创建或更新 services 表记录
- 创建或更新 checks 表记录
- 更新索引(按节点名、服务名、标签)
4.1.5 调用链路
HTTP PUT /v1/catalog/register
↓
agent/http/catalog.go: catalogRegister()
↓
agent/rpc.go: RPC("Catalog.Register", ...)
↓
agent/consul/catalog_endpoint.go: Catalog.Register()
↓
agent/consul/server.go: raftApply(RegisterRequestType, ...)
↓
agent/consul/fsm/fsm.go: FSM.Apply()
↓
agent/consul/fsm/commands.go: applyRegister()
↓
agent/consul/state/catalog.go: EnsureRegistration()
↓
State Store 更新 nodes, services, checks 表
4.1.6 时序图
sequenceDiagram
autonumber
participant Agent as Consul Agent
participant HTTP as HTTP API
participant RPC as Catalog RPC
participant ACL as ACL Resolver
participant Leader as Raft Leader
participant FSM as FSM
participant State as State Store
Note over Agent,State: 服务注册流程
Agent->>HTTP: PUT /v1/catalog/register<br>{"Node": "node1", "Service": {...}}
HTTP->>RPC: Catalog.Register(request)
alt 非 Leader
RPC->>Leader: 转发到 Leader
end
RPC->>ACL: ResolveToken(token)
ACL-->>RPC: Authorizer
RPC->>RPC: 验证节点字段<br>验证服务字段<br>验证健康检查字段
RPC->>State: NodeServices(node)
State-->>RPC: 现有服务列表
RPC->>ACL: 检查 node:write 和 service:write 权限
ACL-->>RPC: Allow / Deny
alt 权限不足
RPC-->>Agent: 403 Permission Denied
end
RPC->>Leader: raftApply(RegisterRequestType, request)
Leader->>FSM: Apply Raft Log
FSM->>State: EnsureRegistration(node, service, checks)
State->>State: 更新 nodes 表<br>更新 services 表<br>更新 checks 表<br>更新索引
State-->>FSM: 成功
FSM-->>Leader: 成功
Leader-->>RPC: 成功
RPC-->>HTTP: 成功
HTTP-->>Agent: 200 OK
Note over State: 服务现在可被查询
4.2 ServiceNodes - 查询服务实例
4.2.1 基本信息
- 名称:
Catalog.ServiceNodes - 协议:RPC 方法
- HTTP 映射:
GET /v1/catalog/service/:service - 幂等性:是(只读操作)
4.2.2 请求结构体
type ServiceSpecificRequest struct {
Datacenter string // 目标数据中心
ServiceName string // 服务名称
ServiceTags []string // 标签过滤(废弃,使用 Filter)
ServiceTag string // 单个标签(废弃)
ServiceAddress string // 服务地址过滤
TagFilter bool // 是否启用标签过滤
Connect bool // 是否查询 Connect 代理
PeerName string // Peering 对等连接名称
QueryOptions // 包含 Filter、Consistency、MinQueryIndex 等
}
字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| ServiceName | string | 是 | 要查询的服务名称 |
| Connect | bool | 否 | true 时查询该服务的 Connect 代理 |
| Filter | string | 否 | bexpr 表达式过滤(如 “ServiceMeta.version == v1.0”) |
| MinQueryIndex | uint64 | 否 | Blocking Query,等待索引变化 |
4.2.3 响应结构体
type IndexedServiceNodes struct {
ServiceNodes ServiceNodes // 服务实例列表
QueryMeta // 包含 Index、LastContact 等
}
type ServiceNode struct {
ID string // 节点 ID
Node string // 节点名称
Address string // 节点地址
Datacenter string // 数据中心
TaggedAddresses map[string]string // LAN/WAN 地址
NodeMeta map[string]string // 节点元数据
ServiceID string // 服务实例 ID
ServiceName string // 服务名称
ServiceTags []string // 服务标签
ServiceAddress string // 服务地址
ServicePort int // 服务端口
ServiceMeta map[string]string // 服务元数据
ServiceWeights Weights // 权重
ServiceKind string // 服务类型
ServiceProxy ServiceProxy // 代理配置
Checks HealthChecks // 健康检查列表
CreateIndex uint64 // 创建索引
ModifyIndex uint64 // 修改索引
}
4.2.4 入口函数与核心代码
func (c *Catalog) ServiceNodes(
args *structs.ServiceSpecificRequest,
reply *structs.IndexedServiceNodes,
) error {
// 1. 转发到合适的 Server(可以是 Follower)
if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, reply); done {
return err
}
// 2. 确定查询函数
var f func(memdb.WatchSet, *state.Store) (uint64, structs.ServiceNodes, error)
if args.Connect {
// 查询 Connect 代理
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
return s.ConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
} else {
// 普通服务查询
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
if args.ServiceAddress != "" {
return s.ServiceAddressNodes(ws, args.ServiceAddress, &args.EnterpriseMeta, args.PeerName)
}
if args.TagFilter {
return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTags, &args.EnterpriseMeta, args.PeerName)
}
return s.ServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
}
// 3. ACL 权限校验
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
// 4. 准备 bexpr 过滤器
filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
if err != nil {
return err
}
// 5. Blocking Query
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
// 执行查询
index, serviceNodes, err := f(ws, state)
if err != nil {
return err
}
reply.Index = index
reply.ServiceNodes = serviceNodes
// 6. ACL 过滤
c.srv.filterACL(args.Token, reply)
// 7. bexpr 表达式过滤
if filter != nil {
raw, err := filter.Execute(reply.ServiceNodes)
if err != nil {
return err
}
reply.ServiceNodes = raw.(structs.ServiceNodes)
}
// 8. 按距离排序(如提供 Source)
if args.Source.Node != "" {
return c.srv.sortNodesByDistanceFrom(args.Source, reply.ServiceNodes)
}
return nil
},
)
}
核心逻辑:
- 步骤 1-2:确定查询类型(普通服务 vs Connect 代理 vs 标签过滤)
- 步骤 3-4:解析 ACL Token,准备表达式过滤器
- 步骤 5:进入 Blocking Query 循环,支持长轮询
- 步骤 6:执行 State Store 查询,获取服务实例列表
- 步骤 7:ACL 过滤(移除无权限的服务)
- 步骤 8:bexpr 表达式过滤(如
ServiceMeta.version == "v1.0") - 步骤 9:按网络距离排序(基于 Coordinate 模块)
4.2.5 时序图
sequenceDiagram
autonumber
participant Client as 客户端
participant HTTP as HTTP API
participant RPC as Catalog RPC
participant ACL as ACL Resolver
participant State as State Store
participant Coordinate as Coordinate 模块
Client->>HTTP: GET /v1/catalog/service/web?filter=...
HTTP->>RPC: Catalog.ServiceNodes(request)
RPC->>ACL: ResolveToken(token)
ACL-->>RPC: Authorizer
RPC->>RPC: 准备查询函数<br>准备 bexpr 过滤器
loop Blocking Query
RPC->>State: ServiceNodes(ws, "web", ...)
State->>State: 查询 services 表<br>关联 nodes 表<br>关联 checks 表
State-->>RPC: ServiceNodes[] + Index
RPC->>ACL: 过滤无权限服务
ACL-->>RPC: 过滤后列表
RPC->>RPC: 应用 bexpr 表达式过滤
alt 有 Source 参数
RPC->>Coordinate: sortNodesByDistanceFrom(source, nodes)
Coordinate-->>RPC: 按距离排序的列表
end
alt Index 变化或超时
break 退出循环
RPC-->>HTTP: ServiceNodes[] + Index
end
end
Note over RPC,State: 等待 Index 变化(最长 5 分钟)
end
HTTP-->>Client: 200 OK + JSON<br>[{"Node": "node1", "ServicePort": 8080, ...}]
4.3 ListServices - 列出所有服务
4.3.1 基本信息
- 名称:
Catalog.ListServices - 协议:RPC 方法
- HTTP 映射:
GET /v1/catalog/services - 幂等性:是
4.3.2 请求结构体
type DCSpecificRequest struct {
Datacenter string // 目标数据中心
PeerName string // Peering 对等连接名称
acl.EnterpriseMeta
QueryOptions // Filter, MinQueryIndex 等
}
4.3.3 响应结构体
type IndexedServices struct {
Services map[string][]string // 服务名 -> 标签列表
QueryMeta
}
示例响应:
{
"Services": {
"consul": [],
"web": ["v1", "production"],
"api": ["v2", "staging"],
"redis": ["cache"]
},
"Index": 12345
}
4.3.4 核心代码
func (c *Catalog) ListServices(
args *structs.DCSpecificRequest,
reply *structs.IndexedServices,
) error {
// 1. 转发(可以在 Follower 执行)
if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, reply); done {
return err
}
// 2. ACL 权限校验
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
// 3. 准备过滤器
filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
if err != nil {
return err
}
// 4. Blocking Query
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var (
index uint64
services structs.Services
err error
)
// 查询所有服务
index, services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return err
}
reply.Index, reply.Services = index, services
// ACL 过滤(移除无权限的服务)
c.srv.filterACL(args.Token, reply)
// bexpr 过滤
if filter != nil {
// ListServices 不支持复杂过滤,仅支持按服务名
}
return nil
},
)
}
4.4 Deregister - 注销节点/服务
4.4.1 基本信息
- 名称:
Catalog.Deregister - 协议:RPC 方法
- HTTP 映射:
PUT /v1/catalog/deregister - 幂等性:是(重复注销成功)
4.4.2 请求结构体
type DeregisterRequest struct {
Datacenter string // 目标数据中心
Node string // 节点名称
ServiceID string // 要注销的服务 ID(可选)
CheckID types.CheckID // 要注销的健康检查 ID(可选)
PeerName string // Peering 名称
acl.EnterpriseMeta
WriteRequest
}
字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| Node | string | 是 | 节点名称 |
| ServiceID | string | 否 | 省略时注销整个节点 |
| CheckID | types.CheckID | 否 | 省略时注销服务的所有检查 |
注销逻辑:
- 仅提供
Node:注销节点及其所有服务和检查 - 提供
Node + ServiceID:仅注销指定服务及其检查 - 提供
Node + CheckID:仅注销指定检查
4.4.3 核心代码
func (c *Catalog) Deregister(
args *structs.DeregisterRequest,
reply *struct{},
) error {
// 1. 转发到 Leader
if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, reply); done {
return err
}
// 2. ACL 权限校验
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
// 3. 验证权限
state := c.srv.fsm.State()
_, ns, err := state.NodeServices(nil, args.Node, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return err
}
// 注销节点需要 node:write
if authz.NodeWrite(args.Node, nil) != acl.Allow {
return acl.ErrPermissionDenied
}
// 注销服务需要 service:write
if args.ServiceID != "" && ns != nil {
if svc := ns.Services[args.ServiceID]; svc != nil {
if authz.ServiceWrite(svc.Service, nil) != acl.Allow {
return acl.ErrPermissionDenied
}
}
}
// 4. Raft Apply
_, err = c.srv.raftApply(structs.DeregisterRequestType, args)
return err
}
5. 关键功能实现分析
5.1 Anti-Entropy 状态同步
5.1.1 功能概述
Anti-Entropy 是 Agent 端的后台任务,定期将本地状态(服务、健康检查)与 Catalog 同步,修复不一致性。
触发场景:
- 网络分区恢复后
- Agent 重启后
- 服务注册/注销失败后
同步周期:默认 60 秒(sync_coordinate_interval_min)
5.1.2 核心逻辑
func (a *Agent) syncChanges() {
// 1. 读取 Catalog 中的节点服务列表
remote, err := a.delegate.GetNodeServices(a.config.NodeName)
if err != nil {
return
}
// 2. 对比本地服务
local := a.State.Services()
// 3. 计算差异
toRegister := []*structs.NodeService{}
toDeregister := []string{}
for id, localService := range local {
remoteService := remote[id]
if remoteService == nil {
// 本地存在,远程不存在 -> 注册
toRegister = append(toRegister, localService)
} else if !localService.IsSame(remoteService) {
// 本地与远程不一致 -> 重新注册
toRegister = append(toRegister, localService)
}
}
for id := range remote {
if local[id] == nil {
// 远程存在,本地不存在 -> 注销
toDeregister = append(toDeregister, id)
}
}
// 4. 批量注册
for _, svc := range toRegister {
req := &structs.RegisterRequest{
Node: a.config.NodeName,
Address: a.config.AdvertiseAddr,
Service: svc,
}
a.RPC("Catalog.Register", req, &struct{}{})
}
// 5. 批量注销
for _, id := range toDeregister {
req := &structs.DeregisterRequest{
Node: a.config.NodeName,
ServiceID: id,
}
a.RPC("Catalog.Deregister", req, &struct{}{})
}
}
6. 配置与最佳实践
6.1 关键配置项
| 配置项 | 默认值 | 说明 | 调优建议 |
|---|---|---|---|
sync_coordinate_interval_min |
15s | 最小同步间隔 | 默认合理 |
anti_entropy_interval |
60s | Anti-Entropy 周期 | 网络不稳定时可降低到 30s |
catalog_batch_read |
1024 | 批量查询大小 | 默认合理 |
6.2 最佳实践
1. 服务注册:
- 优先使用 Agent API(
/v1/agent/service/register)而非 Catalog API - ServiceID 建议包含节点信息(如
web-node1),避免冲突 - 为每个服务配置健康检查,确保故障快速检测
2. 服务查询:
- 使用 Blocking Query 减少轮询开销
- 使用 bexpr 过滤器按元数据过滤(如
ServiceMeta.version == "v2") - 使用 Connect 查询获取服务网格代理
3. 标签规范:
- 标签用于分类和路由(如
production,v1,canary) - 元数据用于扩展信息(如
version: 1.2.3,region: us-west)
6.3 性能指标
典型延迟:
| 操作 | P50 | P95 | P99 |
|---|---|---|---|
| 服务注册 | 20ms | 40ms | 80ms |
| 服务查询(本地缓存) | 2ms | 5ms | 10ms |
| 服务查询(Blocking Query) | 1-60s | 60s | 300s |
吞吐量:
- 注册 TPS:100-200(受 Raft 限制)
- 查询 QPS:10,000+(Follower 可分担)