LangChain-03-LanguageModels模块

模块概览

职责与定位

LanguageModels模块提供了与大语言模型交互的核心抽象接口,是LangChain框架中连接各种模型提供商(OpenAI、Anthropic、Google等)的桥梁层。该模块定义了两大类模型接口:

  • Chat Models(聊天模型): 使用消息序列作为输入和输出的现代LLM接口
  • LLMs(文本模型): 传统的字符串输入输出接口,主要用于兼容旧模型

模块核心职责:

  • 定义统一的模型调用接口(generate/generate_prompt)
  • 提供缓存机制减少重复API调用
  • 实现流式输出支持
  • 集成追踪和回调系统
  • 提供工具调用(Tool/Function Calling)能力
  • 支持结构化输出(with_structured_output)

输入输出

输入类型:

LanguageModelInput = PromptValue | str | Sequence[MessageLikeRepresentation]
  • PromptValue: Prompt模板的输出对象
  • str: 纯文本字符串(会自动转换为消息)
  • Sequence[MessageLikeRepresentation]: 消息列表(HumanMessage, AIMessage等)

输出类型:

LanguageModelOutput = BaseMessage | str
  • BaseMessage: Chat Models返回AIMessage
  • str: LLMs返回纯文本字符串

上下游依赖

依赖:

  • langchain_core.messages: 消息类型体系
  • langchain_core.outputs: 输出封装(LLMResult, ChatGeneration等)
  • langchain_core.caches: 缓存系统
  • langchain_core.callbacks: 回调管理器
  • langchain_core.runnables: Runnable基类

被依赖:

  • langchain_openai, langchain_anthropic等Partner包实现具体模型
  • langchain.chains, langchain.agents等高层抽象使用模型
  • 用户应用代码直接调用模型

生命周期

  1. 初始化: 创建模型实例,配置API密钥、模型名称、温度等参数
  2. 配置: 通过bind()绑定参数,或configurable_fields()设置动态配置
  3. 调用: 使用invoke/batch/stream等方法执行推理
  4. 缓存: 可选地缓存结果避免重复调用
  5. 销毁: 自动垃圾回收,无需手动清理

整体服务架构图

架构层次全景图

flowchart TB
    subgraph UserLayer["用户层"]
        USER[应用代码]
        CHAIN[Chains/Agents]
    end

    subgraph InterfaceLayer["接口层"]
        BASE_LM[BaseLanguageModel<br/>统一抽象接口]
        RUNNABLE[RunnableSerializable<br/>可组合执行单元]
    end

    subgraph ModelLayer["模型层"]
        BASE_CHAT[BaseChatModel<br/>聊天模型]
        BASE_LLM[BaseLLM<br/>文本模型]
        SIMPLE_CHAT[SimpleChatModel]
        LLM[LLM]
    end

    subgraph ImplementationLayer["实现层"]
        OPENAI[ChatOpenAI]
        ANTHROPIC[ChatAnthropic]
        GEMINI[ChatGemini]
        OLLAMA[ChatOllama]
    end

    subgraph InfrastructureLayer["基础设施层"]
        CACHE[Caches<br/>缓存系统]
        CALLBACK[Callbacks<br/>回调系统]
        MESSAGES[Messages<br/>消息体系]
        OUTPUTS[Outputs<br/>输出封装]
        PROMPTS[Prompts<br/>提示模板]
        TOOLS_MOD[Tools<br/>工具系统]
        PARSERS[Output Parsers<br/>输出解析器]
    end

    subgraph ExternalLayer["外部服务层"]
        OPENAI_API[OpenAI API]
        ANTHROPIC_API[Anthropic API]
        GOOGLE_API[Google API]
        LOCAL_API[本地模型服务]
    end

    USER --> CHAIN
    CHAIN --> BASE_LM
    USER --> BASE_LM

    BASE_LM -.继承.-> RUNNABLE
    BASE_LM --> BASE_CHAT
    BASE_LM --> BASE_LLM

    BASE_CHAT --> SIMPLE_CHAT
    BASE_LLM --> LLM

    BASE_CHAT -.实现.-> OPENAI
    BASE_CHAT -.实现.-> ANTHROPIC
    BASE_CHAT -.实现.-> GEMINI
    BASE_CHAT -.实现.-> OLLAMA

    BASE_CHAT -.依赖.-> CACHE
    BASE_CHAT -.依赖.-> CALLBACK
    BASE_CHAT -.依赖.-> MESSAGES
    BASE_CHAT -.依赖.-> OUTPUTS
    BASE_CHAT -.依赖.-> PROMPTS
    BASE_CHAT -.依赖.-> TOOLS_MOD
    BASE_CHAT -.依赖.-> PARSERS

    OPENAI --> OPENAI_API
    ANTHROPIC --> ANTHROPIC_API
    GEMINI --> GOOGLE_API
    OLLAMA --> LOCAL_API

    style USER fill:#e1f5ff
    style BASE_LM fill:#fff4e1
    style BASE_CHAT fill:#ffe1e1
    style OPENAI fill:#e8f5e9
    style CACHE fill:#f0e1ff
    style CALLBACK fill:#f0e1ff

模块交互关系图

flowchart LR
    subgraph Core["核心模型模块"]
        BCM[BaseChatModel]
    end

    subgraph Input["输入处理"]
        PROMPT_VAL[PromptValue]
        CONV[convert_to_messages]
        NORM[_normalize_messages]
    end

    subgraph Cache["缓存模块"]
        CACHE_RESOLVE[_resolve_cache]
        CACHE_LOOKUP[lookup]
        CACHE_UPDATE[update]
    end

    subgraph Callback["回调模块"]
        CB_MGR[CallbackManager]
        CB_START[on_chat_model_start]
        CB_TOKEN[on_llm_new_token]
        CB_END[on_llm_end]
        CB_ERROR[on_llm_error]
    end

    subgraph Generation["生成模块"]
        GEN[_generate]
        GEN_CACHE[_generate_with_cache]
        STREAM[_stream]
    end

    subgraph Output["输出处理"]
        CHAT_RES[ChatResult]
        CHAT_GEN[ChatGeneration]
        AI_MSG[AIMessage]
        LLM_RES[LLMResult]
    end

    subgraph External["外部API"]
        API[Provider API]
    end

    BCM --> PROMPT_VAL
    PROMPT_VAL --> CONV
    CONV --> NORM

    BCM --> CACHE_RESOLVE
    CACHE_RESOLVE --> CACHE_LOOKUP
    CACHE_RESOLVE --> CACHE_UPDATE

    BCM --> CB_MGR
    CB_MGR --> CB_START
    CB_MGR --> CB_TOKEN
    CB_MGR --> CB_END
    CB_MGR --> CB_ERROR

    NORM --> GEN_CACHE
    GEN_CACHE --> CACHE_LOOKUP
    GEN_CACHE --> GEN
    GEN_CACHE --> STREAM
    GEN --> API
    STREAM --> API

    GEN --> CHAT_RES
    CHAT_RES --> CHAT_GEN
    CHAT_GEN --> AI_MSG
    CHAT_RES --> CACHE_UPDATE

    AI_MSG --> LLM_RES

架构说明

1. 层次划分

用户层:

  • 应用代码直接调用模型或通过Chains/Agents间接使用
  • 面向业务逻辑,无需关心底层实现细节

接口层:

  • BaseLanguageModel: 定义统一的语言模型接口
  • RunnableSerializable: 提供可组合、可序列化的执行单元能力
  • 支持链式调用、批处理、流式输出等

模型层:

  • BaseChatModel: 聊天模型抽象,定义消息输入输出规范
  • BaseLLM: 文本补全模型抽象,定义字符串输入输出规范
  • SimpleChatModel/LLM: 简化实现基类,降低自定义模型开发难度

实现层:

  • 各Provider的具体实现(ChatOpenAI, ChatAnthropic等)
  • 实现_generate_stream核心方法
  • 处理Provider特定的API调用和错误处理

基础设施层:

  • 缓存系统: 减少重复API调用,节省成本
  • 回调系统: 提供可观测性,支持日志、监控、追踪
  • 消息体系: 统一的消息格式,支持多模态内容
  • 输出封装: 标准化输出结构,包含元数据
  • 提示模板: 支持动态生成输入
  • 工具系统: 支持Function Calling
  • 输出解析器: 结构化输出解析

外部服务层:

  • 各LLM Provider的API服务
  • 本地部署的模型服务(Ollama, vLLM等)

2. 核心抽象

BaseLanguageModel:

  • 所有语言模型的最顶层抽象
  • 继承自RunnableSerializable,具备Runnable所有能力(invoke, batch, stream, pipe等)
  • 定义generate_prompt抽象方法作为核心接口
  • 提供缓存、回调、追踪基础设施
  • 支持配置化(configurable_fields, configurable_alternatives)

BaseChatModel:

  • 聊天模型的抽象基类
  • 输入: LanguageModelInput = str | list[BaseMessage] | PromptValue
  • 输出: AIMessage
  • 必须实现_generate(messages, stop, run_manager, **kwargs) -> ChatResult方法
  • 可选实现_stream方法支持流式输出
  • 可选实现_agenerate_astream方法支持异步

BaseLLM:

  • 传统文本模型的抽象基类
  • 输入: str或自动转换的消息列表
  • 输出: str
  • 必须实现_generate(prompts, stop, run_manager, **kwargs) -> LLMResult方法

3. 简化基类

SimpleChatModel:

  • 为简单聊天模型提供的便利基类
  • 只需实现_call(messages, stop, run_manager, **kwargs) -> str方法
  • 自动处理AIMessage包装和ChatResult构造
  • 适合快速原型开发或简单模型接入

