LangChain-10-Callbacks模块

模块概览

职责与定位

Callbacks模块提供了LangChain的可观测性和监控基础设施。通过回调机制,开发者可以在LLM调用、工具执行、链执行等关键节点注入自定义逻辑,实现日志记录、性能监控、调试追踪等功能。

核心职责:

  • 事件分发: 定义标准化回调事件接口,覆盖LLM/Chain/Tool/Retriever生命周期
  • 回调管理: 提供CallbackManager统一管理多个回调处理器
  • 运行追踪: 通过RunManager为每次调用分配唯一run_id,支持父子运行关联
  • 异步支持: 提供完整异步回调机制,适配async/await场景
  • LangSmith集成: 内置LangChainTracer,自动上报到LangSmith平台
  • 流式处理: 支持on_llm_new_token实时流式输出
  • 错误隔离: 回调错误不影响主流程,可选raise_error控制

输入输出

回调事件分类:

事件类型 触发时机 主要参数 返回值
on_llm_start LLM调用前 serialized, prompts, run_id None
on_llm_new_token 流式生成token token, chunk, run_id None
on_llm_end LLM调用后 response, run_id None
on_llm_error LLM异常 error, run_id None
on_chat_model_start ChatModel调用前 serialized, messages, run_id None
on_chain_start 链执行前 serialized, inputs, run_id None
on_chain_end 链执行后 outputs, run_id None
on_chain_error 链执行异常 error, run_id None
on_tool_start 工具调用前 serialized, input_str, run_id None
on_tool_end 工具调用后 output, run_id None
on_tool_error 工具异常 error, run_id None
on_retriever_start 检索开始 serialized, query, run_id None
on_retriever_end 检索完成 documents, run_id None
on_agent_action Agent动作决策 action, run_id None
on_agent_finish Agent完成 finish, run_id None
on_text 中间文本输出 text, run_id None
on_retry 重试事件 retry_state, run_id None
on_custom_event 自定义事件 name, data, run_id None

上下游依赖

依赖模块:

  • typing: 类型定义
  • abc: 抽象基类
  • uuid: 运行唯一标识符
  • asyncio: 异步事件处理
  • concurrent.futures.ThreadPoolExecutor: 同步回调异步化
  • langsmith.run_helpers: LangSmith追踪上下文
  • langchain_core.tracers: 内置追踪器(LangChainTracer/ConsoleCallbackHandler)

被依赖模块:

  • langchain_core.runnables: 所有Runnable通过_call_with_config集成回调
  • langchain_core.language_models: BaseChatModel.generate触发on_chat_model_start
  • langchain_core.tools: BaseTool._run通过RunManager触发工具回调
  • langchain_classic.agents: AgentExecutor通过回调追踪多步推理
  • langchain_core.retrievers: BaseRetriever触发on_retriever_start/end

模块整体架构图

flowchart TB
    subgraph Application["应用层"]
        USER[User Code]
        RUNNABLE[Runnable<br/>prompt\|model\|parser]
        LLM[BaseChatModel]
        TOOL[BaseTool]
        RETRIEVER[BaseRetriever]
    end

    subgraph CallbackCore["Callbacks核心层"]
        subgraph Base["基类层"]
            BASE_CALLBACK[BaseCallbackHandler<br/>回调基类<br/>----------<br/>ignore_llm/chain/agent<br/>raise_error/run_inline]
            BASE_MANAGER[BaseCallbackManager<br/>管理器基类<br/>----------<br/>handlers/tags/metadata<br/>add_handler/merge]
        end

        subgraph Managers["管理器层"]
            CALLBACK_MANAGER[CallbackManager<br/>同步管理器<br/>----------<br/>configure/on_*_start]
            ASYNC_MANAGER[AsyncCallbackManager<br/>异步管理器<br/>----------<br/>await on_*_start]
        end

        subgraph RunManagers["运行管理器层"]
            PARENT_RUN[ParentRunManager<br/>父运行管理器<br/>----------<br/>get_child]
            LLM_RUN[CallbackManagerForLLMRun<br/>----------<br/>on_llm_new_token/end/error]
            CHAIN_RUN[CallbackManagerForChainRun<br/>----------<br/>on_chain_end/error<br/>on_text]
            TOOL_RUN[CallbackManagerForToolRun<br/>----------<br/>on_tool_end/error]
            RETRIEVER_RUN[CallbackManagerForRetrieverRun<br/>----------<br/>on_retriever_end/error]
        end
    end

    subgraph Handlers["处理器实现层"]
        STDOUT[StdOutCallbackHandler<br/>控制台输出]
        FILE[FileCallbackHandler<br/>文件日志]
        STREAMING[StreamingStdOutCallbackHandler<br/>流式标准输出]
        LANGSMITH[LangChainTracer<br/>LangSmith追踪]
        CONSOLE[ConsoleCallbackHandler<br/>调试输出]
        CUSTOM[CustomHandler<br/>用户自定义]
    end

    subgraph Config["配置传递"]
        RCONFIG[RunnableConfig<br/>----------<br/>callbacks: list[Handler]<br/>tags/metadata/run_id]
    end

    %% 应用层调用
    USER -->|invoke| RUNNABLE
    USER -->|generate| LLM
    USER -->|run| TOOL
    USER -->|get_relevant_documents| RETRIEVER
    USER -.->|config| RCONFIG

    %% Runnable集成
    RUNNABLE -->|_call_with_config| CALLBACK_MANAGER
    LLM -->|generate| CALLBACK_MANAGER
    TOOL -->|_run| CALLBACK_MANAGER
    RETRIEVER -->|_get_relevant_documents| CALLBACK_MANAGER

    %% 配置流转
    RCONFIG -.->|get_callback_manager_for_config| CALLBACK_MANAGER

    %% 管理器继承
    BASE_MANAGER <|-- CALLBACK_MANAGER
    BASE_MANAGER <|-- ASYNC_MANAGER

    %% 运行管理器创建
    CALLBACK_MANAGER -->|on_llm_start| LLM_RUN
    CALLBACK_MANAGER -->|on_chain_start| PARENT_RUN
    CALLBACK_MANAGER -->|on_tool_start| TOOL_RUN
    CALLBACK_MANAGER -->|on_retriever_start| RETRIEVER_RUN

    PARENT_RUN -->|get_child| CHAIN_RUN

    %% 事件分发
    LLM_RUN -.->|handle_event| BASE_CALLBACK
    CHAIN_RUN -.->|handle_event| BASE_CALLBACK
    TOOL_RUN -.->|handle_event| BASE_CALLBACK
    RETRIEVER_RUN -.->|handle_event| BASE_CALLBACK

    %% 处理器实现
    BASE_CALLBACK <|-- STDOUT
    BASE_CALLBACK <|-- FILE
    BASE_CALLBACK <|-- STREAMING
    BASE_CALLBACK <|-- LANGSMITH
    BASE_CALLBACK <|-- CONSOLE
    BASE_CALLBACK <|-- CUSTOM

    %% 处理器注册
    CALLBACK_MANAGER o--o STDOUT
    CALLBACK_MANAGER o--o FILE
    CALLBACK_MANAGER o--o STREAMING
    CALLBACK_MANAGER o--o LANGSMITH
    CALLBACK_MANAGER o--o CONSOLE
    CALLBACK_MANAGER o--o CUSTOM

    style BASE_CALLBACK fill:#e1f5ff
    style CALLBACK_MANAGER fill:#fff4e1
    style LANGSMITH fill:#e8f5e9
    style RCONFIG fill:#ffe1f5
    style USER fill:#f0f0f0

架构层次说明

1. 应用层 (Application Layer)

应用层是用户代码与LangChain组件的交互点:

  • User Code: 用户业务代码,通过invoke/generate/run等方法调用LangChain组件
  • Runnable: LangChain核心抽象,所有可执行组件的基类(链/模型/工具等)
  • BaseChatModel: 聊天模型基类,封装LLM调用
  • BaseTool: 工具基类,封装外部API/函数调用
  • BaseRetriever: 检索器基类,封装向量数据库查询

关键点:

  • 所有组件都接受RunnableConfig参数传递回调配置
  • 用户可通过config={"callbacks": [handler]}注入自定义回调
  • 组件内部自动触发对应生命周期事件

2. Callbacks核心层 (Callback Core Layer)

核心层提供回调机制的完整实现:

基类层 (Base):

  • BaseCallbackHandler: 定义20+回调方法(on_llm_start/on_chain_end等)
    • ignore_*: 控制忽略特定事件类型
    • raise_error: 控制回调异常是否上抛
    • run_inline: 控制异步回调执行策略
  • BaseCallbackManager: 管理多个handler,支持继承/合并
    • handlers: 当前运行的处理器列表
    • inheritable_handlers: 可传递给子运行的处理器
    • tags/metadata: 支持标签和元数据传递

管理器层 (Managers):

  • CallbackManager: 同步回调管理器
    • configure(): 从配置创建管理器(合并全局/局部回调)
    • on_*_start(): 触发开始事件并返回RunManager
  • AsyncCallbackManager: 异步回调管理器
    • 所有方法返回Awaitable,支持async/await
    • 使用asyncio.gather并发执行多个handler

运行管理器层 (Run Managers):

  • ParentRunManager: 父运行管理器,可创建子管理器
    • get_child(): 创建子CallbackManager(继承handlers/tags)
  • CallbackManagerForLLMRun: LLM专用运行管理器
    • on_llm_new_token(): 流式token回调
    • on_llm_end(): LLM完成回调
  • CallbackManagerForChainRun: 链专用运行管理器
    • on_chain_end(): 链完成回调
    • on_text(): 中间文本输出
  • CallbackManagerForToolRun: 工具专用运行管理器
  • CallbackManagerForRetrieverRun: 检索器专用运行管理器

关键机制:

  • 运行ID追踪: 每次运行分配UUID,支持父子运行关联
  • 事件分发: handle_event()函数遍历handlers并调用对应方法
  • 异常隔离: try-catch包裹每个handler调用,错误不影响主流程
  • 同步异步桥接: 同步环境中通过ThreadPoolExecutor执行异步handler

3. 处理器实现层 (Handler Implementation Layer)

处理器层提供具体的回调实现:

内置处理器:

  • StdOutCallbackHandler: 打印关键事件到标准输出(调试用)
  • StreamingStdOutCallbackHandler: 实时打印LLM流式token
  • FileCallbackHandler: 将事件写入日志文件
  • LangChainTracer: 上报到LangSmith平台(生产追踪)
  • ConsoleCallbackHandler: 格式化输出到控制台(开发调试)

自定义处理器:

  • 继承BaseCallbackHandler
  • 重写感兴趣的事件方法
  • 通过callbacks=[handler]传入

4. 配置传递 (Configuration Layer)

RunnableConfig是回调配置的载体:

{
    "callbacks": [handler1, handler2],  # 处理器列表
    "tags": ["production", "v2"],       # 标签(用于过滤/分组)
    "metadata": {"user_id": "123"},     # 元数据(附加上下文)
    "run_id": uuid4(),                   # 可选: 指定运行ID
    "run_name": "custom_chain",          # 可选: 运行名称
}

配置合并策略:

  1. 全局配置: set_llm_cache()/环境变量LANGCHAIN_TRACING_V2
  2. 组件配置: ChatOpenAI(callbacks=[...])
  3. 调用配置: model.invoke(..., config={"callbacks": [...]})
  4. 优先级: 调用配置 > 组件配置 > 全局配置

架构流程总结

用户调用 → RunnableConfig → CallbackManager.configure()
CallbackManager.on_*_start() → 创建RunManager(分配run_id)
业务执行 → RunManager.on_*_event() → handle_event()
遍历handlers → 调用handler.on_*() → 捕获异常
RunManager.on_*_end/error() → 完成追踪

回调流程详解

1. 初始化阶段

# 用户代码
handler = MyCallbackHandler()
config = {"callbacks": [handler], "tags": ["production"]}

# 内部流程
callback_manager = CallbackManager.configure(
    callbacks=config["callbacks"],
    inheritable_tags=config["tags"]
)

2. 执行阶段

# Runnable._call_with_config()
run_manager = callback_manager.on_chain_start(
    serialized={"name": "RunnableSequence"},
    inputs=input_data,
    run_id=uuid.uuid4()
)

on_chain_start内部:

  1. 为每个handler调用handler.on_chain_start()
  2. 同步handler直接调用,异步handler提交到事件循环
  3. 捕获异常并记录warning,不中断主流程
  4. 返回CallbackManagerForChainRun(包含run_id)

3. 事件触发阶段

# 业务代码执行中
run_manager.on_text("Processing step 1...")
run_manager.on_llm_new_token("Hello")

# 子运行
child_manager = run_manager.get_child()
child_run = child_manager.on_llm_start(...)

4. 完成阶段

# 正常完成
run_manager.on_chain_end(outputs)

# 异常完成
except Exception as e:
    run_manager.on_chain_error(e)
    raise

运行管理器职责边界

管理器 创建时机 生命周期 专有方法 适用场景
CallbackManager 配置阶段 整个调用 on_*_start 创建RunManager
ParentRunManager on_chain_start 单个链运行 get_child 创建子管理器
CallbackManagerForLLMRun on_llm_start 单次LLM调用 on_llm_new_token 流式输出
CallbackManagerForChainRun on_chain_start 单个链运行 on_text 中间输出
CallbackManagerForToolRun on_tool_start 单次工具调用 - 工具执行
CallbackManagerForRetrieverRun on_retriever_start 单次检索 - 向量检索

模块交互时序图

时序图1: Runnable链执行完整流程

sequenceDiagram
    autonumber
    participant U as User
    participant R as Runnable<br/>(Chain)
    participant CM as CallbackManager
    participant PRM as ParentRunManager
    participant H1 as Handler1<br/>(StdOut)
    participant H2 as Handler2<br/>(LangSmith)
    participant Step1 as Step1<br/>(Prompt)
    participant Step2 as Step2<br/>(ChatModel)
    participant LLMRM as LLMRunManager

    %% 配置阶段
    U->>R: invoke(input, config={callbacks:[H1, H2]})
    activate R
    R->>R: ensure_config(config)
    R->>CM: CallbackManager.configure(callbacks, tags, metadata)
    activate CM
    CM->>CM: _get_trace_callbacks()
    Note over CM: 检查LANGCHAIN_TRACING_V2<br/>自动添加LangChainTracer
    CM->>CM: add_handler(H1)<br/>add_handler(H2)
    CM-->>R: callback_manager
    deactivate CM

    %% 链开始
    R->>CM: on_chain_start(serialized, input, run_id)
    activate CM
    CM->>CM: handle_event(handlers, "on_chain_start", ...)
    CM->>H1: on_chain_start(serialized, input, run_id=<uuid1>)
    H1-->>CM: None
    CM->>H2: on_chain_start(serialized, input, run_id=<uuid1>)
    H2-->>CM: None (上报到LangSmith)
    CM->>PRM: new ParentRunManager(run_id=<uuid1>, handlers=[H1,H2])
    CM-->>R: parent_run_manager
    deactivate CM

    %% 子步骤1: Prompt
    R->>PRM: get_child("seq:step:1")
    activate PRM
    PRM->>CM: new CallbackManager(parent_run_id=<uuid1>)
    PRM-->>R: child_manager
    deactivate PRM

    R->>Step1: invoke(input, config={callbacks: child_manager})
    activate Step1
    Step1->>CM: on_chain_start({"name": "PromptTemplate"}, input, run_id=<uuid2>)
    CM->>H1: on_chain_start(..., run_id=<uuid2>, parent_run_id=<uuid1>)
    CM->>H2: on_chain_start(..., run_id=<uuid2>, parent_run_id=<uuid1>)
    Step1->>Step1: format_prompt(input)
    Step1->>CM: on_chain_end({"output": formatted_prompt}, run_id=<uuid2>)
    CM->>H1: on_chain_end(..., run_id=<uuid2>)
    CM->>H2: on_chain_end(..., run_id=<uuid2>)
    Step1-->>R: formatted_prompt
    deactivate Step1

    %% 子步骤2: ChatModel
    R->>PRM: get_child("seq:step:2")
    PRM->>CM: new CallbackManager(parent_run_id=<uuid1>)
    PRM-->>R: child_manager

    R->>Step2: invoke(formatted_prompt, config={callbacks: child_manager})
    activate Step2
    Step2->>CM: on_chat_model_start(serialized, messages, run_id=<uuid3>)
    activate CM
    CM->>H1: on_chat_model_start(..., run_id=<uuid3>, parent_run_id=<uuid1>)
    CM->>H2: on_chat_model_start(..., run_id=<uuid3>, parent_run_id=<uuid1>)
    CM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid3>, handlers=[H1,H2])
    CM-->>Step2: llm_run_manager
    deactivate CM

    %% LLM流式生成
    Step2->>Step2: _generate(messages, run_manager)
    Step2->>LLMRM: on_llm_new_token("Hello")
    LLMRM->>H1: on_llm_new_token("Hello", run_id=<uuid3>)
    H1-->>LLMRM: (打印到stdout)
    LLMRM->>H2: on_llm_new_token("Hello", run_id=<uuid3>)
    H2-->>LLMRM: (上报到LangSmith)

    Step2->>LLMRM: on_llm_new_token(" World")
    LLMRM->>H1: on_llm_new_token(" World", run_id=<uuid3>)
    LLMRM->>H2: on_llm_new_token(" World", run_id=<uuid3>)

    Step2->>LLMRM: on_llm_end(response, run_id=<uuid3>)
    LLMRM->>H1: on_llm_end(response, run_id=<uuid3>)
    LLMRM->>H2: on_llm_end(response, run_id=<uuid3>)
    Step2-->>R: AIMessage("Hello World")
    deactivate Step2

    %% 链完成
    R->>PRM: on_chain_end({"output": result}, run_id=<uuid1>)
    activate PRM
    PRM->>H1: on_chain_end({"output": result}, run_id=<uuid1>)
    PRM->>H2: on_chain_end({"output": result}, run_id=<uuid1>)
    deactivate PRM

    R-->>U: result
    deactivate R

