1. 预构建组件概述

预构建组件(Prebuilt)是LangGraph提供的高级API层,为常见的AI应用模式提供开箱即用的解决方案。这些组件基于核心框架构建,封装了最佳实践,大大简化了开发流程。

1.1 组件架构

graph TB
    subgraph "用户接口层"
        USER[用户代码]
        EXAMPLES[示例应用]
    end
    
    subgraph "预构建API层"
        REACT[create_react_agent]
        TOOLNODE[ToolNode]
        CONDITIONS[tools_condition]
        INTERRUPT[interrupt]
        VALIDATOR[ToolValidator]
    end
    
    subgraph "核心框架层"
        GRAPH[StateGraph]
        PREGEL[Pregel引擎]
        CHANNELS[通道系统]
    end
    
    USER --> REACT
    USER --> TOOLNODE
    EXAMPLES --> REACT
    
    REACT --> GRAPH
    TOOLNODE --> GRAPH
    CONDITIONS --> GRAPH
    
    GRAPH --> PREGEL
    GRAPH --> CHANNELS
    
    style REACT fill:#f96,stroke:#333,stroke-width:2px
    style TOOLNODE fill:#69f,stroke:#333,stroke-width:2px

1.2 核心组件列表

组件功能用途
create_react_agentReAct Agent构建器创建工具调用型智能体
ToolNode工具执行节点执行LLM工具调用
tools_condition工具条件路由根据工具调用情况路由
ToolValidator工具验证器验证工具调用结果
interrupt中断处理人机交互中断点

2. create_react_agent 详解

2.1 核心实现

create_react_agent 是最重要的预构建组件,实现了ReAct(Reasoning and Acting)模式的智能体。

def create_react_agent(
    model: Union[
        str,
        LanguageModelLike,
        Callable[[StateSchema, Runtime[ContextT]], BaseChatModel],
        # ... 其他模型类型
    ],
    tools: Union[Sequence[Union[BaseTool, Callable, dict[str, Any]]], ToolNode],
    *,
    prompt: Optional[Prompt] = None,
    response_format: Optional[Union[StructuredResponseSchema, tuple[str, StructuredResponseSchema]]] = None,
    pre_model_hook: Optional[RunnableLike] = None,
    post_model_hook: Optional[RunnableLike] = None,
    state_schema: Optional[StateSchemaType] = None,
    context_schema: Optional[Type[Any]] = None,
    checkpointer: Optional[Checkpointer] = None,
    store: Optional[BaseStore] = None,
    interrupt_before: Optional[list[str]] = None,
    interrupt_after: Optional[list[str]] = None,
    debug: bool = False,
    version: Literal["v1", "v2"] = "v2",
    name: Optional[str] = None,
) -> CompiledStateGraph:
    """
    创建ReAct智能体图
    
    ReAct模式工作流:
    1. 接收用户输入
    2. LLM推理并决定是否需要工具
    3. 如果需要工具,执行工具调用
    4. 将工具结果反馈给LLM
    5. 重复直到得到最终答案
    """
    
    # 1. 参数验证和预处理
    if isinstance(model, str):
        model = _get_model_from_string(model)
    
    # 2. 处理工具
    if not isinstance(tools, ToolNode):
        tool_node = ToolNode(tools, handle_tool_errors=True)
    else:
        tool_node = tools
    
    # 3. 确定状态模式
    if state_schema is None:
        state_schema = AgentState
    
    # 4. 构建图
    return _build_react_graph(
        model=model,
        tool_node=tool_node,
        prompt=prompt,
        response_format=response_format,
        pre_model_hook=pre_model_hook,
        post_model_hook=post_model_hook,
        state_schema=state_schema,
        context_schema=context_schema,
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        version=version,
        name=name,
    )

2.2 图结构构建

2.2.1 ReAct Graph 构建逻辑