LLM:

  • 为简单文本模型提供的便利基类
  • 只需实现_call(prompt, stop, run_manager, **kwargs) -> str方法
  • 自动处理Generation和LLMResult包装

4. 功能特性

缓存机制:

  • 基于输入消息和模型参数的缓存键
  • 支持内存缓存(InMemoryCache)、Redis缓存、SQLite缓存等
  • 通过cache属性配置: True(全局缓存), False(禁用), None(默认), BaseCache实例(指定缓存)
  • 缓存查询在API调用前,缓存写入在API调用后
  • 流式输出不使用缓存

流式输出:

  • 实现_stream(messages, stop, run_manager, **kwargs) -> Iterator[ChatGenerationChunk]方法
  • 每个chunk触发on_llm_new_token回调
  • 自动聚合chunk为完整消息
  • 异步版本_astream默认委托给同步_stream
  • 支持通过disable_streaming属性控制流式行为

工具调用(Function Calling):

  • bind_tools(tools, tool_choice, **kwargs)绑定工具列表
  • 自动转换工具定义为OpenAI格式(或Provider特定格式)
  • 模型输出包含tool_calls字段,描述需要调用的工具和参数
  • 支持tool_choice: "auto"(自动决定), "any"(强制调用), "none"(禁止调用), 或指定工具名

结构化输出:

  • with_structured_output(schema, method, include_raw, **kwargs)强制输出特定schema
  • method="function_calling": 使用工具调用机制(强约束)
  • method="json_mode": 使用JSON模式+提示词(弱约束)
  • 自动解析和验证输出为Pydantic模型或dict
  • include_raw=True时返回原始AIMessage和解析结果

速率限制:

  • rate_limiter属性支持配置速率限制器
  • 在缓存查询后、API调用前执行速率限制
  • 支持同步和异步速率限制

5. 模块交互模式

输入处理流程:

  1. _convert_input: 将各种输入格式转换为PromptValue
  2. to_messages(): PromptValue转换为消息列表
  3. _normalize_messages: 规范化消息格式(处理多模态内容)
  4. _format_for_tracing: 格式化消息用于追踪

缓存交互流程:

  1. _resolve_cache: 解析缓存配置(实例/全局/禁用)
  2. _get_llm_string: 生成缓存键(包含模型参数)
  3. cache.lookup(prompt, llm_string): 查询缓存
  4. 缓存命中: 直接返回缓存结果
  5. 缓存未命中: 调用_generate生成结果
  6. cache.update(prompt, llm_string, generations): 更新缓存

回调交互流程:

  1. CallbackManager.configure: 创建回调管理器
  2. on_chat_model_start: 模型调用开始前触发
  3. on_llm_new_token: 流式输出每个token触发
  4. on_llm_end: 模型调用成功结束触发
  5. on_llm_error: 模型调用失败触发

生成执行流程:

  1. 检查是否应使用流式(_should_stream)
  2. 流式模式: 调用_stream生成器,逐chunk返回
  3. 非流式模式: 调用_generate一次性返回完整结果
  4. 子类实现_generate/_stream调用外部API
  5. 包装结果为ChatResult/LLMResult

核心数据结构

classDiagram
    class BaseLanguageModel~Input, Output~ {
        <<abstract>>
        +cache: BaseCache|bool|None
        +callbacks: Callbacks
        +tags: list[str]|None
        +metadata: dict|None
        +generate_prompt(prompts: list[PromptValue]) LLMResult*
        +invoke(input: LanguageModelInput) LanguageModelOutput
        +batch(inputs: list[Input]) list[Output]
        +stream(input: Input) Iterator[Output]
    }

    class BaseChatModel {
        <<abstract>>
        +_generate(messages: list[BaseMessage]) ChatResult*
        +_stream(messages: list[BaseMessage]) Iterator[ChatGenerationChunk]
        +bind_tools(tools: list[Tool]) BaseChatModel
        +with_structured_output(schema: Type[T]) Runnable[Input, T]
        +_generate_with_cache(...) ChatResult
    }

    class SimpleChatModel {
        <<abstract>>
        +_call(messages: list[BaseMessage], stop: list[str]) str*
    }

    class BaseLLM {
        <<abstract>>
        +_generate(prompts: list[str]) LLMResult*
        +_stream(prompt: str) Iterator[GenerationChunk]
    }

    class LLM {
        <<abstract>>
        +_call(prompt: str, stop: list[str]) str*
    }

    class LLMResult {
        +generations: list[list[Generation]]
        +llm_output: dict|None
        +run: list[RunInfo]|None
    }

    class ChatResult {
        +generations: list[ChatGeneration]
        +llm_output: dict|None
    }

    class ChatGeneration {
        +message: AIMessage
        +generation_info: dict|None
    }

    class AIMessage {
        +content: str|list
        +additional_kwargs: dict
        +response_metadata: dict
        +tool_calls: list[ToolCall]
        +usage_metadata: UsageMetadata
    }

    BaseLanguageModel <|-- BaseChatModel
    BaseLanguageModel <|-- BaseLLM
    BaseChatModel <|-- SimpleChatModel
    BaseLLM <|-- LLM

    BaseChatModel --> ChatResult : returns
    ChatResult --> ChatGeneration : contains
    ChatGeneration --> AIMessage : contains
    BaseLLM --> LLMResult : returns

数据结构说明

BaseLanguageModel字段

字段 类型 必填 默认值 说明
cache BaseCache|bool|None None 缓存配置: True使用全局缓存, False禁用, BaseCache实例使用指定缓存
callbacks Callbacks None 回调处理器列表
tags list[str]|None None 追踪标签
metadata dict|None None 追踪元数据
verbose bool False 是否打印详细信息
custom_get_token_ids Callable None 自定义token计数函数

LLMResult结构

字段 类型 说明
generations list[list[Generation]] 每个输入的多个生成结果(n>1时)
llm_output dict|None 模型返回的元数据(token使用量等)
run list[RunInfo]|None 运行信息(废弃)

ChatGeneration结构

字段 类型 说明
message AIMessage 生成的AI消息
generation_info dict|None 生成信息(如finish_reason)

AIMessage关键字段

字段 类型 说明
content str|list[ContentBlock] 消息内容,可以是文本或多模态块
additional_kwargs dict 模型特定的额外字段(废弃,使用response_metadata)
response_metadata dict 响应元数据(model_name, finish_reason等)
tool_calls list[ToolCall] 模型请求的工具调用列表
usage_metadata UsageMetadata Token使用情况
id str 消息唯一ID

UsageMetadata结构

class UsageMetadata(BaseModel):
    input_tokens: int  # 输入token数
    output_tokens: int  # 输出token数
    total_tokens: int  # 总token数

完整调用链路分析

调用路径全景时序图

sequenceDiagram
    autonumber
    participant App as 应用代码
    participant Runnable as Runnable接口层
    participant BCM as BaseChatModel
    participant Input as 输入处理
    participant Cache as 缓存层
    participant CB as 回调层
    participant GenCache as _generate_with_cache
    participant RateLimit as 速率限制
    participant Stream as _stream/_generate
    participant Impl as 实现层(ChatOpenAI)
    participant API as 外部API
    participant Output as 输出处理

    App->>Runnable: invoke(input, config)
    Note over Runnable: Runnable统一入口<br/>支持配置、重试、Fallback

    Runnable->>BCM: invoke(input, config)
    Note over BCM: BaseChatModel<br/>核心调度逻辑

    BCM->>Input: _convert_input(input)
    Input->>Input: 规范化为PromptValue
    Input->>Input: to_messages()
    Input->>Input: _normalize_messages()
    Input-->>BCM: 标准化消息列表

    BCM->>BCM: generate_prompt([ChatPromptValue])
    BCM->>BCM: generate(messages_list)

    BCM->>CB: CallbackManager.configure()
    CB->>CB: on_chat_model_start(model, messages)
    CB-->>BCM: run_manager

    BCM->>GenCache: _generate_with_cache(messages)

    GenCache->>Cache: _resolve_cache()
    Cache-->>GenCache: llm_cache

    alt 缓存已启用
        GenCache->>Cache: _get_llm_string(params)
        Cache-->>GenCache: llm_string
        GenCache->>Cache: cache.lookup(prompt, llm_string)

        alt 缓存命中
            Cache-->>GenCache: cached_generations
            GenCache-->>BCM: ChatResult(cached)
            BCM->>CB: on_llm_end(result)
            BCM-->>App: AIMessage
            Note over BCM,App: 缓存命中,直接返回<br/>耗时<10ms
        end
    end

    Note over GenCache: 缓存未命中,继续执行

    GenCache->>RateLimit: rate_limiter.acquire()
    RateLimit-->>GenCache: 继续

    alt 应使用流式输出
        GenCache->>Stream: _stream(messages)
        Stream->>Impl: 子类实现_stream
        Impl->>API: HTTP Stream请求

        loop 每个token
            API-->>Impl: chunk_data
            Impl->>Impl: 解析chunk
            Impl-->>Stream: ChatGenerationChunk
            Stream->>CB: on_llm_new_token(token, chunk)
            Stream->>Stream: 聚合chunks
        end

        API-->>Impl: [DONE]
        Impl-->>Stream: 流结束
        Stream->>Output: generate_from_stream(chunks)
        Output-->>GenCache: ChatResult

    else 非流式输出
        GenCache->>Stream: _generate(messages)
        Stream->>Impl: 子类实现_generate
        Impl->>Impl: 构造API请求
        Impl->>API: HTTP请求
        Note over API: LLM推理<br/>耗时1-10秒
        API-->>Impl: 完整响应
        Impl->>Impl: 解析响应
        Impl->>Output: 构造ChatResult
        Output-->>Stream: ChatResult
        Stream-->>GenCache: ChatResult
    end

    GenCache->>Output: 添加response_metadata
    GenCache->>Output: 设置message.id

    alt 缓存已启用
        GenCache->>Cache: cache.update(prompt, llm_string, generations)
        Cache-->>GenCache: 更新完成
    end

    GenCache-->>BCM: ChatResult
    BCM->>CB: on_llm_end(result)
    BCM->>Output: 提取generations[0][0].message
    Output-->>BCM: AIMessage
    BCM-->>Runnable: AIMessage
    Runnable-->>App: AIMessage