时序图1详解

阶段1: 配置与初始化 (步骤1-6)

步骤1-2: 用户调用Runnable.invoke(),传入回调配置

  • 配置传递: 通过config参数传递callbacks列表
  • 配置标准化: ensure_config()确保配置字典完整

步骤3-6: CallbackManager配置

  • configure静态方法: 合并全局/组件/调用配置
    CallbackManager.configure(
        callbacks=config["callbacks"],           # 调用级
        inheritable_callbacks=self.callbacks,    # 组件级
        verbose=self.verbose,                    # 调试标志
        inheritable_tags=config.get("tags"),
        inheritable_metadata=config.get("metadata")
    )
    
  • 自动追踪注入: 检查LANGCHAIN_TRACING_V2环境变量,自动添加LangChainTracer
  • 处理器去重: 通过add_handler()避免重复注册

关键点:

  • 配置优先级: 调用配置 > 组件配置 > 全局配置
  • 环境变量: LANGCHAIN_TRACING_V2=true启用自动追踪
  • 处理器继承: inheritable_handlers会传递给子运行

阶段2: 链运行启动 (步骤7-12)

步骤7-12: on_chain_start事件分发

  • 事件分发: handle_event()遍历所有handlers
  • 同步调用: 同步handler直接调用
  • 异步调用: 异步handler提交到事件循环
  • 异常隔离: try-catch包裹每个handler,错误不影响主流程
    for handler in handlers:
        try:
            handler.on_chain_start(serialized, inputs, run_id=run_id, ...)
        except Exception as e:
            logger.warning(f"Error in {handler.__class__.__name__}: {e}")
            if handler.raise_error:
                raise
    
  • RunManager创建: 返回ParentRunManager(包含run_id、handlers、tags)

关键点:

  • run_id分配: 每次运行分配UUID,支持追踪
  • 父子关联: parent_run_id建立运行树
  • 标签传递: tags和metadata传递给所有子运行

阶段3: 子步骤执行 - Prompt格式化 (步骤13-23)

步骤13-15: 创建子运行管理器

  • get_child(): ParentRunManager创建子CallbackManager
  • 继承机制: 子管理器继承父handlers/tags/metadata
  • 父子关联: 子管理器的parent_run_id指向父run_id
    def get_child(self, tag: str | None = None) -> CallbackManager:
        manager = CallbackManager(handlers=[], parent_run_id=self.run_id)
        manager.set_handlers(self.inheritable_handlers)  # 继承handlers
        manager.add_tags(self.inheritable_tags)          # 继承tags
        if tag is not None:
            manager.add_tags([tag], inherit=False)       # 添加步骤标签
        return manager
    

步骤16-23: Prompt步骤执行

  • on_chain_start: 触发Prompt开始事件(run_id=uuid2, parent_run_id=uuid1)
  • 业务执行: PromptTemplate.format_prompt()
  • on_chain_end: 触发Prompt完成事件
  • 结果传递: 格式化后的prompt传递给下一步

关键点:

  • 步骤标签: seq:step:1标识链中的第一步
  • 父子树: LangSmith中显示为嵌套运行树
  • 中间输出: on_chain_end的outputs包含步骤输出

阶段4: 子步骤执行 - ChatModel生成 (步骤24-40)

步骤24-26: 创建第二个子管理器

  • 步骤标签: seq:step:2标识第二步
  • 独立run_id: 每个步骤有独立run_id(uuid3)

步骤27-32: on_chat_model_start事件

  • Chat专用事件: ChatModel触发on_chat_model_start(不是on_llm_start)
  • 消息格式化: messages序列化为可追踪格式
  • LLMRunManager: 返回专用运行管理器(支持on_llm_new_token)

步骤33-40: LLM流式生成

  • on_llm_new_token: 每生成一个token触发一次
  • 实时回调: StreamingStdOutCallbackHandler实时打印
  • token累积: LangChainTracer累积tokens记录到trace
  • on_llm_end: 完成时触发,包含完整response和token usage
    # LLM内部
    for chunk in stream_generator():
        run_manager.on_llm_new_token(chunk.content, chunk=chunk)
    
    run_manager.on_llm_end(LLMResult(
        generations=[[generation]],
        llm_output={"token_usage": {...}}
    ))
    

关键点:

  • 流式支持: on_llm_new_token仅在流式模式触发
  • chunk结构: 包含content、tool_calls、usage_metadata
  • token计数: on_llm_end的llm_output包含token usage

阶段5: 链运行完成 (步骤41-45)

步骤41-44: on_chain_end事件

  • 输出记录: 链的最终输出传递给所有handlers
  • 运行完成: 标记运行结束,LangSmith关闭trace
  • 资源清理: 释放RunManager相关资源

步骤45: 返回结果给用户

  • 透传输出: 回调不修改输出,仅观察

关键点:

  • 嵌套完成: 先完成子运行,再完成父运行
  • 异常处理: 如果步骤失败,触发on_chain_error而非on_chain_end

时序图2: LLM调用带错误处理流程

sequenceDiagram
    autonumber
    participant U as User
    participant CM as ChatModel
    participant CBM as CallbackManager
    participant LLMRM as LLMRunManager
    participant H as Handler<br/>(Custom)
    participant API as OpenAI API

    %% 正常调用
    U->>CM: generate(messages, callbacks=[H])
    CM->>CBM: CallbackManager.configure(...)
    CBM-->>CM: callback_manager

    CM->>CBM: on_chat_model_start(serialized, messages)
    CBM->>H: on_chat_model_start(..., run_id=<uuid>)
    H-->>CBM: None
    CBM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid>)
    CBM-->>CM: llm_run_manager

    %% API调用失败
    CM->>API: POST /v1/chat/completions
    API-->>CM: 429 Rate Limit Error

    CM->>CM: raise RateLimitError(...)
    Note over CM: 捕获异常

    CM->>LLMRM: on_llm_error(RateLimitError, response=None)
    activate LLMRM
    LLMRM->>H: on_llm_error(error, run_id=<uuid>)
    activate H
    H->>H: log_error(error)
    H-->>LLMRM: None
    deactivate H
    deactivate LLMRM

    CM-->>U: raise RateLimitError
    Note over U: 用户代码捕获异常

时序图2详解

错误处理流程关键点

步骤1-8: 与正常流程相同,配置callbacks并触发on_chat_model_start

步骤9-10: API调用失败

  • 异常类型: RateLimitError/Timeout/APIError等
  • 异常捕获: ChatModel内部try-catch捕获

步骤11-15: on_llm_error事件

  • 错误上报: 调用run_manager.on_llm_error(error)
  • 响应元数据: 可选传递response参数(部分生成的内容)
    except BaseException as e:
        if run_manager:
            run_manager.on_llm_error(
                e,
                response=LLMResult(generations=[[generation]])  # 可选
            )
        raise
    
  • Handler处理: 自定义handler可记录错误、发送告警等
  • 异常上抛: 错误继续上抛给用户代码

