概述

LangGraph是一个专为构建多智能体应用而设计的Python框架,基于图计算模型实现复杂AI工作流的编排和执行。本手册将从基础安装到高级应用全面介绍LangGraph的使用方法。

1. 快速开始

1.1 安装配置

基础安装

# 安装核心包
pip install langgraph

# 安装检查点支持
pip install langgraph-checkpoint-postgres  # PostgreSQL支持
pip install langgraph-checkpoint-sqlite    # SQLite支持

# 安装预构建组件
pip install langgraph-prebuilt

# 安装CLI工具
pip install langgraph-cli

开发环境配置

# 创建项目目录
mkdir my-langgraph-app
cd my-langgraph-app

# 初始化项目
langgraph init

# 安装开发依赖
pip install -r requirements-dev.txt

环境变量配置

# .env文件
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
DATABASE_URL=postgresql://user:password@localhost/langgraph
LANGSMITH_API_KEY=your_langsmith_api_key
LANGSMITH_TRACING=true

1.2 项目结构

my-langgraph-app/
├── langgraph.json          # 项目配置文件
├── .env                    # 环境变量
├── requirements.txt        # 依赖包
├── src/
   ├── __init__.py
   ├── graph.py           # 主图定义
   ├── nodes/             # 节点实现
      ├── __init__.py
      ├── agent.py
      └── tools.py
   ├── state.py           # 状态定义
   └── utils.py           # 工具函数
├── tests/                 # 测试文件
└── docs/                  # 文档

1.3 配置文件详解

langgraph.json配置

{
  "graphs": {
    "main_workflow": "./src/graph.py:workflow",
    "chat_agent": "./src/agents/chat.py:agent_graph"
  },
  "python_version": "3.11",
  "env": "./.env",
  "dependencies": ["."],
  "dockerfile_lines": [
    "RUN apt-get update && apt-get install -y git",
    "RUN pip install --no-cache-dir torch"
  ],
  "deployment": {
    "platform": "langgraph-cloud",
    "scaling": {
      "min_instances": 1,
      "max_instances": 10,
      "auto_scaling_metric": "cpu_utilization",
      "target_utilization": 70
    },
    "monitoring": {
      "enable_tracing": true,
      "log_level": "INFO",
      "metrics_collection": ["latency", "throughput", "error_rate"]
    }
  }
}

2. 核心概念

2.1 状态管理

状态定义

from typing import TypedDict, Annotated, List
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    """智能体状态定义
    
    状态是图中所有节点共享的数据结构,支持:
    - 类型安全:基于TypedDict的强类型定义
    - 累积操作:通过Annotated定义数据累积方式
    - 版本控制:自动跟踪状态变化历史
    """
    
    # 消息历史(自动累积)
    messages: Annotated[List[BaseMessage], add_messages]
    
    # 当前任务
    current_task: str
    
    # 执行结果
    result: str
    
    # 工具调用历史
    tool_calls: List[dict]
    
    # 错误信息
    error: str | None
    
    # 迭代计数
    iteration_count: int

# 状态操作示例
def update_state_example():
    """状态更新示例"""
    from langgraph.graph import StateGraph
    
    def process_node(state: AgentState) -> AgentState:
        """处理节点:展示状态更新模式"""
        return {
            "current_task": "processing_data",
            "iteration_count": state.get("iteration_count", 0) + 1,
            "result": f"Processed at iteration {state.get('iteration_count', 0) + 1}"
        }
    
    # 创建图
    graph = StateGraph(AgentState)
    graph.add_node("process", process_node)
    graph.set_entry_point("process")
    graph.set_finish_point("process")
    
    return graph.compile()

2.2 图构建

基础图构建

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool

# 定义工具
@tool
def search_web(query: str) -> str:
    """搜索网络信息"""
    return f"搜索结果:{query}"

@tool  
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return f"计算结果:{result}"
    except:
        return "计算错误"

def build_basic_graph():
    """构建基础图示例"""
    
    def agent_node(state: AgentState) -> AgentState:
        """智能体节点:决策和工具调用"""
        from langchain_openai import ChatOpenAI
        
        llm = ChatOpenAI(model="gpt-4")
        tools = [search_web, calculate]
        llm_with_tools = llm.bind_tools(tools)
        
        # 调用LLM
        response = llm_with_tools.invoke(state["messages"])
        
        return {"messages": [response]}
    
    def should_continue(state: AgentState) -> str:
        """条件判断:是否继续执行"""
        last_message = state["messages"][-1]
        
        # 检查是否有工具调用
        if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
            return "tools"
        else:
            return END
    
    # 创建图
    graph = StateGraph(AgentState)
    
    # 添加节点
    graph.add_node("agent", agent_node)
    graph.add_node("tools", ToolNode([search_web, calculate]))
    
    # 设置入口
    graph.set_entry_point("agent")
    
    # 添加条件边
    graph.add_conditional_edges(
        "agent",
        should_continue,
        {
            "tools": "tools",
            END: END
        }
    )
    
    # 工具执行后返回智能体
    graph.add_edge("tools", "agent")
    
    return graph.compile()

高级图模式

from langgraph.types import Command, Send
from typing import List

