LangGraph-08-SDK-Python模块综合文档

0. 模块概览

0.1 模块职责

LangGraph SDK Python 模块是 LangGraph Platform REST API 的官方 Python 客户端库,提供了与 LangGraph 服务端进行交互的完整接口。它抽象了底层的 HTTP 通信细节,为开发者提供了简洁易用的 Python API,支持异步和同步两种调用模式。

0.2 模块输入输出

输入

  • LangGraph API 服务器地址和认证信息
  • 客户端请求参数(Assistants、Threads、Runs 等)
  • 流式处理配置和回调函数

输出

  • 结构化的 API 响应对象
  • 流式数据迭代器
  • 错误和异常信息

0.3 上下游依赖

依赖关系

  • httpx: 现代异步 HTTP 客户端库
  • orjson: 高性能 JSON 序列化库
  • LangGraph API Server: 上游服务依赖

下游使用方

  • Python 应用开发者
  • LangGraph 客户端应用
  • 第三方集成系统
  • 自动化脚本和工具

0.4 生命周期

  1. 初始化:创建客户端实例,配置服务器地址和认证
  2. 连接建立:建立与 LangGraph API 的 HTTP 连接
  3. API 调用:执行各种资源操作(CRUD、流式处理)
  4. 资源清理:关闭连接,释放资源

0.5 模块架构图

flowchart TD
    A[Client Application] --> B[SDK Entry Points]
    B --> C[get_client]
    B --> D[get_sync_client]
    
    C --> E[LangGraphClient]
    D --> F[SyncLangGraphClient]
    
    E --> G[HTTP Client - Async]
    F --> H[HTTP Client - Sync]
    
    G --> I[Resource Managers]
    H --> I
    
    I --> J[Assistants]
    I --> K[Threads]
    I --> L[Runs]
    I --> M[Crons]
    I --> N[Store]
    
    O[Authentication] --> P[Auth Module]
    P --> Q[Custom Handlers]
    P --> R[Token Validation]
    
    S[Schema Definitions] --> T[Request Types]
    S --> U[Response Types]
    S --> V[Stream Types]
    
    W[Error Handling] --> X[HTTP Exceptions]
    W --> Y[API Errors]
    
    Z[LangGraph API Server] <--> G
    Z <--> H

架构说明

  • Client Factories: get_client()get_sync_client() 工厂函数
  • Core Clients: 异步和同步版本的主客户端类
  • Resource Managers: 各种资源的专用管理器
  • Auth Module: 灵活的认证和授权系统
  • Schema Layer: 类型安全的请求/响应定义
  • Error Handling: 统一的错误处理和异常机制

1. 关键数据结构与UML

1.1 核心客户端类

class LangGraphClient:
    """异步 LangGraph 客户端"""
    
    # 资源管理器
    assistants: AssistantsClient
    threads: ThreadsClient
    runs: RunsClient
    crons: CronsClient
    store: StoreClient
    
    # HTTP 客户端
    http: httpx.AsyncClient
    
    # 配置
    api_url: str
    timeout: httpx.Timeout
    headers: dict[str, str]

class SyncLangGraphClient:
    """同步 LangGraph 客户端"""
    
    # 资源管理器  
    assistants: SyncAssistantsClient
    threads: SyncThreadsClient
    runs: SyncRunsClient
    crons: SyncCronsClient
    store: SyncStoreClient
    
    # HTTP 客户端
    http: httpx.Client
    
    # 配置
    api_url: str
    timeout: httpx.Timeout
    headers: dict[str, str]

1.2 认证系统数据结构

class Auth:
    """认证和授权管理系统"""
    
    on: _On                           # 授权处理器入口
    _handlers: dict[tuple[str, str], list[Handler]]  # 处理器映射
    _authenticate_handler: Optional[Authenticator]   # 认证处理器
    
    types: types                      # 类型定义模块
    exceptions: exceptions            # 异常定义模块

class AuthContext:
    """认证上下文"""
    
    user: User                        # 用户信息
    path: str                        # 请求路径
    method: str                      # HTTP 方法
    headers: dict[str, str]          # 请求头
    params: dict[str, Any]           # 请求参数

class User:
    """用户信息"""
    
    identity: str                    # 用户标识
    permissions: list[str]           # 用户权限
    metadata: dict[str, Any]         # 额外元数据

1.3 资源对象定义