关键点:

  • 不吞异常: 回调记录错误但不阻止异常传播
  • 完整上下文: 传递error、run_id、parent_run_id
  • LangSmith: LangChainTracer自动标记运行为失败状态

时序图3: Agent多步推理回调流程

sequenceDiagram
    autonumber
    participant U as User
    participant AE as AgentExecutor
    participant CM as CallbackManager
    participant PRM as ParentRunManager
    participant Agent as Agent<br/>(ReAct)
    participant LLMRM as LLMRunManager
    participant Tool as Tool<br/>(Search)
    participant TRM as ToolRunManager
    participant H as Handler<br/>(Tracer)

    U->>AE: invoke({"input": "北京天气?"}, callbacks=[H])
    AE->>CM: CallbackManager.configure(callbacks=[H])
    AE->>CM: on_chain_start({"name": "AgentExecutor"}, input)
    CM->>H: on_chain_start(..., run_id=<uuid1>)
    CM->>PRM: new ParentRunManager(run_id=<uuid1>)
    CM-->>AE: parent_run_manager

    %% 第一轮: Agent决策
    AE->>PRM: get_child("agent:step:1")
    PRM-->>AE: child_manager

    AE->>Agent: plan(input, child_manager)
    Agent->>CM: on_chain_start({"name": "ReActAgent"}, input)
    CM->>H: on_chain_start(..., run_id=<uuid2>, parent=<uuid1>)

    Agent->>Agent: _call_llm(prompt, run_manager)
    Agent->>CM: on_chat_model_start(...)
    CM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid3>)
    CM-->>Agent: llm_run_manager

    Agent->>Agent: LLM生成: "需要搜索天气"
    Agent->>LLMRM: on_llm_end(response)
    LLMRM->>H: on_llm_end(..., run_id=<uuid3>)

    Agent->>Agent: parse_output(response)
    Note over Agent: 解析出AgentAction<br/>tool="search"<br/>tool_input="北京天气"

    Agent->>CM: on_agent_action(action, run_id=<uuid2>)
    CM->>H: on_agent_action(action, run_id=<uuid2>)
    Note over H: 记录Agent决策

    Agent->>CM: on_chain_end({"action": action}, run_id=<uuid2>)
    CM->>H: on_chain_end(..., run_id=<uuid2>)
    Agent-->>AE: AgentAction(tool="search", tool_input="北京天气")

    %% 第二轮: Tool执行
    AE->>PRM: get_child("tool:search")
    PRM-->>AE: child_manager

    AE->>Tool: run("北京天气", child_manager)
    Tool->>CM: on_tool_start({"name": "search"}, "北京天气")
    CM->>H: on_tool_start(..., run_id=<uuid4>, parent=<uuid1>)
    CM->>TRM: new CallbackManagerForToolRun(run_id=<uuid4>)
    CM-->>Tool: tool_run_manager

    Tool->>Tool: _run("北京天气")
    Note over Tool: 调用搜索API

    Tool->>TRM: on_tool_end("晴天 25°C", run_id=<uuid4>)
    TRM->>H: on_tool_end("晴天 25°C", run_id=<uuid4>)
    Tool-->>AE: "晴天 25°C"

    %% 第三轮: Agent最终回答
    AE->>PRM: get_child("agent:step:2")
    PRM-->>AE: child_manager

    AE->>Agent: plan(observation, child_manager)
    Agent->>CM: on_chain_start({"name": "ReActAgent"}, input)
    Agent->>Agent: _call_llm(prompt_with_observation, run_manager)
    Agent->>CM: on_chat_model_start(...)
    CM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid5>)

    Agent->>Agent: LLM生成: "北京天气晴天25度"
    Agent->>LLMRM: on_llm_end(response)

    Agent->>Agent: parse_output(response)
    Note over Agent: 解析出AgentFinish

    Agent->>CM: on_agent_finish(finish, run_id=<uuid5>)
    CM->>H: on_agent_finish(finish, run_id=<uuid5>)
    Note over H: 记录Agent完成

    Agent->>CM: on_chain_end({"output": "北京天气晴天25度"})
    Agent-->>AE: AgentFinish(output="北京天气晴天25度")

    %% AgentExecutor完成
    AE->>PRM: on_chain_end({"output": "北京天气晴天25度"}, run_id=<uuid1>)
    PRM->>H: on_chain_end(..., run_id=<uuid1>)
    AE-->>U: {"output": "北京天气晴天25度"}

时序图3详解

Agent回调的特殊性

多步推理追踪:

  • on_agent_action: 记录Agent决策(选择哪个工具、参数是什么)
  • on_agent_finish: 记录Agent完成(最终答案)
  • 嵌套运行: Agent运行 > LLM调用 + Tool调用

运行树结构:

<uuid1> AgentExecutor
  ├─ <uuid2> ReActAgent (step 1)
  │   └─ <uuid3> ChatOpenAI (生成action)
  ├─ <uuid4> SearchTool (执行)
  └─ <uuid5> ReActAgent (step 2)
      └─ <uuid6> ChatOpenAI (生成答案)

关键回调时机:

  1. on_chain_start (AgentExecutor): Agent开始
  2. on_chain_start (Agent): 单轮推理开始
  3. on_chat_model_start: LLM生成决策
  4. on_agent_action: 解析出工具调用
  5. on_tool_start: 工具执行开始
  6. on_tool_end: 工具返回结果
  7. on_agent_finish: Agent决定完成
  8. on_chain_end (AgentExecutor): Agent结束

LangSmith中的展示:

  • Agent轨迹: 每一轮思考和行动
  • Token统计: 累计所有LLM调用的tokens
  • 工具调用: 每个工具的输入输出
  • 耗时分析: LLM耗时 vs 工具耗时

核心调用链路与关键代码

链路1: RunnableConfig → CallbackManager创建

调用路径

User Code
  ↓ invoke(input, config={"callbacks": [handler], "tags": [...]})
Runnable._call_with_config()
  ↓ ensure_config(config)
runnables/config.py::get_callback_manager_for_config(config)
CallbackManager.configure(
    callbacks,
    inheritable_callbacks,
    verbose,
    tags,
    metadata
)
BaseCallbackManager.__init__(handlers, inheritable_handlers, tags, metadata)
  ↓ 返回
CallbackManager实例

关键代码: get_callback_manager_for_config

文件: langchain_core/runnables/config.py

def get_callback_manager_for_config(
    config: RunnableConfig, _stacklevel: int = 2
) -> CallbackManager:
    """从配置创建CallbackManager

    Args:
        config: Runnable配置字典
        _stacklevel: 调用栈层级(用于追踪)

    Returns:
        配置好的CallbackManager实例
    """
    # 1. 获取配置中的callbacks(可能是list或CallbackManager)
    callbacks_on_config = config.get("callbacks")

    # 2. 调用CallbackManager.configure合并配置
    return CallbackManager.configure(
        callbacks=callbacks_on_config,
        inheritable_callbacks=None,
        local_callbacks=None,
        verbose=config.get("verbose", False),
        tags=config.get("tags"),
        inheritable_tags=None,
        metadata=config.get("metadata"),
        inheritable_metadata=None,
    )

关键代码: CallbackManager.configure

文件: langchain_core/callbacks/manager.py