def build_advanced_graph():
    """构建高级图:支持并行执行和动态路由"""
    
    def coordinator_node(state: AgentState) -> Command[str]:
        """协调器节点:智能路由决策"""
        task = state.get("current_task", "")
        
        if "search" in task.lower():
            return Command(
                goto="search_specialist",
                update={"current_task": f"Routing to search: {task}"}
            )
        elif "calculate" in task.lower():
            return Command(
                goto="math_specialist", 
                update={"current_task": f"Routing to math: {task}"}
            )
        else:
            return Command(
                goto="general_agent",
                update={"current_task": f"General processing: {task}"}
            )
    
    def parallel_dispatcher(state: AgentState) -> List[Send]:
        """并行分发器:将任务分发到多个工作节点"""
        tasks = state.get("subtasks", [])
        
        return [
            Send("worker", {"task": task, "worker_id": i})
            for i, task in enumerate(tasks)
        ]
    
    def search_specialist(state: AgentState) -> AgentState:
        """搜索专家节点"""
        task = state["current_task"]
        result = f"搜索专家处理:{task}"
        return {"result": result, "current_task": "search_completed"}
    
    def math_specialist(state: AgentState) -> AgentState:
        """数学专家节点"""
        task = state["current_task"]
        result = f"数学专家处理:{task}"
        return {"result": result, "current_task": "math_completed"}
    
    def worker_node(state: dict) -> AgentState:
        """工作节点:处理并行任务"""
        task = state["task"]
        worker_id = state["worker_id"]
        
        return {
            "result": f"Worker {worker_id} completed: {task}",
            "worker_id": worker_id
        }
    
    # 构建图
    graph = StateGraph(AgentState)
    
    # 添加节点
    graph.add_node("coordinator", coordinator_node)
    graph.add_node("dispatcher", parallel_dispatcher)
    graph.add_node("search_specialist", search_specialist)
    graph.add_node("math_specialist", math_specialist)
    graph.add_node("worker", worker_node)
    
    # 设置路由
    graph.set_entry_point("coordinator")
    
    # 条件路由
    graph.add_conditional_edges(
        "coordinator",
        lambda state: state["current_task"].split(":")[0].split()[-1],
        {
            "search": "search_specialist",
            "math": "math_specialist", 
            "processing": "dispatcher"
        }
    )
    
    # 并行执行
    graph.add_edge("dispatcher", "worker")
    
    return graph.compile()

2.3 检查点系统

检查点配置

from langgraph.checkpoint.postgres import PostgresCheckpointSaver
from langgraph.checkpoint.sqlite import SqliteCheckpointSaver
from langgraph.checkpoint.memory import InMemorySaver

def setup_checkpointing():
    """设置检查点系统"""
    
    # 生产环境:PostgreSQL
    def create_postgres_checkpointer():
        return PostgresCheckpointSaver.from_conn_string(
            "postgresql://user:password@localhost/langgraph"
        )
    
    # 开发环境:SQLite
    def create_sqlite_checkpointer():
        return SqliteCheckpointSaver.from_conn_string("checkpoints.db")
    
    # 测试环境:内存
    def create_memory_checkpointer():
        return InMemorySaver()
    
    # 根据环境选择
    import os
    env = os.getenv("ENVIRONMENT", "development")
    
    if env == "production":
        return create_postgres_checkpointer()
    elif env == "development":
        return create_sqlite_checkpointer()
    else:
        return create_memory_checkpointer()

def use_checkpointing_example():
    """检查点使用示例"""
    
    # 创建带检查点的图
    checkpointer = setup_checkpointing()
    graph = build_basic_graph()
    app = graph.compile(checkpointer=checkpointer)
    
    # 配置线程ID
    config = {"configurable": {"thread_id": "conversation-1"}}
    
    # 执行并自动保存检查点
    result = app.invoke(
        {"messages": [("user", "计算 2+2")]},
        config=config
    )
    
    # 从检查点恢复执行
    continued_result = app.invoke(
        {"messages": [("user", "再计算 3+3")]},
        config=config  # 相同thread_id会从上次状态继续
    )
    
    # 查看执行历史
    history = app.get_state_history(config)
    for state in history:
        print(f"Step {state.metadata['step']}: {state.values}")
    
    return result, continued_result

3. API深入分析

3.1 StateGraph核心API

创建和配置

class StateGraph:
    """状态图核心类
    
    StateGraph是LangGraph的核心API,负责:
    - 图结构定义和管理
    - 节点和边的添加
    - 执行流程控制
    - 状态传播机制
    """
    
    def __init__(self, state_schema: Type[TypedDict]):
        """初始化状态图
        
        Args:
            state_schema: 状态模式定义,必须是TypedDict子类
        """
        self.state_schema = state_schema
        self.nodes = {}
        self.edges = {}
        self.conditional_edges = {}
        self.entry_point = None
        self.finish_point = None
    
    def add_node(self, name: str, action: Callable) -> "StateGraph":
        """添加节点
        
        Args:
            name: 节点名称,必须唯一
            action: 节点执行函数,签名为 (state) -> state_update
            
        Returns:
            StateGraph: 支持链式调用
        """
        if name in self.nodes:
            raise ValueError(f"Node '{name}' already exists")
        
        self.nodes[name] = PregelNode(
            name=name,
            action=action,
            input_channels=self._get_input_channels(action),
            output_channels=self._get_output_channels(action)
        )
        return self
    
    def add_edge(self, start: str, end: str) -> "StateGraph":
        """添加无条件边
        
        Args:
            start: 起始节点名称
            end: 结束节点名称或END常量
            
        Returns:
            StateGraph: 支持链式调用
        """
        if start not in self.nodes:
            raise ValueError(f"Start node '{start}' not found")
        
        if end != END and end not in self.nodes:
            raise ValueError(f"End node '{end}' not found")
        
        if start not in self.edges:
            self.edges[start] = []
        self.edges[start].append(end)
        return self
    
    def add_conditional_edges(
        self,
        start: str,
        condition: Callable,
        condition_map: Dict[str, str]
    ) -> "StateGraph":
        """添加条件边
        
        Args:
            start: 起始节点名称
            condition: 条件函数,签名为 (state) -> str
            condition_map: 条件值到目标节点的映射
            
        Returns:
            StateGraph: 支持链式调用
        """
        if start not in self.nodes:
            raise ValueError(f"Start node '{start}' not found")
        
        # 验证条件映射中的目标节点
        for condition_value, target in condition_map.items():
            if target != END and target not in self.nodes:
                raise ValueError(f"Target node '{target}' not found")
        
        self.conditional_edges[start] = ConditionalEdge(
            condition=condition,
            condition_map=condition_map
        )
        return self
    
    def set_entry_point(self, name: str) -> "StateGraph":
        """设置入口点
        
        Args:
            name: 入口节点名称
            
        Returns:
            StateGraph: 支持链式调用
        """
        if name not in self.nodes:
            raise ValueError(f"Entry node '{name}' not found")
        
        self.entry_point = name
        return self
    
    def set_finish_point(self, name: str) -> "StateGraph":
        """设置结束点
        
        Args:
            name: 结束节点名称
            
        Returns:
            StateGraph: 支持链式调用
        """
        if name not in self.nodes:
            raise ValueError(f"Finish node '{name}' not found")
        
        self.finish_point = name
        return self
    
    def compile(
        self,
        checkpointer: BaseCheckpointSaver = None,
        interrupt_before: List[str] = None,
        interrupt_after: List[str] = None,
        debug: bool = False
    ) -> CompiledStateGraph:
        """编译图为可执行对象
        
        Args:
            checkpointer: 检查点保存器
            interrupt_before: 在这些节点前中断
            interrupt_after: 在这些节点后中断
            debug: 是否启用调试模式
            
        Returns:
            CompiledStateGraph: 编译后的可执行图
        """
        # 验证图结构
        self._validate_graph()
        
        # 创建通道系统
        channels = self._create_channels()
        
        # 创建Pregel执行器
        pregel = Pregel(
            nodes=self.nodes,
            channels=channels,
            input_channels=self._get_input_channels(),
            output_channels=self._get_output_channels(),
            stream_channels=self._get_stream_channels(),
            checkpointer=checkpointer,
            interrupt_before=interrupt_before or [],
            interrupt_after=interrupt_after or [],
            debug=debug
        )
        
        return CompiledStateGraph(pregel)