class Assistant:
    """助手资源"""
    
    assistant_id: str                # 助手ID
    graph_id: str                   # 图ID
    config: dict[str, Any]          # 配置信息
    created_at: str                 # 创建时间
    updated_at: str                 # 更新时间
    metadata: dict[str, Any]        # 元数据

class Thread:
    """线程资源"""
    
    thread_id: str                  # 线程ID
    created_at: str                 # 创建时间
    updated_at: str                 # 更新时间
    metadata: dict[str, Any]        # 元数据

class Run:
    """运行资源"""
    
    run_id: str                     # 运行ID
    thread_id: str                  # 线程ID
    assistant_id: str               # 助手ID
    status: RunStatus               # 运行状态
    created_at: str                 # 创建时间
    updated_at: str                 # 更新时间
    metadata: dict[str, Any]        # 元数据

1.4 类图关系

classDiagram
    class LangGraphClient {
        +assistants: AssistantsClient
        +threads: ThreadsClient
        +runs: RunsClient
        +crons: CronsClient
        +store: StoreClient
        +http: httpx.AsyncClient
        +get(path: str) Any
        +post(path: str, json: Any) Any
        +stream(path: str, json: Any) AsyncIterator
    }
    
    class SyncLangGraphClient {
        +assistants: SyncAssistantsClient
        +threads: SyncThreadsClient
        +runs: SyncRunsClient
        +crons: SyncCronsClient
        +store: SyncStoreClient
        +http: httpx.Client
        +get(path: str) Any
        +post(path: str, json: Any) Any
        +stream(path: str, json: Any) Iterator
    }
    
    class AssistantsClient {
        +create(graph_id: str, config: dict) Assistant
        +get(assistant_id: str) Assistant
        +update(assistant_id: str, config: dict) Assistant
        +delete(assistant_id: str) None
        +search(metadata: dict) List[Assistant]
    }
    
    class ThreadsClient {
        +create(metadata: dict) Thread
        +get(thread_id: str) Thread
        +update(thread_id: str, metadata: dict) Thread
        +delete(thread_id: str) None
        +get_state(thread_id: str) State
        +update_state(thread_id: str, values: dict) dict
    }
    
    class RunsClient {
        +create(thread_id: str, assistant_id: str, input: dict) Run
        +get(thread_id: str, run_id: str) Run
        +cancel(thread_id: str, run_id: str) None
        +stream(thread_id: str, assistant_id: str, input: dict) AsyncIterator
        +wait(thread_id: str, run_id: str) Run
    }
    
    class Auth {
        +on: _On
        +authenticate(handler: Authenticator) Authenticator
        +_handlers: dict[tuple[str, str], list[Handler]]
        +_authenticate_handler: Authenticator
    }
    
    class AuthContext {
        +user: User
        +path: str
        +method: str
        +headers: dict[str, str]
        +params: dict[str, Any]
    }
    
    LangGraphClient --> AssistantsClient
    LangGraphClient --> ThreadsClient
    LangGraphClient --> RunsClient
    SyncLangGraphClient --> AssistantsClient
    SyncLangGraphClient --> ThreadsClient
    SyncLangGraphClient --> RunsClient
    
    Auth --> AuthContext
    AuthContext --> User
    
    AssistantsClient --> Assistant
    ThreadsClient --> Thread
    RunsClient --> Run

2. 对外API列表与规格

2.1 客户端工厂函数

2.1.1 get_client()

基本信息

  • 名称:get_client
  • 协议:函数调用 get_client(url="http://localhost:8123", api_key=None)
  • 幂等性:否(每次创建新客户端实例)

函数签名

def get_client(
    url: Optional[str] = None,
    api_key: Optional[str] = None,
    timeout: Optional[httpx.Timeout] = None,
    headers: Optional[dict[str, str]] = None
) -> LangGraphClient:
    """创建异步 LangGraph 客户端"""

参数说明

参数 类型 必填 默认值 说明
url str http://localhost:8123 LangGraph API 服务器地址
api_key str None API 密钥
timeout httpx.Timeout 30秒 请求超时设置
headers dict {} 自定义请求头

核心实现