调用链路详细说明

阶段1: Runnable接口层 (1-2)

目的: 提供统一的调用接口和通用能力

关键代码:

# Runnable基类提供的能力
class RunnableSerializable:
    def invoke(self, input, config=None, **kwargs):
        # 1) 处理配置(重试、fallback、超时等)
        config = ensure_config(config)

        # 2) 应用配置化字段
        bound = self._bind_config(config)

        # 3) 委托给具体实现
        return bound._invoke(input, config, **kwargs)

关键点:

  • 统一的配置管理: 回调、标签、元数据、超时、重试
  • 支持运行时配置化: configurable_fields()定义的字段可动态修改
  • 支持Fallback链: with_fallbacks()定义的备用模型
  • 支持重试策略: with_retry()定义的重试逻辑

阶段2: 输入规范化 (3-7)

目的: 将多样化的输入格式统一为标准消息列表

关键代码:

def _convert_input(self, model_input: LanguageModelInput) -> PromptValue:
    # 1) PromptValue直接返回
    if isinstance(model_input, PromptValue):
        return model_input

    # 2) 字符串包装为StringPromptValue
    if isinstance(model_input, str):
        return StringPromptValue(text=model_input)

    # 3) 消息列表包装为ChatPromptValue
    if isinstance(model_input, Sequence):
        messages = convert_to_messages(model_input)  # 支持dict/tuple转换
        return ChatPromptValue(messages=messages)

    raise ValueError(f"Invalid input type {type(model_input)}")

def to_messages(self) -> list[BaseMessage]:
    # ChatPromptValue: 返回消息列表
    # StringPromptValue: 包装为[HumanMessage(content=text)]
    ...

def _normalize_messages(messages: list[BaseMessage]) -> list[BaseMessage]:
    """规范化消息格式"""
    # 1) 处理多模态内容(图片、音频、文件)
    # 2) 展开MessageLike对象
    # 3) 验证消息顺序和角色
    ...

def _format_for_tracing(messages: list[BaseMessage]) -> list[BaseMessage]:
    """格式化用于追踪的消息"""
    # 1) 转换图片块为OpenAI格式(向后兼容)
    # 2) 为content block添加type字段
    # 3) 浅拷贝消息避免修改原始数据
    ...

数据流转换:

"你好"
  → StringPromptValue(text="你好")
  → [HumanMessage(content="你好")]
  → 规范化消息
  → 追踪格式化

[{"role": "user", "content": "你好"}]
  → convert_to_messages
  → [HumanMessage(content="你好")]
  → 规范化消息

[HumanMessage("你好"), AIMessage("你好!"), HumanMessage("天气如何?")]
  → 直接使用
  → 规范化消息

阶段3: 回调管理器创建 (8-10)

目的: 建立可观测性基础设施

关键代码:

def generate(self, messages: list[list[BaseMessage]], stop=None, callbacks=None, **kwargs):
    # 1) 获取LangSmith追踪参数
    params = self._get_invocation_params(stop=stop, **kwargs)
    ls_params = self._get_ls_params(stop=stop, **kwargs)

    # 2) 合并元数据
    inheritable_metadata = {
        **(metadata or {}),
        **ls_params,  # ls_provider, ls_model_name, ls_temperature等
    }

    # 3) 创建回调管理器
    callback_manager = CallbackManager.configure(
        callbacks,           # 本次调用的回调
        self.callbacks,      # 模型实例的回调
        self.verbose,        # 是否打印详细信息
        tags,                # 标签
        self.tags,           # 模型实例标签
        inheritable_metadata, # 元数据
        self.metadata,       # 模型实例元数据
    )

    # 4) 触发开始回调
    run_managers = callback_manager.on_chat_model_start(
        self._serialized,           # 模型序列化信息
        messages_to_trace,          # 格式化后的消息
        invocation_params=params,   # 调用参数
        options=options,            # 选项
        name=run_name,              # 运行名称
        run_id=run_id,              # 运行ID
        batch_size=len(messages),   # 批次大小
    )

    return run_managers

回调事件时间线:

t0: on_chat_model_start(model, messages, params)
t1-t9: [流式模式] on_llm_new_token(token, chunk) × N次
t10: on_llm_end(result)  或  on_llm_error(error)

回调用途:

  • 日志记录: 记录模型调用参数和结果
  • 成本追踪: 统计token使用量和费用
  • 性能监控: 测量延迟、吞吐量
  • LangSmith追踪: 自动上传到LangSmith平台
  • 自定义逻辑: 用户自定义回调处理器

阶段4: 缓存查询 (11-20)

目的: 避免重复API调用,降低成本和延迟

关键代码:

def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
    # 1) 解析缓存配置
    llm_cache = self.cache if isinstance(self.cache, BaseCache) else get_llm_cache()
    check_cache = self.cache or self.cache is None

    if check_cache and llm_cache:
        # 2) 生成缓存键
        llm_string = self._get_llm_string(stop=stop, **kwargs)
        # llm_string示例: '{"model":"gpt-4o","temperature":0.7}---[("stop",["\\n"])]'

        prompt = dumps(messages)  # 序列化消息列表

        # 3) 查询缓存
        cache_val = llm_cache.lookup(prompt, llm_string)

        if isinstance(cache_val, list):
            # 缓存命中!转换并返回
            converted = self._convert_cached_generations(cache_val)
            return ChatResult(generations=converted)

    # 缓存未命中,继续执行生成逻辑
    ...

缓存键构成:

cache_key = hash(prompt + llm_string)

prompt = dumps([
    HumanMessage(content="什么是量子计算?")
])
# → "{"type":"human","data":{"content":"什么是量子计算?"}}"

llm_string = JSON({
    "model": "gpt-4o",
    "temperature": 0.7,
    "max_tokens": null,
    ...  # 所有_identifying_params
}) + "---" + str([("stop", ["\\n"])])

缓存命中效果:

  • 延迟: 从1-10秒降至 <10ms (100-1000倍提升)
  • 成本: 从$0.001-0.01降至 $0 (完全节省)
  • Token使用: usage_metadata中total_cost置为0

阶段5: 速率限制 (21-22)

目的: 控制API调用频率,避免触发速率限制

关键代码:

# 在缓存查询后,API调用前执行
if self.rate_limiter:
    self.rate_limiter.acquire(blocking=True)

# 使用示例
from langchain_core.rate_limiters import InMemoryRateLimiter

rate_limiter = InMemoryRateLimiter(
    requests_per_second=10,  # 每秒最多10次请求
    check_every_n_seconds=0.1,  # 每0.1秒检查一次
)

model = ChatOpenAI(
    model="gpt-4o",
    rate_limiter=rate_limiter,
)

速率限制策略:

  • 令牌桶算法: 固定速率补充令牌
  • 滑动窗口: 统计最近N秒内的请求数
  • 优先级队列: 不同优先级的请求分开限速
  • 自适应限速: 根据API错误(429)动态调整

阶段6: 决策流式vs非流式 (23-47)

目的: 根据配置和上下文决定使用流式还是非流式输出

关键代码:

def _should_stream(self, *, async_api: bool, run_manager=None, **kwargs) -> bool:
    # 1) 检查是否实现了_stream
    sync_not_implemented = type(self)._stream == BaseChatModel._stream
    async_not_implemented = type(self)._astream == BaseChatModel._astream

    if (not async_api) and sync_not_implemented:
        return False
    if async_api and async_not_implemented and sync_not_implemented:
        return False

    # 2) 检查实例级禁用
    if self.disable_streaming is True:
        return False
    if self.disable_streaming == "tool_calling" and kwargs.get("tools"):
        return False  # 工具调用模式下禁用流式

    # 3) 检查运行时参数
    if "stream" in kwargs:
        return kwargs["stream"]

    # 4) 检查模型字段配置
    if "streaming" in self.model_fields_set:
        streaming_value = getattr(self, "streaming", None)
        if isinstance(streaming_value, bool):
            return streaming_value

    # 5) 检查回调处理器
    handlers = run_manager.handlers if run_manager else []
    return any(isinstance(h, _StreamingCallbackHandler) for h in handlers)

决策流程图:

是否实现_stream?
  ├─   使用非流式
  └─   disable_streaming?
          ├─ True  使用非流式
          ├─ "tool_calling" + tools存在  使用非流式
          └─ False  kwargs["stream"]?
                     ├─ True  使用流式
                     ├─ False  使用非流式
                     └─ 未设置  streaming字段?
                                 ├─ True  使用流式
                                 ├─ False  使用非流式
                                 └─ 未设置  StreamingCallback?
                                             ├─   使用流式
                                             └─   使用非流式

阶段7: 流式输出路径 (24-38)

目的: 逐token生成,降低首字节延迟,改善用户体验

关键代码:

