Consul 源码剖析 - Session 会话管理模块
1. 模块概览
1.1 职责定义
Session 模块提供分布式会话管理功能,是实现分布式锁和 Leader 选举的基础。Session 通过 TTL 和健康检查机制保证会话有效性,会话失效时自动释放关联的 KV 锁。
核心职责:
- 会话创建与生命周期管理
- TTL 超时检测(心跳续期)
- 健康检查关联(基于 Check 的会话)
- 会话失效处理(自动释放锁)
- Lock Delay 机制(防止锁立即重新获取)
1.2 输入与输出
输入:
- Session 创建请求(TTL、健康检查列表、Behavior)
- Session 续期请求(Renew)
- Session 销毁请求(Destroy)
输出:
- Session ID(UUID)
- Session 详细信息(TTL、Checks、CreateIndex)
- Session 列表
1.3 上下游依赖
上游调用方:
- KV Store 模块(Lock/Unlock 操作)
- HTTP API(
/v1/session/*) - SDK(分布式锁、Leader 选举)
下游依赖:
- State Store(Session 数据存储)
- Health Check 模块(监听健康检查状态)
- Raft(写操作通过共识)
- Session Timers(TTL 超时管理)
2. 模块架构图
flowchart TB
subgraph "API 入口"
HTTP[HTTP API<br>/v1/session/*]
SDK[SDK Lock]
end
subgraph "Session RPC 端点"
Create[Create<br>创建会话]
Renew[Renew<br>续期会话]
Destroy[Destroy<br>销毁会话]
Get[Get<br>查询会话]
List[List<br>列出会话]
end
subgraph "会话管理"
TTLManager[TTL Manager<br>定时器管理]
CheckWatcher[Check Watcher<br>健康检查监听]
Invalidator[Invalidator<br>失效处理]
end
subgraph "数据存储"
StateStore[State Store<br>sessions 表]
Raft[Raft 共识]
end
subgraph "关联模块"
HealthCheck[Health Check 模块]
KVStore[KV Store 模块]
end
HTTP --> Create
SDK --> Create
Create --> Raft
Renew --> TTLManager
Destroy --> Raft
Raft --> StateStore
TTLManager --> Invalidator
CheckWatcher --> HealthCheck
CheckWatcher --> Invalidator
Invalidator --> Raft
Invalidator --> KVStore
StateStore --> Get
StateStore --> List
2.1 架构说明
组件职责:
-
Create RPC:
- 创建 Session,分配 UUID
- 设置 TTL 或关联健康检查
- 通过 Raft 持久化
-
TTL Manager:
- 管理 TTL Session 的定时器
- 超时后触发 Invalidator
-
Check Watcher:
- 监听健康检查状态变化
- Critical 状态触发 Invalidator
-
Invalidator:
- 处理 Session 失效
- 根据 Behavior 删除或释放锁
- 设置 Lock Delay
关键决策点:
- TTL vs 健康检查:TTL 需定期续期,健康检查自动监听
- Behavior:
release(释放锁,默认)vsdelete(删除 KV) - Lock Delay:默认 15 秒,防止惊群
边界条件:
- TTL 最小值 10 秒
- 单节点最大 Session 数量无限制(受内存限制)
- Session 失效时自动释放所有关联的 KV 锁
3. 核心数据结构
3.1 Session 定义
type Session struct {
ID string // Session UUID
Name string // Session 名称(可选)
Node string // 所属节点
Checks []types.CheckID // 关联的健康检查列表
LockDelay time.Duration // Lock Delay 时间
Behavior string // 失效行为:"release" 或 "delete"
TTL string // TTL(如 "10s"),与 Checks 互斥
CreateIndex uint64 // 创建时的 Raft Index
ModifyIndex uint64 // 最后修改的 Raft Index
}
字段说明:
| 字段 | 类型 | 必填 | 说明 | 约束 |
|---|---|---|---|---|
| ID | string | 是 | Session UUID | 自动生成 |
| Name | string | 否 | 会话名称 | 用于标识 |
| Node | string | 是 | 所属节点名称 | 必须存在 |
| Checks | []types.CheckID | 条件 | 健康检查列表 | 与 TTL 互斥,空表示节点检查 |
| LockDelay | time.Duration | 否 | Lock Delay | 默认 15s |
| Behavior | string | 否 | 失效行为 | “release” 或 “delete”,默认 “release” |
| TTL | string | 条件 | TTL 时间 | 与 Checks 互斥,最小 10s |
4. 核心 API 详细规格
4.1 Create - 创建 Session
4.1.1 基本信息
- 名称:
Session.Create - HTTP 映射:
PUT /v1/session/create - 幂等性:否(每次创建新 Session)
4.1.2 请求结构体
{
"Name": "my-session",
"Node": "node1",
"Checks": ["service:web", "serfHealth"],
"LockDelay": "15s",
"Behavior": "release",
"TTL": ""
}
字段表:
| 字段 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
| Name | string | 否 | "" | Session 名称 |
| Node | string | 否 | 当前节点 | 所属节点 |
| Checks | []string | 否 | [“serfHealth”] | 健康检查 ID 列表 |
| LockDelay | string | 否 | “15s” | Lock Delay 时间 |
| Behavior | string | 否 | “release” | “release” 或 “delete” |
| TTL | string | 条件 | "" | TTL 时间(如 “30s”) |
4.1.3 响应结构体
{
"ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
}
4.1.4 核心代码
func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
// 1. 转发到 Leader
if done, err := s.srv.ForwardRPC("Session.Apply", args, reply); done {
return err
}
// 2. 验证请求
if args.Session.Node == "" {
args.Session.Node = args.Session.Node
}
// 验证 TTL
if args.Session.TTL != "" {
ttl, err := time.ParseDuration(args.Session.TTL)
if err != nil {
return fmt.Errorf("Invalid TTL: %v", err)
}
if ttl < 10*time.Second {
return fmt.Errorf("TTL must be >= 10s")
}
}
// 验证 Behavior
switch args.Session.Behavior {
case "release", "delete", "":
// 合法值
default:
return fmt.Errorf("Invalid Behavior: %s", args.Session.Behavior)
}
// 3. 生成 Session ID
if args.Op == structs.SessionCreate {
args.Session.ID, _ = uuid.GenerateUUID()
}
// 4. Raft Apply
resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// 5. 启动 TTL 定时器(如果是 TTL Session)
if args.Session.TTL != "" {
s.srv.sessionTimers.SetTimer(args.Session.ID, ttl)
}
*reply = args.Session.ID
return nil
}
4.2 Renew - 续期 Session
4.2.1 基本信息
- HTTP 映射:
PUT /v1/session/renew/:id - 幂等性:是
4.2.2 核心代码
func (s *Session) Renew(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error {
// 1. 读取 Session
state := s.srv.fsm.State()
_, session, err := state.SessionGet(nil, args.SessionID, nil)
if err != nil || session == nil {
return fmt.Errorf("Session not found")
}
// 2. 验证 TTL
if session.TTL == "" {
return fmt.Errorf("Session does not have TTL")
}
// 3. 重置 TTL 定时器
ttl, _ := time.ParseDuration(session.TTL)
s.srv.sessionTimers.ResetTimer(args.SessionID, ttl)
reply.Sessions = structs.Sessions{session}
return nil
}
4.3 Destroy - 销毁 Session
4.3.1 基本信息
- HTTP 映射:
PUT /v1/session/destroy/:id - 幂等性:是
4.3.2 核心代码
func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
// Destroy 操作
if args.Op == structs.SessionDestroy {
// 1. Raft Apply
resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil {
return err
}
// 2. 停止 TTL 定时器
s.srv.sessionTimers.StopTimer(args.Session.ID)
*reply = "true"
return nil
}
return fmt.Errorf("Invalid operation")
}
5. 关键功能实现分析
5.1 Session 失效处理
5.1.1 TTL 超时触发
type SessionTimers struct {
timers map[string]*time.Timer
mu sync.Mutex
}
func (st *SessionTimers) SetTimer(sessionID string, ttl time.Duration) {
st.mu.Lock()
defer st.mu.Unlock()
// 创建定时器
timer := time.AfterFunc(ttl, func() {
// TTL 超时,触发 Invalidator
st.invalidateSession(sessionID)
})
st.timers[sessionID] = timer
}
func (st *SessionTimers) invalidateSession(sessionID string) {
// 销毁 Session(通过 Raft)
req := &structs.SessionRequest{
Op: structs.SessionDestroy,
Session: structs.Session{
ID: sessionID,
},
}
st.srv.raftApply(structs.SessionRequestType, req)
}
5.1.2 健康检查触发
// 健康检查状态变更时触发
func (s *Server) handleCheckUpdate(checkID types.CheckID, status string) {
if status != api.HealthCritical {
return
}
// 查询关联的 Sessions
state := s.fsm.State()
_, sessions, _ := state.SessionList(nil, nil)
for _, session := range sessions {
for _, check := range session.Checks {
if check == checkID {
// 健康检查失败,销毁 Session
s.invalidateSession(session.ID)
}
}
}
}
5.1.3 Session 失效后处理
// FSM Apply Session 销毁时
func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
decodeSessionRequest(buf, &req)
if req.Op == structs.SessionDestroy {
session, _ := c.state.SessionGet(nil, req.Session.ID, nil)
if session == nil {
return nil
}
// 1. 删除 Session
c.state.SessionDestroy(index, req.Session.ID, nil)
// 2. 根据 Behavior 处理关联的 KV
if session.Behavior == "release" {
// 释放锁(清除 Session 字段)
c.releaseLocksForSession(index, req.Session.ID)
} else if session.Behavior == "delete" {
// 删除 KV
c.deleteKVsForSession(index, req.Session.ID)
}
// 3. 设置 Lock Delay
c.state.KVSSetLockDelay(req.Session.ID, session.LockDelay)
return nil
}
return nil
}
func (c *FSM) releaseLocksForSession(index uint64, sessionID string) {
// 查询所有被此 Session 锁定的 KV
kvs, _ := c.state.KVSListKeys(nil, "", nil)
for _, kv := range kvs {
if kv.Session == sessionID {
// 清除 Session 字段
kv.Session = ""
kv.ModifyIndex = index
c.state.KVSSet(index, kv)
}
}
}
6. 使用示例
6.1 创建 TTL Session
# 创建 30 秒 TTL Session
curl -X PUT http://localhost:8500/v1/session/create \
-d '{
"Name": "my-lock",
"TTL": "30s",
"Behavior": "release"
}'
# 响应:
# {"ID":"adf4238a-882b-9ddc-4a9d-5b6758e4159e"}
6.2 创建基于健康检查的 Session
# 创建 Session,关联服务健康检查
curl -X PUT http://localhost:8500/v1/session/create \
-d '{
"Name": "db-leader",
"Checks": ["service:postgres", "serfHealth"],
"Behavior": "release"
}'
6.3 续期 Session
# 续期 Session(重置 TTL 计时器)
curl -X PUT http://localhost:8500/v1/session/renew/adf4238a-882b-9ddc-4a9d-5b6758e4159e
6.4 销毁 Session
# 销毁 Session(释放所有锁)
curl -X PUT http://localhost:8500/v1/session/destroy/adf4238a-882b-9ddc-4a9d-5b6758e4159e
7. 最佳实践
7.1 TTL Session 使用
优点:
- 无需额外健康检查
- 适用于短期锁
缺点:
- 需要定期续期(应用负担)
- 网络分区时可能误删
推荐场景:
- 临时任务锁
- 短期 Leader 选举
7.2 健康检查 Session 使用
优点:
- 自动监听健康状态
- 无需手动续期
缺点:
- 依赖健康检查准确性
- 延迟可能较大
推荐场景:
- 长期 Leader 选举
- 服务级分布式锁
7.3 Behavior 选择
- release(默认):释放锁但保留 KV,适用于锁场景
- delete:删除 KV,适用于临时数据
7.4 Lock Delay 设置
- 默认 15 秒,防止锁释放后立即重新获取
- 高频锁场景可降低到 5 秒
- 低频锁场景可提高到 30 秒