执行API

class CompiledStateGraph:
    """编译后的状态图
    
    提供多种执行模式:
    - invoke: 同步执行
    - ainvoke: 异步执行  
    - stream: 流式执行
    - astream: 异步流式执行
    """
    
    def __init__(self, pregel: Pregel):
        self.pregel = pregel
    
    def invoke(
        self,
        input: Dict[str, Any],
        config: RunnableConfig = None,
        **kwargs
    ) -> Dict[str, Any]:
        """同步执行图
        
        Args:
            input: 输入状态
            config: 运行配置,包含thread_id等
            **kwargs: 额外参数
            
        Returns:
            Dict[str, Any]: 最终状态
            
        Example:
            >>> graph = build_basic_graph()
            >>> result = graph.invoke(
            ...     {"messages": [("user", "Hello")]},
            ...     {"configurable": {"thread_id": "thread-1"}}
            ... )
        """
        config = config or {}
        
        # 执行图并返回最终状态
        final_state = None
        for state in self.pregel.stream(input, config, **kwargs):
            final_state = state
        
        return final_state
    
    async def ainvoke(
        self,
        input: Dict[str, Any], 
        config: RunnableConfig = None,
        **kwargs
    ) -> Dict[str, Any]:
        """异步执行图
        
        Args:
            input: 输入状态
            config: 运行配置
            **kwargs: 额外参数
            
        Returns:
            Dict[str, Any]: 最终状态
        """
        config = config or {}
        
        final_state = None
        async for state in self.pregel.astream(input, config, **kwargs):
            final_state = state
        
        return final_state
    
    def stream(
        self,
        input: Dict[str, Any],
        config: RunnableConfig = None,
        *,
        stream_mode: str = "values",
        **kwargs
    ) -> Iterator[Dict[str, Any]]:
        """流式执行图
        
        Args:
            input: 输入状态
            config: 运行配置
            stream_mode: 流模式 ("values", "updates", "debug")
            **kwargs: 额外参数
            
        Yields:
            Dict[str, Any]: 中间状态或更新
            
        Example:
            >>> for state in graph.stream(input, config):
            ...     print(f"Current state: {state}")
        """
        yield from self.pregel.stream(
            input, 
            config, 
            stream_mode=stream_mode,
            **kwargs
        )
    
    async def astream(
        self,
        input: Dict[str, Any],
        config: RunnableConfig = None,
        *,
        stream_mode: str = "values", 
        **kwargs
    ) -> AsyncIterator[Dict[str, Any]]:
        """异步流式执行图
        
        Args:
            input: 输入状态
            config: 运行配置
            stream_mode: 流模式
            **kwargs: 额外参数
            
        Yields:
            Dict[str, Any]: 中间状态或更新
        """
        async for state in self.pregel.astream(
            input,
            config,
            stream_mode=stream_mode, 
            **kwargs
        ):
            yield state
    
    def get_state(self, config: RunnableConfig) -> StateSnapshot:
        """获取当前状态快照
        
        Args:
            config: 运行配置,必须包含thread_id
            
        Returns:
            StateSnapshot: 状态快照
        """
        return self.pregel.get_state(config)
    
    def get_state_history(
        self,
        config: RunnableConfig,
        *,
        filter: Dict[str, Any] = None,
        before: RunnableConfig = None,
        limit: int = None
    ) -> Iterator[StateSnapshot]:
        """获取状态历史
        
        Args:
            config: 运行配置
            filter: 过滤条件
            before: 获取此配置之前的状态
            limit: 限制返回数量
            
        Yields:
            StateSnapshot: 历史状态快照
        """
        yield from self.pregel.get_state_history(
            config,
            filter=filter,
            before=before,
            limit=limit
        )
    
    def update_state(
        self,
        config: RunnableConfig,
        values: Dict[str, Any],
        as_node: str = None
    ) -> RunnableConfig:
        """更新状态
        
        Args:
            config: 运行配置
            values: 要更新的状态值
            as_node: 以哪个节点的身份更新
            
        Returns:
            RunnableConfig: 更新后的配置
        """
        return self.pregel.update_state(config, values, as_node)

3.2 Pregel执行引擎API

