LangGraph-07-checkpoint-sqlite模块综合文档

0. 模块概览

0.1 模块职责

checkpoint-sqlite 模块是 LangGraph checkpoint 基础接口的 SQLite 实现,提供了基于 SQLite 数据库的轻量级检查点存储能力。它继承并实现了 BaseCheckpointSaver 接口,为 LangGraph 应用提供了简单易用的状态持久化解决方案,特别适合开发、测试和小规模部署场景。

0.2 模块输入输出

输入

  • SQLite 数据库连接(sqlite3.Connection 或 aiosqlite.Connection)
  • Checkpoint 数据结构
  • RunnableConfig 配置对象
  • 序列化器(SerializerProtocol)

输出

  • 持久化的检查点数据
  • 检查点历史记录
  • CheckpointTuple 对象
  • 写入操作记录

0.3 上下游依赖

依赖关系

  • sqlite3: 标准库 SQLite 驱动(同步版本)
  • aiosqlite: 异步 SQLite 驱动(异步版本)
  • sqlite_vec: 向量搜索扩展(可选)
  • langgraph.checkpoint.base: 基础接口定义
  • langgraph.checkpoint.serde: 序列化协议

下游使用方

  • 开发和测试环境的 LangGraph 应用
  • 小规模生产环境的 Agent 系统
  • 本地文件系统的状态存储需求

0.4 生命周期

  1. 初始化:建立 SQLite 连接,设置序列化器
  2. 设置:创建必要的数据库表和索引
  3. 运行时:执行检查点的存储、查询和管理操作
  4. 清理:关闭数据库连接,释放文件锁

0.5 模块架构图

flowchart TD
    A[SqliteSaver] --> B[BaseCheckpointSaver]
    C[AsyncSqliteSaver] --> B
    
    A --> D[sqlite3.Connection]
    C --> E[aiosqlite.Connection]
    
    A --> F[Sync Operations]
    C --> G[Async Operations]
    
    F --> H[SQL Execution]
    G --> H
    
    H --> I[Database Files]
    I --> J[checkpoints.db]
    I --> K[WAL Files]
    
    L[SqliteCache] --> M[BaseCache]
    L --> N[Cache Table]
    
    O[SqliteStore] --> P[BaseStore]
    O --> Q[Vector Search]
    O --> R[TTL Management]
    
    S[Database Schema] --> T[checkpoints Table]
    S --> U[writes Table]
    S --> V[store Table]
    S --> W[store_vectors Table]

架构说明

  • SqliteSaver: 同步版本的 SQLite 检查点保存器
  • AsyncSqliteSaver: 异步版本的 SQLite 检查点保存器
  • SqliteCache: SQLite 基础缓存实现
  • SqliteStore: 键值存储和向量搜索功能
  • WAL Mode: 使用 Write-Ahead Logging 提高并发性能

1. 关键数据结构与UML

1.1 核心数据结构

class SqliteSaver(BaseCheckpointSaver[str]):
    """SQLite 检查点保存器"""
    
    conn: sqlite3.Connection       # SQLite 连接
    is_setup: bool                # 设置状态标志
    lock: threading.RLock         # 线程锁(同步版本)

class AsyncSqliteSaver(BaseCheckpointSaver[str]):
    """异步 SQLite 检查点保存器"""
    
    conn: aiosqlite.Connection    # 异步 SQLite 连接
    is_setup: bool               # 设置状态标志
    lock: asyncio.Lock          # 异步锁

class SqliteCache(BaseCache[ValueT]):
    """SQLite 缓存实现"""
    
    _conn: sqlite3.Connection     # SQLite 连接
    _lock: threading.RLock       # 线程锁
    serde: SerializerProtocol    # 序列化器

class SqliteStore(BaseStore):
    """SQLite 存储实现"""
    
    conn: sqlite3.Connection      # SQLite 连接
    lock: threading.RLock        # 线程锁
    index_config: IndexConfig    # 索引配置
    ttl_config: TTLConfig       # TTL 配置

1.2 数据库表结构

