Consul 源码剖析 - Session 会话管理模块

1. 模块概览

1.1 职责定义

Session 模块提供分布式会话管理功能,是实现分布式锁和 Leader 选举的基础。Session 通过 TTL 和健康检查机制保证会话有效性,会话失效时自动释放关联的 KV 锁。

核心职责

  • 会话创建与生命周期管理
  • TTL 超时检测(心跳续期)
  • 健康检查关联(基于 Check 的会话)
  • 会话失效处理(自动释放锁)
  • Lock Delay 机制(防止锁立即重新获取)

1.2 输入与输出

输入

  • Session 创建请求(TTL、健康检查列表、Behavior)
  • Session 续期请求(Renew)
  • Session 销毁请求(Destroy)

输出

  • Session ID(UUID)
  • Session 详细信息(TTL、Checks、CreateIndex)
  • Session 列表

1.3 上下游依赖

上游调用方

  • KV Store 模块(Lock/Unlock 操作)
  • HTTP API(/v1/session/*
  • SDK(分布式锁、Leader 选举)

下游依赖

  • State Store(Session 数据存储)
  • Health Check 模块(监听健康检查状态)
  • Raft(写操作通过共识)
  • Session Timers(TTL 超时管理)

2. 模块架构图

flowchart TB
    subgraph "API 入口"
        HTTP[HTTP API<br>/v1/session/*]
        SDK[SDK Lock]
    end
    
    subgraph "Session RPC 端点"
        Create[Create<br>创建会话]
        Renew[Renew<br>续期会话]
        Destroy[Destroy<br>销毁会话]
        Get[Get<br>查询会话]
        List[List<br>列出会话]
    end
    
    subgraph "会话管理"
        TTLManager[TTL Manager<br>定时器管理]
        CheckWatcher[Check Watcher<br>健康检查监听]
        Invalidator[Invalidator<br>失效处理]
    end
    
    subgraph "数据存储"
        StateStore[State Store<br>sessions 表]
        Raft[Raft 共识]
    end
    
    subgraph "关联模块"
        HealthCheck[Health Check 模块]
        KVStore[KV Store 模块]
    end
    
    HTTP --> Create
    SDK --> Create
    
    Create --> Raft
    Renew --> TTLManager
    Destroy --> Raft
    
    Raft --> StateStore
    
    TTLManager --> Invalidator
    CheckWatcher --> HealthCheck
    CheckWatcher --> Invalidator
    
    Invalidator --> Raft
    Invalidator --> KVStore
    
    StateStore --> Get
    StateStore --> List

2.1 架构说明

组件职责

  1. Create RPC

    • 创建 Session,分配 UUID
    • 设置 TTL 或关联健康检查
    • 通过 Raft 持久化
  2. TTL Manager

    • 管理 TTL Session 的定时器
    • 超时后触发 Invalidator
  3. Check Watcher

    • 监听健康检查状态变化
    • Critical 状态触发 Invalidator
  4. Invalidator

    • 处理 Session 失效
    • 根据 Behavior 删除或释放锁
    • 设置 Lock Delay

关键决策点

  • TTL vs 健康检查:TTL 需定期续期,健康检查自动监听
  • Behavior:release(释放锁,默认)vs delete(删除 KV)
  • Lock Delay:默认 15 秒,防止惊群

边界条件

  • TTL 最小值 10 秒
  • 单节点最大 Session 数量无限制(受内存限制)
  • Session 失效时自动释放所有关联的 KV 锁

3. 核心数据结构

3.1 Session 定义

type Session struct {
    ID        string           // Session UUID
    Name      string           // Session 名称(可选)
    Node      string           // 所属节点
    Checks    []types.CheckID  // 关联的健康检查列表
    LockDelay time.Duration    // Lock Delay 时间
    Behavior  string           // 失效行为:"release" 或 "delete"
    TTL       string           // TTL(如 "10s"),与 Checks 互斥
    CreateIndex uint64         // 创建时的 Raft Index
    ModifyIndex uint64         // 最后修改的 Raft Index
}

字段说明

字段 类型 必填 说明 约束
ID string Session UUID 自动生成
Name string 会话名称 用于标识
Node string 所属节点名称 必须存在
Checks []types.CheckID 条件 健康检查列表 与 TTL 互斥,空表示节点检查
LockDelay time.Duration Lock Delay 默认 15s
Behavior string 失效行为 “release” 或 “delete”,默认 “release”
TTL string 条件 TTL 时间 与 Checks 互斥,最小 10s

4. 核心 API 详细规格

4.1 Create - 创建 Session

4.1.1 基本信息

  • 名称Session.Create
  • HTTP 映射PUT /v1/session/create
  • 幂等性:否(每次创建新 Session)

4.1.2 请求结构体

{
  "Name": "my-session",
  "Node": "node1",
  "Checks": ["service:web", "serfHealth"],
  "LockDelay": "15s",
  "Behavior": "release",
  "TTL": ""
}

字段表

字段 类型 必填 默认 说明
Name string "" Session 名称
Node string 当前节点 所属节点
Checks []string [“serfHealth”] 健康检查 ID 列表
LockDelay string “15s” Lock Delay 时间
Behavior string “release” “release” 或 “delete”
TTL string 条件 "" TTL 时间(如 “30s”)

4.1.3 响应结构体

{
  "ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
}

4.1.4 核心代码

func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
    // 1. 转发到 Leader
    if done, err := s.srv.ForwardRPC("Session.Apply", args, reply); done {
        return err
    }
    
    // 2. 验证请求
    if args.Session.Node == "" {
        args.Session.Node = args.Session.Node
    }
    
    // 验证 TTL
    if args.Session.TTL != "" {
        ttl, err := time.ParseDuration(args.Session.TTL)
        if err != nil {
            return fmt.Errorf("Invalid TTL: %v", err)
        }
        if ttl < 10*time.Second {
            return fmt.Errorf("TTL must be >= 10s")
        }
    }
    
    // 验证 Behavior
    switch args.Session.Behavior {
    case "release", "delete", "":
        // 合法值
    default:
        return fmt.Errorf("Invalid Behavior: %s", args.Session.Behavior)
    }
    
    // 3. 生成 Session ID
    if args.Op == structs.SessionCreate {
        args.Session.ID, _ = uuid.GenerateUUID()
    }
    
    // 4. Raft Apply
    resp, err := s.srv.raftApply(structs.SessionRequestType, args)
    if err != nil {
        return err
    }
    
    if respErr, ok := resp.(error); ok {
        return respErr
    }
    
    // 5. 启动 TTL 定时器(如果是 TTL Session)
    if args.Session.TTL != "" {
        s.srv.sessionTimers.SetTimer(args.Session.ID, ttl)
    }
    
    *reply = args.Session.ID
    return nil
}

4.2 Renew - 续期 Session

4.2.1 基本信息

  • HTTP 映射PUT /v1/session/renew/:id
  • 幂等性:是

4.2.2 核心代码

func (s *Session) Renew(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error {
    // 1. 读取 Session
    state := s.srv.fsm.State()
    _, session, err := state.SessionGet(nil, args.SessionID, nil)
    if err != nil || session == nil {
        return fmt.Errorf("Session not found")
    }
    
    // 2. 验证 TTL
    if session.TTL == "" {
        return fmt.Errorf("Session does not have TTL")
    }
    
    // 3. 重置 TTL 定时器
    ttl, _ := time.ParseDuration(session.TTL)
    s.srv.sessionTimers.ResetTimer(args.SessionID, ttl)
    
    reply.Sessions = structs.Sessions{session}
    return nil
}

4.3 Destroy - 销毁 Session

4.3.1 基本信息

  • HTTP 映射PUT /v1/session/destroy/:id
  • 幂等性:是

4.3.2 核心代码

func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
    // Destroy 操作
    if args.Op == structs.SessionDestroy {
        // 1. Raft Apply
        resp, err := s.srv.raftApply(structs.SessionRequestType, args)
        if err != nil {
            return err
        }
        
        // 2. 停止 TTL 定时器
        s.srv.sessionTimers.StopTimer(args.Session.ID)
        
        *reply = "true"
        return nil
    }
    
    return fmt.Errorf("Invalid operation")
}

5. 关键功能实现分析

5.1 Session 失效处理

5.1.1 TTL 超时触发

type SessionTimers struct {
    timers map[string]*time.Timer
    mu     sync.Mutex
}

func (st *SessionTimers) SetTimer(sessionID string, ttl time.Duration) {
    st.mu.Lock()
    defer st.mu.Unlock()
    
    // 创建定时器
    timer := time.AfterFunc(ttl, func() {
        // TTL 超时,触发 Invalidator
        st.invalidateSession(sessionID)
    })
    
    st.timers[sessionID] = timer
}

func (st *SessionTimers) invalidateSession(sessionID string) {
    // 销毁 Session(通过 Raft)
    req := &structs.SessionRequest{
        Op: structs.SessionDestroy,
        Session: structs.Session{
            ID: sessionID,
        },
    }
    
    st.srv.raftApply(structs.SessionRequestType, req)
}

5.1.2 健康检查触发

// 健康检查状态变更时触发
func (s *Server) handleCheckUpdate(checkID types.CheckID, status string) {
    if status != api.HealthCritical {
        return
    }
    
    // 查询关联的 Sessions
    state := s.fsm.State()
    _, sessions, _ := state.SessionList(nil, nil)
    
    for _, session := range sessions {
        for _, check := range session.Checks {
            if check == checkID {
                // 健康检查失败,销毁 Session
                s.invalidateSession(session.ID)
            }
        }
    }
}

5.1.3 Session 失效后处理

// FSM Apply Session 销毁时
func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
    var req structs.SessionRequest
    decodeSessionRequest(buf, &req)
    
    if req.Op == structs.SessionDestroy {
        session, _ := c.state.SessionGet(nil, req.Session.ID, nil)
        if session == nil {
            return nil
        }
        
        // 1. 删除 Session
        c.state.SessionDestroy(index, req.Session.ID, nil)
        
        // 2. 根据 Behavior 处理关联的 KV
        if session.Behavior == "release" {
            // 释放锁(清除 Session 字段)
            c.releaseLocksForSession(index, req.Session.ID)
        } else if session.Behavior == "delete" {
            // 删除 KV
            c.deleteKVsForSession(index, req.Session.ID)
        }
        
        // 3. 设置 Lock Delay
        c.state.KVSSetLockDelay(req.Session.ID, session.LockDelay)
        
        return nil
    }
    
    return nil
}

func (c *FSM) releaseLocksForSession(index uint64, sessionID string) {
    // 查询所有被此 Session 锁定的 KV
    kvs, _ := c.state.KVSListKeys(nil, "", nil)
    
    for _, kv := range kvs {
        if kv.Session == sessionID {
            // 清除 Session 字段
            kv.Session = ""
            kv.ModifyIndex = index
            c.state.KVSSet(index, kv)
        }
    }
}

6. 使用示例

6.1 创建 TTL Session

# 创建 30 秒 TTL Session
curl -X PUT http://localhost:8500/v1/session/create \
  -d '{
    "Name": "my-lock",
    "TTL": "30s",
    "Behavior": "release"
  }'

# 响应:
# {"ID":"adf4238a-882b-9ddc-4a9d-5b6758e4159e"}

6.2 创建基于健康检查的 Session

# 创建 Session,关联服务健康检查
curl -X PUT http://localhost:8500/v1/session/create \
  -d '{
    "Name": "db-leader",
    "Checks": ["service:postgres", "serfHealth"],
    "Behavior": "release"
  }'

6.3 续期 Session

# 续期 Session(重置 TTL 计时器)
curl -X PUT http://localhost:8500/v1/session/renew/adf4238a-882b-9ddc-4a9d-5b6758e4159e

6.4 销毁 Session

# 销毁 Session(释放所有锁)
curl -X PUT http://localhost:8500/v1/session/destroy/adf4238a-882b-9ddc-4a9d-5b6758e4159e

7. 最佳实践

7.1 TTL Session 使用

优点

  • 无需额外健康检查
  • 适用于短期锁

缺点

  • 需要定期续期(应用负担)
  • 网络分区时可能误删

推荐场景

  • 临时任务锁
  • 短期 Leader 选举

7.2 健康检查 Session 使用

优点

  • 自动监听健康状态
  • 无需手动续期

缺点

  • 依赖健康检查准确性
  • 延迟可能较大

推荐场景

  • 长期 Leader 选举
  • 服务级分布式锁

7.3 Behavior 选择

  • release(默认):释放锁但保留 KV,适用于锁场景
  • delete:删除 KV,适用于临时数据

7.4 Lock Delay 设置

  • 默认 15 秒,防止锁释放后立即重新获取
  • 高频锁场景可降低到 5 秒
  • 低频锁场景可提高到 30 秒