class Pregel:
    """Pregel执行引擎
    
    基于Google Pregel模型的图计算引擎,特点:
    - 超步执行模型
    - 并行节点处理
    - 消息传递机制
    - 状态一致性保证
    """
    
    def __init__(
        self,
        nodes: Dict[str, PregelNode],
        channels: Dict[str, BaseChannel], 
        input_channels: List[str],
        output_channels: List[str],
        stream_channels: List[str] = None,
        checkpointer: BaseCheckpointSaver = None,
        interrupt_before: List[str] = None,
        interrupt_after: List[str] = None,
        debug: bool = False
    ):
        """初始化Pregel执行引擎
        
        Args:
            nodes: 节点映射
            channels: 通道映射
            input_channels: 输入通道列表
            output_channels: 输出通道列表
            stream_channels: 流式输出通道
            checkpointer: 检查点保存器
            interrupt_before: 中断前节点列表
            interrupt_after: 中断后节点列表
            debug: 调试模式
        """
        self.nodes = nodes
        self.channels = channels
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.stream_channels = stream_channels or []
        self.checkpointer = checkpointer
        self.interrupt_before = set(interrupt_before or [])
        self.interrupt_after = set(interrupt_after or [])
        self.debug = debug
    
    def stream(
        self,
        input: Dict[str, Any],
        config: RunnableConfig = None,
        *,
        stream_mode: str = "values",
        **kwargs
    ) -> Iterator[Dict[str, Any]]:
        """流式执行核心方法
        
        Args:
            input: 输入数据
            config: 运行配置
            stream_mode: 流模式
            **kwargs: 额外参数
            
        Yields:
            Dict[str, Any]: 执行结果
        """
        config = config or {}
        
        # 准备初始状态
        checkpoint = self._prepare_initial_checkpoint(input, config)
        
        # 执行超步循环
        step = 0
        while True:
            # 1. 计划阶段:确定活跃节点
            tasks = self._plan_step(checkpoint, config)
            
            if not tasks:
                break  # 没有更多任务,执行完成
            
            # 2. 检查中断点
            if self._should_interrupt_before(tasks):
                yield self._create_interrupt_state(checkpoint, tasks, "before")
                break
            
            # 3. 执行阶段:并行执行节点
            step_results = self._execute_step(tasks, checkpoint, config)
            
            # 4. 更新阶段:应用状态更新
            checkpoint = self._update_checkpoint(
                checkpoint, 
                step_results, 
                config
            )
            
            # 5. 保存检查点
            if self.checkpointer:
                self.checkpointer.put(
                    config,
                    checkpoint,
                    self._create_metadata(step, "loop"),
                    self._get_channel_versions(checkpoint)
                )
            
            # 6. 检查中断点
            if self._should_interrupt_after(tasks):
                yield self._create_interrupt_state(checkpoint, tasks, "after")
                break
            
            # 7. 输出中间结果
            if stream_mode == "values":
                yield self._extract_values(checkpoint)
            elif stream_mode == "updates":
                yield self._extract_updates(step_results)
            elif stream_mode == "debug":
                yield self._create_debug_info(checkpoint, step_results)
            
            step += 1
        
        # 输出最终结果
        if stream_mode == "values":
            yield self._extract_values(checkpoint)
    
    def _plan_step(
        self, 
        checkpoint: Checkpoint, 
        config: RunnableConfig
    ) -> List[PregelTask]:
        """计划执行步骤
        
        Args:
            checkpoint: 当前检查点
            config: 运行配置
            
        Returns:
            List[PregelTask]: 待执行任务列表
        """
        tasks = []
        
        # 检查每个节点是否需要执行
        for node_name, node in self.nodes.items():
            if self._should_execute_node(node, checkpoint):
                task = PregelTask(
                    name=node_name,
                    node=node,
                    input_state=self._prepare_node_input(node, checkpoint),
                    config=config
                )
                tasks.append(task)
        
        return tasks
    
    def _execute_step(
        self,
        tasks: List[PregelTask],
        checkpoint: Checkpoint,
        config: RunnableConfig
    ) -> Dict[str, Any]:
        """执行步骤中的所有任务
        
        Args:
            tasks: 任务列表
            checkpoint: 当前检查点
            config: 运行配置
            
        Returns:
            Dict[str, Any]: 执行结果
        """
        import concurrent.futures
        
        results = {}
        
        # 并行执行任务
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # 提交所有任务
            future_to_task = {
                executor.submit(self._execute_task, task): task
                for task in tasks
            }
            
            # 收集结果
            for future in concurrent.futures.as_completed(future_to_task):
                task = future_to_task[future]
                try:
                    result = future.result()
                    results[task.name] = result
                except Exception as e:
                    if self.debug:
                        print(f"Task {task.name} failed: {e}")
                    results[task.name] = PregelTaskError(task.name, e)
        
        return results
    
    def _execute_task(self, task: PregelTask) -> Any:
        """执行单个任务
        
        Args:
            task: 要执行的任务
            
        Returns:
            Any: 任务执行结果
        """
        try:
            # 调用节点函数
            if asyncio.iscoroutinefunction(task.node.action):
                # 异步节点
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                try:
                    result = loop.run_until_complete(
                        task.node.action(task.input_state)
                    )
                finally:
                    loop.close()
            else:
                # 同步节点
                result = task.node.action(task.input_state)
            
            return result
            
        except Exception as e:
            return PregelTaskError(task.name, e)
    
    def _update_checkpoint(
        self,
        checkpoint: Checkpoint,
        step_results: Dict[str, Any],
        config: RunnableConfig
    ) -> Checkpoint:
        """更新检查点状态
        
        Args:
            checkpoint: 当前检查点
            step_results: 步骤执行结果
            config: 运行配置
            
        Returns:
            Checkpoint: 更新后的检查点
        """
        # 复制当前检查点
        new_checkpoint = copy_checkpoint(checkpoint)
        
        # 应用每个节点的更新
        for node_name, result in step_results.items():
            if isinstance(result, PregelTaskError):
                continue  # 跳过错误结果
            
            # 更新通道值
            self._apply_node_updates(new_checkpoint, node_name, result)
        
        # 更新时间戳和版本
        new_checkpoint["ts"] = datetime.now(timezone.utc).isoformat()
        new_checkpoint["id"] = str(uuid6())
        
        return new_checkpoint

3.3 检查点API详解