def _build_react_graph(
    model: LanguageModelLike,
    tool_node: ToolNode,
    **kwargs: Any,
) -> CompiledStateGraph:
    """
    构建ReAct智能体图结构
    
    图结构:
    START -> [pre_hook] -> agent -> tools_condition -> tools -> agent
                                 \-> structured_response -> END
    """
    
    # 1. 创建状态图
    graph = StateGraph(state_schema, context_schema=context_schema)
    
    # 2. 添加前置钩子(可选)
    if pre_model_hook:
        graph.add_node("pre_hook", pre_model_hook)
        graph.add_edge(START, "pre_hook")
        model_input = "pre_hook"
    else:
        model_input = START
    
    # 3. 添加智能体节点
    agent_node = _create_agent_node(model, prompt)
    graph.add_node("agent", agent_node)
    graph.add_edge(model_input, "agent")
    
    # 4. 添加后置钩子(可选,仅v2版本)
    if post_model_hook and version == "v2":
        graph.add_node("post_hook", post_model_hook)
        graph.add_edge("agent", "post_hook")
        condition_input = "post_hook"
    else:
        condition_input = "agent"
    
    # 5. 添加工具节点
    if tool_node.tools:  # 只有工具时才添加
        graph.add_node("tools", tool_node)
        graph.add_conditional_edges(
            condition_input,
            tools_condition,
            {"tools": "tools", END: END}
        )
        graph.add_edge("tools", "agent")
    else:
        graph.add_edge(condition_input, END)
    
    # 6. 添加结构化响应处理(可选)
    if response_format:
        structured_node = _create_structured_response_node(
            model, response_format
        )
        graph.add_node("structured_response", structured_node)
        # 修改条件边路由
        graph.add_conditional_edges(
            condition_input,
            _structured_tools_condition,
            {
                "tools": "tools",
                "structured": "structured_response",
                END: END
            }
        )
        graph.add_edge("structured_response", END)
    
    # 7. 编译图
    return graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        name=name or "ReActAgent",
    )

2.2.2 智能体节点实现

def _create_agent_node(
    model: LanguageModelLike,
    prompt: Optional[Prompt] = None,
) -> Callable:
    """
    创建智能体节点函数
    
    智能体职责:
    1. 处理消息历史
    2. 应用提示词
    3. 调用语言模型
    4. 返回模型响应
    """
    
    def agent_node(state: AgentState, runtime: Runtime) -> dict:
        """
        智能体节点实现
        
        输入:当前状态(包含消息历史)
        输出:模型响应(AI消息)
        """
        
        # 1. 获取消息列表
        messages = state.get("messages", [])
        
        # 2. 处理特殊消息键
        llm_input_messages = state.get("llm_input_messages")
        if llm_input_messages is not None:
            # 使用自定义LLM输入消息
            input_messages = llm_input_messages
        else:
            input_messages = messages
        
        # 3. 应用提示词
        if prompt:
            if isinstance(prompt, str):
                # 字符串提示词转为系统消息
                system_msg = SystemMessage(content=prompt)
                input_messages = [system_msg] + input_messages
            elif callable(prompt):
                # 可调用提示词
                prompt_result = prompt(state, runtime)
                if isinstance(prompt_result, str):
                    system_msg = SystemMessage(content=prompt_result)
                    input_messages = [system_msg] + input_messages
                elif isinstance(prompt_result, list):
                    input_messages = prompt_result + input_messages
        
        # 4. 检查剩余步数(防止无限循环)
        remaining_steps = state.get("remaining_steps", float("inf"))
        if remaining_steps < 2:
            # 步数不足,返回错误消息
            return {
                "messages": [
                    AIMessage(
                        content="Sorry, need more steps to process this request.",
                        id=str(uuid4()),
                    )
                ]
            }
        
        # 5. 获取模型实例
        if callable(model) and not isinstance(model, Runnable):
            # 动态模型选择
            model_instance = model(state, runtime)
        else:
            model_instance = model
        
        # 6. 调用模型
        try:
            response = model_instance.invoke(input_messages, runtime.config)
            
            # 7. 处理响应
            if not isinstance(response, AIMessage):
                # 转换为AI消息
                response = AIMessage(content=str(response), id=str(uuid4()))
            
            return {
                "messages": [response],
                "remaining_steps": remaining_steps - 1,
            }
            
        except Exception as e:
            # 错误处理
            error_msg = AIMessage(
                content=f"Error occurred: {str(e)}",
                id=str(uuid4()),
            )
            return {
                "messages": [error_msg],
                "remaining_steps": remaining_steps - 1,
            }
    
    return agent_node

