Consul 源码剖析 - 实战指南
1. 快速开始
1.1 本地开发环境搭建
1.1.1 单节点开发模式
启动命令:
# 开发模式启动(数据不持久化)
consul agent -dev
# 指定节点名称和数据中心
consul agent -dev \
-node=dev-node \
-datacenter=dc1 \
-ui \
-client=0.0.0.0
# 访问 UI
open http://localhost:8500/ui
开发模式特点:
- 单Server模式,无需集群
- 数据存储在内存,重启丢失
- 启用所有功能(KV、Catalog、UI等)
- 默认绑定 localhost,使用
-client参数开放外部访问
1.1.2 三节点集群搭建
Server 配置文件(server1.hcl):
# 数据中心名称
datacenter = "dc1"
# 数据目录
data_dir = "/opt/consul/data"
# 日志级别
log_level = "INFO"
# Server 模式
server = true
# 期望 Server 数量(用于自动 Bootstrap)
bootstrap_expect = 3
# 绑定地址(内网 IP)
bind_addr = "10.0.1.10"
# 客户端访问地址
client_addr = "0.0.0.0"
# 自动加入集群(通过 DNS/云元数据)
retry_join = ["10.0.1.10", "10.0.1.11", "10.0.1.12"]
# 启用 UI
ui_config {
enabled = true
}
# 性能调优
performance {
raft_multiplier = 1
}
# 监控指标
telemetry {
prometheus_retention_time = "24h"
disable_hostname = true
}
启动集群:
# Server 1
consul agent -config-file=/etc/consul.d/server1.hcl
# Server 2(修改 bind_addr 为 10.0.1.11)
consul agent -config-file=/etc/consul.d/server2.hcl
# Server 3(修改 bind_addr 为 10.0.1.12)
consul agent -config-file=/etc/consul.d/server3.hcl
验证集群状态:
# 查看集群成员
consul members
# 查看 Raft Leader
consul operator raft list-peers
# 验证集群健康
consul operator raft list-peers | grep leader
2. 服务注册与发现实战
2.1 场景 1:微服务注册(HTTP API)
2.1.1 注册 Web 服务
服务定义(web-service.json):
{
"ID": "web-1",
"Name": "web",
"Tags": ["v1", "production"],
"Address": "10.0.1.20",
"Port": 8080,
"Meta": {
"version": "1.2.3",
"git_commit": "abc123",
"region": "us-west"
},
"Check": {
"HTTP": "http://10.0.1.20:8080/health",
"Interval": "10s",
"Timeout": "5s",
"DeregisterCriticalServiceAfter": "90s"
}
}
注册命令:
# 通过 Agent API 注册
curl -X PUT \
-d @web-service.json \
http://localhost:8500/v1/agent/service/register
# 验证注册成功
curl http://localhost:8500/v1/catalog/service/web | jq
2.1.2 注册数据库服务(多健康检查)
{
"ID": "db-1",
"Name": "postgres",
"Tags": ["primary", "production"],
"Address": "10.0.1.30",
"Port": 5432,
"Checks": [
{
"Name": "TCP Check",
"TCP": "10.0.1.30:5432",
"Interval": "10s",
"Timeout": "3s"
},
{
"Name": "Script Check",
"Args": ["/usr/local/bin/check-db.sh"],
"Interval": "30s"
}
]
}
2.2 场景 2:服务发现(DNS)
2.2.1 DNS 查询服务
查询所有健康实例:
# 查询服务(默认返回健康实例)
dig @localhost -p 8600 web.service.consul
# 查询指定标签
dig @localhost -p 8600 v1.web.service.consul
# 查询 SRV 记录(包含 Port)
dig @localhost -p 8600 web.service.consul SRV
响应示例:
;; ANSWER SECTION:
web.service.consul. 0 IN A 10.0.1.20
web.service.consul. 0 IN A 10.0.1.21
2.2.2 应用集成 DNS 发现
Go 示例(使用 net.LookupHost):
package main
import (
"context"
"fmt"
"net"
"time"
)
func discoverService(serviceName string) ([]string, error) {
// 配置 DNS Resolver 使用 Consul
resolver := &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := net.Dialer{
Timeout: 5 * time.Second,
}
return d.DialContext(ctx, "udp", "localhost:8600")
},
}
// 查询服务
ctx := context.Background()
addrs, err := resolver.LookupHost(ctx, serviceName+".service.consul")
if err != nil {
return nil, err
}
return addrs, nil
}
func main() {
addrs, err := discoverService("web")
if err != nil {
panic(err)
}
fmt.Printf("Found %d instances: %v\n", len(addrs), addrs)
}
2.3 场景 3:服务发现(HTTP API + 负载均衡)
使用 Consul SDK:
package main
import (
"fmt"
"math/rand"
"github.com/hashicorp/consul/api"
)
type ServiceDiscovery struct {
client *api.Client
}
func NewServiceDiscovery(consulAddr string) (*ServiceDiscovery, error) {
config := api.DefaultConfig()
config.Address = consulAddr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceDiscovery{client: client}, nil
}
// 获取健康服务实例(带客户端负载均衡)
func (sd *ServiceDiscovery) GetHealthyInstance(serviceName string) (*api.ServiceEntry, error) {
// 查询健康服务
services, _, err := sd.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
if len(services) == 0 {
return nil, fmt.Errorf("no healthy instances found for service: %s", serviceName)
}
// 随机选择一个实例(简单负载均衡)
idx := rand.Intn(len(services))
return services[idx], nil
}
// 调用服务
func (sd *ServiceDiscovery) CallService(serviceName, path string) error {
instance, err := sd.GetHealthyInstance(serviceName)
if err != nil {
return err
}
url := fmt.Sprintf("http://%s:%d%s",
instance.Service.Address,
instance.Service.Port,
path,
)
fmt.Printf("Calling: %s\n", url)
// HTTP 调用逻辑...
return nil
}
func main() {
sd, _ := NewServiceDiscovery("localhost:8500")
sd.CallService("web", "/api/users")
}
3. 配置管理实战(KV Store)
3.1 场景 1:动态配置管理
3.1.1 存储应用配置
配置结构:
config/
├── app/
│ ├── database/host
│ ├── database/port
│ ├── database/username
│ ├── database/password (encrypted)
│ ├── cache/ttl
│ └── cache/max_size
└── feature_flags/
├── new_ui_enabled
└── experimental_api_enabled
写入配置:
# 存储数据库配置
consul kv put config/app/database/host "db.example.com"
consul kv put config/app/database/port "5432"
# 存储加密密码(应用层加密)
echo "encrypted_password" | consul kv put config/app/database/password -
# 存储 JSON 配置
consul kv put config/app/cache - <<EOF
{
"ttl": 3600,
"max_size": 1048576,
"eviction_policy": "lru"
}
EOF
3.1.2 应用读取配置(Watch 模式)
Go 示例(自动重载配置):
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
type AppConfig struct {
Database struct {
Host string
Port string
Username string
Password string
}
Cache struct {
TTL int `json:"ttl"`
MaxSize int `json:"max_size"`
EvictionPolicy string `json:"eviction_policy"`
}
}
var currentConfig AppConfig
// 初始化配置
func loadConfig(client *api.Client) error {
kv := client.KV()
// 读取数据库配置
host, _, _ := kv.Get("config/app/database/host", nil)
port, _, _ := kv.Get("config/app/database/port", nil)
currentConfig.Database.Host = string(host.Value)
currentConfig.Database.Port = string(port.Value)
// 读取缓存配置(JSON)
cacheKV, _, _ := kv.Get("config/app/cache", nil)
json.Unmarshal(cacheKV.Value, ¤tConfig.Cache)
log.Printf("Config loaded: %+v\n", currentConfig)
return nil
}
// Watch 配置变化
func watchConfig(consulAddr string) {
params := map[string]interface{}{
"type": "keyprefix",
"prefix": "config/app/",
}
plan, err := watch.Parse(params)
if err != nil {
log.Fatal(err)
}
plan.Handler = func(idx uint64, data interface{}) {
kvPairs, ok := data.(api.KVPairs)
if !ok {
return
}
log.Printf("Config changed! Index: %d, Keys: %d\n", idx, len(kvPairs))
// 重新加载配置
client, _ := api.NewClient(api.DefaultConfig())
loadConfig(client)
}
if err := plan.Run(consulAddr); err != nil {
log.Fatal(err)
}
}
func main() {
client, _ := api.NewClient(api.DefaultConfig())
// 初始加载
loadConfig(client)
// Watch 配置变化
go watchConfig("localhost:8500")
// 应用主逻辑
for {
fmt.Printf("Running with TTL: %d\n", currentConfig.Cache.TTL)
time.Sleep(10 * time.Second)
}
}
3.2 场景 2:分布式锁(防并发)
3.2.1 实现分布式锁
Go 实现:
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
type DistributedLock struct {
client *api.Client
session string
key string
lockChan <-chan struct{}
}
// 创建分布式锁
func NewDistributedLock(client *api.Client, key string) (*DistributedLock, error) {
// 创建 Session
session, _, err := client.Session().Create(&api.SessionEntry{
Name: "distributed-lock",
TTL: "10s",
Behavior: "delete", // Session 失效时自动删除锁
}, nil)
if err != nil {
return nil, err
}
return &DistributedLock{
client: client,
session: session,
key: key,
}, nil
}
// 获取锁(阻塞直到成功)
func (dl *DistributedLock) Lock() error {
kv := dl.client.KV()
for {
// 尝试获取锁
p := &api.KVPair{
Key: dl.key,
Value: []byte(fmt.Sprintf("locked by session: %s", dl.session)),
Session: dl.session,
}
acquired, _, err := kv.Acquire(p, nil)
if err != nil {
return err
}
if acquired {
log.Printf("Lock acquired on key: %s\n", dl.key)
// 启动 Session 续期
go dl.renewSession()
return nil
}
// 锁已被占用,等待后重试
log.Printf("Lock busy, retrying...\n")
time.Sleep(1 * time.Second)
}
}
// 释放锁
func (dl *DistributedLock) Unlock() error {
kv := dl.client.KV()
p := &api.KVPair{
Key: dl.key,
Session: dl.session,
}
released, _, err := kv.Release(p, nil)
if err != nil {
return err
}
if !released {
return fmt.Errorf("failed to release lock")
}
// 销毁 Session
_, err = dl.client.Session().Destroy(dl.session, nil)
log.Printf("Lock released on key: %s\n", dl.key)
return err
}
// 续期 Session(防止超时)
func (dl *DistributedLock) renewSession() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
_, _, err := dl.client.Session().Renew(dl.session, nil)
if err != nil {
log.Printf("Failed to renew session: %v\n", err)
return
}
}
}
// 使用示例
func main() {
client, _ := api.NewClient(api.DefaultConfig())
lock, err := NewDistributedLock(client, "locks/critical-section")
if err != nil {
log.Fatal(err)
}
// 获取锁
if err := lock.Lock(); err != nil {
log.Fatal(err)
}
defer lock.Unlock()
// 执行临界区代码
fmt.Println("Executing critical section...")
time.Sleep(5 * time.Second)
fmt.Println("Critical section completed.")
}
3.3 场景 3:Leader 选举
使用 Consul SDK 实现 Leader 选举:
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
func runLeaderElection(client *api.Client, serviceID string) {
lockKey := "service/leader-election/" + serviceID
// 创建 Lock(内部使用 Session)
lock, err := client.LockKey(lockKey)
if err != nil {
log.Fatal(err)
}
// 获取 Lock(阻塞)
leaderCh, err := lock.Lock(nil)
if err != nil {
log.Fatal(err)
}
if leaderCh == nil {
log.Fatal("Failed to acquire leadership")
}
log.Printf("I am the leader for service: %s\n", serviceID)
// Leader 逻辑
for {
select {
case <-leaderCh:
// Leader 失效(Session 超时或被抢占)
log.Println("Lost leadership, exiting...")
return
default:
// 执行 Leader 任务
fmt.Println("Executing leader tasks...")
time.Sleep(5 * time.Second)
}
}
}
func main() {
client, _ := api.NewClient(api.DefaultConfig())
// 多个实例运行此代码,只有一个成为 Leader
runLeaderElection(client, "my-service")
}
4. 服务网格实战(Connect)
4.1 场景 1:Sidecar 代理部署
4.1.1 注册服务与 Sidecar
服务定义(web-with-sidecar.json):
{
"name": "web",
"port": 8080,
"connect": {
"sidecar_service": {
"port": 20000,
"proxy": {
"upstreams": [
{
"destination_name": "api",
"local_bind_port": 9090
},
{
"destination_name": "postgres",
"local_bind_port": 5432
}
]
}
}
},
"checks": [
{
"http": "http://localhost:8080/health",
"interval": "10s"
}
]
}
注册并启动 Sidecar:
# 1. 注册服务
consul services register web-with-sidecar.json
# 2. 启动 Envoy Sidecar
consul connect envoy -sidecar-for web \
-admin-bind localhost:19000 \
-- -l debug
# 3. 验证 Sidecar 运行
curl http://localhost:19000/stats
4.1.2 应用调用上游服务
应用代码无需修改,通过本地端口调用:
package main
import (
"fmt"
"io/ioutil"
"net/http"
)
func main() {
// 调用 api 服务(通过 Sidecar 转发)
resp, err := http.Get("http://localhost:9090/users")
if err != nil {
panic(err)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(body))
}
流量路径:
Web App -> localhost:9090 -> Web Sidecar (Envoy) ->
mTLS 加密 -> API Sidecar (Envoy) -> API Service
4.2 场景 2:服务意图(Intention)控制
4.2.1 配置服务间访问策略
允许 web 调用 api:
# 创建 Intention(L7 授权)
consul intention create -allow web api
# 查看 Intention
consul intention list
# 详细规则
consul intention get web api
拒绝 web 调用 postgres(强制经过 api):
consul intention create -deny web postgres
4.2.2 L7 流量控制(基于 HTTP 路径)
Intention 配置(config-entry.hcl):
Kind = "service-intentions"
Name = "api"
Sources = [
{
Name = "web"
Action = "allow"
Permissions = [
{
Action = "allow"
HTTP {
PathPrefix = "/api/v1/"
}
},
{
Action = "deny"
HTTP {
PathPrefix = "/admin/"
}
}
]
}
]
应用配置:
consul config write intention-api.hcl
4.3 场景 3:流量路由与金丝雀发布
4.3.1 配置服务路由
Service Router 配置(service-router.hcl):
Kind = "service-router"
Name = "api"
Routes = [
{
Match {
HTTP {
Header = [
{
Name = "x-version"
Exact = "v2"
}
]
}
}
Destination {
Service = "api"
ServiceSubset = "v2"
}
},
{
Match {
HTTP {
PathPrefix = "/beta/"
}
}
Destination {
Service = "api"
ServiceSubset = "canary"
}
}
]
Service Splitter 配置(流量比例):
Kind = "service-splitter"
Name = "api"
Splits = [
{
Weight = 90
ServiceSubset = "v1"
},
{
Weight = 10
ServiceSubset = "v2"
}
]
Service Resolver 配置(定义子集):
Kind = "service-resolver"
Name = "api"
Subsets = {
"v1" = {
Filter = "Service.Meta.version == v1"
}
"v2" = {
Filter = "Service.Meta.version == v2"
}
"canary" = {
Filter = "Service.Tags contains canary"
}
}
应用配置:
consul config write service-resolver.hcl
consul config write service-splitter.hcl
consul config write service-router.hcl
5. ACL 权限管理最佳实践
5.1 场景 1:初始化 ACL 系统
5.1.1 Bootstrap ACL
# 1. 启用 ACL(修改配置)
# acl.hcl
acl {
enabled = true
default_policy = "deny"
enable_token_persistence = true
}
# 2. 重启 Server
systemctl restart consul
# 3. Bootstrap(仅执行一次)
consul acl bootstrap
# 输出:
# AccessorID: xxxx-xxxx-xxxx-xxxx
# SecretID: yyyy-yyyy-yyyy-yyyy # 保存此 Token
# Description: Bootstrap Token (Global Management)
# Local: false
# Create Time: 2024-10-26 10:00:00
# 4. 设置环境变量
export CONSUL_HTTP_TOKEN=yyyy-yyyy-yyyy-yyyy
5.2 场景 2:创建应用 Token
5.2.1 为 Web 服务创建 Token
创建 Policy(web-policy.hcl):
# Web 服务策略
service "web" {
policy = "write"
}
# Web Sidecar 策略
service "web-sidecar-proxy" {
policy = "write"
}
# 读取其他服务(用于服务发现)
service_prefix "" {
policy = "read"
}
# 读取节点信息
node_prefix "" {
policy = "read"
}
# 读取 KV 配置
key_prefix "config/web/" {
policy = "read"
}
# Intention 读取(Connect)
intentions = "read"
创建 Policy 和 Token:
# 1. 创建 Policy
consul acl policy create \
-name web-service-policy \
-rules @web-policy.hcl
# 2. 创建 Token
consul acl token create \
-description "Token for web service" \
-policy-name web-service-policy
# 输出:
# AccessorID: aaaa-aaaa-aaaa-aaaa
# SecretID: bbbb-bbbb-bbbb-bbbb # 配置到 Web 服务
5.2.2 为运维创建只读 Token
Policy(operator-read-policy.hcl):
# 只读所有服务
service_prefix "" {
policy = "read"
}
# 只读所有节点
node_prefix "" {
policy = "read"
}
# 只读 KV
key_prefix "" {
policy = "read"
}
# 只读 ACL
acl = "read"
# 只读 Operator 信息
operator = "read"
创建 Token:
consul acl policy create \
-name operator-readonly \
-rules @operator-read-policy.hcl
consul acl token create \
-description "Read-only token for operators" \
-policy-name operator-readonly
5.3 场景 3:Kubernetes 集成(Auth Method)
5.3.1 配置 Kubernetes Auth Method
创建 Auth Method:
# 获取 Kubernetes API Server 信息
K8S_HOST=$(kubectl config view --raw -o jsonpath='{.clusters[0].cluster.server}')
K8S_CA_CERT=$(kubectl config view --raw -o jsonpath='{.clusters[0].cluster.certificate-authority-data}' | base64 -d)
K8S_JWT=$(kubectl get secret consul-auth -o jsonpath='{.data.token}' | base64 -d)
# 创建 Auth Method
consul acl auth-method create \
-name k8s-auth \
-type kubernetes \
-description "Kubernetes Auth Method" \
-kubernetes-host "$K8S_HOST" \
-kubernetes-ca-cert "$K8S_CA_CERT" \
-kubernetes-service-account-jwt "$K8S_JWT"
创建 Binding Rule:
# 为 default namespace 的 Service Account 创建 Token
consul acl binding-rule create \
-method k8s-auth \
-bind-type service \
-bind-name '${serviceaccount.name}' \
-selector 'serviceaccount.namespace==default'
5.3.2 Pod 自动获取 Token
Kubernetes Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: web
spec:
template:
metadata:
annotations:
"consul.hashicorp.com/connect-inject": "true"
spec:
serviceAccountName: web
initContainers:
- name: consul-login
image: hashicorp/consul:latest
command:
- /bin/sh
- -c
- |
# 使用 K8S Service Account Token 登录 Consul
K8S_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
CONSUL_TOKEN=$(consul login \
-method k8s-auth \
-bearer-token "$K8S_TOKEN" \
-token-sink-file /consul/token)
echo "Consul Token acquired"
volumeMounts:
- name: consul-token
mountPath: /consul
containers:
- name: web
image: my-web-app:latest
env:
- name: CONSUL_HTTP_TOKEN
valueFrom:
secretKeyRef:
name: consul-token
key: token
volumes:
- name: consul-token
emptyDir: {}
6. 监控与故障排查
6.1 关键指标监控
6.1.1 Prometheus 指标采集
Consul 配置:
telemetry {
prometheus_retention_time = "24h"
disable_hostname = true
}
Prometheus 配置(prometheus.yml):
scrape_configs:
- job_name: 'consul'
static_configs:
- targets: ['localhost:8500']
metrics_path: '/v1/agent/metrics'
params:
format: ['prometheus']
关键指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
consul_raft_apply |
Raft Apply 延迟 | P99 > 100ms |
consul_raft_commitTime |
Raft Commit 延迟 | P99 > 50ms |
consul_raft_leader |
是否为 Leader | 0 表示非 Leader |
consul_serf_member_flap |
成员抖动次数 | > 5/小时 |
consul_catalog_service_query |
服务查询 QPS | - |
consul_kvs_apply |
KV 写入延迟 | P99 > 50ms |
consul_acl_resolveToken |
Token 解析延迟 | P99 > 10ms |
6.2 常见故障排查
6.2.1 Leader 频繁切换
症状:
- 日志频繁出现 “becoming leader” 和 “lost leadership”
- 写操作失败率上升
排查步骤:
# 1. 查看 Raft 状态
consul operator raft list-peers
# 2. 查看 Leader 切换日志
journalctl -u consul | grep "becoming leader"
# 3. 检查网络延迟
# 在每个 Server 间执行
ping -c 100 <other-server-ip> | tail -1
# 4. 检查 CPU/内存负载
top -p $(pidof consul)
解决方案:
- 增加
raft_multiplier(降低 Raft 超时敏感度) - 优化网络(降低延迟)
- 增加 Server 资源(CPU/内存)
6.2.2 服务发现返回空
症状:
- DNS 查询返回
NXDOMAIN - HTTP API 返回空列表
排查步骤:
# 1. 检查服务是否注册
consul catalog services
# 2. 检查健康检查状态
consul catalog service <service-name>
# 3. 检查 ACL 权限
curl -H "X-Consul-Token: $TOKEN" \
http://localhost:8500/v1/catalog/service/<service-name>
# 4. 查看 Agent 日志
tail -f /var/log/consul.log | grep "service.*register"
解决方案:
- 验证服务注册成功
- 修复健康检查(查看检查日志)
- 配置正确的 ACL Token
6.2.3 KV CAS 频繁失败
症状:
- 应用日志显示 CAS 操作返回 false
- 配置更新失败
排查步骤:
# 查看键的当前 ModifyIndex
curl http://localhost:8500/v1/kv/<key> | jq '.[].ModifyIndex'
# 启用详细日志
consul monitor -log-level=debug | grep "KV.Apply"
解决方案:
- 读取最新 ModifyIndex 后立即执行 CAS
- 增加 CAS 重试逻辑
- 降低并发写入频率
7. 生产环境部署清单
7.1 部署前检查清单
-
硬件资源
- Server:4核CPU,8GB内存,100GB SSD
- Client:2核CPU,2GB内存
- 网络延迟:Server 间 < 10ms
-
软件配置
- Server 节点数量:3 或 5
- 数据目录:独立磁盘分区
- 日志输出:持久化存储
- 防火墙:开放 8300-8302, 8500, 8600 端口
-
安全配置
- 启用 TLS 加密(Gossip、RPC、HTTP)
- 启用 ACL 并设置
default_policy = deny - 配置备份策略(快照定期备份)
- 限制 HTTP API 访问(使用防火墙或 ACL)
-
监控与告警
- 配置 Prometheus 指标采集
- 设置 Grafana 监控面板
- 配置告警规则(Leader 切换、延迟、错误率)
- 集成日志聚合(ELK、Loki)
7.2 运维最佳实践
定期备份:
# 每天凌晨 2 点执行
0 2 * * * /usr/local/bin/consul snapshot save /backup/consul-$(date +\%Y\%m\%d).snap
升级策略:
- 逐个滚动升级 Server(保持奇数节点)
- 升级顺序:Follower → Follower → Leader(最后)
- 每次升级后验证集群健康
容量规划:
- 单 DC:最多 5000 Client,50000 服务实例
- 跨 DC:每个 DC 独立管理,通过 WAN Gossip 联邦