class BaseCheckpointSaver:
    """检查点保存器基类API
    
    提供统一的检查点管理接口:
    - 状态持久化
    - 历史查询
    - 版本管理
    - 恢复机制
    """
    
    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions
    ) -> RunnableConfig:
        """保存检查点
        
        Args:
            config: 运行配置
            checkpoint: 检查点数据
            metadata: 元数据
            new_versions: 新版本信息
            
        Returns:
            RunnableConfig: 更新后的配置
        """
        raise NotImplementedError
    
    def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        """获取检查点元组
        
        Args:
            config: 运行配置
            
        Returns:
            CheckpointTuple | None: 检查点元组或None
        """
        raise NotImplementedError
    
    def list(
        self,
        config: RunnableConfig | None,
        *,
        filter: Dict[str, Any] | None = None,
        before: RunnableConfig | None = None,
        limit: int | None = None
    ) -> Iterator[CheckpointTuple]:
        """列出检查点
        
        Args:
            config: 基础配置
            filter: 过滤条件
            before: 获取此配置之前的检查点
            limit: 限制数量
            
        Yields:
            CheckpointTuple: 检查点元组
        """
        raise NotImplementedError

# PostgreSQL实现示例
class PostgresCheckpointSaver(BaseCheckpointSaver):
    """PostgreSQL检查点保存器实现"""
    
    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint, 
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions
    ) -> RunnableConfig:
        """保存检查点到PostgreSQL"""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        checkpoint_id = checkpoint["id"]
        
        # 序列化数据
        checkpoint_data = self.serde.dumps(checkpoint)
        metadata_data = self.serde.dumps(metadata)
        
        with self._cursor() as cur:
            # 插入或更新检查点
            cur.execute("""
                INSERT INTO checkpoints 
                (thread_id, checkpoint_ns, checkpoint_id, checkpoint, metadata)
                VALUES (%s, %s, %s, %s, %s)
                ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id)
                DO UPDATE SET 
                    checkpoint = EXCLUDED.checkpoint,
                    metadata = EXCLUDED.metadata
            """, (
                thread_id,
                checkpoint_ns,
                checkpoint_id, 
                checkpoint_data,
                metadata_data
            ))
        
        return {
            **config,
            "configurable": {
                **config["configurable"],
                "checkpoint_id": checkpoint_id
            }
        }

4. 实战案例

4.1 智能客服系统

def build_customer_service_agent():
    """构建智能客服系统"""
    
    from langchain_openai import ChatOpenAI
    from langchain_core.tools import tool
    
    # 定义工具
    @tool
    def query_order_status(order_id: str) -> str:
        """查询订单状态"""
        # 模拟数据库查询
        orders = {
            "12345": "已发货,预计明天到达",
            "67890": "正在处理中",
            "11111": "已完成"
        }
        return orders.get(order_id, "订单不存在")
    
    @tool
    def create_refund_request(order_id: str, reason: str) -> str:
        """创建退款申请"""
        return f"已为订单 {order_id} 创建退款申请,原因:{reason}"
    
    @tool
    def escalate_to_human(issue: str) -> str:
        """升级到人工客服"""
        return f"已将问题升级到人工客服:{issue}"
    
    class CustomerServiceState(TypedDict):
        messages: Annotated[List[BaseMessage], add_messages]
        customer_info: dict
        issue_category: str
        resolution_status: str
        escalation_needed: bool
    
    def classify_issue(state: CustomerServiceState) -> CustomerServiceState:
        """问题分类节点"""
        llm = ChatOpenAI(model="gpt-4")
        
        classification_prompt = """
        根据客户消息,将问题分类为以下类别之一:
        - order_inquiry: 订单查询
        - refund_request: 退款申请  
        - technical_support: 技术支持
        - complaint: 投诉
        - other: 其他
        
        客户消息:{message}
        
        只返回分类结果。
        """
        
        last_message = state["messages"][-1].content
        response = llm.invoke(classification_prompt.format(message=last_message))
        
        return {
            "issue_category": response.content.strip(),
            "resolution_status": "classified"
        }
    
    def handle_order_inquiry(state: CustomerServiceState) -> CustomerServiceState:
        """处理订单查询"""
        llm = ChatOpenAI(model="gpt-4")
        tools = [query_order_status]
        llm_with_tools = llm.bind_tools(tools)
        
        response = llm_with_tools.invoke(state["messages"])
        return {"messages": [response]}
    
    def handle_refund_request(state: CustomerServiceState) -> CustomerServiceState:
        """处理退款申请"""
        llm = ChatOpenAI(model="gpt-4")
        tools = [create_refund_request]
        llm_with_tools = llm.bind_tools(tools)
        
        response = llm_with_tools.invoke(state["messages"])
        return {"messages": [response]}
    
    def check_escalation_needed(state: CustomerServiceState) -> str:
        """检查是否需要升级"""
        issue_category = state.get("issue_category", "")
        
        # 复杂问题自动升级
        if issue_category in ["complaint", "technical_support"]:
            return "escalate"
        
        # 检查客户满意度
        last_message = state["messages"][-1].content
        if any(word in last_message.lower() for word in ["不满意", "投诉", "经理"]):
            return "escalate"
        
        return "resolve"
    
    def escalate_issue(state: CustomerServiceState) -> CustomerServiceState:
        """升级问题"""
        issue = state.get("issue_category", "未分类问题")
        result = escalate_to_human(issue)
        
        return {
            "messages": [AIMessage(content=result)],
            "escalation_needed": True,
            "resolution_status": "escalated"
        }
    
    def finalize_resolution(state: CustomerServiceState) -> CustomerServiceState:
        """完成问题解决"""
        return {
            "resolution_status": "resolved",
            "messages": [AIMessage(content="问题已解决,还有其他需要帮助的吗?")]
        }
    
    # 构建图
    graph = StateGraph(CustomerServiceState)
    
    # 添加节点
    graph.add_node("classify", classify_issue)
    graph.add_node("handle_order", handle_order_inquiry)
    graph.add_node("handle_refund", handle_refund_request)
    graph.add_node("escalate", escalate_issue)
    graph.add_node("finalize", finalize_resolution)
    graph.add_node("tools", ToolNode([query_order_status, create_refund_request]))
    
    # 设置流程
    graph.set_entry_point("classify")
    
    # 根据问题类型路由
    graph.add_conditional_edges(
        "classify",
        lambda state: state["issue_category"],
        {
            "order_inquiry": "handle_order",
            "refund_request": "handle_refund",
            "complaint": "escalate",
            "technical_support": "escalate",
            "other": "escalate"
        }
    )
    
    # 工具调用处理
    def route_after_llm(state: CustomerServiceState) -> str:
        last_message = state["messages"][-1]
        if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
            return "tools"
        else:
            return "check_escalation"
    
    graph.add_conditional_edges("handle_order", route_after_llm)
    graph.add_conditional_edges("handle_refund", route_after_llm)
    
    # 工具执行后检查升级
    graph.add_edge("tools", "check_escalation")
    
    # 升级检查
    graph.add_conditional_edges(
        "check_escalation",
        check_escalation_needed,
        {
            "escalate": "escalate",
            "resolve": "finalize"
        }
    )
    
    return graph.compile()