2.3 状态模式定义

2.3.1 AgentState 定义

class AgentState(TypedDict):
    """
    ReAct智能体的状态模式
    
    状态字段:
    - messages: 对话消息列表(必需)
    - remaining_steps: 剩余执行步数(可选)
    - llm_input_messages: 自定义LLM输入(可选)
    """
    
    # 消息列表,使用add_messages reducer进行合并
    messages: Annotated[list[AnyMessage], add_messages]
    
    # 剩余步数,用于防止无限循环
    remaining_steps: NotRequired[int]
    
    # 自定义LLM输入消息(不会更新主消息列表)
    llm_input_messages: NotRequired[list[AnyMessage]]

# Pydantic版本(用于更严格的类型检查)
class AgentStatePydantic(BaseModel):
    """AgentState的Pydantic版本"""
    
    messages: Annotated[list[AnyMessage], add_messages]
    remaining_steps: Optional[int] = None
    llm_input_messages: Optional[list[AnyMessage]] = None

2.4 模型管理机制

2.4.1 静态和动态模型支持

def _get_model(
    model: Union[str, LanguageModelLike, Callable],
    state: AgentState,
    runtime: Runtime,
) -> BaseChatModel:
    """
    获取模型实例,支持静态和动态模型
    
    模型类型:
    1. 字符串:模型标识符(如"openai:gpt-4")
    2. 模型实例:直接使用
    3. 可调用对象:动态模型选择函数
    """
    
    if isinstance(model, str):
        # 字符串模型标识符
        return _create_model_from_string(model)
    
    elif hasattr(model, "invoke"):
        # 已实例化的模型
        return model
    
    elif callable(model):
        # 动态模型选择函数
        try:
            # 检查函数签名
            sig = inspect.signature(model)
            params = list(sig.parameters.keys())
            
            if len(params) >= 2:
                # 函数需要state和runtime参数
                result = model(state, runtime)
            elif len(params) == 1:
                # 只需要state参数
                result = model(state)
            else:
                # 无参数函数
                result = model()
            
            if not hasattr(result, "invoke"):
                raise TypeError(f"Dynamic model function must return a callable model")
            
            return result
            
        except Exception as e:
            raise ValueError(f"Error calling dynamic model function: {e}")
    
    else:
        raise TypeError(f"Unsupported model type: {type(model)}")

def _create_model_from_string(model_str: str) -> BaseChatModel:
    """从字符串标识符创建模型实例"""
    
    if ":" not in model_str:
        raise ValueError(f"Invalid model string format: {model_str}")
    
    provider, model_name = model_str.split(":", 1)
    
    # 根据提供商创建模型
    if provider == "openai":
        from langchain_openai import ChatOpenAI
        return ChatOpenAI(model=model_name)
    
    elif provider == "anthropic":
        from langchain_anthropic import ChatAnthropic
        return ChatAnthropic(model=model_name)
    
    elif provider == "groq":
        from langchain_groq import ChatGroq
        return ChatGroq(model=model_name)
    
    else:
        raise ValueError(f"Unsupported model provider: {provider}")

2.4.2 工具绑定检查

def _should_bind_tools(
    model: BaseChatModel,
    tools: list[Any],
) -> bool:
    """
    检查是否需要为模型绑定工具
    
    检查逻辑:
    1. 模型是否支持工具调用
    2. 是否已经绑定了工具
    3. 工具列表是否与已绑定工具匹配
    """
    
    # 1. 检查模型是否支持工具调用
    if not hasattr(model, "bind_tools"):
        return False
    
    # 2. 检查是否已绑定工具
    if hasattr(model, "kwargs") and "tools" in model.kwargs:
        bound_tools = model.kwargs["tools"]
        
        # 3. 比较工具列表
        if len(bound_tools) == len(tools):
            # 简单比较工具名称
            bound_names = {
                getattr(tool, "name", str(tool))
                for tool in bound_tools
            }
            new_names = {
                getattr(tool, "name", str(tool))
                for tool in tools
            }
            
            return bound_names != new_names
    
    return True

3. ToolNode 工具执行节点

3.1 ToolNode 核心实现