class CallbackManager(BaseCallbackManager):
    @classmethod
    def configure(
        cls,
        callbacks: Callbacks = None,
        inheritable_callbacks: Callbacks = None,
        local_callbacks: Callbacks = None,
        verbose: bool = False,
        tags: list[str] | None = None,
        inheritable_tags: list[str] | None = None,
        metadata: dict[str, Any] | None = None,
        inheritable_metadata: dict[str, Any] | None = None,
    ) -> CallbackManager:
        """配置CallbackManager(合并多级配置)

        优先级: local > callbacks > inheritable
        """
        # 1. 检查全局追踪配置
        trace_callbacks = _get_trace_callbacks(
            project_name=None,
            example_id=None,
            callback_manager=None
        )

        # 2. 合并callbacks列表
        callback_list: list[BaseCallbackHandler] = []

        # 添加inheritable_callbacks
        if inheritable_callbacks:
            if isinstance(inheritable_callbacks, list):
                callback_list.extend(inheritable_callbacks)
            else:
                callback_list.extend(inheritable_callbacks.handlers)

        # 添加callbacks
        if callbacks:
            if isinstance(callbacks, list):
                callback_list.extend(callbacks)
            elif isinstance(callbacks, BaseCallbackManager):
                callback_list.extend(callbacks.handlers)

        # 添加local_callbacks(优先级最高)
        if local_callbacks:
            callback_list.extend(local_callbacks)

        # 添加trace_callbacks(LangSmith追踪)
        if trace_callbacks:
            callback_list.extend(trace_callbacks)

        # 3. 创建CallbackManager实例
        manager = cls(
            handlers=callback_list,
            inheritable_handlers=callback_list.copy(),
            tags=list(set(tags or [])),
            inheritable_tags=list(set(inheritable_tags or [])),
            metadata=metadata or {},
            inheritable_metadata=inheritable_metadata or {}
        )

        return manager

关键机制:

  1. 三级配置合并: inheritablecallbackslocal
  2. 自动追踪注入: 通过_get_trace_callbacks()检查环境变量
  3. 去重处理: add_handler()内部检查handler是否已存在
  4. 继承策略: inheritable_handlers会传递给子运行

链路2: on_chain_start → RunManager创建

调用路径

Runnable._call_with_config()
  ↓ callback_manager.on_chain_start(serialized, inputs, run_id)
CallbackManager.on_chain_start()
  ↓ handle_event(handlers, "on_chain_start", ...)
manager.py::handle_event()
  ↓ 遍历handlers
  ↓ handler.on_chain_start(serialized, inputs, run_id, parent_run_id, ...)
BaseCallbackHandler.on_chain_start()
  ↓ (用户实现的逻辑)
  ↓ 返回
ParentRunManager(run_id, handlers, ...)

关键代码: CallbackManager.on_chain_start

文件: langchain_core/callbacks/manager.py

class CallbackManager(BaseCallbackManager):
    def on_chain_start(
        self,
        serialized: dict[str, Any],
        inputs: dict[str, Any],
        run_id: UUID | None = None,
        *,
        tags: list[str] | None = None,
        metadata: dict[str, Any] | None = None,
        run_type: str | None = None,
        name: str | None = None,
        **kwargs: Any,
    ) -> CallbackManagerForChainRun:
        """链开始事件(创建RunManager)

        Args:
            serialized: 组件序列化信息(name/id等)
            inputs: 输入数据
            run_id: 可选的运行ID(不传则自动生成)
            tags: 运行标签
            metadata: 运行元数据
            run_type: 运行类型("chain"/"llm"/"tool"等)
            name: 运行名称

        Returns:
            CallbackManagerForChainRun实例
        """
        # 1. 生成run_id
        if run_id is None:
            run_id = uuid.uuid4()

        # 2. 合并tags和metadata
        _tags = (self.tags or []) + (tags or [])
        _metadata = {**(self.metadata or {}), **(metadata or {})}
        _inheritable_tags = (self.inheritable_tags or []) + (tags or [])
        _inheritable_metadata = {
            **(self.inheritable_metadata or {}),
            **(metadata or {})
        }

        # 3. 分发on_chain_start事件到所有handlers
        handle_event(
            self.handlers,
            "on_chain_start",
            "ignore_chain",  # 忽略条件属性名
            serialized,
            inputs,
            run_id=run_id,
            parent_run_id=self.parent_run_id,
            tags=_tags,
            metadata=_metadata,
            run_type=run_type,
            name=name,
            **kwargs,
        )

        # 4. 返回ParentRunManager
        return CallbackManagerForChainRun(
            run_id=run_id,
            handlers=self.handlers,
            inheritable_handlers=self.inheritable_handlers,
            parent_run_id=self.parent_run_id,
            tags=_tags,
            inheritable_tags=_inheritable_tags,
            metadata=_metadata,
            inheritable_metadata=_inheritable_metadata,
        )

关键代码: handle_event (事件分发核心)

文件: langchain_core/callbacks/manager.py

def handle_event(
    handlers: list[BaseCallbackHandler],
    event_name: str,
    ignore_condition_name: str | None,
    *args: Any,
    **kwargs: Any,
) -> None:
    """通用事件分发函数

    Args:
        handlers: 回调处理器列表
        event_name: 事件方法名(如"on_llm_start")
        ignore_condition_name: 忽略条件属性名(如"ignore_llm")
        *args: 传递给事件方法的位置参数
        **kwargs: 传递给事件方法的关键字参数
    """
    coros: list[Coroutine[Any, Any, Any]] = []

    try:
        for handler in handlers:
            try:
                # 1. 检查是否忽略此事件
                if ignore_condition_name is not None:
                    if getattr(handler, ignore_condition_name, False):
                        continue  # 跳过此handler

                # 2. 调用handler的事件方法
                event = getattr(handler, event_name)(*args, **kwargs)

                # 3. 处理异步handler
                if asyncio.iscoroutine(event):
                    coros.append(event)

            except NotImplementedError as e:
                # 4. 处理on_chat_model_start回退到on_llm_start
                if event_name == "on_chat_model_start":
                    # 转换messages为prompts
                    message_strings = [get_buffer_string(m) for m in args[1]]
                    handle_event(
                        [handler],
                        "on_llm_start",
                        "ignore_llm",
                        args[0],  # serialized
                        message_strings,  # prompts
                        *args[2:],
                        **kwargs,
                    )
                else:
                    logger.warning(f"NotImplementedError in {handler.__class__.__name__}.{event_name}")

            except Exception as e:
                # 5. 异常隔离
                logger.warning(f"Error in {handler.__class__.__name__}.{event_name}: {repr(e)}")
                if handler.raise_error:
                    raise  # 仅当handler.raise_error=True时上抛

    finally:
        # 6. 执行异步回调
        if coros:
            try:
                loop = asyncio.get_running_loop()
                loop_running = True
            except RuntimeError:
                loop_running = False

            if loop_running:
                # 同步环境中有事件循环: 提交到线程池
                _executor().submit(
                    cast("Callable", copy_context().run),
                    _run_coros,
                    coros
                ).result()
            else:
                # 无事件循环: 直接运行
                _run_coros(coros)

关键机制:

  1. 忽略控制: 通过ignore_condition_name检查handler的ignore_llm/ignore_chain属性
  2. 异常隔离: try-catch包裹每个handler,错误不影响其他handler和主流程
  3. 同步异步混合: 同步handler直接调用,异步handler收集后统一执行
  4. 事件循环处理:
    • 有运行中的loop → 提交到ThreadPoolExecutor(避免死锁)
    • 无loop → 创建新loop执行(asyncio.run)
  5. 回退机制: on_chat_model_start未实现时回退到on_llm_start

链路3: ParentRunManager → 子RunManager

调用路径

RunnableSequence.invoke()
   run_manager = callback_manager.on_chain_start(...)
   for i, step in enumerate(steps):
     child_manager = run_manager.get_child(f"seq:step:{i+1}")
ParentRunManager.get_child(tag)
   new CallbackManager(parent_run_id=self.run_id)
   manager.set_handlers(self.inheritable_handlers)
   manager.add_tags(self.inheritable_tags + [tag])
   返回
CallbackManager
   step.invoke(input, config={callbacks: child_manager})
   子步骤触发on_chain_start(parent_run_id=run_id)

关键代码: ParentRunManager.get_child

文件: langchain_core/callbacks/manager.py

class ParentRunManager(RunManager):
    """父运行管理器(可创建子管理器)"""

    def get_child(self, tag: str | None = None) -> CallbackManager:
        """创建子CallbackManager

        Args:
            tag: 子运行标签(如"seq:step:1"/"map:key:query")

        Returns:
            继承父handlers/tags的子CallbackManager
        """
        # 1. 创建子管理器(parent_run_id指向自己)
        manager = CallbackManager(
            handlers=[],
            parent_run_id=self.run_id  # 关键: 建立父子关联
        )

        # 2. 继承父handlers
        manager.set_handlers(self.inheritable_handlers)

        # 3. 继承父tags和metadata
        manager.add_tags(self.inheritable_tags)
        manager.add_metadata(self.inheritable_metadata)

        # 4. 添加步骤特定标签(不继承)
        if tag is not None:
            manager.add_tags([tag], inherit=False)

        return manager

