LangGraph-08-SDK-Python模块综合文档
0. 模块概览
0.1 模块职责
LangGraph SDK Python 模块是 LangGraph Platform REST API 的官方 Python 客户端库,提供了与 LangGraph 服务端进行交互的完整接口。它抽象了底层的 HTTP 通信细节,为开发者提供了简洁易用的 Python API,支持异步和同步两种调用模式。
0.2 模块输入输出
输入:
- LangGraph API 服务器地址和认证信息
- 客户端请求参数(Assistants、Threads、Runs 等)
- 流式处理配置和回调函数
输出:
- 结构化的 API 响应对象
- 流式数据迭代器
- 错误和异常信息
0.3 上下游依赖
依赖关系:
- httpx: 现代异步 HTTP 客户端库
- orjson: 高性能 JSON 序列化库
- LangGraph API Server: 上游服务依赖
下游使用方:
- Python 应用开发者
- LangGraph 客户端应用
- 第三方集成系统
- 自动化脚本和工具
0.4 生命周期
- 初始化:创建客户端实例,配置服务器地址和认证
- 连接建立:建立与 LangGraph API 的 HTTP 连接
- API 调用:执行各种资源操作(CRUD、流式处理)
- 资源清理:关闭连接,释放资源
0.5 模块架构图
flowchart TD
A[Client Application] --> B[SDK Entry Points]
B --> C[get_client]
B --> D[get_sync_client]
C --> E[LangGraphClient]
D --> F[SyncLangGraphClient]
E --> G[HTTP Client - Async]
F --> H[HTTP Client - Sync]
G --> I[Resource Managers]
H --> I
I --> J[Assistants]
I --> K[Threads]
I --> L[Runs]
I --> M[Crons]
I --> N[Store]
O[Authentication] --> P[Auth Module]
P --> Q[Custom Handlers]
P --> R[Token Validation]
S[Schema Definitions] --> T[Request Types]
S --> U[Response Types]
S --> V[Stream Types]
W[Error Handling] --> X[HTTP Exceptions]
W --> Y[API Errors]
Z[LangGraph API Server] <--> G
Z <--> H
架构说明:
- Client Factories:
get_client()和get_sync_client()工厂函数 - Core Clients: 异步和同步版本的主客户端类
- Resource Managers: 各种资源的专用管理器
- Auth Module: 灵活的认证和授权系统
- Schema Layer: 类型安全的请求/响应定义
- Error Handling: 统一的错误处理和异常机制
1. 关键数据结构与UML
1.1 核心客户端类
class LangGraphClient:
"""异步 LangGraph 客户端"""
# 资源管理器
assistants: AssistantsClient
threads: ThreadsClient
runs: RunsClient
crons: CronsClient
store: StoreClient
# HTTP 客户端
http: httpx.AsyncClient
# 配置
api_url: str
timeout: httpx.Timeout
headers: dict[str, str]
class SyncLangGraphClient:
"""同步 LangGraph 客户端"""
# 资源管理器
assistants: SyncAssistantsClient
threads: SyncThreadsClient
runs: SyncRunsClient
crons: SyncCronsClient
store: SyncStoreClient
# HTTP 客户端
http: httpx.Client
# 配置
api_url: str
timeout: httpx.Timeout
headers: dict[str, str]
1.2 认证系统数据结构
class Auth:
"""认证和授权管理系统"""
on: _On # 授权处理器入口
_handlers: dict[tuple[str, str], list[Handler]] # 处理器映射
_authenticate_handler: Optional[Authenticator] # 认证处理器
types: types # 类型定义模块
exceptions: exceptions # 异常定义模块
class AuthContext:
"""认证上下文"""
user: User # 用户信息
path: str # 请求路径
method: str # HTTP 方法
headers: dict[str, str] # 请求头
params: dict[str, Any] # 请求参数
class User:
"""用户信息"""
identity: str # 用户标识
permissions: list[str] # 用户权限
metadata: dict[str, Any] # 额外元数据
1.3 资源对象定义
class Assistant:
"""助手资源"""
assistant_id: str # 助手ID
graph_id: str # 图ID
config: dict[str, Any] # 配置信息
created_at: str # 创建时间
updated_at: str # 更新时间
metadata: dict[str, Any] # 元数据
class Thread:
"""线程资源"""
thread_id: str # 线程ID
created_at: str # 创建时间
updated_at: str # 更新时间
metadata: dict[str, Any] # 元数据
class Run:
"""运行资源"""
run_id: str # 运行ID
thread_id: str # 线程ID
assistant_id: str # 助手ID
status: RunStatus # 运行状态
created_at: str # 创建时间
updated_at: str # 更新时间
metadata: dict[str, Any] # 元数据
1.4 类图关系
classDiagram
class LangGraphClient {
+assistants: AssistantsClient
+threads: ThreadsClient
+runs: RunsClient
+crons: CronsClient
+store: StoreClient
+http: httpx.AsyncClient
+get(path: str) Any
+post(path: str, json: Any) Any
+stream(path: str, json: Any) AsyncIterator
}
class SyncLangGraphClient {
+assistants: SyncAssistantsClient
+threads: SyncThreadsClient
+runs: SyncRunsClient
+crons: SyncCronsClient
+store: SyncStoreClient
+http: httpx.Client
+get(path: str) Any
+post(path: str, json: Any) Any
+stream(path: str, json: Any) Iterator
}
class AssistantsClient {
+create(graph_id: str, config: dict) Assistant
+get(assistant_id: str) Assistant
+update(assistant_id: str, config: dict) Assistant
+delete(assistant_id: str) None
+search(metadata: dict) List[Assistant]
}
class ThreadsClient {
+create(metadata: dict) Thread
+get(thread_id: str) Thread
+update(thread_id: str, metadata: dict) Thread
+delete(thread_id: str) None
+get_state(thread_id: str) State
+update_state(thread_id: str, values: dict) dict
}
class RunsClient {
+create(thread_id: str, assistant_id: str, input: dict) Run
+get(thread_id: str, run_id: str) Run
+cancel(thread_id: str, run_id: str) None
+stream(thread_id: str, assistant_id: str, input: dict) AsyncIterator
+wait(thread_id: str, run_id: str) Run
}
class Auth {
+on: _On
+authenticate(handler: Authenticator) Authenticator
+_handlers: dict[tuple[str, str], list[Handler]]
+_authenticate_handler: Authenticator
}
class AuthContext {
+user: User
+path: str
+method: str
+headers: dict[str, str]
+params: dict[str, Any]
}
LangGraphClient --> AssistantsClient
LangGraphClient --> ThreadsClient
LangGraphClient --> RunsClient
SyncLangGraphClient --> AssistantsClient
SyncLangGraphClient --> ThreadsClient
SyncLangGraphClient --> RunsClient
Auth --> AuthContext
AuthContext --> User
AssistantsClient --> Assistant
ThreadsClient --> Thread
RunsClient --> Run
2. 对外API列表与规格
2.1 客户端工厂函数
2.1.1 get_client()
基本信息:
- 名称:
get_client - 协议:函数调用
get_client(url="http://localhost:8123", api_key=None) - 幂等性:否(每次创建新客户端实例)
函数签名:
def get_client(
url: Optional[str] = None,
api_key: Optional[str] = None,
timeout: Optional[httpx.Timeout] = None,
headers: Optional[dict[str, str]] = None
) -> LangGraphClient:
"""创建异步 LangGraph 客户端"""
参数说明:
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| url | str | 否 | http://localhost:8123 | LangGraph API 服务器地址 |
| api_key | str | 否 | None | API 密钥 |
| timeout | httpx.Timeout | 否 | 30秒 | 请求超时设置 |
| headers | dict | 否 | {} | 自定义请求头 |
核心实现:
def get_client(
url: Optional[str] = None,
api_key: Optional[str] = None,
timeout: Optional[httpx.Timeout] = None,
headers: Optional[dict[str, str]] = None
) -> LangGraphClient:
"""创建异步客户端实例"""
# 1) 默认配置处理
if url is None:
url = os.environ.get("LANGGRAPH_API_URL", "http://localhost:8123")
if api_key is None:
api_key = os.environ.get("LANGGRAPH_API_KEY")
if timeout is None:
timeout = httpx.Timeout(30.0)
# 2) 构建请求头
default_headers = {
"User-Agent": f"langgraph-sdk-python/{langgraph_sdk.__version__}",
"Content-Type": "application/json",
"Accept": "application/json"
}
if api_key:
default_headers["Authorization"] = f"Bearer {api_key}"
if headers:
default_headers.update(headers)
# 3) 创建 HTTP 客户端
http_client = httpx.AsyncClient(
base_url=url,
timeout=timeout,
headers=default_headers,
follow_redirects=True
)
# 4) 创建 LangGraph 客户端
return LangGraphClient(
http=http_client,
api_url=url,
timeout=timeout,
headers=default_headers
)
2.2 核心资源API
2.2.1 Assistants 管理
创建助手:
async def create(
self,
graph_id: str,
config: Optional[dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
if_exists: Literal["raise", "ignore", "replace"] = "raise"
) -> Assistant:
"""创建新助手"""
# 1) 构建请求数据
request_data = {
"graph_id": graph_id,
"config": config or {},
"metadata": metadata or {},
"if_exists": if_exists
}
# 2) 发送 API 请求
response = await self.http.post("/assistants", json=request_data)
# 省略:错误处理和状态检查
# 3) 解析响应
return Assistant(**response.json())
2.2.2 流式运行处理
流式运行:
async def stream(
self,
thread_id: str,
assistant_id: str,
input: Optional[dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
config: Optional[dict[str, Any]] = None,
stream_mode: Literal["values", "messages", "updates", "events"] = "values"
) -> AsyncIterator[StreamPart]:
"""流式执行运行"""
# 1) 构建请求数据
request_data = {
"assistant_id": assistant_id,
"input": input,
"metadata": metadata or {},
"config": config or {},
"stream_mode": stream_mode
}
# 2) 发送流式请求
async with self.http.stream(
"POST",
f"/threads/{thread_id}/runs/stream",
json=request_data,
headers={"Accept": "text/plain"}
) as response:
# 省略:错误处理
# 3) 解析流式数据
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:] # 移除 "data: " 前缀
if data == "[DONE]":
break
# 省略:JSON解析和错误处理
json_data = orjson.loads(data)
yield StreamPart(**json_data)
3. 核心算法/流程剖析
3.1 HTTP 重试机制
目的:提供可靠的网络请求处理,自动重试临时性失败 输入:HTTP 请求配置和重试策略 输出:成功的响应或最终失败异常 复杂度:O(n),其中 n 为最大重试次数
class RetryManager:
"""HTTP 请求重试管理器"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
async def execute_with_retry(
self,
request_func: Callable,
*args,
**kwargs
) -> httpx.Response:
"""带重试的请求执行"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
# 1) 执行请求
response = await request_func(*args, **kwargs)
# 2) 检查是否需要重试
if self._should_retry(response, attempt):
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
await asyncio.sleep(delay)
continue
return response
except (httpx.ConnectError, httpx.TimeoutException) as e:
last_exception = e
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
# 省略:日志记录
await asyncio.sleep(delay)
continue
except Exception as e:
# 不可重试的异常直接抛出
raise e
# 所有重试都失败
raise last_exception or Exception("请求重试失败")
def _calculate_delay(self, attempt: int) -> float:
"""计算重试延迟时间(指数退避)"""
delay = self.base_delay * (self.backoff_factor ** attempt)
return min(delay, self.max_delay)
3.2 流式数据处理算法
目的:高效处理服务器端事件流(SSE),支持实时数据传输 输入:HTTP流响应 输出:结构化的流事件序列 复杂度:O(m),其中 m 为流中的事件数量
class StreamProcessor:
"""流式数据处理器"""
def __init__(self, buffer_size: int = 8192):
self.buffer_size = buffer_size
self.line_buffer = ""
async def process_stream(
self,
response: httpx.Response
) -> AsyncIterator[StreamPart]:
"""处理 SSE 流式响应"""
async for chunk in response.aiter_bytes(self.buffer_size):
# 1) 将字节转换为字符串
chunk_str = chunk.decode('utf-8')
self.line_buffer += chunk_str
# 2) 按行分割处理
while '\n' in self.line_buffer:
line, self.line_buffer = self.line_buffer.split('\n', 1)
# 3) 处理单行数据
event = self._process_line(line.strip())
if event:
yield event
def _process_line(self, line: str) -> Optional[StreamPart]:
"""处理单行 SSE 数据"""
# 1) 跳过空行和注释
if not line or line.startswith(':'):
return None
# 2) 解析 SSE 格式
if line.startswith('data: '):
data = line[6:] # 移除 "data: " 前缀
# 3) 处理结束标记
if data == '[DONE]':
return None
# 4) 解析 JSON 数据
try:
json_data = orjson.loads(data)
return self._create_stream_part(json_data)
except orjson.JSONDecodeError:
# 省略:错误日志
return None
return None
4. 模块级架构图与时序图
4.1 客户端架构图
flowchart LR
subgraph "Client Application"
A[Application Code] --> B[SDK Import]
end
subgraph "SDK Entry Layer"
B --> C[get_client]
B --> D[get_sync_client]
B --> E[Auth]
end
subgraph "Client Layer"
C --> F[LangGraphClient]
D --> G[SyncLangGraphClient]
F --> H[HTTP Transport - Async]
G --> I[HTTP Transport - Sync]
end
subgraph "Resource Management Layer"
F --> J[AssistantsClient]
F --> K[ThreadsClient]
F --> L[RunsClient]
F --> M[CronsClient]
F --> N[StoreClient]
G --> O[SyncAssistantsClient]
G --> P[SyncThreadsClient]
G --> Q[SyncRunsClient]
G --> R[SyncCronsClient]
G --> S[SyncStoreClient]
end
subgraph "Protocol Layer"
H --> T[HTTP/1.1 & HTTP/2]
I --> T
T --> U[JSON Serialization]
T --> V[SSE Processing]
T --> W[Error Handling]
end
subgraph "LangGraph Platform"
X[API Gateway] --> Y[Core Services]
Y --> Z[Graph Runtime]
Y --> AA[Persistence Layer]
end
T <--> X
4.2 认证授权时序图
sequenceDiagram
autonumber
participant Client as Client App
participant SDK as LangGraph SDK
participant Auth as Auth Module
participant Server as LangGraph API
Note over Client,Server: 1. 客户端初始化
Client->>SDK: get_client(api_key="token")
SDK->>SDK: 设置认证头
SDK-->>Client: LangGraphClient 实例
Note over Client,Server: 2. 服务端认证配置
Server->>Auth: 加载认证配置
Auth->>Auth: 注册 @auth.authenticate 处理器
Auth->>Auth: 注册 @auth.on.* 授权处理器
Note over Client,Server: 3. API 请求流程
Client->>SDK: client.assistants.create(...)
SDK->>Server: POST /assistants (with Authorization header)
Server->>Auth: 执行认证处理器
Auth->>Auth: verify_token(authorization_header)
Auth-->>Server: 返回用户信息或抛出异常
Server->>Auth: 执行授权检查
Auth->>Auth: find_handler("assistants", "create")
Auth->>Auth: execute_handler(ctx, request_data)
Auth-->>Server: 返回授权结果
alt 认证授权成功
Server->>Server: 处理业务逻辑
Server-->>SDK: 200 OK + 响应数据
SDK-->>Client: Assistant 对象
else 认证失败
Server-->>SDK: 401 Unauthorized
SDK->>SDK: raise UnauthorizedError
SDK-->>Client: 抛出异常
else 授权失败
Server-->>SDK: 403 Forbidden
SDK->>SDK: raise ForbiddenError
SDK-->>Client: 抛出异常
end
4.3 流式处理时序图
sequenceDiagram
autonumber
participant Client as Client App
participant SDK as SDK Client
participant HTTP as HTTP Client
participant Server as LangGraph API
participant Runtime as Graph Runtime
Note over Client,Runtime: 流式运行请求
Client->>SDK: async for chunk in client.runs.stream(...)
SDK->>HTTP: POST /threads/{id}/runs/stream
HTTP->>Server: 建立 SSE 连接
Server-->>HTTP: 200 OK + text/plain
Note over Client,Runtime: 流式数据传输
Server->>Runtime: 启动图执行
Runtime->>Runtime: 执行节点 1
Runtime->>Server: 发送节点输出
Server->>HTTP: data: {"type": "node", "data": {...}}
HTTP->>SDK: 接收 SSE 数据
SDK->>SDK: 解析 JSON 数据
SDK->>Client: yield StreamPart(type="node", data=...)
Runtime->>Runtime: 执行节点 2
Runtime->>Server: 发送节点输出
Server->>HTTP: data: {"type": "node", "data": {...}}
HTTP->>SDK: 接收 SSE 数据
SDK->>SDK: 解析 JSON 数据
SDK->>Client: yield StreamPart(type="node", data=...)
Runtime->>Runtime: 图执行完成
Runtime->>Server: 发送完成信号
Server->>HTTP: data: [DONE]
HTTP->>SDK: 接收结束标记
SDK->>SDK: 关闭流迭代器
SDK-->>Client: 迭代结束
5. 异常处理与最佳实践
5.1 异常层次结构
class LangGraphError(Exception):
"""LangGraph SDK 基础异常"""
def __init__(self, message: str, details: Optional[dict] = None):
super().__init__(message)
self.message = message
self.details = details or {}
class HTTPException(LangGraphError):
"""HTTP 相关异常"""
def __init__(
self,
message: str,
status_code: int,
headers: Optional[dict] = None,
details: Optional[dict] = None
):
super().__init__(message, details)
self.status_code = status_code
self.headers = headers or {}
class AuthenticationError(HTTPException):
"""认证异常"""
pass
class AuthorizationError(HTTPException):
"""授权异常"""
pass
class RateLimitError(HTTPException):
"""速率限制异常"""
def __init__(
self,
message: str,
retry_after: Optional[int] = None,
**kwargs
):
super().__init__(message, **kwargs)
self.retry_after = retry_after
5.2 性能优化建议
class PerformanceOptimizer:
"""性能优化工具"""
@staticmethod
def create_optimized_client(
url: str,
api_key: str,
max_connections: int = 100,
max_keepalive_connections: int = 20
) -> LangGraphClient:
"""创建优化的客户端"""
# 1) 优化连接池设置
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections
)
# 2) 设置合理的超时
timeout = httpx.Timeout(
connect=5.0, # 连接超时
read=30.0, # 读取超时
write=10.0, # 写入超时
pool=5.0 # 连接池超时
)
# 3) 启用 HTTP/2 和压缩
http_client = httpx.AsyncClient(
base_url=url,
limits=limits,
timeout=timeout,
http2=True,
headers={
"Authorization": f"Bearer {api_key}",
"Accept-Encoding": "gzip, deflate"
}
)
return LangGraphClient(http=http_client)
6. 总结
LangGraph SDK Python 模块作为 LangGraph Platform 的官方客户端,提供了完整、易用、高性能的 Python API。其主要特点包括:
6.1 核心优势
- 类型安全:完整的类型注解和运行时验证
- 异步支持:原生支持 async/await 编程模式
- 流式处理:高效的实时数据传输能力
- 认证授权:灵活强大的安全机制
6.2 架构特点
- 分层设计:清晰的客户端、资源、协议分层
- 错误处理:完善的异常层次和错误恢复机制
- 性能优化:连接池、重试、批处理等优化策略
- 扩展性:支持自定义认证和中间件
6.3 应用价值
该 SDK 为 Python 开发者提供了与 LangGraph Platform 交互的标准化接口,大大简化了客户端应用的开发复杂度,支持从简单的脚本工具到复杂的企业级应用等各种使用场景。通过统一的 API 设计和完善的错误处理,确保了应用的稳定性和可维护性。