class ToolNode:
    """
    工具节点:执行LLM工具调用的专用节点
    
    特性:
    1. 并行执行多个工具调用
    2. 错误处理和恢复
    3. 工具参数注入
    4. 结果格式化
    """
    
    tools: dict[str, Any]                          # 工具字典
    handle_tool_errors: bool                       # 是否处理工具错误
    handled_types: tuple[type, ...] | None         # 处理的消息类型
    
    def __init__(
        self,
        tools: Sequence[Union[BaseTool, Callable, dict[str, Any]]],
        *,
        name: str = "tools",
        tags: list[str] | None = None,
        handle_tool_errors: bool = True,
        handled_types: tuple[type, ...] | None = None,
    ) -> None:
        """
        初始化工具节点
        
        参数:
        - tools: 工具列表(支持多种格式)
        - handle_tool_errors: 是否捕获和处理工具错误
        - handled_types: 要处理的消息类型
        """
        
        # 1. 标准化工具格式
        self.tools = {}
        for tool in tools:
            if isinstance(tool, dict):
                # 字典格式工具
                self.tools[tool["name"]] = tool
            elif hasattr(tool, "name"):
                # 有名称属性的工具
                self.tools[tool.name] = tool
            else:
                # 可调用对象
                name = getattr(tool, "__name__", str(tool))
                self.tools[name] = tool
        
        self.handle_tool_errors = handle_tool_errors
        self.name = name
        self.tags = tags or []
        
        # 2. 推断处理的消息类型
        if handled_types is None:
            self.handled_types = _infer_handled_types(self.tools)
        else:
            self.handled_types = handled_types
    
    def __call__(
        self, 
        state: Union[list[AnyMessage], dict[str, Any], Any]
    ) -> dict[str, list[ToolMessage]]:
        """
        执行工具调用
        
        执行流程:
        1. 提取工具调用
        2. 并行执行工具
        3. 格式化结果
        4. 返回工具消息
        """
        
        # 1. 提取消息和工具调用
        if isinstance(state, list):
            messages = state
        elif isinstance(state, dict):
            messages = state.get("messages", [])
        else:
            # 尝试获取messages属性
            messages = getattr(state, "messages", [])
        
        # 2. 找到最后的AI消息中的工具调用
        tool_calls = []
        for message in reversed(messages):
            if isinstance(message, AIMessage) and message.tool_calls:
                tool_calls = message.tool_calls
                break
        
        if not tool_calls:
            return {"messages": []}
        
        # 3. 执行工具调用
        results = self._execute_tools(tool_calls, state)
        
        return {"messages": results}
    
    def _execute_tools(
        self,
        tool_calls: list[ToolCall],
        state: Any,
    ) -> list[ToolMessage]:
        """
        执行工具调用列表
        
        支持:
        1. 并行执行
        2. 错误处理
        3. 参数注入
        """
        
        results = []
        
        # 并行执行工具调用
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(self._execute_single_tool, call, state): call
                for call in tool_calls
            }
            
            for future in concurrent.futures.as_completed(futures):
                tool_call = futures[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    if self.handle_tool_errors:
                        # 创建错误消息
                        error_result = ToolMessage(
                            content=f"Tool execution error: {str(e)}",
                            tool_call_id=tool_call["id"],
                            name=tool_call["name"],
                            id=str(uuid4()),
                        )
                        results.append(error_result)
                    else:
                        raise
        
        return results
    
    def _execute_single_tool(
        self,
        tool_call: ToolCall,
        state: Any,
    ) -> ToolMessage:
        """
        执行单个工具调用
        
        处理步骤:
        1. 获取工具实例
        2. 准备参数(包括注入参数)
        3. 执行工具
        4. 格式化结果
        """
        
        tool_name = tool_call["name"]
        tool_args = tool_call.get("args", {})
        tool_id = tool_call["id"]
        
        # 1. 获取工具
        if tool_name not in self.tools:
            raise ValueError(f"Unknown tool: {tool_name}")
        
        tool = self.tools[tool_name]
        
        # 2. 准备参数
        final_args = dict(tool_args)
        
        # 3. 处理参数注入
        if hasattr(tool, "__annotations__"):
            injected_args = _get_state_args(tool, state)
            final_args.update(injected_args)
        
        # 4. 执行工具
        if isinstance(tool, BaseTool):
            # LangChain工具
            result = tool.invoke(final_args)
        elif callable(tool):
            # 可调用对象
            result = tool(**final_args)
        else:
            # 字典格式工具
            func = tool.get("function")
            if func:
                result = func(**final_args)
            else:
                raise ValueError(f"Tool {tool_name} is not executable")
        
        # 5. 创建工具消息
        return ToolMessage(
            content=str(result),
            tool_call_id=tool_id,
            name=tool_name,
            id=str(uuid4()),
        )

3.2 参数注入机制

3.2.1 状态注入

class InjectedState:
    """状态注入标记类"""
    pass

class InjectedStore:
    """存储注入标记类"""
    
    def __init__(self, namespace: Optional[tuple[str, ...]] = None):
        self.namespace = namespace

def _get_state_args(tool: Callable, state: Any) -> dict[str, Any]:
    """
    获取需要注入的状态参数
    
    支持的注入类型:
    1. InjectedState: 注入完整状态
    2. InjectedStore: 注入存储实例
    3. InjectedToolCallId: 注入工具调用ID
    """
    
    injected = {}
    
    # 获取函数签名
    sig = inspect.signature(tool)
    type_hints = get_type_hints(tool, include_extras=True)
    
    for param_name, param in sig.parameters.items():
        param_type = type_hints.get(param_name)
        
        if param_type:
            # 检查是否为注解类型
            origin = get_origin(param_type)
            args = get_args(param_type)
            
            if origin is Annotated and len(args) >= 2:
                annotation = args[1]
                
                if isinstance(annotation, type) and issubclass(annotation, InjectedState):
                    # 注入状态
                    injected[param_name] = state
                
                elif isinstance(annotation, InjectedStore):
                    # 注入存储
                    if hasattr(state, "store") and state.store:
                        injected[param_name] = state.store
                    else:
                        raise ValueError("Store not available for injection")
                
                elif isinstance(annotation, type) and issubclass(annotation, InjectedToolCallId):
                    # 注入工具调用ID(需要在调用时设置)
                    pass
    
    return injected

3.2.2 工具错误处理

def _handle_tool_error(
    error: Exception,
    tool_call: ToolCall,
    handle_tool_errors: bool = True,
) -> ToolMessage:
    """
    处理工具执行错误
    
    错误处理策略:
    1. ToolException: 返回工具定义的错误消息
    2. 其他异常: 返回通用错误消息
    3. 不处理错误: 重新抛出异常
    """
    
    if not handle_tool_errors:
        raise error
    
    if isinstance(error, ToolException):
        # 工具特定错误
        error_content = str(error)
    else:
        # 通用错误
        error_content = f"Error: {error.__class__.__name__}: {str(error)}"
    
    return ToolMessage(
        content=error_content,
        tool_call_id=tool_call["id"],
        name=tool_call["name"],
        id=str(uuid4()),
        additional_kwargs={"error": True},
    )

4. 条件路由函数

4.1 tools_condition 实现

def tools_condition(
    state: Union[list[AnyMessage], dict[str, Any], Any]
) -> Literal["tools", "__end__"]:
    """
    工具条件路由函数
    
    路由逻辑:
    1. 检查最后的AI消息是否有工具调用
    2. 有工具调用 -> 路由到"tools"节点
    3. 无工具调用 -> 路由到END
    """
    
    # 1. 提取消息列表
    if isinstance(state, list):
        messages = state
    elif isinstance(state, dict):
        messages = state.get("messages", [])
    else:
        messages = getattr(state, "messages", [])
    
    # 2. 检查最后的消息
    if not messages:
        return END
    
    last_message = messages[-1]
    
    # 3. 检查是否为AI消息且有工具调用
    if isinstance(last_message, AIMessage) and last_message.tool_calls:
        return "tools"
    
    return END

def _structured_tools_condition(
    state: Union[list[AnyMessage], dict[str, Any], Any]
) -> Literal["tools", "structured", "__end__"]:
    """
    支持结构化响应的工具条件路由
    
    路由逻辑:
    1. 有工具调用 -> "tools"
    2. 无工具调用但需要结构化响应 -> "structured"  
    3. 其他 -> END
    """
    
    # 提取消息
    messages = _extract_messages(state)
    if not messages:
        return END
    
    last_message = messages[-1]
    
    # 检查工具调用
    if isinstance(last_message, AIMessage) and last_message.tool_calls:
        return "tools"
    
    # 检查是否需要结构化响应
    if _should_generate_structured_response(state):
        return "structured"
    
    return END

4.2 自定义条件函数

def create_custom_condition(
    *,
    tools_key: str = "tools",
    end_key: str = END,
    message_key: str = "messages",
    check_func: Optional[Callable[[AnyMessage], bool]] = None,
) -> Callable:
    """
    创建自定义条件路由函数
    
    参数:
    - tools_key: 工具节点键名
    - end_key: 结束节点键名  
    - message_key: 消息键名
    - check_func: 自定义检查函数
    """
    
    def condition(state: Any) -> str:
        # 提取消息
        messages = state.get(message_key, []) if isinstance(state, dict) else []
        
        if not messages:
            return end_key
        
        last_message = messages[-1]
        
        # 使用自定义检查函数
        if check_func:
            if check_func(last_message):
                return tools_key
            else:
                return end_key
        
        # 默认检查工具调用
        if isinstance(last_message, AIMessage) and last_message.tool_calls:
            return tools_key
        
        return end_key
    
    return condition

5. 中断和验证组件

5.1 中断处理

def interrupt(
    reason: str,
    *,
    rollback: bool = False,
    when: Optional[Callable[[Any], bool]] = None,
) -> Callable:
    """
    创建中断处理节点
    
    参数:
    - reason: 中断原因
    - rollback: 是否回滚状态
    - when: 中断条件函数
    """
    
    def interrupt_node(state: Any) -> dict:
        """中断节点实现"""
        
        # 检查中断条件
        if when and not when(state):
            return {}  # 不中断,继续执行
        
        # 创建中断信号
        interrupt_signal = Interrupt(
            reason=reason,
            rollback=rollback,
            state=state,
        )
        
        raise GraphInterrupt(interrupt_signal)
    
    return interrupt_node

class Interrupt:
    """中断信号类"""
    
    def __init__(
        self,
        reason: str,
        rollback: bool = False,
        state: Any = None,
    ):
        self.reason = reason
        self.rollback = rollback
        self.state = state
        self.timestamp = datetime.now()

5.2 工具验证器

class ToolValidator:
    """
    工具验证器:验证工具调用和结果
    """
    
    def __init__(
        self,
        *,
        validate_input: bool = True,
        validate_output: bool = True,
        schema_validation: bool = True,
        custom_validators: Optional[dict[str, Callable]] = None,
    ):
        self.validate_input = validate_input
        self.validate_output = validate_output
        self.schema_validation = schema_validation
        self.custom_validators = custom_validators or {}
    
    def validate_tool_call(
        self,
        tool_call: ToolCall,
        tool_schema: Optional[dict] = None,
    ) -> tuple[bool, Optional[str]]:
        """
        验证工具调用
        
        验证内容:
        1. 工具名称是否存在
        2. 参数格式是否正确
        3. 必需参数是否提供
        4. 参数类型是否匹配
        """
        
        tool_name = tool_call.get("name")
        tool_args = tool_call.get("args", {})
        
        # 1. 检查工具名称
        if not tool_name:
            return False, "Tool name is missing"
        
        # 2. 模式验证
        if self.schema_validation and tool_schema:
            try:
                self._validate_against_schema(tool_args, tool_schema)
            except ValidationError as e:
                return False, f"Schema validation failed: {e}"
        
        # 3. 自定义验证
        if tool_name in self.custom_validators:
            try:
                is_valid = self.custom_validators[tool_name](tool_call)
                if not is_valid:
                    return False, f"Custom validation failed for {tool_name}"
            except Exception as e:
                return False, f"Custom validation error: {e}"
        
        return True, None
    
    def validate_tool_result(
        self,
        result: Any,
        tool_name: str,
        expected_schema: Optional[dict] = None,
    ) -> tuple[bool, Optional[str]]:
        """
        验证工具执行结果
        
        验证内容:
        1. 结果格式
        2. 结果类型
        3. 结果内容
        """
        
        # 1. 基础验证
        if result is None:
            return False, "Tool result is None"
        
        # 2. 模式验证
        if expected_schema:
            try:
                self._validate_against_schema(result, expected_schema)
            except ValidationError as e:
                return False, f"Result schema validation failed: {e}"
        
        # 3. 自定义结果验证
        validator_name = f"{tool_name}_result"
        if validator_name in self.custom_validators:
            try:
                is_valid = self.custom_validators[validator_name](result)
                if not is_valid:
                    return False, f"Custom result validation failed for {tool_name}"
            except Exception as e:
                return False, f"Custom result validation error: {e}"
        
        return True, None
    
    def _validate_against_schema(self, data: Any, schema: dict) -> None:
        """根据JSON Schema验证数据"""
        from jsonschema import validate, ValidationError as JsonSchemaError
        
        try:
            validate(instance=data, schema=schema)
        except JsonSchemaError as e:
            raise ValidationError(str(e))

class ValidationError(Exception):
    """验证错误异常"""
    pass

6. 使用示例

6.1 基础ReAct Agent

from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent

# 定义工具
def get_weather(city: str) -> str:
    """获取指定城市的天气信息"""
    return f"It's sunny in {city} with 22°C"

def search_web(query: str) -> str:
    """搜索网络内容"""
    return f"Search results for: {query}"

# 创建智能体
model = ChatAnthropic(model="claude-3-sonnet-20240229")
tools = [get_weather, search_web]

agent = create_react_agent(
    model=model,
    tools=tools,
    prompt="You are a helpful assistant. Use tools when needed.",
)

# 运行智能体
result = agent.invoke({
    "messages": [{"role": "user", "content": "What's the weather in Beijing?"}]
})

print(result["messages"][-1].content)

6.2 带状态注入的工具

from typing import Annotated
from langgraph.prebuilt.tool_node import InjectedState, InjectedStore
from langgraph.store.memory import InMemoryStore

def search_with_context(
    query: str,
    state: Annotated[dict, InjectedState],
    store: Annotated[InMemoryStore, InjectedStore()],
) -> str:
    """
    带上下文的搜索工具
    
    可以访问当前状态和存储
    """
    
    # 从状态获取上下文
    messages = state.get("messages", [])
    context = f"Previous messages: {len(messages)}"
    
    # 从存储获取历史信息
    history = store.get(("search_history",), "queries") or []
    
    # 执行搜索
    result = f"Searching '{query}' with context: {context}"
    
    # 保存查询历史
    history.append(query)
    store.put(("search_history",), "queries", history)
    
    return result

# 使用带存储的智能体
store = InMemoryStore()
agent = create_react_agent(
    model=model,
    tools=[search_with_context],
    store=store,
)

6.3 自定义条件路由

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, create_custom_condition

# 自定义检查函数
def should_use_tools(message: AnyMessage) -> bool:
    """自定义工具使用条件"""
    if isinstance(message, AIMessage):
        # 检查消息内容是否包含特定关键词
        content = message.content.lower()
        if "search" in content or "weather" in content:
            return True
        # 或者检查工具调用
        return bool(message.tool_calls)
    return False

# 创建自定义条件函数
custom_condition = create_custom_condition(
    tools_key="tools",
    end_key=END,
    check_func=should_use_tools,
)

# 手动构建图
graph = StateGraph(MessagesState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))

graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", custom_condition)
graph.add_edge("tools", "agent")

compiled = graph.compile()

7. 总结

预构建组件为LangGraph提供了强大而易用的高级API:

7.1 核心优势

  1. 开箱即用:无需深入了解底层机制即可构建复杂应用
  2. 最佳实践:集成了经过验证的设计模式和优化策略
  3. 高度可定制:提供丰富的配置选项满足不同需求
  4. 类型安全:完整的类型提示确保开发体验
  5. 错误恢复:内置错误处理和恢复机制

7.2 设计特点

  1. 组合式架构:组件可以独立使用或组合使用
  2. 扩展性:支持自定义工具、条件和处理逻辑
  3. 性能优化:并行执行和智能缓存提升性能
  4. 调试友好:丰富的日志和调试信息

这些预构建组件大大降低了构建AI应用的门槛,让开发者能够专注于业务逻辑而不是底层实现细节。在下一部分中,我们将分析检查点系统的实现原理。