# BaseChatModel中的流式逻辑
def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
    if self._should_stream(async_api=False, run_manager=run_manager, **kwargs):
        chunks: list[ChatGenerationChunk] = []
        run_id = f"{LC_ID_PREFIX}-{run_manager.run_id}" if run_manager else None

        # 调用子类实现的_stream
        for chunk in self._stream(messages, stop=stop, **kwargs):
            # 1) 设置响应元数据
            chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk)

            # 2) 处理output_version
            if self.output_version == "v1":
                chunk.message = _update_message_content_to_blocks(chunk.message, "v1")

            # 3) 设置消息ID
            if run_manager and chunk.message.id is None:
                chunk.message.id = run_id

            # 4) 触发token回调
            if run_manager:
                run_manager.on_llm_new_token(
                    cast("str", chunk.message.content),
                    chunk=chunk
                )

            # 5) 收集chunk
            chunks.append(chunk)

        # 6) 从chunks生成最终结果
        result = generate_from_stream(iter(chunks))
        return result

子类实现示例(ChatOpenAI):

def _stream(self, messages, stop=None, run_manager=None, **kwargs):
    # 1) 构造OpenAI API请求
    request_params = self._prepare_request(messages, stop, stream=True, **kwargs)

    # 2) 发起流式请求
    response = self.client.chat.completions.create(
        **request_params,
        stream=True,
    )

    # 3) 逐chunk处理
    for chunk in response:
        if not chunk.choices:
            continue

        choice = chunk.choices[0]

        # 4) 提取增量内容
        delta = choice.delta
        content = delta.content or ""
        tool_call_chunks = self._extract_tool_call_chunks(delta)

        # 5) 构造消息chunk
        message_chunk = AIMessageChunk(
            content=content,
            tool_call_chunks=tool_call_chunks,
        )

        # 6) 包装为ChatGenerationChunk
        generation_chunk = ChatGenerationChunk(
            message=message_chunk,
            generation_info={
                "finish_reason": choice.finish_reason,
                "model": chunk.model,
            },
        )

        yield generation_chunk

流式chunk聚合:

def generate_from_stream(stream: Iterator[ChatGenerationChunk]) -> ChatResult:
    # 1) 读取第一个chunk
    generation = next(stream, None)

    if generation:
        # 2) 累加所有chunk
        for chunk in stream:
            generation += chunk  # AIMessageChunk.__add__实现累加

        # 3) 转换为完整消息
        message = message_chunk_to_message(generation.message)

        return ChatResult(
            generations=[ChatGeneration(
                message=message,
                generation_info=generation.generation_info,
            )]
        )

流式性能特性:

  • TTFT (Time To First Token): 200-800ms
  • 后续token间隔: 20-100ms
  • 总耗时: 与非流式相同或略高(多次网络往返)
  • 用户感知延迟: 显著降低

阶段8: 非流式输出路径 (39-44)

目的: 一次性生成完整响应

关键代码:

# 非流式路径
if inspect.signature(self._generate).parameters.get("run_manager"):
    result = self._generate(
        messages,
        stop=stop,
        run_manager=run_manager,
        **kwargs
    )
else:
    # 旧版本不支持run_manager参数
    result = self._generate(messages, stop=stop, **kwargs)

子类实现示例(ChatOpenAI):

def _generate(self, messages, stop=None, run_manager=None, **kwargs):
    # 1) 构造API请求
    request_params = self._prepare_request(messages, stop, stream=False, **kwargs)

    # 2) 同步调用API
    response = self.client.chat.completions.create(**request_params)

    # 3) 解析响应
    choice = response.choices[0]
    message = AIMessage(
        content=choice.message.content or "",
        tool_calls=self._extract_tool_calls(choice.message),
        response_metadata={
            "model": response.model,
            "finish_reason": choice.finish_reason,
            "system_fingerprint": response.system_fingerprint,
        },
        usage_metadata=UsageMetadata(
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens,
            total_tokens=response.usage.total_tokens,
        ),
    )

    # 4) 包装为ChatResult
    generation = ChatGeneration(
        message=message,
        generation_info={
            "finish_reason": choice.finish_reason,
        },
    )

    return ChatResult(
        generations=[generation],
        llm_output={"model": response.model},
    )

阶段9: 输出后处理 (45-47)

目的: 标准化输出格式,添加元数据

关键代码:

# 1) 处理output_version
if self.output_version == "v1":
    for generation in result.generations:
        generation.message = _update_message_content_to_blocks(
            generation.message, "v1"
        )

# 2) 添加消息ID
for idx, generation in enumerate(result.generations):
    if run_manager and generation.message.id is None:
        generation.message.id = f"{LC_ID_PREFIX}-{run_manager.run_id}-{idx}"

    # 3) 合并响应元数据
    generation.message.response_metadata = _gen_info_and_msg_metadata(generation)

# 4) 特殊处理单个generation的情况
if len(result.generations) == 1 and result.llm_output is not None:
    result.generations[0].message.response_metadata = {
        **result.llm_output,
        **result.generations[0].message.response_metadata,
    }

阶段10: 缓存更新 (48-51)

目的: 存储新生成的结果供后续使用

关键代码:

if check_cache and llm_cache:
    llm_cache.update(prompt, llm_string, result.generations)

缓存更新时机:

  • 仅在成功生成后更新
  • 错误响应不缓存
  • 流式和非流式都会缓存最终结果
  • 异步版本使用await llm_cache.aupdate()

阶段11: 回调触发和结果返回 (52-56)

目的: 通知观察者并返回最终结果

关键代码:

# 触发结束回调
run_manager.on_llm_end(LLMResult(generations=generations, llm_output=llm_output))

# 提取AIMessage
message = cast("AIMessage", result.generations[0][0].message)

# 返回给用户
return message

批处理调用链路

sequenceDiagram
    autonumber
    participant App
    participant BCM as BaseChatModel
    participant Gen as generate()
    participant Cache
    participant Batch as 批处理逻辑
    participant Worker as 工作线程池

    App->>BCM: batch([input1, input2, input3], config)
    BCM->>Gen: generate([msg1, msg2, msg3])

    Gen->>Cache: 批量查询缓存
    Cache-->>Gen: [cached1, miss, cached3]

    Gen->>Batch: 只处理miss的输入

    par 并发执行
        Batch->>Worker: _generate_with_cache(msg2)
        Worker->>Worker: API调用
        Worker-->>Batch: result2
    end

    Batch->>Cache: 更新缓存
    Batch->>Gen: 合并结果[cached1, result2, cached3]
    Gen-->>BCM: LLMResult
    BCM-->>App: [AIMessage1, AIMessage2, AIMessage3]

批处理关键代码:

def generate(self, messages: list[list[BaseMessage]], stop=None, callbacks=None, **kwargs):
    # 1) 缓存批量查询
    existing_prompts, llm_string, missing_prompt_idxs, missing_prompts = \
        get_prompts(params, [dumps(m) for m in messages], self.cache)

    # 2) 只生成缓存未命中的部分
    if len(missing_prompts) > 0:
        run_managers = [
            callback_managers[idx].on_llm_start(...)
            for idx in missing_prompt_idxs
        ]

        # 批量调用_generate (子类可实现真正的批处理)
        new_results = self._generate_helper(
            missing_prompts,
            stop,
            run_managers,
            **kwargs,
        )

        # 3) 更新缓存
        update_cache(self.cache, existing_prompts, llm_string,
                     missing_prompt_idxs, new_results, prompts)

    # 4) 合并结果
    generations = [existing_prompts[i] for i in range(len(prompts))]
    return LLMResult(generations=generations, llm_output=llm_output)

批处理优化:

  • 缓存命中的输入跳过API调用
  • 子类可重写_generate实现真正的批量API调用
  • 默认实现串行调用每个输入
  • 支持通过max_concurrency控制并发度

异步调用链路

sequenceDiagram
    autonumber
    participant App
    participant BCM as BaseChatModel
    participant AGen as agenerate()
    participant Executor as AsyncExecutor
    participant API as 异步API

    App->>BCM: await ainvoke(input)
    BCM->>AGen: await agenerate([messages])

    alt 子类实现了_agenerate
        AGen->>API: await self._agenerate(messages)
        API-->>AGen: result
    else 只实现了_generate
        AGen->>Executor: run_in_executor(self._generate, messages)
        Note over Executor: 在线程池中执行同步代码
        Executor-->>AGen: result
    end

    AGen-->>BCM: LLMResult
    BCM-->>App: AIMessage

异步实现策略:

  1. 原生异步: 子类实现_agenerate_astream
  2. 线程池回退: 使用run_in_executor包装同步方法
  3. 性能差异: 原生异步避免线程切换开销

核心API详解

API-1: BaseChatModel.invoke

基本信息

  • 名称: invoke
  • 方法签名: def invoke(self, input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs) -> AIMessage
  • 幂等性: 非幂等(相同输入可能产生不同输出)

功能说明

同步调用聊天模型,将输入消息转换为AI响应。支持缓存、追踪、流式输出等特性。

请求结构体

# invoke方法参数
input: LanguageModelInput  # str | list[BaseMessage] | PromptValue
config: RunnableConfig | None  # 可选配置
stop: list[str] | None  # 停止词
**kwargs: Any  # 传递给模型的额外参数(如temperature)
参数 类型 必填 默认值 说明
input LanguageModelInput - 输入消息,自动转换为消息列表
config RunnableConfig|None None 运行时配置
stop list[str]|None None 停止词序列
**kwargs Any {} 额外参数(如temperature, max_tokens)

响应结构体