4.2 代码生成与审查系统

def build_code_review_system():
    """构建代码生成与审查系统"""
    
    class CodeReviewState(TypedDict):
        messages: Annotated[List[BaseMessage], add_messages]
        code_request: str
        generated_code: str
        review_feedback: List[str]
        test_results: dict
        approval_status: str
        iteration_count: int
    
    def code_generator(state: CodeReviewState) -> CodeReviewState:
        """代码生成节点"""
        from langchain_openai import ChatOpenAI
        
        llm = ChatOpenAI(model="gpt-4", temperature=0.1)
        
        # 构建代码生成提示
        request = state.get("code_request", "")
        feedback = state.get("review_feedback", [])
        
        if feedback:
            prompt = f"""
            根据以下需求和反馈,改进代码:
            
            需求:{request}
            
            之前的反馈:
            {chr(10).join(f"- {fb}" for fb in feedback)}
            
            请生成改进后的Python代码,包含适当的注释和错误处理。
            """
        else:
            prompt = f"""
            根据以下需求生成Python代码:
            
            需求:{request}
            
            要求:
            1. 代码结构清晰,有适当注释
            2. 包含错误处理
            3. 遵循PEP 8规范
            4. 包含基本的单元测试
            """
        
        response = llm.invoke(prompt)
        
        return {
            "generated_code": response.content,
            "messages": [response],
            "iteration_count": state.get("iteration_count", 0) + 1
        }
    
    def code_reviewer(state: CodeReviewState) -> CodeReviewState:
        """代码审查节点"""
        from langchain_openai import ChatOpenAI
        
        llm = ChatOpenAI(model="gpt-4", temperature=0.2)
        
        code = state.get("generated_code", "")
        
        review_prompt = f"""
        请审查以下Python代码,从以下方面给出反馈:
        
        1. 代码质量和可读性
        2. 错误处理
        3. 性能优化
        4. 安全性
        5. 测试覆盖度
        
        代码:
        ```python
        {code}
        ```
        
        请提供具体的改进建议,如果代码质量良好,请说明"APPROVED"。
        """
        
        response = llm.invoke(review_prompt)
        review_content = response.content
        
        # 解析审查结果
        if "APPROVED" in review_content.upper():
            approval_status = "approved"
            feedback = []
        else:
            approval_status = "needs_improvement"
            # 提取反馈点
            feedback = [
                line.strip() 
                for line in review_content.split('\n') 
                if line.strip() and not line.startswith('#')
            ]
        
        return {
            "review_feedback": feedback,
            "approval_status": approval_status,
            "messages": [response]
        }
    
    def code_tester(state: CodeReviewState) -> CodeReviewState:
        """代码测试节点"""
        code = state.get("generated_code", "")
        
        # 简单的代码测试(实际应用中会更复杂)
        test_results = {
            "syntax_check": True,
            "basic_tests": True,
            "coverage": 85.0
        }
        
        try:
            # 语法检查
            compile(code, '<string>', 'exec')
            test_results["syntax_check"] = True
        except SyntaxError as e:
            test_results["syntax_check"] = False
            test_results["syntax_error"] = str(e)
        
        return {
            "test_results": test_results,
            "messages": [AIMessage(content=f"测试结果:{test_results}")]
        }
    
    def should_continue_iteration(state: CodeReviewState) -> str:
        """判断是否继续迭代"""
        approval_status = state.get("approval_status", "")
        iteration_count = state.get("iteration_count", 0)
        test_results = state.get("test_results", {})
        
        # 检查是否通过审查和测试
        if (approval_status == "approved" and 
            test_results.get("syntax_check", False)):
            return "finalize"
        
        # 检查迭代次数限制
        if iteration_count >= 3:
            return "finalize"  # 超过最大迭代次数
        
        return "generate"  # 继续迭代
    
    def finalize_code(state: CodeReviewState) -> CodeReviewState:
        """完成代码生成"""
        code = state.get("generated_code", "")
        approval_status = state.get("approval_status", "")
        
        if approval_status == "approved":
            message = "代码已通过审查,可以使用。"
        else:
            message = "代码已达到最大迭代次数,请手动审查。"
        
        return {
            "messages": [AIMessage(content=f"{message}\n\n最终代码:\n```python\n{code}\n```")]
        }
    
    # 构建图
    graph = StateGraph(CodeReviewState)
    
    # 添加节点
    graph.add_node("generate", code_generator)
    graph.add_node("review", code_reviewer)
    graph.add_node("test", code_tester)
    graph.add_node("finalize", finalize_code)
    
    # 设置流程
    graph.set_entry_point("generate")
    graph.add_edge("generate", "review")
    graph.add_edge("review", "test")
    
    # 条件路由
    graph.add_conditional_edges(
        "test",
        should_continue_iteration,
        {
            "generate": "generate",
            "finalize": "finalize"
        }
    )
    
    return graph.compile()

5. 最佳实践

5.1 性能优化

并行执行优化

