LangGraph-07-checkpoint-sqlite模块综合文档
0. 模块概览
0.1 模块职责
checkpoint-sqlite 模块是 LangGraph checkpoint 基础接口的 SQLite 实现,提供了基于 SQLite 数据库的轻量级检查点存储能力。它继承并实现了 BaseCheckpointSaver 接口,为 LangGraph 应用提供了简单易用的状态持久化解决方案,特别适合开发、测试和小规模部署场景。
0.2 模块输入输出
输入:
- SQLite 数据库连接(sqlite3.Connection 或 aiosqlite.Connection)
- Checkpoint 数据结构
- RunnableConfig 配置对象
- 序列化器(SerializerProtocol)
输出:
- 持久化的检查点数据
- 检查点历史记录
- CheckpointTuple 对象
- 写入操作记录
0.3 上下游依赖
依赖关系:
- sqlite3: 标准库 SQLite 驱动(同步版本)
- aiosqlite: 异步 SQLite 驱动(异步版本)
- sqlite_vec: 向量搜索扩展(可选)
- langgraph.checkpoint.base: 基础接口定义
- langgraph.checkpoint.serde: 序列化协议
下游使用方:
- 开发和测试环境的 LangGraph 应用
- 小规模生产环境的 Agent 系统
- 本地文件系统的状态存储需求
0.4 生命周期
- 初始化:建立 SQLite 连接,设置序列化器
- 设置:创建必要的数据库表和索引
- 运行时:执行检查点的存储、查询和管理操作
- 清理:关闭数据库连接,释放文件锁
0.5 模块架构图
flowchart TD
A[SqliteSaver] --> B[BaseCheckpointSaver]
C[AsyncSqliteSaver] --> B
A --> D[sqlite3.Connection]
C --> E[aiosqlite.Connection]
A --> F[Sync Operations]
C --> G[Async Operations]
F --> H[SQL Execution]
G --> H
H --> I[Database Files]
I --> J[checkpoints.db]
I --> K[WAL Files]
L[SqliteCache] --> M[BaseCache]
L --> N[Cache Table]
O[SqliteStore] --> P[BaseStore]
O --> Q[Vector Search]
O --> R[TTL Management]
S[Database Schema] --> T[checkpoints Table]
S --> U[writes Table]
S --> V[store Table]
S --> W[store_vectors Table]
架构说明:
- SqliteSaver: 同步版本的 SQLite 检查点保存器
- AsyncSqliteSaver: 异步版本的 SQLite 检查点保存器
- SqliteCache: SQLite 基础缓存实现
- SqliteStore: 键值存储和向量搜索功能
- WAL Mode: 使用 Write-Ahead Logging 提高并发性能
1. 关键数据结构与UML
1.1 核心数据结构
class SqliteSaver(BaseCheckpointSaver[str]):
"""SQLite 检查点保存器"""
conn: sqlite3.Connection # SQLite 连接
is_setup: bool # 设置状态标志
lock: threading.RLock # 线程锁(同步版本)
class AsyncSqliteSaver(BaseCheckpointSaver[str]):
"""异步 SQLite 检查点保存器"""
conn: aiosqlite.Connection # 异步 SQLite 连接
is_setup: bool # 设置状态标志
lock: asyncio.Lock # 异步锁
class SqliteCache(BaseCache[ValueT]):
"""SQLite 缓存实现"""
_conn: sqlite3.Connection # SQLite 连接
_lock: threading.RLock # 线程锁
serde: SerializerProtocol # 序列化器
class SqliteStore(BaseStore):
"""SQLite 存储实现"""
conn: sqlite3.Connection # SQLite 连接
lock: threading.RLock # 线程锁
index_config: IndexConfig # 索引配置
ttl_config: TTLConfig # TTL 配置
1.2 数据库表结构
-- 检查点主表
CREATE TABLE IF NOT EXISTS checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint BLOB,
metadata BLOB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
-- 写入记录表
CREATE TABLE IF NOT EXISTS writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
value BLOB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
-- 缓存表
CREATE TABLE IF NOT EXISTS cache (
ns TEXT,
key TEXT,
expiry REAL,
encoding TEXT NOT NULL,
val BLOB NOT NULL,
PRIMARY KEY (ns, key)
);
-- 存储表
CREATE TABLE IF NOT EXISTS store (
prefix TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
ttl_minutes REAL,
PRIMARY KEY (prefix, key)
);
-- 向量存储表
CREATE TABLE IF NOT EXISTS store_vectors (
prefix TEXT NOT NULL,
key TEXT NOT NULL,
field_name TEXT NOT NULL,
embedding BLOB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (prefix, key, field_name),
FOREIGN KEY (prefix, key) REFERENCES store(prefix, key) ON DELETE CASCADE
);
1.3 类图关系
classDiagram
class BaseCheckpointSaver {
<<abstract>>
+serde: SerializerProtocol
+get(config) CheckpointTuple
+put(config, checkpoint, metadata) RunnableConfig
+list(config) Iterator[CheckpointTuple]
+put_writes(config, writes) None
}
class SqliteSaver {
+conn: sqlite3.Connection
+is_setup: bool
+lock: threading.RLock
+from_conn_string(conn_string) SqliteSaver
+setup() None
+list(config, filter, before, limit) Iterator[CheckpointTuple]
+get_tuple(config) CheckpointTuple
+put(config, checkpoint, metadata, new_versions) RunnableConfig
+put_writes(config, writes, task_id) None
}
class AsyncSqliteSaver {
+conn: aiosqlite.Connection
+is_setup: bool
+lock: asyncio.Lock
+from_conn_string(conn_string) AsyncSqliteSaver
+asetup() None
+alist(config, filter, before, limit) AsyncIterator[CheckpointTuple]
+aget_tuple(config) CheckpointTuple
+aput(config, checkpoint, metadata, new_versions) RunnableConfig
+aput_writes(config, writes, task_id) None
}
class BaseCache {
<<abstract>>
+serde: SerializerProtocol
+get(keys) dict[FullKey, ValueT]
+put(keys_and_values) None
+delete(keys) None
}
class SqliteCache {
+_conn: sqlite3.Connection
+_lock: threading.RLock
+get(keys) dict[FullKey, ValueT]
+put(keys_and_values) None
+delete(keys) None
+clear() None
}
class BaseStore {
<<abstract>>
+get(namespace, key) Item
+put(namespace, key, value) None
+delete(namespace, key) None
+search(namespace, query) List[SearchItem]
}
class SqliteStore {
+conn: sqlite3.Connection
+lock: threading.RLock
+index_config: IndexConfig
+ttl_config: TTLConfig
+setup() None
+get(namespace, key) Item
+put(namespace, key, value, index) None
+delete(namespace, key) None
+search(namespace, query) List[SearchItem]
}
BaseCheckpointSaver <|-- SqliteSaver
BaseCheckpointSaver <|-- AsyncSqliteSaver
BaseCache <|-- SqliteCache
BaseStore <|-- SqliteStore
SqliteSaver --> "1" sqlite3.Connection : uses
AsyncSqliteSaver --> "1" aiosqlite.Connection : uses
SqliteCache --> "1" sqlite3.Connection : uses
SqliteStore --> "1" sqlite3.Connection : uses
2. 对外API列表与规格
2.1 核心检查点API
2.1.1 from_conn_string() 类方法
基本信息:
- 名称:
from_conn_string - 协议:类方法调用
SqliteSaver.from_conn_string(conn_string) - 幂等性:否(每次创建新的连接和实例)
方法签名:
@classmethod
@contextmanager
def from_conn_string(cls, conn_string: str) -> Iterator[SqliteSaver]:
"""从连接字符串创建 SqliteSaver 实例"""
参数说明:
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| conn_string | str | 是 | - | SQLite 连接字符串,如 “:memory:” 或文件路径 |
核心实现:
@classmethod
@contextmanager
def from_conn_string(cls, conn_string: str) -> Iterator[SqliteSaver]:
"""从连接字符串创建实例"""
# 1) 建立 SQLite 连接
conn = sqlite3.connect(
conn_string,
check_same_thread=False, # 允许多线程访问
isolation_level=None # 自动提交模式
)
try:
# 2) 设置 WAL 模式提高并发性能
conn.execute("PRAGMA journal_mode=WAL")
# 3) 创建 SqliteSaver 实例
saver = cls(conn)
# 4) 自动设置数据库
saver.setup()
yield saver
finally:
# 5) 确保连接正确关闭
conn.close()
2.1.2 setup()
基本信息:
- 名称:
setup - 协议:方法调用
checkpointer.setup() - 幂等性:是(可重复执行)
核心实现:
def setup(self) -> None:
"""设置数据库表结构"""
if self.is_setup:
return
# 执行数据库初始化脚本
self.conn.executescript("""
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint BLOB,
metadata BLOB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
CREATE TABLE IF NOT EXISTS writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
value BLOB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
""")
self.is_setup = True
2.1.3 put()
基本信息:
- 名称:
put - 协议:方法调用
checkpointer.put(config, checkpoint, metadata, new_versions) - 幂等性:否(每次创建新记录)
核心实现:
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
"""存储检查点"""
# 1) 提取配置信息
configurable = config.get("configurable", {})
thread_id = configurable["thread_id"]
checkpoint_ns = configurable.get("checkpoint_ns", "")
checkpoint_id = checkpoint["id"]
# 2) 序列化数据
serialized_checkpoint = self.serde.dumps_typed(checkpoint)
serialized_metadata = self.serde.dumps_typed(metadata)
# 3) 执行数据库插入
with self.cursor() as cur:
parent_checkpoint_id = configurable.get("checkpoint_id")
cur.execute("""
INSERT OR REPLACE INTO checkpoints
(thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id,
type, checkpoint, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
thread_id,
checkpoint_ns,
checkpoint_id,
parent_checkpoint_id,
serialized_checkpoint[0], # type
serialized_checkpoint[1], # data
serialized_metadata[1] # metadata
))
# 4) 返回更新的配置
return {
**config,
"configurable": {
**configurable,
"checkpoint_id": checkpoint_id,
}
}
2.1.4 get_tuple()
基本信息:
- 名称:
get_tuple - 协议:方法调用
checkpointer.get_tuple(config) - 幂等性:是(只读操作)
核心实现:
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
"""获取检查点元组"""
# 1) 解析配置
configurable = config.get("configurable", {})
thread_id = configurable["thread_id"]
checkpoint_ns = configurable.get("checkpoint_ns", "")
checkpoint_id = configurable.get("checkpoint_id")
# 2) 构建查询
with self.cursor(transaction=False) as cur:
if checkpoint_id:
# 查询特定检查点
cur.execute("""
SELECT * FROM checkpoints
WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?
""", (thread_id, checkpoint_ns, checkpoint_id))
else:
# 查询最新检查点
cur.execute("""
SELECT * FROM checkpoints
WHERE thread_id = ? AND checkpoint_ns = ?
ORDER BY checkpoint_id DESC LIMIT 1
""", (thread_id, checkpoint_ns))
row = cur.fetchone()
if not row:
return None
# 3) 查询相关写入记录
cur.execute("""
SELECT task_id, idx, channel, type, value FROM writes
WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?
ORDER BY task_id, idx
""", (thread_id, checkpoint_ns, row[2])) # row[2] is checkpoint_id
writes = cur.fetchall()
# 4) 反序列化数据
checkpoint = self.serde.loads_typed((row[4], row[5])) # type, checkpoint
metadata = self.serde.loads_typed(("json", row[6])) # metadata
# 5) 处理写入记录
pending_writes = []
for write in writes:
task_id, idx, channel, type_, value = write
deserialized_value = self.serde.loads_typed((type_, value))
pending_writes.append((task_id, channel, deserialized_value))
# 6) 构建父配置
parent_config = None
if row[3]: # parent_checkpoint_id
parent_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": row[3]
}
}
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": row[2]
}
},
checkpoint=checkpoint,
metadata=metadata,
parent_config=parent_config,
pending_writes=pending_writes
)
2.2 异步API
2.2.1 AsyncSqliteSaver 核心方法
异步版本的主要特点:
- 使用
aiosqlite实现真正的异步数据库操作 - 支持异步上下文管理器
- 提供
aput,aget_tuple,alist等异步方法
class AsyncSqliteSaver(BaseCheckpointSaver[str]):
"""异步 SQLite 检查点保存器"""
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
"""异步存储检查点"""
# 1) 序列化数据
serialized_checkpoint = self.serde.dumps_typed(checkpoint)
serialized_metadata = self.serde.dumps_typed(metadata)
# 2) 异步数据库操作
async with self.lock:
configurable = config.get("configurable", {})
await self.conn.execute("""
INSERT OR REPLACE INTO checkpoints
(thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id,
type, checkpoint, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
configurable["thread_id"],
configurable.get("checkpoint_ns", ""),
checkpoint["id"],
configurable.get("checkpoint_id"),
serialized_checkpoint[0],
serialized_checkpoint[1],
serialized_metadata[1]
))
await self.conn.commit()
return {
**config,
"configurable": {
**configurable,
"checkpoint_id": checkpoint["id"],
}
}
async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
"""异步获取检查点元组"""
configurable = config.get("configurable", {})
thread_id = configurable["thread_id"]
checkpoint_ns = configurable.get("checkpoint_ns", "")
checkpoint_id = configurable.get("checkpoint_id")
async with self.conn.execute(
"""SELECT * FROM checkpoints
WHERE thread_id = ? AND checkpoint_ns = ?
AND checkpoint_id = ?""" if checkpoint_id else
"""SELECT * FROM checkpoints
WHERE thread_id = ? AND checkpoint_ns = ?
ORDER BY checkpoint_id DESC LIMIT 1""",
(thread_id, checkpoint_ns, checkpoint_id) if checkpoint_id else
(thread_id, checkpoint_ns)
) as cursor:
row = await cursor.fetchone()
if not row:
return None
# 获取写入记录并构建完整的检查点元组
# (实现细节与同步版本类似)
...
3. 核心算法/流程剖析
3.1 WAL 模式优化算法
目的:利用 SQLite 的 Write-Ahead Logging 模式提高并发性能 输入:数据库连接 输出:配置了 WAL 模式的高性能连接 复杂度:O(1)
class WALModeManager:
"""WAL 模式管理器"""
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
self.wal_enabled = False
def enable_wal_mode(self) -> bool:
"""启用 WAL 模式"""
try:
# 1) 设置 WAL 模式
cursor = self.conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
result = cursor.fetchone()
if result and result[0].upper() == 'WAL':
self.wal_enabled = True
# 2) 优化 WAL 相关参数
cursor.execute("PRAGMA synchronous=NORMAL") # 平衡性能和安全性
cursor.execute("PRAGMA cache_size=10000") # 增加缓存大小
cursor.execute("PRAGMA temp_store=memory") # 临时表存储在内存
cursor.execute("PRAGMA mmap_size=268435456") # 启用内存映射
return True
else:
return False
except sqlite3.Error as e:
logger.error(f"启用 WAL 模式失败: {e}")
return False
def get_wal_info(self) -> dict:
"""获取 WAL 模式信息"""
if not self.wal_enabled:
return {"enabled": False}
try:
cursor = self.conn.cursor()
# 获取 WAL 文件信息
cursor.execute("PRAGMA wal_checkpoint(PASSIVE)")
checkpoint_result = cursor.fetchone()
# 获取其他统计信息
cursor.execute("PRAGMA wal_autocheckpoint")
autocheckpoint = cursor.fetchone()
return {
"enabled": True,
"checkpoint_result": checkpoint_result,
"autocheckpoint_pages": autocheckpoint[0] if autocheckpoint else None
}
except sqlite3.Error as e:
logger.error(f"获取 WAL 信息失败: {e}")
return {"enabled": True, "error": str(e)}
3.2 事务管理算法
目的:提供灵活的事务控制,支持只读和读写操作 输入:是否需要事务标志 输出:事务上下文管理器 复杂度:O(1)
@contextmanager
def cursor(self, transaction: bool = True) -> Iterator[sqlite3.Cursor]:
"""事务管理的游标上下文管理器"""
cursor = self.conn.cursor()
try:
if transaction:
# 1) 开始显式事务
cursor.execute("BEGIN IMMEDIATE")
yield cursor
if transaction:
# 2) 提交事务
cursor.execute("COMMIT")
except Exception as e:
if transaction:
# 3) 发生异常时回滚
try:
cursor.execute("ROLLBACK")
except sqlite3.Error:
pass # 忽略回滚失败
# 重新抛出原始异常
raise e
finally:
# 4) 确保游标关闭
cursor.close()
3.3 批量写入优化算法
目的:优化多个写入操作的性能,减少事务开销 输入:写入记录列表 输出:批量执行结果 复杂度:O(n),其中 n 为写入记录数量
class BatchWriteOptimizer:
"""批量写入优化器"""
def __init__(self, saver: SqliteSaver, batch_size: int = 100):
self.saver = saver
self.batch_size = batch_size
def batch_put_writes(
self,
write_operations: List[WriteOperation]
) -> List[bool]:
"""批量执行写入操作"""
results = []
# 1) 按批次大小分组
for i in range(0, len(write_operations), self.batch_size):
batch = write_operations[i:i + self.batch_size]
batch_result = self._execute_write_batch(batch)
results.extend(batch_result)
return results
def _execute_write_batch(self, batch: List[WriteOperation]) -> List[bool]:
"""执行单个批次的写入"""
try:
with self.saver.cursor(transaction=True) as cur:
batch_data = []
# 2) 准备批量数据
for write_op in batch:
for idx, (channel, value) in enumerate(write_op.writes):
serialized_value = self.saver.serde.dumps_typed(value)
batch_data.append((
write_op.thread_id,
write_op.checkpoint_ns,
write_op.checkpoint_id,
write_op.task_id,
idx,
channel,
serialized_value[0], # type
serialized_value[1] # value
))
# 3) 执行批量插入
cur.executemany("""
INSERT OR REPLACE INTO writes
(thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", batch_data)
return [True] * len(batch)
except sqlite3.Error as e:
logger.error(f"批量写入失败: {e}")
return [False] * len(batch)
4. 模块级架构图与时序图
4.1 SQLite 文件系统架构图
flowchart LR
subgraph "Application Layer"
A[LangGraph App] --> B[SqliteSaver]
A --> C[AsyncSqliteSaver]
end
subgraph "SQLite Layer"
B --> D[sqlite3.Connection]
C --> E[aiosqlite.Connection]
D --> F[Transaction Manager]
E --> G[Async Transaction Manager]
F --> H[SQL Executor]
G --> H
end
subgraph "File System"
H --> I[Database File]
H --> J[WAL File]
H --> K[SHM File]
I --> L[checkpoints.db]
J --> M[checkpoints.db-wal]
K --> N[checkpoints.db-shm]
end
subgraph "Storage Components"
O[SqliteCache] --> P[cache.db]
Q[SqliteStore] --> R[store.db]
Q --> S[Vector Extension]
end
4.2 同步操作时序图
sequenceDiagram
autonumber
participant App as Application
participant Saver as SqliteSaver
participant Conn as sqlite3.Connection
participant File as Database File
participant WAL as WAL File
Note over App,WAL: 初始化阶段
App->>Saver: SqliteSaver.from_conn_string("checkpoints.db")
Saver->>Conn: sqlite3.connect()
Conn->>File: 打开/创建数据库文件
File-->>Conn: 文件就绪
Saver->>Conn: PRAGMA journal_mode=WAL
Conn->>WAL: 创建 WAL 文件
WAL-->>Conn: WAL 模式启用
Saver->>Saver: setup() - 创建表结构
Note over App,WAL: 写入操作
App->>Saver: put(config, checkpoint, metadata, versions)
Saver->>Saver: 序列化数据
Saver->>Conn: BEGIN IMMEDIATE
Conn->>WAL: 开始事务写入
Saver->>Conn: INSERT INTO checkpoints
Conn->>WAL: 写入 WAL 文件
WAL-->>Conn: 写入完成
Saver->>Conn: COMMIT
Conn->>WAL: 事务提交
Conn->>File: 定期 checkpoint
Saver-->>App: 返回更新的配置
Note over App,WAL: 读取操作
App->>Saver: get_tuple(config)
Saver->>Conn: SELECT FROM checkpoints
Conn->>File: 读取主数据文件
Conn->>WAL: 读取 WAL 中的最新数据
WAL-->>Conn: 合并数据
File-->>Conn: 返回完整数据
Saver->>Saver: 反序列化数据
Saver-->>App: 返回 CheckpointTuple
4.3 异步操作时序图
sequenceDiagram
autonumber
participant App as Async Application
participant ASaver as AsyncSqliteSaver
participant AConn as aiosqlite.Connection
participant File as Database File
App->>ASaver: AsyncSqliteSaver.from_conn_string("checkpoints.db")
ASaver->>AConn: aiosqlite.connect()
AConn->>File: 异步打开数据库文件
File-->>AConn: 文件就绪
ASaver->>AConn: await conn.execute("PRAGMA journal_mode=WAL")
AConn-->>ASaver: WAL 模式启用
App->>ASaver: await aput(config, checkpoint, metadata, versions)
ASaver->>ASaver: 序列化数据
ASaver->>AConn: async with lock:
ASaver->>AConn: await conn.execute(INSERT...)
AConn->>File: 异步写入
File-->>AConn: 写入完成
ASaver->>AConn: await conn.commit()
AConn-->>ASaver: 事务提交
ASaver-->>App: 返回结果
App->>ASaver: await aget_tuple(config)
ASaver->>AConn: async with conn.execute(SELECT...)
AConn->>File: 异步查询
File-->>AConn: 查询结果
ASaver->>ASaver: 反序列化数据
ASaver-->>App: 返回 CheckpointTuple
5. 异常处理与性能优化
5.1 数据库锁异常处理
class DatabaseLockManager:
"""数据库锁管理器"""
def __init__(self, max_retries: int = 5, base_delay: float = 0.1):
self.max_retries = max_retries
self.base_delay = base_delay
@contextmanager
def with_retry(self, operation_name: str):
"""带重试的操作执行"""
last_exception = None
for attempt in range(self.max_retries):
try:
yield
return
except sqlite3.OperationalError as e:
if "database is locked" in str(e).lower():
last_exception = e
if attempt < self.max_retries - 1:
# 指数退避
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 0.1)
time.sleep(delay)
logger.warning(f"{operation_name} 数据库锁定,重试 {attempt + 1}/{self.max_retries}")
continue
else:
# 非锁定错误直接抛出
raise
except Exception as e:
# 其他异常直接抛出
raise
# 所有重试都失败
raise sqlite3.OperationalError(f"{operation_name} 失败,已重试 {self.max_retries} 次: {last_exception}")
# 使用示例
lock_manager = DatabaseLockManager()
def safe_put_checkpoint(self, config, checkpoint, metadata, new_versions):
"""安全的检查点存储"""
with lock_manager.with_retry("put_checkpoint"):
return self._unsafe_put_checkpoint(config, checkpoint, metadata, new_versions)
5.2 内存优化策略
class MemoryOptimizer:
"""内存使用优化器"""
def __init__(self, saver: SqliteSaver):
self.saver = saver
def optimize_connection(self):
"""优化连接配置"""
with self.saver.cursor(transaction=False) as cur:
# 1) 设置合理的缓存大小(10MB)
cur.execute("PRAGMA cache_size=-10000")
# 2) 使用内存临时存储
cur.execute("PRAGMA temp_store=memory")
# 3) 启用内存映射(256MB)
cur.execute("PRAGMA mmap_size=268435456")
# 4) 优化页面大小
cur.execute("PRAGMA page_size=4096")
def streaming_list(
self,
config: RunnableConfig,
batch_size: int = 50
) -> Iterator[CheckpointTuple]:
"""流式列出检查点,避免内存占用过大"""
configurable = config.get("configurable", {})
thread_id = configurable["thread_id"]
checkpoint_ns = configurable.get("checkpoint_ns", "")
offset = 0
while True:
with self.saver.cursor(transaction=False) as cur:
cur.execute("""
SELECT * FROM checkpoints
WHERE thread_id = ? AND checkpoint_ns = ?
ORDER BY checkpoint_id DESC
LIMIT ? OFFSET ?
""", (thread_id, checkpoint_ns, batch_size, offset))
rows = cur.fetchall()
if not rows:
break
# 逐个处理并释放内存
for row in rows:
checkpoint_tuple = self._build_checkpoint_tuple(row)
yield checkpoint_tuple
offset += len(rows)
# 如果批次不满,说明已经到末尾
if len(rows) < batch_size:
break
def vacuum_database(self):
"""清理数据库,回收空间"""
try:
with self.saver.cursor(transaction=False) as cur:
# 1) 分析数据库统计信息
cur.execute("ANALYZE")
# 2) 执行 VACUUM 回收空间
cur.execute("VACUUM")
# 3) 重建索引
cur.execute("REINDEX")
logger.info("数据库清理完成")
except sqlite3.Error as e:
logger.error(f"数据库清理失败: {e}")
5.3 性能监控
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, saver: SqliteSaver):
self.saver = saver
self.metrics = defaultdict(list)
def measure_operation(self, operation_name: str):
"""操作性能测量装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
raise
finally:
end_time = time.time()
duration = end_time - start_time
self.metrics[operation_name].append({
"duration": duration,
"success": success,
"timestamp": start_time
})
return wrapper
return decorator
def get_performance_stats(self) -> dict:
"""获取性能统计"""
stats = {}
for operation, measurements in self.metrics.items():
if not measurements:
continue
durations = [m["duration"] for m in measurements]
success_count = sum(1 for m in measurements if m["success"])
stats[operation] = {
"count": len(measurements),
"success_rate": success_count / len(measurements),
"avg_duration": sum(durations) / len(durations),
"min_duration": min(durations),
"max_duration": max(durations),
"p95_duration": sorted(durations)[int(len(durations) * 0.95)] if durations else 0
}
return stats
def get_database_stats(self) -> dict:
"""获取数据库统计信息"""
try:
with self.saver.cursor(transaction=False) as cur:
stats = {}
# 数据库大小
cur.execute("PRAGMA page_count")
page_count = cur.fetchone()[0]
cur.execute("PRAGMA page_size")
page_size = cur.fetchone()[0]
stats["database_size_bytes"] = page_count * page_size
# 表统计
cur.execute("SELECT COUNT(*) FROM checkpoints")
stats["checkpoint_count"] = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM writes")
stats["writes_count"] = cur.fetchone()[0]
# WAL 信息
cur.execute("PRAGMA wal_checkpoint(PASSIVE)")
wal_info = cur.fetchone()
stats["wal_frames"] = wal_info[1] if wal_info else 0
return stats
except sqlite3.Error as e:
logger.error(f"获取数据库统计失败: {e}")
return {"error": str(e)}
6. 总结
checkpoint-sqlite 模块作为 LangGraph 的轻量级持久化解决方案,具有以下特点:
6.1 核心优势
- 零配置部署:无需额外的数据库服务器
- 文件便携性:数据库文件可以轻松备份和迁移
- 开发友好:支持内存模式(:memory:)用于测试
- 异步支持:完整的异步操作实现
6.2 架构特点
- WAL 模式:提供更好的并发读写性能
- 事务安全:完善的事务管理和异常处理
- 内存优化:流式处理和批量操作支持
- 向量搜索:集成 sqlite-vec 扩展支持语义搜索
6.3 适用场景
- 开发和测试环境:快速原型开发和单元测试
- 小规模应用:个人项目和小型团队应用
- 边缘计算:嵌入式设备和离线应用
- 数据分析:本地数据处理和分析工具
6.4 限制和注意事项
- 并发限制:SQLite 的写入并发能力有限
- 文件锁定:可能出现数据库锁定问题
- 扩展性:不适合大规模高并发场景
- 网络共享:不建议在网络文件系统上使用
该模块为需要简单、可靠状态存储的 LangGraph 应用提供了理想的解决方案,特别适合单机部署和开发测试场景。