# invoke返回AIMessage
class AIMessage(BaseMessage):
    content: str | list[ContentBlock]  # 生成的内容
    response_metadata: dict  # 响应元数据
    tool_calls: list[ToolCall]  # 工具调用请求
    usage_metadata: UsageMetadata  # Token使用情况
字段 类型 说明
content str|list 生成的文本或多模态内容
response_metadata dict 包含model_name, finish_reason, system_fingerprint等
tool_calls list[ToolCall] 模型请求调用的工具列表
usage_metadata UsageMetadata input_tokens, output_tokens, total_tokens

入口函数与关键代码

class BaseChatModel(BaseLanguageModel[LanguageModelInput, AIMessage]):
    def invoke(
        self,
        input: LanguageModelInput,
        config: RunnableConfig | None = None,
        *,
        stop: list[str] | None = None,
        **kwargs: Any,
    ) -> AIMessage:
        config = ensure_config(config)

        # 1) 转换输入为消息列表
        messages = self._convert_input(input).to_messages()

        # 2) 调用generate_prompt并提取第一个结果
        result = self.generate_prompt(
            [ChatPromptValue(messages=messages)],
            stop=stop,
            callbacks=config.get("callbacks"),
            tags=config.get("tags"),
            metadata=config.get("metadata"),
            run_name=config.get("run_name"),
            **kwargs,
        )

        # 3) 返回第一个生成的消息
        return cast(AIMessage, result.generations[0][0].message)

    def generate_prompt(
        self,
        prompts: list[PromptValue],
        stop: list[str] | None = None,
        callbacks: Callbacks = None,
        **kwargs: Any,
    ) -> LLMResult:
        # 4) 转换PromptValue为消息列表
        messages_list = [p.to_messages() for p in prompts]

        # 5) 调用generate
        return self.generate(messages_list, stop=stop, callbacks=callbacks, **kwargs)

    def generate(
        self,
        messages: list[list[BaseMessage]],
        stop: list[str] | None = None,
        callbacks: Callbacks = None,
        **kwargs: Any,
    ) -> LLMResult:
        # 6) 创建回调管理器
        callback_manager = CallbackManager.configure(
            callbacks,
            self.callbacks,
            self.tags,
            self.metadata,
            self.verbose,
        )
        run_manager = callback_manager.on_chat_model_start(
            dumpd(self),
            messages,
        )

        # 7) 批量生成(带缓存)
        results = []
        for m in messages:
            result = self._generate_with_cache(
                m,
                stop=stop,
                run_manager=run_manager,
                **kwargs,
            )
            results.append(result)

        # 8) 合并结果
        llm_output = self._combine_llm_outputs([r.llm_output for r in results])
        generations = [r.generations for r in results]

        run_manager.on_llm_end(LLMResult(generations=generations, llm_output=llm_output))

        return LLMResult(generations=generations, llm_output=llm_output)

    def _generate_with_cache(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> ChatResult:
        # 9) 检查缓存
        llm_cache = _resolve_cache(cache=self.cache)
        if llm_cache:
            llm_string = self._get_llm_string(stop=stop, **kwargs)
            prompt = dumps(messages)
            cache_val = llm_cache.lookup(prompt, llm_string)
            if cache_val:
                return ChatResult(generations=cache_val)

        # 10) 缓存未命中,调用实际生成
        result = self._generate(messages, stop=stop, run_manager=run_manager, **kwargs)

        # 11) 写入缓存
        if llm_cache:
            llm_cache.update(prompt, llm_string, result.generations)

        return result

    @abstractmethod
    def _generate(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> ChatResult:
        """子类必须实现的核心生成方法"""
        ...

代码说明:

  1. 将输入规范化为消息列表
  2. 通过generate_prompt调用generate
  3. generate创建回调管理器并触发on_chat_model_start
  4. _generate_with_cache检查缓存,未命中则调用_generate
  5. _generate是子类实现的核心方法,实际调用模型API
  6. 结果经过缓存写入后返回
  7. 触发on_llm_end回调

调用链与上层函数

# 用户代码
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
response = model.invoke("讲一个笑话")

# 调用栈:
# 1. BaseChatModel.invoke
#    ├─ 2. _convert_input: 将str转换为[HumanMessage]
#    ├─ 3. generate_prompt: 包装为PromptValue
#    │    └─ 4. generate: 批量生成(此处只有1个)
#    │         └─ 5. _generate_with_cache: 检查缓存
#    │              ├─ 6. llm_cache.lookup: 查询缓存
#    │              └─ 7. ChatOpenAI._generate: 实际API调用
#    │                   ├─ 8. openai.chat.completions.create: OpenAI API
#    │                   └─ 9. _create_chat_result: 包装为ChatResult
#    └─ 10. 提取generations[0][0].message返回

# 在链中使用
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
chain = prompt | model

result = chain.invoke({"topic": "程序员"})
# prompt.invoke → 生成消息列表 → model.invoke → AIMessage

时序图

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Model as BaseChatModel
    participant Cache as LLMCache
    participant CB as CallbackManager
    participant Impl as ChatOpenAI
    participant API as OpenAI API

    User->>Model: invoke("讲一个笑话")
    Model->>Model: _convert_input → [HumanMessage]
    Model->>CB: on_chat_model_start(messages)

    Model->>Cache: lookup(messages, llm_string)
    Cache-->>Model: None (缓存未命中)

    Model->>Impl: _generate(messages)
    Impl->>Impl: 构造API请求参数
    Impl->>API: POST /v1/chat/completions
    Note over API: 模型推理<br/>耗时1-5秒
    API-->>Impl: {choices: [...], usage: {...}}

    Impl->>Impl: _create_chat_result
    Impl-->>Model: ChatResult(generations=[...])

    Model->>Cache: update(messages, llm_string, generations)
    Model->>CB: on_llm_end(result)

    Model-->>User: AIMessage(content="为什么...")

时序图说明:

图意: 展示聊天模型invoke方法从输入转换、缓存查询、API调用到返回响应的完整流程。

边界条件:

  • 缓存命中时跳过API调用,直接返回缓存结果
  • 回调异常不影响主流程(已捕获)
  • API超时由HTTP客户端控制,通常60-120秒
  • 并发调用共享缓存,需考虑缓存一致性

异常与回退:

  • API错误(如503)会抛出异常,可通过with_retry处理
  • 速率限制(429)通常由Provider实现自动重试
  • 缓存读写失败静默忽略(记录警告日志)

性能要点:

  • API调用占总耗时90%+,是主要瓶颈
  • 缓存可将重复请求响应时间降至<10ms
  • 回调系统异步执行,不阻塞主流程
  • Token计数在API响应中返回,无额外开销

API-2: BaseChatModel.stream

基本信息

  • 名称: stream
  • 方法签名: def stream(self, input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs) -> Iterator[AIMessageChunk]
  • 幂等性: 非幂等

功能说明

流式生成AI响应,逐个token返回,降低首字节延迟。

请求结构体

同invoke,参数相同。

响应结构体

# stream返回Iterator[AIMessageChunk]
class AIMessageChunk(BaseMessageChunk):
    content: str | list  # 增量内容
    tool_call_chunks: list[ToolCallChunk]  # 增量工具调用

入口函数与关键代码

class BaseChatModel(BaseLanguageModel):
    def stream(
        self,
        input: LanguageModelInput,
        config: RunnableConfig | None = None,
        *,
        stop: list[str] | None = None,
        **kwargs: Any,
    ) -> Iterator[AIMessageChunk]:
        config = ensure_config(config)
        messages = self._convert_input(input).to_messages()

        # 调用_stream_with_aggregation
        iterator = self._stream_with_aggregation(
            messages,
            stop=stop,
            config=config,
            **kwargs,
        )

        for chunk in iterator:
            yield chunk

    def _stream_with_aggregation(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        config: RunnableConfig | None = None,
        **kwargs: Any,
    ) -> Iterator[AIMessageChunk]:
        # 1) 创建回调管理器
        callback_manager = get_callback_manager_for_config(config)
        run_manager = callback_manager.on_chat_model_start(
            dumpd(self),
            [messages],
        )

        # 2) 调用_stream生成器
        generation: ChatGenerationChunk | None = None
        for chunk in self._stream(messages, stop=stop, run_manager=run_manager, **kwargs):
            # 3) 聚合chunk
            if generation is None:
                generation = chunk
            else:
                generation += chunk

            # 4) 触发on_llm_new_token回调
            if run_manager and isinstance(chunk.message.content, str):
                run_manager.on_llm_new_token(
                    chunk.message.content,
                    chunk=chunk,
                )

            # 5) yield消息chunk
            yield chunk.message

        # 6) 结束回调
        if run_manager and generation:
            run_manager.on_llm_end(ChatResult(generations=[generation]))

    def _stream(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        """子类可选实现流式生成

        默认实现:调用_generate并yield整个结果
        """
        result = self._generate(messages, stop=stop, run_manager=run_manager, **kwargs)
        yield ChatGenerationChunk(
            message=AIMessageChunk(content=result.generations[0].message.content)
        )

代码说明:

  1. stream调用_stream_with_aggregation处理流式逻辑
  2. _stream_with_aggregation聚合chunk并触发回调
  3. _stream是可选实现的生成器方法,默认非流式
  4. 子类重写_stream实现真正的流式输出
  5. 每个chunk触发on_llm_new_token回调

调用链与上层函数

# 流式输出示例
model = ChatOpenAI(model="gpt-4o", streaming=True)

for chunk in model.stream("讲一个长故事"):
    print(chunk.content, end="", flush=True)

# 在Web应用中流式响应
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    def generate():
        for chunk in model.stream(request.messages):
            yield f"data: {json.dumps({'content': chunk.content})}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

时序图

sequenceDiagram
    autonumber
    participant User
    participant Model as BaseChatModel
    participant Stream as _stream生成器
    participant API as OpenAI Stream API

    User->>Model: stream("讲故事")
    Model->>Stream: _stream(messages)
    Stream->>API: POST /v1/chat/completions<br/>(stream=True)

    loop 每个token
        API-->>Stream: chunk1: "从"
        Stream->>Stream: on_llm_new_token
        Stream-->>Model: AIMessageChunk("从")
        Model-->>User: yield chunk
        Note over User: 立即显示"从"

        API-->>Stream: chunk2: "前"
        Stream-->>Model: AIMessageChunk("前")
        Model-->>User: yield chunk
        Note over User: 立即显示"前"
    end

    API-->>Stream: [DONE]
    Stream->>Stream: on_llm_end
    Stream-->>Model: 生成器结束

时序图说明:

图意: 展示流式输出如何逐token返回,实现低延迟的用户体验。

边界条件:

  • 流式模式不使用缓存(缓存需要完整响应)
  • 客户端断开连接时,生成器自动停止
  • 某些模型不支持流式,会退化为非流式

性能要点:

  • 首token延迟(Time To First Token, TTFT): 通常200-800ms
  • 后续token延迟: 20-100ms per token
  • 总耗时与非流式相同或略高(多次网络往返)
  • 用户感知延迟显著降低

API-3: BaseChatModel.bind_tools

基本信息

  • 名称: bind_tools
  • 方法签名: def bind_tools(self, tools: Sequence[dict | type | Callable | BaseTool], *, tool_choice: str | dict | None = None, **kwargs) -> Runnable[LanguageModelInput, AIMessage]
  • 幂等性: 幂等(返回新的绑定Runnable)

功能说明

将工具绑定到模型,使模型能够请求调用工具(Function Calling)。返回新的Runnable而不修改原模型。

请求结构体

tools: Sequence[dict | type | Callable | BaseTool]  # 工具列表
tool_choice: str | dict | None  # 工具选择策略: "auto" | "any" | "none" | {"name": "tool_name"}
**kwargs: Any  # 额外参数
参数 类型 必填 默认值 说明
tools Sequence - 工具列表,支持多种格式
tool_choice str|dict|None None 工具选择策略

响应结构体

Runnable[LanguageModelInput, AIMessage]  # 绑定了工具的Runnable

入口函数与关键代码

class BaseChatModel(BaseLanguageModel):
    def bind_tools(
        self,
        tools: Sequence[dict | type | Callable | BaseTool],
        *,
        tool_choice: str | dict | None = None,
        **kwargs: Any,
    ) -> Runnable[LanguageModelInput, AIMessage]:
        # 1) 转换工具为标准格式
        formatted_tools = [self._convert_tool(tool) for tool in tools]

        # 2) 构造tools参数(模型特定格式)
        # 对于OpenAI: {"tools": [...], "tool_choice": ...}
        # 对于Anthropic: {"tools": [...]}
        tools_kwargs = self._bind_tools_kwargs(
            formatted_tools,
            tool_choice=tool_choice,
        )

        # 3) 使用bind绑定参数
        return self.bind(**tools_kwargs, **kwargs)

    def _convert_tool(self, tool: dict | type | Callable | BaseTool) -> dict:
        """将工具转换为OpenAI工具格式"""
        if isinstance(tool, dict):
            return tool
        elif isinstance(tool, type) and is_basemodel_subclass(tool):
            # Pydantic模型 → JSON Schema
            return convert_to_openai_tool(tool)
        elif isinstance(tool, BaseTool):
            # LangChain Tool → OpenAI工具
            return convert_to_openai_tool(tool)
        elif callable(tool):
            # 函数 → OpenAI工具
            return convert_to_openai_tool(tool)
        else:
            msg = f"Unsupported tool type: {type(tool)}"
            raise ValueError(msg)

    def _bind_tools_kwargs(
        self,
        tools: list[dict],
        tool_choice: str | dict | None = None,
    ) -> dict:
        """子类重写以适配模型特定格式"""
        # OpenAI格式
        kwargs = {"tools": tools}
        if tool_choice:
            kwargs["tool_choice"] = tool_choice
        return kwargs

代码说明:

  1. 将各种格式工具转换为统一的字典格式
  2. _bind_tools_kwargs生成模型特定的参数
  3. 使用bind方法绑定参数,返回新Runnable
  4. 绑定后的模型在调用时自动包含工具信息

调用链与上层函数

from langchain_core.tools import tool

# 定义工具
@tool
def get_weather(location: str) -> str:
    """获取指定地点的天气信息

    Args:
        location: 地点名称,如"北京"
    """
    # 实际实现...
    return f"{location}的天气是晴天,25度"

@tool
def search_web(query: str) -> str:
    """搜索互联网

    Args:
        query: 搜索查询
    """
    # 实际实现...
    return f"搜索结果: {query}"

# 绑定工具到模型
model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools([get_weather, search_web])

# 调用
response = model_with_tools.invoke("北京天气怎么样?")
print(response.tool_calls)
# [{'name': 'get_weather', 'args': {'location': '北京'}, 'id': 'call_123'}]

# Agent模式:自动执行工具
from langgraph.prebuilt import create_react_agent

agent = create_react_agent(model, [get_weather, search_web])
result = agent.invoke({"messages": [("user", "北京天气怎么样?")]})

时序图

sequenceDiagram
    autonumber
    participant User
    participant ModelWithTools as 绑定工具的模型
    participant API as LLM API

    User->>ModelWithTools: invoke("北京天气?")
    ModelWithTools->>API: POST /chat/completions<br/>{messages:[...], tools:[...]}

    Note over API: 模型分析输入<br/>决定调用工具

    API-->>ModelWithTools: AIMessage(<br/>  tool_calls=[{<br/>    name: "get_weather",<br/>    args: {"location": "北京"}<br/>  }]<br/>)

    ModelWithTools-->>User: AIMessage (包含tool_calls)

    Note over User: 用户代码执行工具<br/>并将结果发回

    User->>ModelWithTools: invoke([<br/>  HumanMessage("北京天气?"),<br/>  AIMessage(tool_calls=[...]),<br/>  ToolMessage("晴天25度")<br/>])

    ModelWithTools->>API: POST /chat/completions
    API-->>ModelWithTools: AIMessage("北京今天晴天...")
    ModelWithTools-->>User: 最终答案

时序图说明:

图意: 展示工具调用的完整流程,从模型请求工具到执行工具再到最终答案。

边界条件:

  • 不是所有模型都支持工具调用(如GPT-3.5-turbo-instruct)
  • tool_choice=“any"强制模型必须调用工具
  • tool_choice=“none"禁止工具调用(即使绑定了工具)

异常与回退:

  • 工具执行失败应返回ToolMessage说明错误
  • 模型可能请求不存在的工具(需验证)
  • 工具参数可能不符合schema(需验证)

API-4: BaseChatModel.with_structured_output

基本信息

  • 名称: with_structured_output
  • 方法签名: def with_structured_output(self, schema: type[T] | dict, *, method: Literal["function_calling", "json_mode"] = "function_calling", include_raw: bool = False, **kwargs) -> Runnable[LanguageModelInput, T]
  • 幂等性: 幂等

功能说明

强制模型输出特定Pydantic模型格式,自动解析和验证输出。

请求结构体

schema: type[T] | dict  # Pydantic模型类或JSON Schema
method: Literal["function_calling", "json_mode"]  # 实现方法
include_raw: bool  # 是否包含原始AIMessage
**kwargs: Any  # 额外参数
参数 类型 必填 默认值 说明
schema type[T]|dict - 输出schema
method str “function_calling” 实现方法
include_raw bool False 是否返回原始消息

响应结构体

Runnable[LanguageModelInput, T]  # 输出为schema指定的类型T
# 或 Runnable[LanguageModelInput, dict[str, T | AIMessage]] 当include_raw=True

入口函数与关键代码

class BaseChatModel(BaseLanguageModel):
    def with_structured_output(
        self,
        schema: type[T] | dict,
        *,
        method: Literal["function_calling", "json_mode"] = "function_calling",
        include_raw: bool = False,
        **kwargs: Any,
    ) -> Runnable[LanguageModelInput, T]:
        # 1) 根据method选择实现策略
        if method == "function_calling":
            # 使用bind_tools + 解析器
            if isinstance(schema, type):
                # Pydantic模型
                tool_name = schema.__name__
                tool = convert_to_openai_tool(schema)
                llm = self.bind_tools([tool], tool_choice=tool_name, **kwargs)
                output_parser = PydanticToolsParser(tools=[schema], first_tool_only=True)
            else:
                # JSON Schema字典
                llm = self.bind_tools([schema], tool_choice=schema["name"], **kwargs)
                output_parser = JsonOutputKeyToolsParser(key_name=schema["name"], first_tool_only=True)

        elif method == "json_mode":
            # 使用JSON模式 + 提示词约束
            llm = self.bind(response_format={"type": "json_object"}, **kwargs)
            if isinstance(schema, type):
                output_parser = JsonOutputParser(pydantic_object=schema)
            else:
                output_parser = JsonOutputParser()

        else:
            msg = f"Unsupported method: {method}"
            raise ValueError(msg)

        # 2) 构建链
        if include_raw:
            # 返回 {"raw": AIMessage, "parsed": T}
            return RunnableMap({
                "raw": llm,
                "parsed": llm | output_parser,
            })
        else:
            # 只返回解析后的T
            return llm | output_parser

代码说明:

  1. function_calling方法使用工具调用实现强约束
  2. json_mode方法使用JSON模式+提示词实现弱约束
  3. 链式组合模型和输出解析器
  4. include_raw时返回原始消息和解析结果

调用链与上层函数

from pydantic import BaseModel, Field

# 定义输出schema
class Joke(BaseModel):
    setup: str = Field(description="笑话的铺垫")
    punchline: str = Field(description="笑点/包袱")
    rating: int = Field(description="有趣程度1-10")

# 创建结构化输出模型
model = ChatOpenAI(model="gpt-4o")
structured_model = model.with_structured_output(Joke)

# 调用,自动解析为Joke对象
joke = structured_model.invoke("讲一个程序员的笑话")
print(f"铺垫: {joke.setup}")
print(f"包袱: {joke.punchline}")
print(f"评分: {joke.rating}")

# 获取原始消息
structured_model_with_raw = model.with_structured_output(Joke, include_raw=True)
result = structured_model_with_raw.invoke("讲一个笑话")
print(result["parsed"])  # Joke对象
print(result["raw"])     # AIMessage

时序图

sequenceDiagram
    autonumber
    participant User
    participant Chain as structured_model
    participant LLM as bind_tools(schema)
    participant Parser as PydanticParser

    User->>Chain: invoke("讲笑话")
    Chain->>LLM: invoke(input)
    LLM->>LLM: 生成响应<br/>(tool_calls格式)
    LLM-->>Chain: AIMessage(tool_calls=[{<br/>  name: "Joke",<br/>  args: {<br/>    setup: "...",<br/>    punchline: "...",<br/>    rating: 8<br/>  }<br/>}])

    Chain->>Parser: parse(AIMessage)
    Parser->>Parser: 提取tool_calls[0].args
    Parser->>Parser: 验证Pydantic模型
    Parser-->>Chain: Joke(setup="...", punchline="...", rating=8)

    Chain-->>User: Joke对象

时序图说明:

图意: 展示结构化输出如何通过工具调用+解析器强制输出特定格式。

边界条件:

  • 模型不保证100%遵循schema(可能生成不符合的内容)
  • 解析失败会抛出OutputParserException
  • json_mode比function_calling约束更弱但更灵活

性能要点:

  • function_calling额外overhead: ~5-10ms(工具参数构造)
  • 解析和验证overhead: ~1-5ms
  • 相比自由文本,token使用量略高(tool_calls格式)

典型使用场景与时序图

场景1: 简单问答

model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke("什么是量子计算?")
print(response.content)
sequenceDiagram
    User->>Model: invoke("什么是量子计算?")
    Model->>OpenAI API: POST /chat/completions
    OpenAI API-->>Model: AIMessage("量子计算是...")
    Model-->>User: "量子计算是..."

场景2: 多轮对话

messages = [
    HumanMessage("我叫Alice"),
    AIMessage("你好Alice,有什么可以帮你的?"),
    HumanMessage("我叫什么名字?"),
]

response = model.invoke(messages)
# AIMessage("你叫Alice")
sequenceDiagram
    User->>Model: invoke([<br/>  HumanMessage("我叫Alice"),<br/>  AIMessage("你好..."),<br/>  HumanMessage("我叫什么?")<br/>])
    Model->>OpenAI API: POST (包含完整对话历史)
    Note over OpenAI API: 模型根据上下文<br/>记住Alice
    OpenAI API-->>Model: AIMessage("你叫Alice")
    Model-->>User: "你叫Alice"

场景3: Agent工具调用

# 定义工具
@tool
def calculator(expression: str) -> float:
    """计算数学表达式"""
    return eval(expression)

# 绑定工具
model_with_tools = model.bind_tools([calculator])

# Agent循环
messages = [HumanMessage("123 * 456等于多少?")]

while True:
    response = model_with_tools.invoke(messages)
    messages.append(response)

    if not response.tool_calls:
        # 没有工具调用,结束
        print(response.content)
        break

    # 执行工具
    for tool_call in response.tool_calls:
        result = calculator.invoke(tool_call["args"])
        messages.append(ToolMessage(content=str(result), tool_call_id=tool_call["id"]))
sequenceDiagram
    autonumber
    participant User
    participant Agent
    participant Model
    participant Tool as Calculator

    User->>Agent: "123 * 456等于多少?"

    Agent->>Model: invoke(messages)
    Model-->>Agent: AIMessage(tool_calls=[{name: "calculator", args: {"expression": "123 * 456"}}])

    Agent->>Tool: calculator.invoke({"expression": "123 * 456"})
    Tool-->>Agent: 56088

    Agent->>Model: invoke(messages + ToolMessage("56088"))
    Model-->>Agent: AIMessage("123 * 456等于56088")

    Agent-->>User: "123 * 456等于56088"

关键配置与最佳实践

缓存配置

from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache

# 1. 全局内存缓存
set_llm_cache(InMemoryCache())

model = ChatOpenAI(model="gpt-4o")
model.invoke("你好")  # 调用API
model.invoke("你好")  # 从缓存返回,0延迟

# 2. Redis缓存
from langchain_community.cache import RedisCache
import redis

redis_client = redis.Redis(host="localhost", port=6379)
set_llm_cache(RedisCache(redis_=redis_client))

# 3. 针对特定模型禁用缓存
model_no_cache = ChatOpenAI(model="gpt-4o", cache=False)

流式输出最佳实践

# 1. SSE(Server-Sent Events)流式响应
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(message: str):
    async def event_generator():
        async for chunk in model.astream(message):
            yield f"data: {json.dumps({'content': chunk.content})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

# 2. WebSocket流式
from fastapi import WebSocket

@app.websocket("/ws/chat")
async def chat_ws(websocket: WebSocket):
    await websocket.accept()
    message = await websocket.receive_text()

    async for chunk in model.astream(message):
        await websocket.send_json({"content": chunk.content})

    await websocket.close()

工具调用最佳实践

# 1. 工具描述清晰
@tool
def search_database(query: str, limit: int = 10) -> list[dict]:
    """在数据库中搜索相关记录

    Args:
        query: 搜索关键词,支持模糊匹配
        limit: 返回结果数量上限,默认10,最大100

    Returns:
        包含匹配记录的列表,每条记录是一个字典
    """
    ...

# 2. 处理工具调用错误
def execute_tool_call(tool_call, tools_map):
    try:
        tool = tools_map[tool_call["name"]]
        result = tool.invoke(tool_call["args"])
        return ToolMessage(
            content=str(result),
            tool_call_id=tool_call["id"],
        )
    except Exception as e:
        return ToolMessage(
            content=f"工具执行失败: {str(e)}",
            tool_call_id=tool_call["id"],
            is_error=True,
        )

# 3. 并行执行多个工具调用
import asyncio

async def execute_tool_calls_parallel(tool_calls, tools_map):
    tasks = [
        tools_map[tc["name"]].ainvoke(tc["args"])
        for tc in tool_calls
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    messages = []
    for tc, result in zip(tool_calls, results):
        if isinstance(result, Exception):
            content = f"错误: {result}"
        else:
            content = str(result)
        messages.append(ToolMessage(content=content, tool_call_id=tc["id"]))

    return messages

结构化输出最佳实践

from pydantic import BaseModel, Field, field_validator

# 1. 详细的Field描述
class UserProfile(BaseModel):
    """用户画像"""

    name: str = Field(description="用户姓名")
    age: int = Field(description="年龄,必须18-120之间", ge=18, le=120)
    interests: list[str] = Field(description="兴趣爱好列表,至少1个")
    personality: str = Field(description="性格特征,简短描述")

    @field_validator("interests")
    def validate_interests(cls, v):
        if len(v) == 0:
            raise ValueError("至少需要1个兴趣爱好")
        return v

# 2. 处理解析失败
from langchain_core.exceptions import OutputParserException

structured_model = model.with_structured_output(UserProfile)

try:
    result = structured_model.invoke("分析用户: 张三, 25岁, 喜欢编程")
except OutputParserException as e:
    print(f"解析失败: {e}")
    # 回退策略:使用自由文本输出
    fallback_result = model.invoke("分析用户...")

# 3. 链式处理
extraction_chain = (
    {"text": RunnablePassthrough()}
    | ChatPromptTemplate.from_template("从以下文本提取用户信息:\n{text}")
    | model.with_structured_output(UserProfile)
)

user = extraction_chain.invoke("我是Alice,30岁,喜欢阅读和旅行")

错误处理与重试

from openai import APIError, RateLimitError, Timeout

# 1. 重试配置
model_with_retry = model.with_retry(
    retry_if_exception_type=(APIError, RateLimitError, Timeout),
    stop_after_attempt=5,
    wait_exponential_jitter=True,
)

# 2. Fallback到备用模型
primary = ChatOpenAI(model="gpt-4o").with_retry(stop_after_attempt=3)
fallback = ChatOpenAI(model="gpt-4o-mini")

robust_model = primary.with_fallbacks([fallback])

# 3. 超时控制
result = model.invoke(
    "长任务...",
    config={"timeout": 30}  # 30秒超时
)

成本优化

# 1. 使用缓存减少重复调用
set_llm_cache(RedisCache(redis_client))

# 2. 选择合适的模型
cheap_model = ChatOpenAI(model="gpt-4o-mini")  # 便宜但能力稍弱
expensive_model = ChatOpenAI(model="gpt-4o")   # 昂贵但能力强

# 简单任务用便宜模型
result = cheap_model.invoke("今天星期几?")

# 复杂任务用昂贵模型
result = expensive_model.invoke("写一篇论文...")

# 3. 监控Token使用
result = model.invoke("...")
if result.usage_metadata:
    cost = (
        result.usage_metadata.input_tokens * 0.03 / 1000 +
        result.usage_metadata.output_tokens * 0.06 / 1000
    )
    print(f"本次调用花费: ${cost:.4f}")

性能优化指南

调用性能优化

1. 启用缓存

from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache

# 全局启用内存缓存
set_llm_cache(InMemoryCache())

model = ChatOpenAI(model="gpt-4o")
result = model.invoke("什么是量子计算?")  # 1-5秒
result = model.invoke("什么是量子计算?")  # <10ms (缓存命中)

性能提升:

  • 延迟: 100-1000倍提升
  • 成本: 100%节省
  • 适用场景: 重复查询、测试环境、演示

2. 使用流式输出

model = ChatOpenAI(model="gpt-4o", streaming=True)

# 用户立即看到输出
for chunk in model.stream("写一篇文章"):
    print(chunk.content, end="", flush=True)

性能提升:

  • 首字节延迟: 从3-5秒降至200-800ms
  • 用户体验: 显著改善
  • 总耗时: 相同或略高

3. 批处理

# 批量调用,充分利用缓存
inputs = ["问题1", "问题2", "问题3"]
results = model.batch(inputs)

# 支持并发
results = model.batch(inputs, config={"max_concurrency": 3})

性能提升:

  • 缓存利用率更高
  • 减少回调开销
  • 部分Provider支持批量API

4. 异步并发

import asyncio

async def process_queries(queries):
    tasks = [model.ainvoke(q) for q in queries]
    return await asyncio.gather(*tasks)

# 10个查询并发执行
results = await process_queries([f"问题{i}" for i in range(10)])

性能提升:

  • 10个请求从50秒降至5秒(10倍提升)
  • CPU利用率更高
  • 适合I/O密集型任务

成本优化策略

1. 模型选择

# 简单任务使用便宜模型
cheap_model = ChatOpenAI(model="gpt-4o-mini")  # $0.15/1M tokens
expensive_model = ChatOpenAI(model="gpt-4o")   # $5/1M tokens

# 路由策略
def route_query(query: str):
    if is_simple(query):
        return cheap_model.invoke(query)
    else:
        return expensive_model.invoke(query)

2. Prompt优化

# 减少输入token
# ❌ 长提示
prompt = "请详细分析以下文本,包括主题、情感、关键词...(500 words)"

# ✅ 精简提示
prompt = "分析文本: 主题、情感、关键词"

# 减少输出token
model.invoke(prompt, max_tokens=100)  # 限制输出长度

3. 缓存策略

from langchain_community.cache import RedisCache
import redis

# 持久化缓存,多实例共享
redis_client = redis.Redis(host="localhost", port=6379)
set_llm_cache(RedisCache(redis_=redis_client))

4. 成本监控

from langchain_core.callbacks import BaseCallbackHandler

class CostTracker(BaseCallbackHandler):
    def __init__(self):
        self.total_tokens = 0
        self.total_cost = 0.0

    def on_llm_end(self, response, **kwargs):
        if response.llm_output and "token_usage" in response.llm_output:
            usage = response.llm_output["token_usage"]
            self.total_tokens += usage.get("total_tokens", 0)

            # GPT-4o价格: input $5/1M, output $15/1M
            input_cost = usage.get("prompt_tokens", 0) * 5 / 1_000_000
            output_cost = usage.get("completion_tokens", 0) * 15 / 1_000_000
            self.total_cost += input_cost + output_cost

            print(f"本次调用: {usage.get('total_tokens', 0)} tokens, ${input_cost + output_cost:.4f}")
            print(f"累计: {self.total_tokens} tokens, ${self.total_cost:.2f}")

tracker = CostTracker()
model = ChatOpenAI(model="gpt-4o", callbacks=[tracker])

故障排查指南

常见错误1: 速率限制(429 Too Many Requests)

症状:

openai.RateLimitError: Rate limit reached for gpt-4o in organization...

解决方案:

from langchain_core.rate_limiters import InMemoryRateLimiter

# 方案1: 添加速率限制器
rate_limiter = InMemoryRateLimiter(
    requests_per_second=10,
    check_every_n_seconds=0.1,
)
model = ChatOpenAI(model="gpt-4o", rate_limiter=rate_limiter)

# 方案2: 添加重试机制
from tenacity import retry, stop_after_attempt, wait_exponential

model_with_retry = model.with_retry(
    retry_if_exception_type=(openai.RateLimitError,),
    stop_after_attempt=5,
    wait_exponential_jitter=True,
)

# 方案3: 使用Fallback模型
fallback_model = ChatOpenAI(model="gpt-4o-mini")
robust_model = model.with_fallbacks([fallback_model])

常见错误2: 超时(Timeout)

症状:

openai.APITimeoutError: Request timed out

解决方案:

# 方案1: 增加超时时间
model = ChatOpenAI(model="gpt-4o", request_timeout=120)

# 方案2: 使用流式输出(更快获得首字节)
for chunk in model.stream(input):
    print(chunk.content, end="")

# 方案3: 在config中设置超时
model.invoke(input, config={"timeout": 60})

常见错误3: 缓存问题

症状:

ValueError: Asked to cache, but no cache found at `langchain.cache`.

解决方案:

# 方案1: 显式禁用缓存
model = ChatOpenAI(model="gpt-4o", cache=False)

# 方案2: 设置全局缓存
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
set_llm_cache(InMemoryCache())

# 方案3: 使用实例级缓存
from langchain_core.caches import InMemoryCache
cache = InMemoryCache()
model = ChatOpenAI(model="gpt-4o", cache=cache)

常见错误4: 工具调用解析失败

症状:

OutputParserException: Failed to parse tool calls from response

解决方案:

# 方案1: 使用include_raw获取原始响应
structured_model = model.with_structured_output(
    schema=MySchema,
    include_raw=True,
)

result = structured_model.invoke(input)
if "parsing_error" in result and result["parsing_error"]:
    print(f"解析失败: {result['parsing_error']}")
    print(f"原始响应: {result['raw']}")

# 方案2: 添加重试逻辑
from langchain_core.runnables import RunnablePassthrough

def retry_on_parse_error(input_data):
    for i in range(3):
        try:
            return structured_model.invoke(input_data["input"])
        except OutputParserException as e:
            if i == 2:  # 最后一次尝试
                raise
            # 添加更明确的提示
            input_data["input"] += "\n请严格按照指定的JSON格式返回"
            continue

chain = {"input": RunnablePassthrough()} | retry_on_parse_error

常见错误5: 回调异常

症状:

Error in callback handler: ...

解决方案:

# 回调中的异常不会中断主流程,但会记录警告
# 确保回调处理器正确实现

class SafeCallback(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        try:
            # 回调逻辑
            self.log(serialized, prompts)
        except Exception as e:
            # 不要让回调异常传播
            logger.warning(f"Callback error: {e}")

调试技巧

1. 启用详细日志

import logging

# 启用LangChain日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("langchain")
logger.setLevel(logging.DEBUG)

# 启用模型verbose模式
model = ChatOpenAI(model="gpt-4o", verbose=True)

2. 使用LangSmith追踪

import os

# 配置LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"

# 自动追踪所有调用
model = ChatOpenAI(model="gpt-4o")
result = model.invoke("测试")

# 在LangSmith Dashboard中查看完整追踪

3. 自定义回调追踪

class DebugCallback(BaseCallbackHandler):
    def on_chat_model_start(self, serialized, messages, **kwargs):
        print(f"[START] Model: {serialized.get('name')}")
        print(f"[START] Messages: {messages}")
        print(f"[START] Params: {kwargs}")

    def on_llm_end(self, response, **kwargs):
        print(f"[END] Generations: {len(response.generations)}")
        if response.llm_output:
            print(f"[END] Usage: {response.llm_output.get('token_usage')}")

    def on_llm_error(self, error, **kwargs):
        print(f"[ERROR] {type(error).__name__}: {error}")

model = ChatOpenAI(model="gpt-4o", callbacks=[DebugCallback()])

4. 检查中间状态

# 在链中插入检查点
from langchain_core.runnables import RunnableLambda

def inspect(x):
    print(f"中间值: {x}")
    return x

chain = (
    {"input": RunnablePassthrough()}
    | RunnableLambda(inspect)
    | model
    | RunnableLambda(inspect)
)

result = chain.invoke("测试")

最佳实践总结

DO ✅

  1. 使用缓存: 测试和开发环境必备
  2. 流式输出: 改善用户体验
  3. 批处理: 多个输入一起处理
  4. 异步并发: I/O密集型任务
  5. 监控成本: 生产环境必须监控token使用
  6. 错误处理: 添加重试和fallback
  7. 类型提示: 所有模型配置使用类型提示
  8. 清晰的Prompt: 精简、明确的提示
  9. 合理的超时: 设置合理的超时时间
  10. 日志和追踪: 生产环境启用LangSmith

DON’T ❌

  1. 不要在循环中同步调用: 使用batch或异步
  2. 不要忽略缓存: 重复查询浪费成本
  3. 不要使用过长的Prompt: 控制token使用
  4. 不要忽略错误: 添加错误处理
  5. 不要在生产环境使用verbose: 影响性能
  6. 不要硬编码API密钥: 使用环境变量
  7. 不要忽略速率限制: 添加速率限制器
  8. 不要滥用昂贵模型: 简单任务用便宜模型
  9. 不要忽略流式的价值: 长输出必须流式
  10. 不要跳过测试: 充分测试错误场景