def optimize_parallel_execution():
    """并行执行优化示例"""
    
    from langgraph.types import Send
    
    def parallel_processor(state: dict) -> List[Send]:
        """并行处理器:将任务分发到多个工作节点"""
        tasks = state.get("tasks", [])
        
        # 根据任务类型分组
        task_groups = defaultdict(list)
        for task in tasks:
            task_type = task.get("type", "default")
            task_groups[task_type].append(task)
        
        sends = []
        for task_type, group_tasks in task_groups.items():
            # 为每个任务组创建Send
            for i, task in enumerate(group_tasks):
                sends.append(Send(
                    f"worker_{task_type}",
                    {
                        "task": task,
                        "task_id": f"{task_type}_{i}",
                        "group_size": len(group_tasks)
                    }
                ))
        
        return sends
    
    def cpu_intensive_worker(state: dict) -> dict:
        """CPU密集型工作节点"""
        import time
        task = state["task"]
        
        # 模拟CPU密集型任务
        start_time = time.time()
        result = sum(i * i for i in range(task.get("iterations", 1000)))
        duration = time.time() - start_time
        
        return {
            "result": result,
            "duration": duration,
            "task_id": state["task_id"]
        }
    
    def io_intensive_worker(state: dict) -> dict:
        """IO密集型工作节点"""
        import asyncio
        import aiohttp
        
        async def fetch_data():
            # 模拟异步IO操作
            await asyncio.sleep(0.1)
            return {"data": "fetched"}
        
        # 在同步上下文中运行异步操作
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            result = loop.run_until_complete(fetch_data())
        finally:
            loop.close()
        
        return {
            "result": result,
            "task_id": state["task_id"]
        }

内存优化

def optimize_memory_usage():
    """内存使用优化"""
    
    class MemoryOptimizedState(TypedDict):
        # 使用生成器减少内存占用
        data_stream: Iterator[dict]
        # 只保留必要的历史信息
        recent_messages: Annotated[List[BaseMessage], add_messages]
        # 使用弱引用避免循环引用
        cache_refs: dict
    
    def memory_efficient_processor(state: MemoryOptimizedState) -> MemoryOptimizedState:
        """内存高效的处理节点"""
        import gc
        import weakref
        
        # 处理数据流,避免一次性加载所有数据
        processed_count = 0
        for data_chunk in state.get("data_stream", []):
            # 处理单个数据块
            process_chunk(data_chunk)
            processed_count += 1
            
            # 定期触发垃圾回收
            if processed_count % 100 == 0:
                gc.collect()
        
        # 清理临时变量
        del processed_count
        
        return {
            "processing_completed": True,
            "cache_refs": {}  # 清空缓存引用
        }
    
    def process_chunk(chunk: dict):
        """处理单个数据块"""
        # 实际的数据处理逻辑
        pass

5.2 错误处理

健壮的错误处理

def implement_robust_error_handling():
    """实现健壮的错误处理"""
    
    from enum import Enum
    
    class ErrorSeverity(Enum):
        LOW = "low"
        MEDIUM = "medium"
        HIGH = "high"
        CRITICAL = "critical"
    
    class ErrorHandlingState(TypedDict):
        messages: Annotated[List[BaseMessage], add_messages]
        errors: List[dict]
        retry_count: int
        fallback_used: bool
        error_severity: str
    
    def resilient_node(state: ErrorHandlingState) -> ErrorHandlingState:
        """具有弹性的节点实现"""
        max_retries = 3
        retry_count = state.get("retry_count", 0)
        
        try:
            # 主要处理逻辑
            result = risky_operation(state)
            
            # 重置错误状态
            return {
                "result": result,
                "retry_count": 0,
                "fallback_used": False
            }
            
        except ConnectionError as e:
            # 网络错误 - 可重试
            if retry_count < max_retries:
                return {
                    "retry_count": retry_count + 1,
                    "errors": state.get("errors", []) + [{
                        "type": "ConnectionError",
                        "message": str(e),
                        "severity": ErrorSeverity.MEDIUM.value,
                        "retry_attempt": retry_count + 1
                    }]
                }
            else:
                # 超过重试次数,使用降级方案
                return use_fallback_strategy(state, e)
        
        except ValueError as e:
            # 数据错误 - 不可重试
            return {
                "errors": state.get("errors", []) + [{
                    "type": "ValueError", 
                    "message": str(e),
                    "severity": ErrorSeverity.HIGH.value,
                    "recoverable": False
                }],
                "error_severity": ErrorSeverity.HIGH.value
            }
        
        except Exception as e:
            # 未知错误 - 记录并降级
            return {
                "errors": state.get("errors", []) + [{
                    "type": "UnknownError",
                    "message": str(e),
                    "severity": ErrorSeverity.CRITICAL.value,
                    "stack_trace": traceback.format_exc()
                }],
                "error_severity": ErrorSeverity.CRITICAL.value,
                "fallback_used": True
            }
    
    def use_fallback_strategy(state: ErrorHandlingState, error: Exception) -> ErrorHandlingState:
        """使用降级策略"""
        # 实现降级逻辑
        fallback_result = "使用缓存数据或默认响应"
        
        return {
            "result": fallback_result,
            "fallback_used": True,
            "errors": state.get("errors", []) + [{
                "type": "FallbackUsed",
                "message": f"使用降级策略: {str(error)}",
                "severity": ErrorSeverity.MEDIUM.value
            }]
        }
    
    def risky_operation(state: ErrorHandlingState) -> str:
        """可能失败的操作"""
        import random
        
        if random.random() < 0.3:  # 30%失败率
            raise ConnectionError("网络连接失败")
        
        return "操作成功"

5.3 监控和调试

全面的监控系统