def get_client(
    url: Optional[str] = None,
    api_key: Optional[str] = None,
    timeout: Optional[httpx.Timeout] = None,
    headers: Optional[dict[str, str]] = None
) -> LangGraphClient:
    """创建异步客户端实例"""
    
    # 1) 默认配置处理
    if url is None:
        url = os.environ.get("LANGGRAPH_API_URL", "http://localhost:8123")
    
    if api_key is None:
        api_key = os.environ.get("LANGGRAPH_API_KEY")
    
    if timeout is None:
        timeout = httpx.Timeout(30.0)
    
    # 2) 构建请求头
    default_headers = {
        "User-Agent": f"langgraph-sdk-python/{langgraph_sdk.__version__}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    if api_key:
        default_headers["Authorization"] = f"Bearer {api_key}"
    
    if headers:
        default_headers.update(headers)
    
    # 3) 创建 HTTP 客户端
    http_client = httpx.AsyncClient(
        base_url=url,
        timeout=timeout,
        headers=default_headers,
        follow_redirects=True
    )
    
    # 4) 创建 LangGraph 客户端
    return LangGraphClient(
        http=http_client,
        api_url=url,
        timeout=timeout,
        headers=default_headers
    )

2.2 核心资源API

2.2.1 Assistants 管理

创建助手

async def create(
    self,
    graph_id: str,
    config: Optional[dict[str, Any]] = None,
    metadata: Optional[dict[str, Any]] = None,
    if_exists: Literal["raise", "ignore", "replace"] = "raise"
) -> Assistant:
    """创建新助手"""
    
    # 1) 构建请求数据
    request_data = {
        "graph_id": graph_id,
        "config": config or {},
        "metadata": metadata or {},
        "if_exists": if_exists
    }
    
    # 2) 发送 API 请求
    response = await self.http.post("/assistants", json=request_data)
    # 省略:错误处理和状态检查
    
    # 3) 解析响应
    return Assistant(**response.json())

2.2.2 流式运行处理

流式运行

async def stream(
    self,
    thread_id: str,
    assistant_id: str,
    input: Optional[dict[str, Any]] = None,
    metadata: Optional[dict[str, Any]] = None,
    config: Optional[dict[str, Any]] = None,
    stream_mode: Literal["values", "messages", "updates", "events"] = "values"
) -> AsyncIterator[StreamPart]:
    """流式执行运行"""
    
    # 1) 构建请求数据
    request_data = {
        "assistant_id": assistant_id,
        "input": input,
        "metadata": metadata or {},
        "config": config or {},
        "stream_mode": stream_mode
    }
    
    # 2) 发送流式请求
    async with self.http.stream(
        "POST",
        f"/threads/{thread_id}/runs/stream",
        json=request_data,
        headers={"Accept": "text/plain"}
    ) as response:
        # 省略:错误处理
        
        # 3) 解析流式数据
        async for line in response.aiter_lines():
            if line.startswith("data: "):
                data = line[6:]  # 移除 "data: " 前缀
                if data == "[DONE]":
                    break
                
                # 省略:JSON解析和错误处理
                json_data = orjson.loads(data)
                yield StreamPart(**json_data)

3. 核心算法/流程剖析

3.1 HTTP 重试机制

目的:提供可靠的网络请求处理,自动重试临时性失败 输入:HTTP 请求配置和重试策略 输出:成功的响应或最终失败异常 复杂度:O(n),其中 n 为最大重试次数

class RetryManager:
    """HTTP 请求重试管理器"""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        backoff_factor: float = 2.0
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor
    
    async def execute_with_retry(
        self,
        request_func: Callable,
        *args,
        **kwargs
    ) -> httpx.Response:
        """带重试的请求执行"""
        
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                # 1) 执行请求
                response = await request_func(*args, **kwargs)
                
                # 2) 检查是否需要重试
                if self._should_retry(response, attempt):
                    if attempt < self.max_retries:
                        delay = self._calculate_delay(attempt)
                        await asyncio.sleep(delay)
                        continue
                
                return response
                
            except (httpx.ConnectError, httpx.TimeoutException) as e:
                last_exception = e
                if attempt < self.max_retries:
                    delay = self._calculate_delay(attempt)
                    # 省略:日志记录
                    await asyncio.sleep(delay)
                    continue
                
            except Exception as e:
                # 不可重试的异常直接抛出
                raise e
        
        # 所有重试都失败
        raise last_exception or Exception("请求重试失败")
    
    def _calculate_delay(self, attempt: int) -> float:
        """计算重试延迟时间(指数退避)"""
        delay = self.base_delay * (self.backoff_factor ** attempt)
        return min(delay, self.max_delay)

3.2 流式数据处理算法

目的:高效处理服务器端事件流(SSE),支持实时数据传输 输入:HTTP流响应 输出:结构化的流事件序列 复杂度:O(m),其中 m 为流中的事件数量

class StreamProcessor:
    """流式数据处理器"""
    
    def __init__(self, buffer_size: int = 8192):
        self.buffer_size = buffer_size
        self.line_buffer = ""
    
    async def process_stream(
        self,
        response: httpx.Response
    ) -> AsyncIterator[StreamPart]:
        """处理 SSE 流式响应"""
        
        async for chunk in response.aiter_bytes(self.buffer_size):
            # 1) 将字节转换为字符串
            chunk_str = chunk.decode('utf-8')
            self.line_buffer += chunk_str
            
            # 2) 按行分割处理
            while '\n' in self.line_buffer:
                line, self.line_buffer = self.line_buffer.split('\n', 1)
                
                # 3) 处理单行数据
                event = self._process_line(line.strip())
                if event:
                    yield event
    
    def _process_line(self, line: str) -> Optional[StreamPart]:
        """处理单行 SSE 数据"""
        
        # 1) 跳过空行和注释
        if not line or line.startswith(':'):
            return None
        
        # 2) 解析 SSE 格式
        if line.startswith('data: '):
            data = line[6:]  # 移除 "data: " 前缀
            
            # 3) 处理结束标记
            if data == '[DONE]':
                return None
            
            # 4) 解析 JSON 数据
            try:
                json_data = orjson.loads(data)
                return self._create_stream_part(json_data)
            except orjson.JSONDecodeError:
                # 省略:错误日志
                return None
        
        return None

4. 模块级架构图与时序图

4.1 客户端架构图

flowchart LR
    subgraph "Client Application"
        A[Application Code] --> B[SDK Import]
    end
    
    subgraph "SDK Entry Layer"
        B --> C[get_client]
        B --> D[get_sync_client]
        B --> E[Auth]
    end
    
    subgraph "Client Layer"
        C --> F[LangGraphClient]
        D --> G[SyncLangGraphClient]
        
        F --> H[HTTP Transport - Async]
        G --> I[HTTP Transport - Sync]
    end
    
    subgraph "Resource Management Layer"
        F --> J[AssistantsClient]
        F --> K[ThreadsClient]
        F --> L[RunsClient]
        F --> M[CronsClient]
        F --> N[StoreClient]
        
        G --> O[SyncAssistantsClient]
        G --> P[SyncThreadsClient]
        G --> Q[SyncRunsClient]
        G --> R[SyncCronsClient]
        G --> S[SyncStoreClient]
    end
    
    subgraph "Protocol Layer"
        H --> T[HTTP/1.1 & HTTP/2]
        I --> T
        T --> U[JSON Serialization]
        T --> V[SSE Processing]
        T --> W[Error Handling]
    end
    
    subgraph "LangGraph Platform"
        X[API Gateway] --> Y[Core Services]
        Y --> Z[Graph Runtime]
        Y --> AA[Persistence Layer]
    end
    
    T <--> X

4.2 认证授权时序图

sequenceDiagram
    autonumber
    participant Client as Client App
    participant SDK as LangGraph SDK
    participant Auth as Auth Module
    participant Server as LangGraph API
    
    Note over Client,Server: 1. 客户端初始化
    Client->>SDK: get_client(api_key="token")
    SDK->>SDK: 设置认证头
    SDK-->>Client: LangGraphClient 实例
    
    Note over Client,Server: 2. 服务端认证配置
    Server->>Auth: 加载认证配置
    Auth->>Auth: 注册 @auth.authenticate 处理器
    Auth->>Auth: 注册 @auth.on.* 授权处理器
    
    Note over Client,Server: 3. API 请求流程
    Client->>SDK: client.assistants.create(...)
    SDK->>Server: POST /assistants (with Authorization header)
    
    Server->>Auth: 执行认证处理器
    Auth->>Auth: verify_token(authorization_header)
    Auth-->>Server: 返回用户信息或抛出异常
    
    Server->>Auth: 执行授权检查
    Auth->>Auth: find_handler("assistants", "create")
    Auth->>Auth: execute_handler(ctx, request_data)
    Auth-->>Server: 返回授权结果
    
    alt 认证授权成功
        Server->>Server: 处理业务逻辑
        Server-->>SDK: 200 OK + 响应数据
        SDK-->>Client: Assistant 对象
    else 认证失败
        Server-->>SDK: 401 Unauthorized
        SDK->>SDK: raise UnauthorizedError
        SDK-->>Client: 抛出异常
    else 授权失败
        Server-->>SDK: 403 Forbidden
        SDK->>SDK: raise ForbiddenError
        SDK-->>Client: 抛出异常
    end

4.3 流式处理时序图

sequenceDiagram
    autonumber
    participant Client as Client App
    participant SDK as SDK Client
    participant HTTP as HTTP Client
    participant Server as LangGraph API
    participant Runtime as Graph Runtime
    
    Note over Client,Runtime: 流式运行请求
    Client->>SDK: async for chunk in client.runs.stream(...)
    SDK->>HTTP: POST /threads/{id}/runs/stream
    HTTP->>Server: 建立 SSE 连接
    Server-->>HTTP: 200 OK + text/plain
    
    Note over Client,Runtime: 流式数据传输
    Server->>Runtime: 启动图执行
    Runtime->>Runtime: 执行节点 1
    Runtime->>Server: 发送节点输出
    Server->>HTTP: data: {"type": "node", "data": {...}}
    HTTP->>SDK: 接收 SSE 数据
    SDK->>SDK: 解析 JSON 数据
    SDK->>Client: yield StreamPart(type="node", data=...)
    
    Runtime->>Runtime: 执行节点 2
    Runtime->>Server: 发送节点输出
    Server->>HTTP: data: {"type": "node", "data": {...}}
    HTTP->>SDK: 接收 SSE 数据
    SDK->>SDK: 解析 JSON 数据
    SDK->>Client: yield StreamPart(type="node", data=...)
    
    Runtime->>Runtime: 图执行完成
    Runtime->>Server: 发送完成信号
    Server->>HTTP: data: [DONE]
    HTTP->>SDK: 接收结束标记
    SDK->>SDK: 关闭流迭代器
    SDK-->>Client: 迭代结束

5. 异常处理与最佳实践

5.1 异常层次结构

class LangGraphError(Exception):
    """LangGraph SDK 基础异常"""
    
    def __init__(self, message: str, details: Optional[dict] = None):
        super().__init__(message)
        self.message = message
        self.details = details or {}

class HTTPException(LangGraphError):
    """HTTP 相关异常"""
    
    def __init__(
        self,
        message: str,
        status_code: int,
        headers: Optional[dict] = None,
        details: Optional[dict] = None
    ):
        super().__init__(message, details)
        self.status_code = status_code
        self.headers = headers or {}

class AuthenticationError(HTTPException):
    """认证异常"""
    pass

class AuthorizationError(HTTPException):
    """授权异常"""
    pass

class RateLimitError(HTTPException):
    """速率限制异常"""
    
    def __init__(
        self,
        message: str,
        retry_after: Optional[int] = None,
        **kwargs
    ):
        super().__init__(message, **kwargs)
        self.retry_after = retry_after

5.2 性能优化建议

class PerformanceOptimizer:
    """性能优化工具"""
    
    @staticmethod
    def create_optimized_client(
        url: str,
        api_key: str,
        max_connections: int = 100,
        max_keepalive_connections: int = 20
    ) -> LangGraphClient:
        """创建优化的客户端"""
        
        # 1) 优化连接池设置
        limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive_connections
        )
        
        # 2) 设置合理的超时
        timeout = httpx.Timeout(
            connect=5.0,    # 连接超时
            read=30.0,      # 读取超时
            write=10.0,     # 写入超时
            pool=5.0        # 连接池超时
        )
        
        # 3) 启用 HTTP/2 和压缩
        http_client = httpx.AsyncClient(
            base_url=url,
            limits=limits,
            timeout=timeout,
            http2=True,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Accept-Encoding": "gzip, deflate"
            }
        )
        
        return LangGraphClient(http=http_client)

6. 总结

LangGraph SDK Python 模块作为 LangGraph Platform 的官方客户端,提供了完整、易用、高性能的 Python API。其主要特点包括:

6.1 核心优势

  • 类型安全:完整的类型注解和运行时验证
  • 异步支持:原生支持 async/await 编程模式
  • 流式处理:高效的实时数据传输能力
  • 认证授权:灵活强大的安全机制

6.2 架构特点

  • 分层设计:清晰的客户端、资源、协议分层
  • 错误处理:完善的异常层次和错误恢复机制
  • 性能优化:连接池、重试、批处理等优化策略
  • 扩展性:支持自定义认证和中间件

6.3 应用价值

该 SDK 为 Python 开发者提供了与 LangGraph Platform 交互的标准化接口,大大简化了客户端应用的开发复杂度,支持从简单的脚本工具到复杂的企业级应用等各种使用场景。通过统一的 API 设计和完善的错误处理,确保了应用的稳定性和可维护性。