关键机制:

  1. 父子关联: 子manager的parent_run_id指向父run_id
  2. 选择性继承: inheritable_handlers继承,普通handlers不继承
  3. 标签累积: 子运行包含父标签+自己的标签
  4. 步骤标签: 通过tag参数标识链中的位置(seq:step:1, map:key:name等)

链路4: LLM流式生成回调

调用路径

ChatModel.generate(messages, run_manager)
  ↓ run_manager = callback_manager.on_chat_model_start(...)
  ↓ for chunk in self._stream(messages):
  ↓   run_manager.on_llm_new_token(chunk.content, chunk=chunk)
CallbackManagerForLLMRun.on_llm_new_token()
  ↓ handle_event(handlers, "on_llm_new_token", "ignore_llm", ...)
  ↓ 遍历handlers
  ↓ handler.on_llm_new_token(token, chunk, run_id, ...)
StreamingStdOutCallbackHandler.on_llm_new_token()
  ↓ print(token, end="", flush=True)
LangChainTracer.on_llm_new_token()
  ↓ 累积token到当前trace
  ↓ 完成后
  ↓ run_manager.on_llm_end(LLMResult)

关键代码: ChatModel._generate_with_cache

文件: langchain_core/language_models/chat_models.py

class BaseChatModel(BaseLanguageModel):
    def _generate_with_cache(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> ChatResult:
        """LLM生成(带缓存和回调)

        Args:
            messages: 输入消息
            stop: 停止词
            run_manager: LLM运行管理器
            **kwargs: 模型参数

        Returns:
            ChatResult(generations + llm_output)
        """
        # 1. 检查缓存
        if self.cache is not None:
            llm_cache = get_llm_cache()
            if llm_cache:
                cached_result = llm_cache.lookup(messages, self._llm_type, **kwargs)
                if cached_result:
                    # 触发on_llm_end(即使是缓存结果)
                    if run_manager:
                        run_manager.on_llm_end(
                            LLMResult(generations=[[cached_result]])
                        )
                    return ChatResult(generations=[cached_result])

        # 2. 调用_generate(子类实现)
        try:
            result = self._generate(
                messages,
                stop=stop,
                run_manager=run_manager,  # 传递run_manager
                **kwargs,
            )
        except BaseException as e:
            # 3. 异常时触发on_llm_error
            if run_manager:
                run_manager.on_llm_error(e)
            raise
        else:
            # 4. 成功时触发on_llm_end
            if run_manager:
                run_manager.on_llm_end(
                    LLMResult(
                        generations=[[result.generations[0]]],
                        llm_output=result.llm_output
                    )
                )
            return result

关键代码: ChatModel._stream (子类实现示例)

文件: langchain_openai/chat_models/base.py (示例)

class ChatOpenAI(BaseChatModel):
    def _stream(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        """流式生成

        Yields:
            ChatGenerationChunk(每个token的增量)
        """
        # 1. 调用OpenAI API流式接口
        for chunk in self.client.chat.completions.create(
            messages=messages,
            stream=True,
            **kwargs
        ):
            delta = chunk.choices[0].delta
            if delta.content:
                # 2. 创建ChatGenerationChunk
                chunk_obj = ChatGenerationChunk(
                    message=AIMessageChunk(content=delta.content),
                    generation_info={
                        "finish_reason": chunk.choices[0].finish_reason
                    }
                )

                # 3. 触发on_llm_new_token回调
                if run_manager:
                    run_manager.on_llm_new_token(
                        token=delta.content,
                        chunk=chunk_obj  # 传递完整chunk
                    )

                # 4. 返回chunk给用户
                yield chunk_obj

关键代码: CallbackManagerForLLMRun.on_llm_new_token

文件: langchain_core/callbacks/manager.py

class CallbackManagerForLLMRun(RunManager, LLMManagerMixin):
    """LLM运行管理器"""

    def on_llm_new_token(
        self,
        token: str,
        *,
        chunk: GenerationChunk | ChatGenerationChunk | None = None,
        **kwargs: Any,
    ) -> None:
        """流式token回调

        Args:
            token: 新生成的token文本
            chunk: 完整的生成chunk(包含metadata)
            **kwargs: 其他参数
        """
        if not self.handlers:
            return

        # 分发到所有handlers
        handle_event(
            self.handlers,
            "on_llm_new_token",
            "ignore_llm",  # 检查handler.ignore_llm
            token=token,
            chunk=chunk,
            run_id=self.run_id,
            parent_run_id=self.parent_run_id,
            tags=self.tags,
            **kwargs,
        )

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """LLM完成回调

        Args:
            response: LLM响应(包含generations和llm_output)
            **kwargs: 其他参数
        """
        if not self.handlers:
            return

        handle_event(
            self.handlers,
            "on_llm_end",
            "ignore_llm",
            response,
            run_id=self.run_id,
            parent_run_id=self.parent_run_id,
            tags=self.tags,
            **kwargs,
        )

关键机制:

  1. 实时回调: 每个token生成立即触发回调(延迟<10ms)
  2. chunk传递: 除了token文本,还传递完整chunk(包含tool_calls等)
  3. StreamingStdOutCallbackHandler: 实时打印到stdout
  4. LangChainTracer: 累积tokens到trace,on_llm_end时提交

链路5: 异常处理链路

调用路径

Runnable.invoke()
  ↓ try:
  ↓   run_manager = callback_manager.on_chain_start(...)
  ↓   output = func(input, run_manager)
  ↓ except BaseException as e:
  ↓   run_manager.on_chain_error(e)
  ↓   raise
CallbackManagerForChainRun.on_chain_error()
  ↓ handle_event(handlers, "on_chain_error", "ignore_chain", ...)
  ↓ 遍历handlers
  ↓ handler.on_chain_error(error, run_id, ...)
CustomHandler.on_chain_error()
  ↓ log_error(error)
  ↓ send_alert(error)
LangChainTracer.on_chain_error()
  ↓ 标记trace为失败
  ↓ 记录错误信息到LangSmith

关键代码: Runnable._call_with_config (异常处理)

文件: langchain_core/runnables/base.py

class Runnable(ABC, Generic[Input, Output]):
    def _call_with_config(
        self,
        func: Callable[[Input], Output],
        input_: Input,
        config: RunnableConfig | None,
        run_type: str | None = None,
        serialized: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> Output:
        """带回调的执行(异常处理模板)

        Args:
            func: 实际执行的函数
            input_: 输入数据
            config: 运行配置
            run_type: 运行类型
            serialized: 序列化信息

        Returns:
            执行结果

        Raises:
            任何异常(在触发on_*_error后上抛)
        """
        # 1. 准备回调
        config = ensure_config(config)
        callback_manager = get_callback_manager_for_config(config)

        # 2. 触发开始事件
        run_manager = callback_manager.on_chain_start(
            serialized or {},
            input_,
            run_type=run_type,
            name=config.get("run_name") or self.get_name(),
            run_id=config.pop("run_id", None),
        )

        # 3. 执行业务逻辑(带异常处理)
        try:
            # 创建子配置(继承回调)
            child_config = patch_config(
                config,
                callbacks=run_manager.get_child()
            )

            # 执行函数
            with set_config_context(child_config) as context:
                output = cast(
                    Output,
                    context.run(
                        call_func_with_variable_args,
                        func,
                        input_,
                        config,
                        run_manager,
                        **kwargs,
                    ),
                )

        except BaseException as e:
            # 4. 异常处理: 触发on_chain_error
            run_manager.on_chain_error(e)
            raise  # 继续上抛异常

        else:
            # 5. 成功: 触发on_chain_end
            run_manager.on_chain_end(output)
            return output

关键机制:

  1. BaseException捕获: 捕获所有异常(包括KeyboardInterrupt)
  2. 先回调后上抛: 确保异常被记录后再传播
  3. 异常不修改: 回调不改变异常类型和消息
  4. 资源清理: try-finally确保on_chain_error被调用

性能优化关键点

1. 异步回调批处理

# manager.py::handle_event
async def ahandle_event(...):
    # 并发执行所有异步handler
    await asyncio.gather(
        *(handler.on_llm_start(...) for handler in handlers)
    )

2. 线程池复用

_THREAD_POOL_EXECUTOR: ThreadPoolExecutor | None = None

def _executor() -> ThreadPoolExecutor:
    global _THREAD_POOL_EXECUTOR
    if _THREAD_POOL_EXECUTOR is None:
        _THREAD_POOL_EXECUTOR = ThreadPoolExecutor(max_workers=10)
        atexit.register(_THREAD_POOL_EXECUTOR.shutdown, wait=True)
    return _THREAD_POOL_EXECUTOR

3. 事件循环检测

try:
    asyncio.get_running_loop()
    loop_running = True
except RuntimeError:
    loop_running = False

if loop_running:
    # 避免死锁: 提交到线程池
    _executor().submit(copy_context().run, _run_coros, coros).result()
else:
    # 直接运行
    _run_coros(coros)

4. handler去重

def add_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
    if handler not in self.handlers:  # 去重
        self.handlers.append(handler)
    if inherit and handler not in self.inheritable_handlers:
        self.inheritable_handlers.append(handler)
    +ignore_llm: bool
    +ignore_chain: bool
    +ignore_agent: bool
    +on_llm_start(...) None
    +on_llm_end(...) None
    +on_llm_error(...) None
    +on_tool_start(...) None
    +on_tool_end(...) None
    +on_chain_start(...) None
    +on_chain_end(...) None
}

class CallbackManager {
    +handlers: list[BaseCallbackHandler]
    +add_handler(handler) None
    +on_llm_start(...) list[RunManager]
    +on_chain_start(...) RunManager
    +on_tool_start(...) RunManager
}

class CallbackManagerForLLMRun {
    +on_llm_new_token(token) None
    +on_llm_end(response) None
    +on_llm_error(error) None
}

class StdOutCallbackHandler {
    +on_llm_start(...) None
    +on_llm_new_token(token) None
    +on_chain_start(...) None
}

BaseCallbackHandler <|-- StdOutCallbackHandler
CallbackManager o-- BaseCallbackHandler
CallbackManager --> CallbackManagerForLLMRun

### 数据结构说明

#### BaseCallbackHandler方法

| 方法 | 说明 |
|------|------|
| on_llm_start | LLM开始调用时触发 |
| on_llm_new_token | 流式输出每个token时触发 |
| on_llm_end | LLM调用完成时触发 |
| on_llm_error | LLM调用出错时触发 |
| on_chain_start | 链开始执行时触发 |
| on_chain_end | 链执行完成时触发 |
| on_tool_start | 工具开始执行时触发 |
| on_tool_end | 工具执行完成时触发 |
| on_text | 中间文本输出时触发 |

#### CallbackManager字段

| 字段 | 类型 | 说明 |
|------|------|------|
| handlers | list[BaseCallbackHandler] | 注册的回调处理器列表 |
| inheritable_handlers | list[BaseCallbackHandler] | 可继承的处理器 |
| parent_run_id | UUID \| None | 父运行ID |
| tags | list[str] | 标签列表 |
| metadata | dict | 元数据 |

## 核心API详解

### API-1: BaseCallbackHandler自定义

#### 基本信息

- **名称**: `BaseCallbackHandler`
- **类型**: 抽象基类
- **幂等性**: 取决于实现

#### 功能说明

自定义回调处理器的基类,子类可重写感兴趣的事件方法。

#### 入口函数与关键代码

```python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult

class MyCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器"""

    def on_llm_start(
        self,
        serialized: dict,
        prompts: list[str],
        **kwargs,
    ) -> None:
        """LLM开始调用

        Args:
            serialized: LLM序列化信息
            prompts: 提示词列表
        """
        print(f"LLM调用开始,提示词数量: {len(prompts)}")

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        """流式输出新token

        Args:
            token: 新生成的token
        """
        print(token, end="", flush=True)

    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        """LLM调用完成

        Args:
            response: LLM响应结果
        """
        print(f"\nLLM调用完成,总tokens: {response.llm_output.get('token_usage')}")

    def on_llm_error(self, error: Exception, **kwargs) -> None:
        """LLM调用出错

        Args:
            error: 错误异常
        """
        print(f"LLM调用错误: {error}")

    def on_chain_start(
        self,
        serialized: dict,
        inputs: dict,
        **kwargs,
    ) -> None:
        """链开始执行"""
        print(f"链执行开始: {serialized.get('name', 'unknown')}")

    def on_chain_end(self, outputs: dict, **kwargs) -> None:
        """链执行完成"""
        print(f"链执行完成,输出: {outputs}")

    def on_tool_start(
        self,
        serialized: dict,
        input_str: str,
        **kwargs,
    ) -> None:
        """工具开始执行"""
        print(f"工具调用: {serialized.get('name')}, 输入: {input_str}")

    def on_tool_end(self, output: str, **kwargs) -> None:
        """工具执行完成"""
        print(f"工具输出: {output}")

使用示例:

from langchain_openai import ChatOpenAI

# 创建处理器
handler = MyCallbackHandler()

# 使用回调
model = ChatOpenAI(callbacks=[handler])
response = model.invoke("讲个笑话")

# 输出:
# LLM调用开始,提示词数量: 1
# 为什么程序员...
# LLM调用完成,总tokens: {...}

API-2: RunnableConfig配置回调

基本信息

  • 名称: RunnableConfig中的callbacks参数
  • 幂等性: 取决于处理器实现

功能说明

通过RunnableConfig为单次调用配置回调。

请求结构体

callbacks: list[BaseCallbackHandler] | None

入口函数与关键代码

from langchain_core.runnables import RunnableConfig

# 方式1: 在invoke时传递
response = model.invoke(
    "你好",
    config=RunnableConfig(callbacks=[handler])
)

# 方式2: 使用with_config绑定
model_with_callbacks = model.with_config(callbacks=[handler])
response = model_with_callbacks.invoke("你好")

# 方式3: 链式配置
chain = prompt | model | output_parser
response = chain.invoke(
    {"input": "你好"},
    config={"callbacks": [handler]}
)

使用示例:

# 场景: 为特定调用添加日志
class LoggingHandler(BaseCallbackHandler):
    def __init__(self, log_file: str):
        self.log_file = log_file

    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        with open(self.log_file, "a") as f:
            f.write(f"{response.generations[0][0].text}\n")

# 仅为重要查询启用日志
important_query = "分析以下数据..."
response = model.invoke(
    important_query,
    config={"callbacks": [LoggingHandler("important_queries.log")]}
)

# 普通查询不记录
normal_query = "你好"
response = model.invoke(normal_query)  # 无回调

API-3: StdOutCallbackHandler

基本信息

  • 名称: StdOutCallbackHandler
  • 幂等性: 幂等

功能说明

内置的标准输出回调处理器,打印关键事件到控制台。

入口函数与关键代码

from langchain_core.callbacks import StdOutCallbackHandler

class StdOutCallbackHandler(BaseCallbackHandler):
    """标准输出回调处理器"""

    def on_llm_start(self, serialized: dict, prompts: list[str], **kwargs) -> None:
        """打印LLM开始信息"""
        print("LLM Start")

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        """打印新token"""
        print(token, end="", flush=True)

    def on_chain_start(self, serialized: dict, inputs: dict, **kwargs) -> None:
        """打印链开始信息"""
        class_name = serialized.get("name", serialized.get("id", ["<unknown>"])[-1])
        print(f"\n\n\033[1m> Entering new {class_name} chain...\033[0m")

    def on_chain_end(self, outputs: dict, **kwargs) -> None:
        """打印链完成信息"""
        print("\n\033[1m> Finished chain.\033[0m")

    def on_tool_start(self, serialized: dict, input_str: str, **kwargs) -> None:
        """打印工具调用"""
        print(f"Tool: {serialized['name']}")

使用示例:

from langchain_core.callbacks import StdOutCallbackHandler

# 调试链执行流程
handler = StdOutCallbackHandler()

chain = prompt | model | output_parser
response = chain.invoke(
    {"input": "解释量子计算"},
    config={"callbacks": [handler]}
)

# 输出:
# > Entering new RunnableSequence chain...
# LLM Start
# 量子计算是...
# > Finished chain.

API-4: 异步回调

基本信息

  • 名称: AsyncCallbackHandler
  • 幂等性: 取决于实现

功能说明

支持异步操作的回调处理器。

入口函数与关键代码

from langchain_core.callbacks import AsyncCallbackHandler

class AsyncLoggingHandler(AsyncCallbackHandler):
    """异步日志回调处理器"""

    def __init__(self, db_client):
        self.db_client = db_client

    async def on_llm_start(
        self,
        serialized: dict,
        prompts: list[str],
        **kwargs,
    ) -> None:
        """异步记录LLM调用"""
        await self.db_client.log_event({
            "type": "llm_start",
            "prompts": prompts,
            "timestamp": datetime.now()
        })

    async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        """异步记录响应"""
        await self.db_client.log_event({
            "type": "llm_end",
            "response": response.generations[0][0].text,
            "token_usage": response.llm_output.get("token_usage"),
            "timestamp": datetime.now()
        })

    async def on_chain_error(self, error: Exception, **kwargs) -> None:
        """异步记录错误"""
        await self.db_client.log_error({
            "error": str(error),
            "timestamp": datetime.now()
        })

使用示例:

# 异步使用
async def process_with_logging():
    handler = AsyncLoggingHandler(db_client)

    response = await model.ainvoke(
        "你好",
        config={"callbacks": [handler]}
    )

    return response

# 运行
result = await process_with_logging()

典型使用场景

场景1: 性能监控

import time
from langchain_core.callbacks import BaseCallbackHandler

class PerformanceMonitor(BaseCallbackHandler):
    """性能监控回调"""

    def __init__(self):
        self.start_times = {}
        self.metrics = []

    def on_llm_start(self, serialized: dict, prompts: list[str], run_id, **kwargs) -> None:
        self.start_times[run_id] = time.time()

    def on_llm_end(self, response: LLMResult, run_id, **kwargs) -> None:
        elapsed = time.time() - self.start_times[run_id]
        token_usage = response.llm_output.get("token_usage", {})

        self.metrics.append({
            "elapsed_seconds": elapsed,
            "total_tokens": token_usage.get("total_tokens", 0),
            "tokens_per_second": token_usage.get("total_tokens", 0) / elapsed if elapsed > 0 else 0
        })

    def get_summary(self) -> dict:
        """获取性能摘要"""
        if not self.metrics:
            return {}

        return {
            "total_calls": len(self.metrics),
            "avg_latency": sum(m["elapsed_seconds"] for m in self.metrics) / len(self.metrics),
            "total_tokens": sum(m["total_tokens"] for m in self.metrics),
            "avg_tokens_per_second": sum(m["tokens_per_second"] for m in self.metrics) / len(self.metrics)
        }

# 使用
monitor = PerformanceMonitor()

for query in queries:
    model.invoke(query, config={"callbacks": [monitor]})

print(monitor.get_summary())
# {'total_calls': 10, 'avg_latency': 1.23, 'total_tokens': 5000, 'avg_tokens_per_second': 120}

场景2: 成本追踪

class CostTracker(BaseCallbackHandler):
    """成本追踪回调"""

    PRICING = {
        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    }

    def __init__(self, model_name: str):
        self.model_name = model_name
        self.total_cost = 0.0

    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        token_usage = response.llm_output.get("token_usage", {})
        input_tokens = token_usage.get("prompt_tokens", 0)
        output_tokens = token_usage.get("completion_tokens", 0)

        pricing = self.PRICING.get(self.model_name, {"input": 0, "output": 0})
        cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"])

        self.total_cost += cost
        print(f"本次调用成本: ${cost:.6f}")

    def get_total_cost(self) -> float:
        return self.total_cost

# 使用
cost_tracker = CostTracker("gpt-4o-mini")
model = ChatOpenAI(model="gpt-4o-mini", callbacks=[cost_tracker])

# 处理多个请求
for query in queries:
    model.invoke(query)

print(f"总成本: ${cost_tracker.get_total_cost():.4f}")

场景3: Agent执行追踪

class AgentTracer(BaseCallbackHandler):
    """Agent执行追踪"""

    def __init__(self):
        self.steps = []
        self.current_step = {}

    def on_tool_start(self, serialized: dict, input_str: str, **kwargs) -> None:
        self.current_step = {
            "tool": serialized.get("name"),
            "input": input_str,
            "start_time": time.time()
        }

    def on_tool_end(self, output: str, **kwargs) -> None:
        self.current_step["output"] = output
        self.current_step["duration"] = time.time() - self.current_step["start_time"]
        self.steps.append(self.current_step)

        print(f"步骤 {len(self.steps)}: {self.current_step['tool']}")
        print(f"  输入: {self.current_step['input'][:50]}...")
        print(f"  输出: {self.current_step['output'][:50]}...")
        print(f"  耗时: {self.current_step['duration']:.2f}s")

    def on_tool_error(self, error: Exception, **kwargs) -> None:
        print(f"工具错误: {error}")

    def get_execution_summary(self) -> dict:
        return {
            "total_steps": len(self.steps),
            "total_time": sum(s["duration"] for s in self.steps),
            "steps": self.steps
        }

# 使用
tracer = AgentTracer()
agent_executor = AgentExecutor(agent=agent, tools=tools, callbacks=[tracer])

result = agent_executor.invoke({"input": "分析这个问题并给出答案"})
summary = tracer.get_execution_summary()

最佳实践

1. 选择性启用回调

# 推荐: 仅在需要时启用回调
if DEBUG_MODE:
    callbacks = [StdOutCallbackHandler()]
else:
    callbacks = []

model = ChatOpenAI(callbacks=callbacks)

# 或者环境变量控制
import os
if os.getenv("LANGCHAIN_VERBOSE") == "true":
    callbacks = [StdOutCallbackHandler()]

2. 回调处理器错误隔离

class SafeCallbackHandler(BaseCallbackHandler):
    """安全的回调处理器"""

    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        try:
            # 可能失败的操作
            self._process_response(response)
        except Exception as e:
            # 不要让回调错误影响主流程
            print(f"回调处理失败: {e}")
            # 可选: 记录到错误日志

3. 异步回调最佳实践

# 推荐: 使用异步回调处理耗时操作
class EfficientLoggingHandler(AsyncCallbackHandler):
    def __init__(self):
        self.buffer = []
        self.buffer_size = 100

    async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        # 批量写入
        self.buffer.append(response)

        if len(self.buffer) >= self.buffer_size:
            await self._flush()

    async def _flush(self):
        # 批量写入数据库
        await db.bulk_insert(self.buffer)
        self.buffer.clear()

文档更新说明

本文档已完整剖析LangChain Callbacks模块的架构、时序流程和核心代码实现。

文档结构

  1. 模块概览: 职责定位、事件分类、依赖关系
  2. 整体架构图: 应用层→核心层→处理器层四层架构
  3. 模块交互时序图:
    • 时序图1: Runnable链执行完整流程(45步骤)
    • 时序图2: LLM错误处理流程
    • 时序图3: Agent多步推理回调流程
  4. 核心调用链路与关键代码:
    • 链路1: RunnableConfig → CallbackManager创建
    • 链路2: on_chain_start → RunManager创建
    • 链路3: ParentRunManager → 子RunManager
    • 链路4: LLM流式生成回调
    • 链路5: 异常处理链路
  5. 核心数据结构: 类图与字段说明
  6. 核心API详解: BaseCallbackHandler/RunnableConfig/StdOutCallbackHandler/异步回调
  7. 典型使用场景: 性能监控/成本追踪/Agent追踪
  8. 最佳实践: 选择性启用/错误隔离/异步优化

关键洞察

  1. 事件分发机制: handle_event()函数是整个回调系统的核心,实现了同步异步混合、异常隔离、事件循环检测
  2. 父子运行树: 通过run_id和parent_run_id建立运行树,支持嵌套追踪(在LangSmith中可视化)
  3. 配置继承策略: inheritable_handlers机制确保回调沿着调用链传递
  4. 性能优化: 线程池复用、异步批处理、handler去重、事件循环检测避免死锁
  5. 错误隔离原则: 回调异常不影响主流程,除非handler.raise_error=True

源码文件映射

模块 文件路径 核心类/函数
基类 langchain_core/callbacks/base.py BaseCallbackHandler, BaseCallbackManager
管理器 langchain_core/callbacks/manager.py CallbackManager, handle_event, RunManager
配置 langchain_core/runnables/config.py get_callback_manager_for_config
内置处理器 langchain_core/callbacks/stdout.py StdOutCallbackHandler
流式处理器 langchain_core/callbacks/streaming_stdout.py StreamingStdOutCallbackHandler
LangSmith langchain_core/tracers/langchain.py LangChainTracer