def implement_comprehensive_monitoring():
    """实现全面的监控系统"""
    
    import time
    import logging
    from dataclasses import dataclass
    from typing import Dict, List
    
    @dataclass
    class PerformanceMetrics:
        node_name: str
        execution_time: float
        memory_usage: int
        success: bool
        error_message: str = None
    
    class MonitoringState(TypedDict):
        messages: Annotated[List[BaseMessage], add_messages]
        metrics: List[PerformanceMetrics]
        start_time: float
        trace_id: str
    
    def monitored_node(state: MonitoringState) -> MonitoringState:
        """带监控的节点"""
        import psutil
        import uuid
        
        node_name = "monitored_node"
        start_time = time.time()
        start_memory = psutil.Process().memory_info().rss
        
        # 生成追踪ID
        trace_id = state.get("trace_id") or str(uuid.uuid4())
        
        try:
            # 节点逻辑
            result = perform_business_logic(state)
            
            # 记录成功指标
            end_time = time.time()
            end_memory = psutil.Process().memory_info().rss
            
            metrics = PerformanceMetrics(
                node_name=node_name,
                execution_time=end_time - start_time,
                memory_usage=end_memory - start_memory,
                success=True
            )
            
            # 记录日志
            logging.info(
                f"Node {node_name} completed successfully",
                extra={
                    "trace_id": trace_id,
                    "execution_time": metrics.execution_time,
                    "memory_delta": metrics.memory_usage
                }
            )
            
            return {
                "result": result,
                "metrics": state.get("metrics", []) + [metrics],
                "trace_id": trace_id
            }
            
        except Exception as e:
            # 记录失败指标
            end_time = time.time()
            end_memory = psutil.Process().memory_info().rss
            
            metrics = PerformanceMetrics(
                node_name=node_name,
                execution_time=end_time - start_time,
                memory_usage=end_memory - start_memory,
                success=False,
                error_message=str(e)
            )
            
            # 记录错误日志
            logging.error(
                f"Node {node_name} failed",
                extra={
                    "trace_id": trace_id,
                    "error": str(e),
                    "execution_time": metrics.execution_time
                },
                exc_info=True
            )
            
            return {
                "metrics": state.get("metrics", []) + [metrics],
                "trace_id": trace_id,
                "error": str(e)
            }
    
    def performance_analyzer(state: MonitoringState) -> MonitoringState:
        """性能分析节点"""
        metrics = state.get("metrics", [])
        
        if not metrics:
            return state
        
        # 分析性能指标
        total_time = sum(m.execution_time for m in metrics)
        avg_time = total_time / len(metrics)
        max_time = max(m.execution_time for m in metrics)
        success_rate = sum(1 for m in metrics if m.success) / len(metrics)
        
        analysis = {
            "total_execution_time": total_time,
            "average_execution_time": avg_time,
            "max_execution_time": max_time,
            "success_rate": success_rate,
            "total_nodes": len(metrics)
        }
        
        # 性能告警
        if avg_time > 5.0:  # 平均执行时间超过5秒
            logging.warning(
                "High average execution time detected",
                extra={"analysis": analysis, "trace_id": state.get("trace_id")}
            )
        
        if success_rate < 0.95:  # 成功率低于95%
            logging.warning(
                "Low success rate detected",
                extra={"analysis": analysis, "trace_id": state.get("trace_id")}
            )
        
        return {
            "performance_analysis": analysis,
            "messages": [AIMessage(content=f"性能分析完成:{analysis}")]
        }
    
    def perform_business_logic(state: MonitoringState) -> str:
        """业务逻辑示例"""
        import random
        import time
        
        # 模拟不同的执行时间
        time.sleep(random.uniform(0.1, 2.0))
        
        # 模拟偶发错误
        if random.random() < 0.1:
            raise Exception("随机业务错误")
        
        return "业务逻辑执行成功"

5.4 部署和扩展

生产环境部署配置

def production_deployment_config():
    """生产环境部署配置"""
    
    # Docker配置
    dockerfile_content = """
FROM python:3.11-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \\
    git \\
    curl \\
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 设置环境变量
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:8000/health || exit 1

# 启动应用
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""
    
    # Kubernetes部署配置
    k8s_deployment = """
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-app
  labels:
    app: langgraph-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph-app
  template:
    metadata:
      labels:
        app: langgraph-app
    spec:
      containers:
      - name: langgraph-app
        image: langgraph-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: database-url
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: langgraph-service
spec:
  selector:
    app: langgraph-app
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer
"""
    
    # 监控配置
    monitoring_config = """
# Prometheus配置
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'langgraph-app'
    static_configs:
      - targets: ['langgraph-service:80']
    metrics_path: /metrics
    scrape_interval: 10s

# Grafana仪表盘配置
dashboard_config = {
    "dashboard": {
        "title": "LangGraph应用监控",
        "panels": [
            {
                "title": "请求率",
                "type": "graph",
                "targets": [
                    {
                        "expr": "rate(http_requests_total[5m])",
                        "legendFormat": "{{method}} {{status}}"
                    }
                ]
            },
            {
                "title": "响应时间",
                "type": "graph", 
                "targets": [
                    {
                        "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
                        "legendFormat": "95th percentile"
                    }
                ]
            },
            {
                "title": "错误率",
                "type": "singlestat",
                "targets": [
                    {
                        "expr": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m])",
                        "legendFormat": "Error Rate"
                    }
                ]
            }
        ]
    }
}
"""
    
    return {
        "dockerfile": dockerfile_content,
        "k8s_deployment": k8s_deployment,
        "monitoring": monitoring_config
    }

6. 总结

LangGraph框架提供了强大而灵活的多智能体应用开发能力。通过本手册的学习,您应该能够:

  1. 掌握核心概念:理解状态管理、图构建、检查点系统等核心概念
  2. 熟练使用API:掌握StateGraph、Pregel等核心API的使用方法
  3. 构建复杂应用:能够设计和实现多智能体协作系统
  4. 优化性能:了解性能优化、错误处理、监控等最佳实践
  5. 生产部署:具备生产环境部署和运维的能力

6.1 学习路径建议

  1. 基础阶段:熟悉基本概念和简单图构建
  2. 进阶阶段:学习条件路由、并行执行、检查点系统
  3. 高级阶段:掌握多智能体协作、性能优化、监控调试
  4. 专家阶段:深入源码、自定义扩展、生产优化

6.2 常见问题解答

Q: 如何选择合适的检查点保存器? A: 开发测试使用InMemorySaver,生产环境推荐PostgresCheckpointSaver。

Q: 如何处理长时间运行的任务? A: 使用检查点系统保存中间状态,支持任务暂停和恢复。

Q: 如何优化大规模并发性能? A: 使用Send机制实现真正的并行执行,配合资源池管理。

Q: 如何实现人工介入? A: 使用interrupt_before/interrupt_after参数在指定节点暂停执行。

通过持续实践和深入学习,您将能够充分发挥LangGraph的强大能力,构建出色的多智能体应用系统。



tommie blog