-- 检查点主表
CREATE TABLE IF NOT EXISTS checkpoints (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    parent_checkpoint_id TEXT,
    type TEXT,
    checkpoint BLOB,
    metadata BLOB,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

-- 写入记录表
CREATE TABLE IF NOT EXISTS writes (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    task_id TEXT NOT NULL,
    idx INTEGER NOT NULL,
    channel TEXT NOT NULL,
    type TEXT,
    value BLOB,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);

-- 缓存表
CREATE TABLE IF NOT EXISTS cache (
    ns TEXT,
    key TEXT,
    expiry REAL,
    encoding TEXT NOT NULL,
    val BLOB NOT NULL,
    PRIMARY KEY (ns, key)
);

-- 存储表
CREATE TABLE IF NOT EXISTS store (
    prefix TEXT NOT NULL,
    key TEXT NOT NULL,
    value TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP,
    ttl_minutes REAL,
    PRIMARY KEY (prefix, key)
);

-- 向量存储表
CREATE TABLE IF NOT EXISTS store_vectors (
    prefix TEXT NOT NULL,
    key TEXT NOT NULL,
    field_name TEXT NOT NULL,
    embedding BLOB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (prefix, key, field_name),
    FOREIGN KEY (prefix, key) REFERENCES store(prefix, key) ON DELETE CASCADE
);

1.3 类图关系

classDiagram
    class BaseCheckpointSaver {
        <<abstract>>
        +serde: SerializerProtocol
        +get(config) CheckpointTuple
        +put(config, checkpoint, metadata) RunnableConfig
        +list(config) Iterator[CheckpointTuple]
        +put_writes(config, writes) None
    }
    
    class SqliteSaver {
        +conn: sqlite3.Connection
        +is_setup: bool
        +lock: threading.RLock
        +from_conn_string(conn_string) SqliteSaver
        +setup() None
        +list(config, filter, before, limit) Iterator[CheckpointTuple]
        +get_tuple(config) CheckpointTuple
        +put(config, checkpoint, metadata, new_versions) RunnableConfig
        +put_writes(config, writes, task_id) None
    }
    
    class AsyncSqliteSaver {
        +conn: aiosqlite.Connection
        +is_setup: bool
        +lock: asyncio.Lock
        +from_conn_string(conn_string) AsyncSqliteSaver
        +asetup() None
        +alist(config, filter, before, limit) AsyncIterator[CheckpointTuple]
        +aget_tuple(config) CheckpointTuple
        +aput(config, checkpoint, metadata, new_versions) RunnableConfig
        +aput_writes(config, writes, task_id) None
    }
    
    class BaseCache {
        <<abstract>>
        +serde: SerializerProtocol
        +get(keys) dict[FullKey, ValueT]
        +put(keys_and_values) None
        +delete(keys) None
    }
    
    class SqliteCache {
        +_conn: sqlite3.Connection
        +_lock: threading.RLock
        +get(keys) dict[FullKey, ValueT]
        +put(keys_and_values) None
        +delete(keys) None
        +clear() None
    }
    
    class BaseStore {
        <<abstract>>
        +get(namespace, key) Item
        +put(namespace, key, value) None
        +delete(namespace, key) None
        +search(namespace, query) List[SearchItem]
    }
    
    class SqliteStore {
        +conn: sqlite3.Connection
        +lock: threading.RLock
        +index_config: IndexConfig
        +ttl_config: TTLConfig
        +setup() None
        +get(namespace, key) Item
        +put(namespace, key, value, index) None
        +delete(namespace, key) None
        +search(namespace, query) List[SearchItem]
    }
    
    BaseCheckpointSaver <|-- SqliteSaver
    BaseCheckpointSaver <|-- AsyncSqliteSaver
    BaseCache <|-- SqliteCache
    BaseStore <|-- SqliteStore
    
    SqliteSaver --> "1" sqlite3.Connection : uses
    AsyncSqliteSaver --> "1" aiosqlite.Connection : uses
    SqliteCache --> "1" sqlite3.Connection : uses
    SqliteStore --> "1" sqlite3.Connection : uses

2. 对外API列表与规格

2.1 核心检查点API

2.1.1 from_conn_string() 类方法

基本信息

  • 名称:from_conn_string
  • 协议:类方法调用 SqliteSaver.from_conn_string(conn_string)
  • 幂等性:否(每次创建新的连接和实例)

方法签名

@classmethod
@contextmanager
def from_conn_string(cls, conn_string: str) -> Iterator[SqliteSaver]:
    """从连接字符串创建 SqliteSaver 实例"""

参数说明

参数 类型 必填 默认值 说明
conn_string str - SQLite 连接字符串,如 “:memory:” 或文件路径

核心实现

@classmethod
@contextmanager
def from_conn_string(cls, conn_string: str) -> Iterator[SqliteSaver]:
    """从连接字符串创建实例"""
    
    # 1) 建立 SQLite 连接
    conn = sqlite3.connect(
        conn_string,
        check_same_thread=False,  # 允许多线程访问
        isolation_level=None      # 自动提交模式
    )
    
    try:
        # 2) 设置 WAL 模式提高并发性能
        conn.execute("PRAGMA journal_mode=WAL")
        
        # 3) 创建 SqliteSaver 实例
        saver = cls(conn)
        
        # 4) 自动设置数据库
        saver.setup()
        
        yield saver
        
    finally:
        # 5) 确保连接正确关闭
        conn.close()

2.1.2 setup()

基本信息

  • 名称:setup
  • 协议:方法调用 checkpointer.setup()
  • 幂等性:是(可重复执行)

核心实现

def setup(self) -> None:
    """设置数据库表结构"""
    
    if self.is_setup:
        return
    
    # 执行数据库初始化脚本
    self.conn.executescript("""
        PRAGMA journal_mode=WAL;
        
        CREATE TABLE IF NOT EXISTS checkpoints (
            thread_id TEXT NOT NULL,
            checkpoint_ns TEXT NOT NULL DEFAULT '',
            checkpoint_id TEXT NOT NULL,
            parent_checkpoint_id TEXT,
            type TEXT,
            checkpoint BLOB,
            metadata BLOB,
            PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
        );
        
        CREATE TABLE IF NOT EXISTS writes (
            thread_id TEXT NOT NULL,
            checkpoint_ns TEXT NOT NULL DEFAULT '',
            checkpoint_id TEXT NOT NULL,
            task_id TEXT NOT NULL,
            idx INTEGER NOT NULL,
            channel TEXT NOT NULL,
            type TEXT,
            value BLOB,
            PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
        );
    """)
    
    self.is_setup = True

2.1.3 put()

基本信息

  • 名称:put
  • 协议:方法调用 checkpointer.put(config, checkpoint, metadata, new_versions)
  • 幂等性:否(每次创建新记录)

核心实现

def put(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig:
    """存储检查点"""
    
    # 1) 提取配置信息
    configurable = config.get("configurable", {})
    thread_id = configurable["thread_id"]
    checkpoint_ns = configurable.get("checkpoint_ns", "")
    checkpoint_id = checkpoint["id"]
    
    # 2) 序列化数据
    serialized_checkpoint = self.serde.dumps_typed(checkpoint)
    serialized_metadata = self.serde.dumps_typed(metadata)
    
    # 3) 执行数据库插入
    with self.cursor() as cur:
        parent_checkpoint_id = configurable.get("checkpoint_id")
        
        cur.execute("""
            INSERT OR REPLACE INTO checkpoints 
            (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, 
             type, checkpoint, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            thread_id,
            checkpoint_ns,
            checkpoint_id,
            parent_checkpoint_id,
            serialized_checkpoint[0],  # type
            serialized_checkpoint[1],  # data
            serialized_metadata[1]     # metadata
        ))
    
    # 4) 返回更新的配置
    return {
        **config,
        "configurable": {
            **configurable,
            "checkpoint_id": checkpoint_id,
        }
    }

2.1.4 get_tuple()

基本信息

  • 名称:get_tuple
  • 协议:方法调用 checkpointer.get_tuple(config)
  • 幂等性:是(只读操作)

核心实现

def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
    """获取检查点元组"""
    
    # 1) 解析配置
    configurable = config.get("configurable", {})
    thread_id = configurable["thread_id"]
    checkpoint_ns = configurable.get("checkpoint_ns", "")
    checkpoint_id = configurable.get("checkpoint_id")
    
    # 2) 构建查询
    with self.cursor(transaction=False) as cur:
        if checkpoint_id:
            # 查询特定检查点
            cur.execute("""
                SELECT * FROM checkpoints 
                WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?
            """, (thread_id, checkpoint_ns, checkpoint_id))
        else:
            # 查询最新检查点
            cur.execute("""
                SELECT * FROM checkpoints 
                WHERE thread_id = ? AND checkpoint_ns = ?
                ORDER BY checkpoint_id DESC LIMIT 1
            """, (thread_id, checkpoint_ns))
        
        row = cur.fetchone()
        if not row:
            return None
        
        # 3) 查询相关写入记录
        cur.execute("""
            SELECT task_id, idx, channel, type, value FROM writes
            WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?
            ORDER BY task_id, idx
        """, (thread_id, checkpoint_ns, row[2]))  # row[2] is checkpoint_id
        
        writes = cur.fetchall()
    
    # 4) 反序列化数据
    checkpoint = self.serde.loads_typed((row[4], row[5]))  # type, checkpoint
    metadata = self.serde.loads_typed(("json", row[6]))    # metadata
    
    # 5) 处理写入记录
    pending_writes = []
    for write in writes:
        task_id, idx, channel, type_, value = write
        deserialized_value = self.serde.loads_typed((type_, value))
        pending_writes.append((task_id, channel, deserialized_value))
    
    # 6) 构建父配置
    parent_config = None
    if row[3]:  # parent_checkpoint_id
        parent_config = {
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": row[3]
            }
        }
    
    return CheckpointTuple(
        config={
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": row[2]
            }
        },
        checkpoint=checkpoint,
        metadata=metadata,
        parent_config=parent_config,
        pending_writes=pending_writes
    )

2.2 异步API

2.2.1 AsyncSqliteSaver 核心方法

异步版本的主要特点

  • 使用 aiosqlite 实现真正的异步数据库操作
  • 支持异步上下文管理器
  • 提供 aput, aget_tuple, alist 等异步方法
class AsyncSqliteSaver(BaseCheckpointSaver[str]):
    """异步 SQLite 检查点保存器"""
    
    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        """异步存储检查点"""
        
        # 1) 序列化数据
        serialized_checkpoint = self.serde.dumps_typed(checkpoint)
        serialized_metadata = self.serde.dumps_typed(metadata)
        
        # 2) 异步数据库操作
        async with self.lock:
            configurable = config.get("configurable", {})
            await self.conn.execute("""
                INSERT OR REPLACE INTO checkpoints 
                (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, 
                 type, checkpoint, metadata)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                configurable["thread_id"],
                configurable.get("checkpoint_ns", ""),
                checkpoint["id"],
                configurable.get("checkpoint_id"),
                serialized_checkpoint[0],
                serialized_checkpoint[1],
                serialized_metadata[1]
            ))
            
            await self.conn.commit()
        
        return {
            **config,
            "configurable": {
                **configurable,
                "checkpoint_id": checkpoint["id"],
            }
        }
    
    async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        """异步获取检查点元组"""
        
        configurable = config.get("configurable", {})
        thread_id = configurable["thread_id"]
        checkpoint_ns = configurable.get("checkpoint_ns", "")
        checkpoint_id = configurable.get("checkpoint_id")
        
        async with self.conn.execute(
            """SELECT * FROM checkpoints 
               WHERE thread_id = ? AND checkpoint_ns = ? 
               AND checkpoint_id = ?""" if checkpoint_id else
            """SELECT * FROM checkpoints 
               WHERE thread_id = ? AND checkpoint_ns = ?
               ORDER BY checkpoint_id DESC LIMIT 1""",
            (thread_id, checkpoint_ns, checkpoint_id) if checkpoint_id else
            (thread_id, checkpoint_ns)
        ) as cursor:
            row = await cursor.fetchone()
            if not row:
                return None
        
        # 获取写入记录并构建完整的检查点元组
        # (实现细节与同步版本类似)
        ...

3. 核心算法/流程剖析

3.1 WAL 模式优化算法

目的:利用 SQLite 的 Write-Ahead Logging 模式提高并发性能 输入:数据库连接 输出:配置了 WAL 模式的高性能连接 复杂度:O(1)

class WALModeManager:
    """WAL 模式管理器"""
    
    def __init__(self, conn: sqlite3.Connection):
        self.conn = conn
        self.wal_enabled = False
    
    def enable_wal_mode(self) -> bool:
        """启用 WAL 模式"""
        try:
            # 1) 设置 WAL 模式
            cursor = self.conn.cursor()
            cursor.execute("PRAGMA journal_mode=WAL")
            result = cursor.fetchone()
            
            if result and result[0].upper() == 'WAL':
                self.wal_enabled = True
                
                # 2) 优化 WAL 相关参数
                cursor.execute("PRAGMA synchronous=NORMAL")   # 平衡性能和安全性
                cursor.execute("PRAGMA cache_size=10000")     # 增加缓存大小
                cursor.execute("PRAGMA temp_store=memory")    # 临时表存储在内存
                cursor.execute("PRAGMA mmap_size=268435456")  # 启用内存映射
                
                return True
            else:
                return False
                
        except sqlite3.Error as e:
            logger.error(f"启用 WAL 模式失败: {e}")
            return False
    
    def get_wal_info(self) -> dict:
        """获取 WAL 模式信息"""
        if not self.wal_enabled:
            return {"enabled": False}
        
        try:
            cursor = self.conn.cursor()
            
            # 获取 WAL 文件信息
            cursor.execute("PRAGMA wal_checkpoint(PASSIVE)")
            checkpoint_result = cursor.fetchone()
            
            # 获取其他统计信息
            cursor.execute("PRAGMA wal_autocheckpoint")
            autocheckpoint = cursor.fetchone()
            
            return {
                "enabled": True,
                "checkpoint_result": checkpoint_result,
                "autocheckpoint_pages": autocheckpoint[0] if autocheckpoint else None
            }
            
        except sqlite3.Error as e:
            logger.error(f"获取 WAL 信息失败: {e}")
            return {"enabled": True, "error": str(e)}

3.2 事务管理算法

目的:提供灵活的事务控制,支持只读和读写操作 输入:是否需要事务标志 输出:事务上下文管理器 复杂度:O(1)

@contextmanager
def cursor(self, transaction: bool = True) -> Iterator[sqlite3.Cursor]:
    """事务管理的游标上下文管理器"""
    
    cursor = self.conn.cursor()
    try:
        if transaction:
            # 1) 开始显式事务
            cursor.execute("BEGIN IMMEDIATE")
            
        yield cursor
        
        if transaction:
            # 2) 提交事务
            cursor.execute("COMMIT")
            
    except Exception as e:
        if transaction:
            # 3) 发生异常时回滚
            try:
                cursor.execute("ROLLBACK")
            except sqlite3.Error:
                pass  # 忽略回滚失败
        
        # 重新抛出原始异常
        raise e
    finally:
        # 4) 确保游标关闭
        cursor.close()

3.3 批量写入优化算法

目的:优化多个写入操作的性能,减少事务开销 输入:写入记录列表 输出:批量执行结果 复杂度:O(n),其中 n 为写入记录数量

class BatchWriteOptimizer:
    """批量写入优化器"""
    
    def __init__(self, saver: SqliteSaver, batch_size: int = 100):
        self.saver = saver
        self.batch_size = batch_size
    
    def batch_put_writes(
        self, 
        write_operations: List[WriteOperation]
    ) -> List[bool]:
        """批量执行写入操作"""
        
        results = []
        
        # 1) 按批次大小分组
        for i in range(0, len(write_operations), self.batch_size):
            batch = write_operations[i:i + self.batch_size]
            batch_result = self._execute_write_batch(batch)
            results.extend(batch_result)
        
        return results
    
    def _execute_write_batch(self, batch: List[WriteOperation]) -> List[bool]:
        """执行单个批次的写入"""
        
        try:
            with self.saver.cursor(transaction=True) as cur:
                batch_data = []
                
                # 2) 准备批量数据
                for write_op in batch:
                    for idx, (channel, value) in enumerate(write_op.writes):
                        serialized_value = self.saver.serde.dumps_typed(value)
                        
                        batch_data.append((
                            write_op.thread_id,
                            write_op.checkpoint_ns,
                            write_op.checkpoint_id,
                            write_op.task_id,
                            idx,
                            channel,
                            serialized_value[0],  # type
                            serialized_value[1]   # value
                        ))
                
                # 3) 执行批量插入
                cur.executemany("""
                    INSERT OR REPLACE INTO writes 
                    (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """, batch_data)
                
                return [True] * len(batch)
                
        except sqlite3.Error as e:
            logger.error(f"批量写入失败: {e}")
            return [False] * len(batch)

4. 模块级架构图与时序图

4.1 SQLite 文件系统架构图

flowchart LR
    subgraph "Application Layer"
        A[LangGraph App] --> B[SqliteSaver]
        A --> C[AsyncSqliteSaver]
    end
    
    subgraph "SQLite Layer"
        B --> D[sqlite3.Connection]
        C --> E[aiosqlite.Connection]
        
        D --> F[Transaction Manager]
        E --> G[Async Transaction Manager]
        
        F --> H[SQL Executor]
        G --> H
    end
    
    subgraph "File System"
        H --> I[Database File]
        H --> J[WAL File]
        H --> K[SHM File]
        
        I --> L[checkpoints.db]
        J --> M[checkpoints.db-wal]
        K --> N[checkpoints.db-shm]
    end
    
    subgraph "Storage Components"
        O[SqliteCache] --> P[cache.db]
        Q[SqliteStore] --> R[store.db]
        Q --> S[Vector Extension]
    end

4.2 同步操作时序图

sequenceDiagram
    autonumber
    participant App as Application
    participant Saver as SqliteSaver
    participant Conn as sqlite3.Connection
    participant File as Database File
    participant WAL as WAL File
    
    Note over App,WAL: 初始化阶段
    App->>Saver: SqliteSaver.from_conn_string("checkpoints.db")
    Saver->>Conn: sqlite3.connect()
    Conn->>File: 打开/创建数据库文件
    File-->>Conn: 文件就绪
    Saver->>Conn: PRAGMA journal_mode=WAL
    Conn->>WAL: 创建 WAL 文件
    WAL-->>Conn: WAL 模式启用
    Saver->>Saver: setup() - 创建表结构
    
    Note over App,WAL: 写入操作
    App->>Saver: put(config, checkpoint, metadata, versions)
    Saver->>Saver: 序列化数据
    Saver->>Conn: BEGIN IMMEDIATE
    Conn->>WAL: 开始事务写入
    Saver->>Conn: INSERT INTO checkpoints
    Conn->>WAL: 写入 WAL 文件
    WAL-->>Conn: 写入完成
    Saver->>Conn: COMMIT
    Conn->>WAL: 事务提交
    Conn->>File: 定期 checkpoint
    Saver-->>App: 返回更新的配置
    
    Note over App,WAL: 读取操作
    App->>Saver: get_tuple(config)
    Saver->>Conn: SELECT FROM checkpoints
    Conn->>File: 读取主数据文件
    Conn->>WAL: 读取 WAL 中的最新数据
    WAL-->>Conn: 合并数据
    File-->>Conn: 返回完整数据
    Saver->>Saver: 反序列化数据
    Saver-->>App: 返回 CheckpointTuple

4.3 异步操作时序图

sequenceDiagram
    autonumber
    participant App as Async Application
    participant ASaver as AsyncSqliteSaver
    participant AConn as aiosqlite.Connection
    participant File as Database File
    
    App->>ASaver: AsyncSqliteSaver.from_conn_string("checkpoints.db")
    ASaver->>AConn: aiosqlite.connect()
    AConn->>File: 异步打开数据库文件
    File-->>AConn: 文件就绪
    ASaver->>AConn: await conn.execute("PRAGMA journal_mode=WAL")
    AConn-->>ASaver: WAL 模式启用
    
    App->>ASaver: await aput(config, checkpoint, metadata, versions)
    ASaver->>ASaver: 序列化数据
    ASaver->>AConn: async with lock:
    ASaver->>AConn: await conn.execute(INSERT...)
    AConn->>File: 异步写入
    File-->>AConn: 写入完成
    ASaver->>AConn: await conn.commit()
    AConn-->>ASaver: 事务提交
    ASaver-->>App: 返回结果
    
    App->>ASaver: await aget_tuple(config)
    ASaver->>AConn: async with conn.execute(SELECT...)
    AConn->>File: 异步查询
    File-->>AConn: 查询结果
    ASaver->>ASaver: 反序列化数据
    ASaver-->>App: 返回 CheckpointTuple

5. 异常处理与性能优化

5.1 数据库锁异常处理

class DatabaseLockManager:
    """数据库锁管理器"""
    
    def __init__(self, max_retries: int = 5, base_delay: float = 0.1):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    @contextmanager
    def with_retry(self, operation_name: str):
        """带重试的操作执行"""
        last_exception = None
        
        for attempt in range(self.max_retries):
            try:
                yield
                return
                
            except sqlite3.OperationalError as e:
                if "database is locked" in str(e).lower():
                    last_exception = e
                    if attempt < self.max_retries - 1:
                        # 指数退避
                        delay = self.base_delay * (2 ** attempt) + random.uniform(0, 0.1)
                        time.sleep(delay)
                        logger.warning(f"{operation_name} 数据库锁定,重试 {attempt + 1}/{self.max_retries}")
                    continue
                else:
                    # 非锁定错误直接抛出
                    raise
            except Exception as e:
                # 其他异常直接抛出
                raise
        
        # 所有重试都失败
        raise sqlite3.OperationalError(f"{operation_name} 失败,已重试 {self.max_retries} 次: {last_exception}")

# 使用示例
lock_manager = DatabaseLockManager()

def safe_put_checkpoint(self, config, checkpoint, metadata, new_versions):
    """安全的检查点存储"""
    with lock_manager.with_retry("put_checkpoint"):
        return self._unsafe_put_checkpoint(config, checkpoint, metadata, new_versions)

5.2 内存优化策略

class MemoryOptimizer:
    """内存使用优化器"""
    
    def __init__(self, saver: SqliteSaver):
        self.saver = saver
    
    def optimize_connection(self):
        """优化连接配置"""
        with self.saver.cursor(transaction=False) as cur:
            # 1) 设置合理的缓存大小(10MB)
            cur.execute("PRAGMA cache_size=-10000")
            
            # 2) 使用内存临时存储
            cur.execute("PRAGMA temp_store=memory")
            
            # 3) 启用内存映射(256MB)
            cur.execute("PRAGMA mmap_size=268435456")
            
            # 4) 优化页面大小
            cur.execute("PRAGMA page_size=4096")
    
    def streaming_list(
        self, 
        config: RunnableConfig,
        batch_size: int = 50
    ) -> Iterator[CheckpointTuple]:
        """流式列出检查点,避免内存占用过大"""
        
        configurable = config.get("configurable", {})
        thread_id = configurable["thread_id"]
        checkpoint_ns = configurable.get("checkpoint_ns", "")
        
        offset = 0
        while True:
            with self.saver.cursor(transaction=False) as cur:
                cur.execute("""
                    SELECT * FROM checkpoints 
                    WHERE thread_id = ? AND checkpoint_ns = ?
                    ORDER BY checkpoint_id DESC
                    LIMIT ? OFFSET ?
                """, (thread_id, checkpoint_ns, batch_size, offset))
                
                rows = cur.fetchall()
                if not rows:
                    break
                
                # 逐个处理并释放内存
                for row in rows:
                    checkpoint_tuple = self._build_checkpoint_tuple(row)
                    yield checkpoint_tuple
                    
                offset += len(rows)
                
                # 如果批次不满,说明已经到末尾
                if len(rows) < batch_size:
                    break
    
    def vacuum_database(self):
        """清理数据库,回收空间"""
        try:
            with self.saver.cursor(transaction=False) as cur:
                # 1) 分析数据库统计信息
                cur.execute("ANALYZE")
                
                # 2) 执行 VACUUM 回收空间
                cur.execute("VACUUM")
                
                # 3) 重建索引
                cur.execute("REINDEX")
                
            logger.info("数据库清理完成")
            
        except sqlite3.Error as e:
            logger.error(f"数据库清理失败: {e}")

5.3 性能监控

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self, saver: SqliteSaver):
        self.saver = saver
        self.metrics = defaultdict(list)
    
    def measure_operation(self, operation_name: str):
        """操作性能测量装饰器"""
        def decorator(func):
            def wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    success = True
                    return result
                except Exception as e:
                    success = False
                    raise
                finally:
                    end_time = time.time()
                    duration = end_time - start_time
                    
                    self.metrics[operation_name].append({
                        "duration": duration,
                        "success": success,
                        "timestamp": start_time
                    })
            return wrapper
        return decorator
    
    def get_performance_stats(self) -> dict:
        """获取性能统计"""
        stats = {}
        
        for operation, measurements in self.metrics.items():
            if not measurements:
                continue
                
            durations = [m["duration"] for m in measurements]
            success_count = sum(1 for m in measurements if m["success"])
            
            stats[operation] = {
                "count": len(measurements),
                "success_rate": success_count / len(measurements),
                "avg_duration": sum(durations) / len(durations),
                "min_duration": min(durations),
                "max_duration": max(durations),
                "p95_duration": sorted(durations)[int(len(durations) * 0.95)] if durations else 0
            }
        
        return stats
    
    def get_database_stats(self) -> dict:
        """获取数据库统计信息"""
        try:
            with self.saver.cursor(transaction=False) as cur:
                stats = {}
                
                # 数据库大小
                cur.execute("PRAGMA page_count")
                page_count = cur.fetchone()[0]
                cur.execute("PRAGMA page_size")
                page_size = cur.fetchone()[0]
                stats["database_size_bytes"] = page_count * page_size
                
                # 表统计
                cur.execute("SELECT COUNT(*) FROM checkpoints")
                stats["checkpoint_count"] = cur.fetchone()[0]
                
                cur.execute("SELECT COUNT(*) FROM writes")
                stats["writes_count"] = cur.fetchone()[0]
                
                # WAL 信息
                cur.execute("PRAGMA wal_checkpoint(PASSIVE)")
                wal_info = cur.fetchone()
                stats["wal_frames"] = wal_info[1] if wal_info else 0
                
                return stats
                
        except sqlite3.Error as e:
            logger.error(f"获取数据库统计失败: {e}")
            return {"error": str(e)}

6. 总结

checkpoint-sqlite 模块作为 LangGraph 的轻量级持久化解决方案,具有以下特点:

6.1 核心优势

  • 零配置部署:无需额外的数据库服务器
  • 文件便携性:数据库文件可以轻松备份和迁移
  • 开发友好:支持内存模式(:memory:)用于测试
  • 异步支持:完整的异步操作实现

6.2 架构特点

  • WAL 模式:提供更好的并发读写性能
  • 事务安全:完善的事务管理和异常处理
  • 内存优化:流式处理和批量操作支持
  • 向量搜索:集成 sqlite-vec 扩展支持语义搜索

6.3 适用场景

  • 开发和测试环境:快速原型开发和单元测试
  • 小规模应用:个人项目和小型团队应用
  • 边缘计算:嵌入式设备和离线应用
  • 数据分析:本地数据处理和分析工具

6.4 限制和注意事项

  • 并发限制:SQLite 的写入并发能力有限
  • 文件锁定:可能出现数据库锁定问题
  • 扩展性:不适合大规模高并发场景
  • 网络共享:不建议在网络文件系统上使用

该模块为需要简单、可靠状态存储的 LangGraph 应用提供了理想的解决方案,特别适合单机部署和开发测试场景。