LangChain-10-Callbacks模块
模块概览
职责与定位
Callbacks模块提供了LangChain的可观测性和监控基础设施。通过回调机制,开发者可以在LLM调用、工具执行、链执行等关键节点注入自定义逻辑,实现日志记录、性能监控、调试追踪等功能。
核心职责:
- 事件分发: 定义标准化回调事件接口,覆盖LLM/Chain/Tool/Retriever生命周期
- 回调管理: 提供CallbackManager统一管理多个回调处理器
- 运行追踪: 通过RunManager为每次调用分配唯一run_id,支持父子运行关联
- 异步支持: 提供完整异步回调机制,适配async/await场景
- LangSmith集成: 内置LangChainTracer,自动上报到LangSmith平台
- 流式处理: 支持on_llm_new_token实时流式输出
- 错误隔离: 回调错误不影响主流程,可选raise_error控制
输入输出
回调事件分类:
| 事件类型 | 触发时机 | 主要参数 | 返回值 |
|---|---|---|---|
| on_llm_start | LLM调用前 | serialized, prompts, run_id | None |
| on_llm_new_token | 流式生成token | token, chunk, run_id | None |
| on_llm_end | LLM调用后 | response, run_id | None |
| on_llm_error | LLM异常 | error, run_id | None |
| on_chat_model_start | ChatModel调用前 | serialized, messages, run_id | None |
| on_chain_start | 链执行前 | serialized, inputs, run_id | None |
| on_chain_end | 链执行后 | outputs, run_id | None |
| on_chain_error | 链执行异常 | error, run_id | None |
| on_tool_start | 工具调用前 | serialized, input_str, run_id | None |
| on_tool_end | 工具调用后 | output, run_id | None |
| on_tool_error | 工具异常 | error, run_id | None |
| on_retriever_start | 检索开始 | serialized, query, run_id | None |
| on_retriever_end | 检索完成 | documents, run_id | None |
| on_agent_action | Agent动作决策 | action, run_id | None |
| on_agent_finish | Agent完成 | finish, run_id | None |
| on_text | 中间文本输出 | text, run_id | None |
| on_retry | 重试事件 | retry_state, run_id | None |
| on_custom_event | 自定义事件 | name, data, run_id | None |
上下游依赖
依赖模块:
typing: 类型定义abc: 抽象基类uuid: 运行唯一标识符asyncio: 异步事件处理concurrent.futures.ThreadPoolExecutor: 同步回调异步化langsmith.run_helpers: LangSmith追踪上下文langchain_core.tracers: 内置追踪器(LangChainTracer/ConsoleCallbackHandler)
被依赖模块:
- langchain_core.runnables: 所有Runnable通过_call_with_config集成回调
- langchain_core.language_models: BaseChatModel.generate触发on_chat_model_start
- langchain_core.tools: BaseTool._run通过RunManager触发工具回调
- langchain_classic.agents: AgentExecutor通过回调追踪多步推理
- langchain_core.retrievers: BaseRetriever触发on_retriever_start/end
模块整体架构图
flowchart TB
subgraph Application["应用层"]
USER[User Code]
RUNNABLE[Runnable<br/>prompt\|model\|parser]
LLM[BaseChatModel]
TOOL[BaseTool]
RETRIEVER[BaseRetriever]
end
subgraph CallbackCore["Callbacks核心层"]
subgraph Base["基类层"]
BASE_CALLBACK[BaseCallbackHandler<br/>回调基类<br/>----------<br/>ignore_llm/chain/agent<br/>raise_error/run_inline]
BASE_MANAGER[BaseCallbackManager<br/>管理器基类<br/>----------<br/>handlers/tags/metadata<br/>add_handler/merge]
end
subgraph Managers["管理器层"]
CALLBACK_MANAGER[CallbackManager<br/>同步管理器<br/>----------<br/>configure/on_*_start]
ASYNC_MANAGER[AsyncCallbackManager<br/>异步管理器<br/>----------<br/>await on_*_start]
end
subgraph RunManagers["运行管理器层"]
PARENT_RUN[ParentRunManager<br/>父运行管理器<br/>----------<br/>get_child]
LLM_RUN[CallbackManagerForLLMRun<br/>----------<br/>on_llm_new_token/end/error]
CHAIN_RUN[CallbackManagerForChainRun<br/>----------<br/>on_chain_end/error<br/>on_text]
TOOL_RUN[CallbackManagerForToolRun<br/>----------<br/>on_tool_end/error]
RETRIEVER_RUN[CallbackManagerForRetrieverRun<br/>----------<br/>on_retriever_end/error]
end
end
subgraph Handlers["处理器实现层"]
STDOUT[StdOutCallbackHandler<br/>控制台输出]
FILE[FileCallbackHandler<br/>文件日志]
STREAMING[StreamingStdOutCallbackHandler<br/>流式标准输出]
LANGSMITH[LangChainTracer<br/>LangSmith追踪]
CONSOLE[ConsoleCallbackHandler<br/>调试输出]
CUSTOM[CustomHandler<br/>用户自定义]
end
subgraph Config["配置传递"]
RCONFIG[RunnableConfig<br/>----------<br/>callbacks: list[Handler]<br/>tags/metadata/run_id]
end
%% 应用层调用
USER -->|invoke| RUNNABLE
USER -->|generate| LLM
USER -->|run| TOOL
USER -->|get_relevant_documents| RETRIEVER
USER -.->|config| RCONFIG
%% Runnable集成
RUNNABLE -->|_call_with_config| CALLBACK_MANAGER
LLM -->|generate| CALLBACK_MANAGER
TOOL -->|_run| CALLBACK_MANAGER
RETRIEVER -->|_get_relevant_documents| CALLBACK_MANAGER
%% 配置流转
RCONFIG -.->|get_callback_manager_for_config| CALLBACK_MANAGER
%% 管理器继承
BASE_MANAGER <|-- CALLBACK_MANAGER
BASE_MANAGER <|-- ASYNC_MANAGER
%% 运行管理器创建
CALLBACK_MANAGER -->|on_llm_start| LLM_RUN
CALLBACK_MANAGER -->|on_chain_start| PARENT_RUN
CALLBACK_MANAGER -->|on_tool_start| TOOL_RUN
CALLBACK_MANAGER -->|on_retriever_start| RETRIEVER_RUN
PARENT_RUN -->|get_child| CHAIN_RUN
%% 事件分发
LLM_RUN -.->|handle_event| BASE_CALLBACK
CHAIN_RUN -.->|handle_event| BASE_CALLBACK
TOOL_RUN -.->|handle_event| BASE_CALLBACK
RETRIEVER_RUN -.->|handle_event| BASE_CALLBACK
%% 处理器实现
BASE_CALLBACK <|-- STDOUT
BASE_CALLBACK <|-- FILE
BASE_CALLBACK <|-- STREAMING
BASE_CALLBACK <|-- LANGSMITH
BASE_CALLBACK <|-- CONSOLE
BASE_CALLBACK <|-- CUSTOM
%% 处理器注册
CALLBACK_MANAGER o--o STDOUT
CALLBACK_MANAGER o--o FILE
CALLBACK_MANAGER o--o STREAMING
CALLBACK_MANAGER o--o LANGSMITH
CALLBACK_MANAGER o--o CONSOLE
CALLBACK_MANAGER o--o CUSTOM
style BASE_CALLBACK fill:#e1f5ff
style CALLBACK_MANAGER fill:#fff4e1
style LANGSMITH fill:#e8f5e9
style RCONFIG fill:#ffe1f5
style USER fill:#f0f0f0
架构层次说明
1. 应用层 (Application Layer)
应用层是用户代码与LangChain组件的交互点:
- User Code: 用户业务代码,通过invoke/generate/run等方法调用LangChain组件
- Runnable: LangChain核心抽象,所有可执行组件的基类(链/模型/工具等)
- BaseChatModel: 聊天模型基类,封装LLM调用
- BaseTool: 工具基类,封装外部API/函数调用
- BaseRetriever: 检索器基类,封装向量数据库查询
关键点:
- 所有组件都接受
RunnableConfig参数传递回调配置 - 用户可通过
config={"callbacks": [handler]}注入自定义回调 - 组件内部自动触发对应生命周期事件
2. Callbacks核心层 (Callback Core Layer)
核心层提供回调机制的完整实现:
基类层 (Base):
- BaseCallbackHandler: 定义20+回调方法(on_llm_start/on_chain_end等)
ignore_*: 控制忽略特定事件类型raise_error: 控制回调异常是否上抛run_inline: 控制异步回调执行策略
- BaseCallbackManager: 管理多个handler,支持继承/合并
handlers: 当前运行的处理器列表inheritable_handlers: 可传递给子运行的处理器tags/metadata: 支持标签和元数据传递
管理器层 (Managers):
- CallbackManager: 同步回调管理器
configure(): 从配置创建管理器(合并全局/局部回调)on_*_start(): 触发开始事件并返回RunManager
- AsyncCallbackManager: 异步回调管理器
- 所有方法返回
Awaitable,支持async/await - 使用
asyncio.gather并发执行多个handler
- 所有方法返回
运行管理器层 (Run Managers):
- ParentRunManager: 父运行管理器,可创建子管理器
get_child(): 创建子CallbackManager(继承handlers/tags)
- CallbackManagerForLLMRun: LLM专用运行管理器
on_llm_new_token(): 流式token回调on_llm_end(): LLM完成回调
- CallbackManagerForChainRun: 链专用运行管理器
on_chain_end(): 链完成回调on_text(): 中间文本输出
- CallbackManagerForToolRun: 工具专用运行管理器
- CallbackManagerForRetrieverRun: 检索器专用运行管理器
关键机制:
- 运行ID追踪: 每次运行分配UUID,支持父子运行关联
- 事件分发:
handle_event()函数遍历handlers并调用对应方法 - 异常隔离: try-catch包裹每个handler调用,错误不影响主流程
- 同步异步桥接: 同步环境中通过ThreadPoolExecutor执行异步handler
3. 处理器实现层 (Handler Implementation Layer)
处理器层提供具体的回调实现:
内置处理器:
- StdOutCallbackHandler: 打印关键事件到标准输出(调试用)
- StreamingStdOutCallbackHandler: 实时打印LLM流式token
- FileCallbackHandler: 将事件写入日志文件
- LangChainTracer: 上报到LangSmith平台(生产追踪)
- ConsoleCallbackHandler: 格式化输出到控制台(开发调试)
自定义处理器:
- 继承
BaseCallbackHandler - 重写感兴趣的事件方法
- 通过
callbacks=[handler]传入
4. 配置传递 (Configuration Layer)
RunnableConfig是回调配置的载体:
{
"callbacks": [handler1, handler2], # 处理器列表
"tags": ["production", "v2"], # 标签(用于过滤/分组)
"metadata": {"user_id": "123"}, # 元数据(附加上下文)
"run_id": uuid4(), # 可选: 指定运行ID
"run_name": "custom_chain", # 可选: 运行名称
}
配置合并策略:
- 全局配置:
set_llm_cache()/环境变量LANGCHAIN_TRACING_V2 - 组件配置:
ChatOpenAI(callbacks=[...]) - 调用配置:
model.invoke(..., config={"callbacks": [...]}) - 优先级: 调用配置 > 组件配置 > 全局配置
架构流程总结
用户调用 → RunnableConfig → CallbackManager.configure()
↓
CallbackManager.on_*_start() → 创建RunManager(分配run_id)
↓
业务执行 → RunManager.on_*_event() → handle_event()
↓
遍历handlers → 调用handler.on_*() → 捕获异常
↓
RunManager.on_*_end/error() → 完成追踪
回调流程详解
1. 初始化阶段
# 用户代码
handler = MyCallbackHandler()
config = {"callbacks": [handler], "tags": ["production"]}
# 内部流程
callback_manager = CallbackManager.configure(
callbacks=config["callbacks"],
inheritable_tags=config["tags"]
)
2. 执行阶段
# Runnable._call_with_config()
run_manager = callback_manager.on_chain_start(
serialized={"name": "RunnableSequence"},
inputs=input_data,
run_id=uuid.uuid4()
)
on_chain_start内部:
- 为每个handler调用
handler.on_chain_start() - 同步handler直接调用,异步handler提交到事件循环
- 捕获异常并记录warning,不中断主流程
- 返回
CallbackManagerForChainRun(包含run_id)
3. 事件触发阶段
# 业务代码执行中
run_manager.on_text("Processing step 1...")
run_manager.on_llm_new_token("Hello")
# 子运行
child_manager = run_manager.get_child()
child_run = child_manager.on_llm_start(...)
4. 完成阶段
# 正常完成
run_manager.on_chain_end(outputs)
# 异常完成
except Exception as e:
run_manager.on_chain_error(e)
raise
运行管理器职责边界
| 管理器 | 创建时机 | 生命周期 | 专有方法 | 适用场景 |
|---|---|---|---|---|
| CallbackManager | 配置阶段 | 整个调用 | on_*_start | 创建RunManager |
| ParentRunManager | on_chain_start | 单个链运行 | get_child | 创建子管理器 |
| CallbackManagerForLLMRun | on_llm_start | 单次LLM调用 | on_llm_new_token | 流式输出 |
| CallbackManagerForChainRun | on_chain_start | 单个链运行 | on_text | 中间输出 |
| CallbackManagerForToolRun | on_tool_start | 单次工具调用 | - | 工具执行 |
| CallbackManagerForRetrieverRun | on_retriever_start | 单次检索 | - | 向量检索 |
模块交互时序图
时序图1: Runnable链执行完整流程
sequenceDiagram
autonumber
participant U as User
participant R as Runnable<br/>(Chain)
participant CM as CallbackManager
participant PRM as ParentRunManager
participant H1 as Handler1<br/>(StdOut)
participant H2 as Handler2<br/>(LangSmith)
participant Step1 as Step1<br/>(Prompt)
participant Step2 as Step2<br/>(ChatModel)
participant LLMRM as LLMRunManager
%% 配置阶段
U->>R: invoke(input, config={callbacks:[H1, H2]})
activate R
R->>R: ensure_config(config)
R->>CM: CallbackManager.configure(callbacks, tags, metadata)
activate CM
CM->>CM: _get_trace_callbacks()
Note over CM: 检查LANGCHAIN_TRACING_V2<br/>自动添加LangChainTracer
CM->>CM: add_handler(H1)<br/>add_handler(H2)
CM-->>R: callback_manager
deactivate CM
%% 链开始
R->>CM: on_chain_start(serialized, input, run_id)
activate CM
CM->>CM: handle_event(handlers, "on_chain_start", ...)
CM->>H1: on_chain_start(serialized, input, run_id=<uuid1>)
H1-->>CM: None
CM->>H2: on_chain_start(serialized, input, run_id=<uuid1>)
H2-->>CM: None (上报到LangSmith)
CM->>PRM: new ParentRunManager(run_id=<uuid1>, handlers=[H1,H2])
CM-->>R: parent_run_manager
deactivate CM
%% 子步骤1: Prompt
R->>PRM: get_child("seq:step:1")
activate PRM
PRM->>CM: new CallbackManager(parent_run_id=<uuid1>)
PRM-->>R: child_manager
deactivate PRM
R->>Step1: invoke(input, config={callbacks: child_manager})
activate Step1
Step1->>CM: on_chain_start({"name": "PromptTemplate"}, input, run_id=<uuid2>)
CM->>H1: on_chain_start(..., run_id=<uuid2>, parent_run_id=<uuid1>)
CM->>H2: on_chain_start(..., run_id=<uuid2>, parent_run_id=<uuid1>)
Step1->>Step1: format_prompt(input)
Step1->>CM: on_chain_end({"output": formatted_prompt}, run_id=<uuid2>)
CM->>H1: on_chain_end(..., run_id=<uuid2>)
CM->>H2: on_chain_end(..., run_id=<uuid2>)
Step1-->>R: formatted_prompt
deactivate Step1
%% 子步骤2: ChatModel
R->>PRM: get_child("seq:step:2")
PRM->>CM: new CallbackManager(parent_run_id=<uuid1>)
PRM-->>R: child_manager
R->>Step2: invoke(formatted_prompt, config={callbacks: child_manager})
activate Step2
Step2->>CM: on_chat_model_start(serialized, messages, run_id=<uuid3>)
activate CM
CM->>H1: on_chat_model_start(..., run_id=<uuid3>, parent_run_id=<uuid1>)
CM->>H2: on_chat_model_start(..., run_id=<uuid3>, parent_run_id=<uuid1>)
CM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid3>, handlers=[H1,H2])
CM-->>Step2: llm_run_manager
deactivate CM
%% LLM流式生成
Step2->>Step2: _generate(messages, run_manager)
Step2->>LLMRM: on_llm_new_token("Hello")
LLMRM->>H1: on_llm_new_token("Hello", run_id=<uuid3>)
H1-->>LLMRM: (打印到stdout)
LLMRM->>H2: on_llm_new_token("Hello", run_id=<uuid3>)
H2-->>LLMRM: (上报到LangSmith)
Step2->>LLMRM: on_llm_new_token(" World")
LLMRM->>H1: on_llm_new_token(" World", run_id=<uuid3>)
LLMRM->>H2: on_llm_new_token(" World", run_id=<uuid3>)
Step2->>LLMRM: on_llm_end(response, run_id=<uuid3>)
LLMRM->>H1: on_llm_end(response, run_id=<uuid3>)
LLMRM->>H2: on_llm_end(response, run_id=<uuid3>)
Step2-->>R: AIMessage("Hello World")
deactivate Step2
%% 链完成
R->>PRM: on_chain_end({"output": result}, run_id=<uuid1>)
activate PRM
PRM->>H1: on_chain_end({"output": result}, run_id=<uuid1>)
PRM->>H2: on_chain_end({"output": result}, run_id=<uuid1>)
deactivate PRM
R-->>U: result
deactivate R
时序图1详解
阶段1: 配置与初始化 (步骤1-6)
步骤1-2: 用户调用Runnable.invoke(),传入回调配置
- 配置传递: 通过
config参数传递callbacks列表 - 配置标准化:
ensure_config()确保配置字典完整
步骤3-6: CallbackManager配置
- configure静态方法: 合并全局/组件/调用配置
CallbackManager.configure( callbacks=config["callbacks"], # 调用级 inheritable_callbacks=self.callbacks, # 组件级 verbose=self.verbose, # 调试标志 inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata") ) - 自动追踪注入: 检查
LANGCHAIN_TRACING_V2环境变量,自动添加LangChainTracer - 处理器去重: 通过
add_handler()避免重复注册
关键点:
- 配置优先级: 调用配置 > 组件配置 > 全局配置
- 环境变量:
LANGCHAIN_TRACING_V2=true启用自动追踪 - 处理器继承:
inheritable_handlers会传递给子运行
阶段2: 链运行启动 (步骤7-12)
步骤7-12: on_chain_start事件分发
- 事件分发:
handle_event()遍历所有handlers - 同步调用: 同步handler直接调用
- 异步调用: 异步handler提交到事件循环
- 异常隔离: try-catch包裹每个handler,错误不影响主流程
for handler in handlers: try: handler.on_chain_start(serialized, inputs, run_id=run_id, ...) except Exception as e: logger.warning(f"Error in {handler.__class__.__name__}: {e}") if handler.raise_error: raise - RunManager创建: 返回
ParentRunManager(包含run_id、handlers、tags)
关键点:
- run_id分配: 每次运行分配UUID,支持追踪
- 父子关联:
parent_run_id建立运行树 - 标签传递: tags和metadata传递给所有子运行
阶段3: 子步骤执行 - Prompt格式化 (步骤13-23)
步骤13-15: 创建子运行管理器
- get_child(): ParentRunManager创建子CallbackManager
- 继承机制: 子管理器继承父handlers/tags/metadata
- 父子关联: 子管理器的
parent_run_id指向父run_iddef get_child(self, tag: str | None = None) -> CallbackManager: manager = CallbackManager(handlers=[], parent_run_id=self.run_id) manager.set_handlers(self.inheritable_handlers) # 继承handlers manager.add_tags(self.inheritable_tags) # 继承tags if tag is not None: manager.add_tags([tag], inherit=False) # 添加步骤标签 return manager
步骤16-23: Prompt步骤执行
- on_chain_start: 触发Prompt开始事件(run_id=uuid2, parent_run_id=uuid1)
- 业务执行: PromptTemplate.format_prompt()
- on_chain_end: 触发Prompt完成事件
- 结果传递: 格式化后的prompt传递给下一步
关键点:
- 步骤标签:
seq:step:1标识链中的第一步 - 父子树: LangSmith中显示为嵌套运行树
- 中间输出: on_chain_end的outputs包含步骤输出
阶段4: 子步骤执行 - ChatModel生成 (步骤24-40)
步骤24-26: 创建第二个子管理器
- 步骤标签:
seq:step:2标识第二步 - 独立run_id: 每个步骤有独立run_id(uuid3)
步骤27-32: on_chat_model_start事件
- Chat专用事件: ChatModel触发
on_chat_model_start(不是on_llm_start) - 消息格式化: messages序列化为可追踪格式
- LLMRunManager: 返回专用运行管理器(支持on_llm_new_token)
步骤33-40: LLM流式生成
- on_llm_new_token: 每生成一个token触发一次
- 实时回调: StreamingStdOutCallbackHandler实时打印
- token累积: LangChainTracer累积tokens记录到trace
- on_llm_end: 完成时触发,包含完整response和token usage
# LLM内部 for chunk in stream_generator(): run_manager.on_llm_new_token(chunk.content, chunk=chunk) run_manager.on_llm_end(LLMResult( generations=[[generation]], llm_output={"token_usage": {...}} ))
关键点:
- 流式支持: on_llm_new_token仅在流式模式触发
- chunk结构: 包含content、tool_calls、usage_metadata
- token计数: on_llm_end的llm_output包含token usage
阶段5: 链运行完成 (步骤41-45)
步骤41-44: on_chain_end事件
- 输出记录: 链的最终输出传递给所有handlers
- 运行完成: 标记运行结束,LangSmith关闭trace
- 资源清理: 释放RunManager相关资源
步骤45: 返回结果给用户
- 透传输出: 回调不修改输出,仅观察
关键点:
- 嵌套完成: 先完成子运行,再完成父运行
- 异常处理: 如果步骤失败,触发on_chain_error而非on_chain_end
时序图2: LLM调用带错误处理流程
sequenceDiagram
autonumber
participant U as User
participant CM as ChatModel
participant CBM as CallbackManager
participant LLMRM as LLMRunManager
participant H as Handler<br/>(Custom)
participant API as OpenAI API
%% 正常调用
U->>CM: generate(messages, callbacks=[H])
CM->>CBM: CallbackManager.configure(...)
CBM-->>CM: callback_manager
CM->>CBM: on_chat_model_start(serialized, messages)
CBM->>H: on_chat_model_start(..., run_id=<uuid>)
H-->>CBM: None
CBM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid>)
CBM-->>CM: llm_run_manager
%% API调用失败
CM->>API: POST /v1/chat/completions
API-->>CM: 429 Rate Limit Error
CM->>CM: raise RateLimitError(...)
Note over CM: 捕获异常
CM->>LLMRM: on_llm_error(RateLimitError, response=None)
activate LLMRM
LLMRM->>H: on_llm_error(error, run_id=<uuid>)
activate H
H->>H: log_error(error)
H-->>LLMRM: None
deactivate H
deactivate LLMRM
CM-->>U: raise RateLimitError
Note over U: 用户代码捕获异常
时序图2详解
错误处理流程关键点
步骤1-8: 与正常流程相同,配置callbacks并触发on_chat_model_start
步骤9-10: API调用失败
- 异常类型: RateLimitError/Timeout/APIError等
- 异常捕获: ChatModel内部try-catch捕获
步骤11-15: on_llm_error事件
- 错误上报: 调用
run_manager.on_llm_error(error) - 响应元数据: 可选传递
response参数(部分生成的内容)except BaseException as e: if run_manager: run_manager.on_llm_error( e, response=LLMResult(generations=[[generation]]) # 可选 ) raise - Handler处理: 自定义handler可记录错误、发送告警等
- 异常上抛: 错误继续上抛给用户代码
关键点:
- 不吞异常: 回调记录错误但不阻止异常传播
- 完整上下文: 传递error、run_id、parent_run_id
- LangSmith: LangChainTracer自动标记运行为失败状态
时序图3: Agent多步推理回调流程
sequenceDiagram
autonumber
participant U as User
participant AE as AgentExecutor
participant CM as CallbackManager
participant PRM as ParentRunManager
participant Agent as Agent<br/>(ReAct)
participant LLMRM as LLMRunManager
participant Tool as Tool<br/>(Search)
participant TRM as ToolRunManager
participant H as Handler<br/>(Tracer)
U->>AE: invoke({"input": "北京天气?"}, callbacks=[H])
AE->>CM: CallbackManager.configure(callbacks=[H])
AE->>CM: on_chain_start({"name": "AgentExecutor"}, input)
CM->>H: on_chain_start(..., run_id=<uuid1>)
CM->>PRM: new ParentRunManager(run_id=<uuid1>)
CM-->>AE: parent_run_manager
%% 第一轮: Agent决策
AE->>PRM: get_child("agent:step:1")
PRM-->>AE: child_manager
AE->>Agent: plan(input, child_manager)
Agent->>CM: on_chain_start({"name": "ReActAgent"}, input)
CM->>H: on_chain_start(..., run_id=<uuid2>, parent=<uuid1>)
Agent->>Agent: _call_llm(prompt, run_manager)
Agent->>CM: on_chat_model_start(...)
CM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid3>)
CM-->>Agent: llm_run_manager
Agent->>Agent: LLM生成: "需要搜索天气"
Agent->>LLMRM: on_llm_end(response)
LLMRM->>H: on_llm_end(..., run_id=<uuid3>)
Agent->>Agent: parse_output(response)
Note over Agent: 解析出AgentAction<br/>tool="search"<br/>tool_input="北京天气"
Agent->>CM: on_agent_action(action, run_id=<uuid2>)
CM->>H: on_agent_action(action, run_id=<uuid2>)
Note over H: 记录Agent决策
Agent->>CM: on_chain_end({"action": action}, run_id=<uuid2>)
CM->>H: on_chain_end(..., run_id=<uuid2>)
Agent-->>AE: AgentAction(tool="search", tool_input="北京天气")
%% 第二轮: Tool执行
AE->>PRM: get_child("tool:search")
PRM-->>AE: child_manager
AE->>Tool: run("北京天气", child_manager)
Tool->>CM: on_tool_start({"name": "search"}, "北京天气")
CM->>H: on_tool_start(..., run_id=<uuid4>, parent=<uuid1>)
CM->>TRM: new CallbackManagerForToolRun(run_id=<uuid4>)
CM-->>Tool: tool_run_manager
Tool->>Tool: _run("北京天气")
Note over Tool: 调用搜索API
Tool->>TRM: on_tool_end("晴天 25°C", run_id=<uuid4>)
TRM->>H: on_tool_end("晴天 25°C", run_id=<uuid4>)
Tool-->>AE: "晴天 25°C"
%% 第三轮: Agent最终回答
AE->>PRM: get_child("agent:step:2")
PRM-->>AE: child_manager
AE->>Agent: plan(observation, child_manager)
Agent->>CM: on_chain_start({"name": "ReActAgent"}, input)
Agent->>Agent: _call_llm(prompt_with_observation, run_manager)
Agent->>CM: on_chat_model_start(...)
CM->>LLMRM: new CallbackManagerForLLMRun(run_id=<uuid5>)
Agent->>Agent: LLM生成: "北京天气晴天25度"
Agent->>LLMRM: on_llm_end(response)
Agent->>Agent: parse_output(response)
Note over Agent: 解析出AgentFinish
Agent->>CM: on_agent_finish(finish, run_id=<uuid5>)
CM->>H: on_agent_finish(finish, run_id=<uuid5>)
Note over H: 记录Agent完成
Agent->>CM: on_chain_end({"output": "北京天气晴天25度"})
Agent-->>AE: AgentFinish(output="北京天气晴天25度")
%% AgentExecutor完成
AE->>PRM: on_chain_end({"output": "北京天气晴天25度"}, run_id=<uuid1>)
PRM->>H: on_chain_end(..., run_id=<uuid1>)
AE-->>U: {"output": "北京天气晴天25度"}
时序图3详解
Agent回调的特殊性
多步推理追踪:
- on_agent_action: 记录Agent决策(选择哪个工具、参数是什么)
- on_agent_finish: 记录Agent完成(最终答案)
- 嵌套运行: Agent运行 > LLM调用 + Tool调用
运行树结构:
<uuid1> AgentExecutor
├─ <uuid2> ReActAgent (step 1)
│ └─ <uuid3> ChatOpenAI (生成action)
├─ <uuid4> SearchTool (执行)
└─ <uuid5> ReActAgent (step 2)
└─ <uuid6> ChatOpenAI (生成答案)
关键回调时机:
- on_chain_start (AgentExecutor): Agent开始
- on_chain_start (Agent): 单轮推理开始
- on_chat_model_start: LLM生成决策
- on_agent_action: 解析出工具调用
- on_tool_start: 工具执行开始
- on_tool_end: 工具返回结果
- on_agent_finish: Agent决定完成
- on_chain_end (AgentExecutor): Agent结束
LangSmith中的展示:
- Agent轨迹: 每一轮思考和行动
- Token统计: 累计所有LLM调用的tokens
- 工具调用: 每个工具的输入输出
- 耗时分析: LLM耗时 vs 工具耗时
核心调用链路与关键代码
链路1: RunnableConfig → CallbackManager创建
调用路径
User Code
↓ invoke(input, config={"callbacks": [handler], "tags": [...]})
Runnable._call_with_config()
↓ ensure_config(config)
runnables/config.py::get_callback_manager_for_config(config)
↓
CallbackManager.configure(
callbacks,
inheritable_callbacks,
verbose,
tags,
metadata
)
↓
BaseCallbackManager.__init__(handlers, inheritable_handlers, tags, metadata)
↓ 返回
CallbackManager实例
关键代码: get_callback_manager_for_config
文件: langchain_core/runnables/config.py
def get_callback_manager_for_config(
config: RunnableConfig, _stacklevel: int = 2
) -> CallbackManager:
"""从配置创建CallbackManager
Args:
config: Runnable配置字典
_stacklevel: 调用栈层级(用于追踪)
Returns:
配置好的CallbackManager实例
"""
# 1. 获取配置中的callbacks(可能是list或CallbackManager)
callbacks_on_config = config.get("callbacks")
# 2. 调用CallbackManager.configure合并配置
return CallbackManager.configure(
callbacks=callbacks_on_config,
inheritable_callbacks=None,
local_callbacks=None,
verbose=config.get("verbose", False),
tags=config.get("tags"),
inheritable_tags=None,
metadata=config.get("metadata"),
inheritable_metadata=None,
)
关键代码: CallbackManager.configure
文件: langchain_core/callbacks/manager.py
class CallbackManager(BaseCallbackManager):
@classmethod
def configure(
cls,
callbacks: Callbacks = None,
inheritable_callbacks: Callbacks = None,
local_callbacks: Callbacks = None,
verbose: bool = False,
tags: list[str] | None = None,
inheritable_tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
inheritable_metadata: dict[str, Any] | None = None,
) -> CallbackManager:
"""配置CallbackManager(合并多级配置)
优先级: local > callbacks > inheritable
"""
# 1. 检查全局追踪配置
trace_callbacks = _get_trace_callbacks(
project_name=None,
example_id=None,
callback_manager=None
)
# 2. 合并callbacks列表
callback_list: list[BaseCallbackHandler] = []
# 添加inheritable_callbacks
if inheritable_callbacks:
if isinstance(inheritable_callbacks, list):
callback_list.extend(inheritable_callbacks)
else:
callback_list.extend(inheritable_callbacks.handlers)
# 添加callbacks
if callbacks:
if isinstance(callbacks, list):
callback_list.extend(callbacks)
elif isinstance(callbacks, BaseCallbackManager):
callback_list.extend(callbacks.handlers)
# 添加local_callbacks(优先级最高)
if local_callbacks:
callback_list.extend(local_callbacks)
# 添加trace_callbacks(LangSmith追踪)
if trace_callbacks:
callback_list.extend(trace_callbacks)
# 3. 创建CallbackManager实例
manager = cls(
handlers=callback_list,
inheritable_handlers=callback_list.copy(),
tags=list(set(tags or [])),
inheritable_tags=list(set(inheritable_tags or [])),
metadata=metadata or {},
inheritable_metadata=inheritable_metadata or {}
)
return manager
关键机制:
- 三级配置合并:
inheritable→callbacks→local - 自动追踪注入: 通过
_get_trace_callbacks()检查环境变量 - 去重处理:
add_handler()内部检查handler是否已存在 - 继承策略:
inheritable_handlers会传递给子运行
链路2: on_chain_start → RunManager创建
调用路径
Runnable._call_with_config()
↓ callback_manager.on_chain_start(serialized, inputs, run_id)
CallbackManager.on_chain_start()
↓ handle_event(handlers, "on_chain_start", ...)
manager.py::handle_event()
↓ 遍历handlers
↓ handler.on_chain_start(serialized, inputs, run_id, parent_run_id, ...)
BaseCallbackHandler.on_chain_start()
↓ (用户实现的逻辑)
↓ 返回
ParentRunManager(run_id, handlers, ...)
关键代码: CallbackManager.on_chain_start
文件: langchain_core/callbacks/manager.py
class CallbackManager(BaseCallbackManager):
def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
run_id: UUID | None = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_type: str | None = None,
name: str | None = None,
**kwargs: Any,
) -> CallbackManagerForChainRun:
"""链开始事件(创建RunManager)
Args:
serialized: 组件序列化信息(name/id等)
inputs: 输入数据
run_id: 可选的运行ID(不传则自动生成)
tags: 运行标签
metadata: 运行元数据
run_type: 运行类型("chain"/"llm"/"tool"等)
name: 运行名称
Returns:
CallbackManagerForChainRun实例
"""
# 1. 生成run_id
if run_id is None:
run_id = uuid.uuid4()
# 2. 合并tags和metadata
_tags = (self.tags or []) + (tags or [])
_metadata = {**(self.metadata or {}), **(metadata or {})}
_inheritable_tags = (self.inheritable_tags or []) + (tags or [])
_inheritable_metadata = {
**(self.inheritable_metadata or {}),
**(metadata or {})
}
# 3. 分发on_chain_start事件到所有handlers
handle_event(
self.handlers,
"on_chain_start",
"ignore_chain", # 忽略条件属性名
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=_tags,
metadata=_metadata,
run_type=run_type,
name=name,
**kwargs,
)
# 4. 返回ParentRunManager
return CallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=_tags,
inheritable_tags=_inheritable_tags,
metadata=_metadata,
inheritable_metadata=_inheritable_metadata,
)
关键代码: handle_event (事件分发核心)
文件: langchain_core/callbacks/manager.py
def handle_event(
handlers: list[BaseCallbackHandler],
event_name: str,
ignore_condition_name: str | None,
*args: Any,
**kwargs: Any,
) -> None:
"""通用事件分发函数
Args:
handlers: 回调处理器列表
event_name: 事件方法名(如"on_llm_start")
ignore_condition_name: 忽略条件属性名(如"ignore_llm")
*args: 传递给事件方法的位置参数
**kwargs: 传递给事件方法的关键字参数
"""
coros: list[Coroutine[Any, Any, Any]] = []
try:
for handler in handlers:
try:
# 1. 检查是否忽略此事件
if ignore_condition_name is not None:
if getattr(handler, ignore_condition_name, False):
continue # 跳过此handler
# 2. 调用handler的事件方法
event = getattr(handler, event_name)(*args, **kwargs)
# 3. 处理异步handler
if asyncio.iscoroutine(event):
coros.append(event)
except NotImplementedError as e:
# 4. 处理on_chat_model_start回退到on_llm_start
if event_name == "on_chat_model_start":
# 转换messages为prompts
message_strings = [get_buffer_string(m) for m in args[1]]
handle_event(
[handler],
"on_llm_start",
"ignore_llm",
args[0], # serialized
message_strings, # prompts
*args[2:],
**kwargs,
)
else:
logger.warning(f"NotImplementedError in {handler.__class__.__name__}.{event_name}")
except Exception as e:
# 5. 异常隔离
logger.warning(f"Error in {handler.__class__.__name__}.{event_name}: {repr(e)}")
if handler.raise_error:
raise # 仅当handler.raise_error=True时上抛
finally:
# 6. 执行异步回调
if coros:
try:
loop = asyncio.get_running_loop()
loop_running = True
except RuntimeError:
loop_running = False
if loop_running:
# 同步环境中有事件循环: 提交到线程池
_executor().submit(
cast("Callable", copy_context().run),
_run_coros,
coros
).result()
else:
# 无事件循环: 直接运行
_run_coros(coros)
关键机制:
- 忽略控制: 通过
ignore_condition_name检查handler的ignore_llm/ignore_chain属性 - 异常隔离: try-catch包裹每个handler,错误不影响其他handler和主流程
- 同步异步混合: 同步handler直接调用,异步handler收集后统一执行
- 事件循环处理:
- 有运行中的loop → 提交到ThreadPoolExecutor(避免死锁)
- 无loop → 创建新loop执行(asyncio.run)
- 回退机制:
on_chat_model_start未实现时回退到on_llm_start
链路3: ParentRunManager → 子RunManager
调用路径
RunnableSequence.invoke()
↓ run_manager = callback_manager.on_chain_start(...)
↓ for i, step in enumerate(steps):
↓ child_manager = run_manager.get_child(f"seq:step:{i+1}")
ParentRunManager.get_child(tag)
↓ new CallbackManager(parent_run_id=self.run_id)
↓ manager.set_handlers(self.inheritable_handlers)
↓ manager.add_tags(self.inheritable_tags + [tag])
↓ 返回
子CallbackManager
↓ step.invoke(input, config={callbacks: child_manager})
↓ 子步骤触发on_chain_start(parent_run_id=父run_id)
关键代码: ParentRunManager.get_child
文件: langchain_core/callbacks/manager.py
class ParentRunManager(RunManager):
"""父运行管理器(可创建子管理器)"""
def get_child(self, tag: str | None = None) -> CallbackManager:
"""创建子CallbackManager
Args:
tag: 子运行标签(如"seq:step:1"/"map:key:query")
Returns:
继承父handlers/tags的子CallbackManager
"""
# 1. 创建子管理器(parent_run_id指向自己)
manager = CallbackManager(
handlers=[],
parent_run_id=self.run_id # 关键: 建立父子关联
)
# 2. 继承父handlers
manager.set_handlers(self.inheritable_handlers)
# 3. 继承父tags和metadata
manager.add_tags(self.inheritable_tags)
manager.add_metadata(self.inheritable_metadata)
# 4. 添加步骤特定标签(不继承)
if tag is not None:
manager.add_tags([tag], inherit=False)
return manager
关键机制:
- 父子关联: 子manager的
parent_run_id指向父run_id - 选择性继承:
inheritable_handlers继承,普通handlers不继承 - 标签累积: 子运行包含父标签+自己的标签
- 步骤标签: 通过tag参数标识链中的位置(seq:step:1, map:key:name等)
链路4: LLM流式生成回调
调用路径
ChatModel.generate(messages, run_manager)
↓ run_manager = callback_manager.on_chat_model_start(...)
↓ for chunk in self._stream(messages):
↓ run_manager.on_llm_new_token(chunk.content, chunk=chunk)
CallbackManagerForLLMRun.on_llm_new_token()
↓ handle_event(handlers, "on_llm_new_token", "ignore_llm", ...)
↓ 遍历handlers
↓ handler.on_llm_new_token(token, chunk, run_id, ...)
StreamingStdOutCallbackHandler.on_llm_new_token()
↓ print(token, end="", flush=True)
LangChainTracer.on_llm_new_token()
↓ 累积token到当前trace
↓ 完成后
↓ run_manager.on_llm_end(LLMResult)
关键代码: ChatModel._generate_with_cache
文件: langchain_core/language_models/chat_models.py
class BaseChatModel(BaseLanguageModel):
def _generate_with_cache(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
"""LLM生成(带缓存和回调)
Args:
messages: 输入消息
stop: 停止词
run_manager: LLM运行管理器
**kwargs: 模型参数
Returns:
ChatResult(generations + llm_output)
"""
# 1. 检查缓存
if self.cache is not None:
llm_cache = get_llm_cache()
if llm_cache:
cached_result = llm_cache.lookup(messages, self._llm_type, **kwargs)
if cached_result:
# 触发on_llm_end(即使是缓存结果)
if run_manager:
run_manager.on_llm_end(
LLMResult(generations=[[cached_result]])
)
return ChatResult(generations=[cached_result])
# 2. 调用_generate(子类实现)
try:
result = self._generate(
messages,
stop=stop,
run_manager=run_manager, # 传递run_manager
**kwargs,
)
except BaseException as e:
# 3. 异常时触发on_llm_error
if run_manager:
run_manager.on_llm_error(e)
raise
else:
# 4. 成功时触发on_llm_end
if run_manager:
run_manager.on_llm_end(
LLMResult(
generations=[[result.generations[0]]],
llm_output=result.llm_output
)
)
return result
关键代码: ChatModel._stream (子类实现示例)
文件: langchain_openai/chat_models/base.py (示例)
class ChatOpenAI(BaseChatModel):
def _stream(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""流式生成
Yields:
ChatGenerationChunk(每个token的增量)
"""
# 1. 调用OpenAI API流式接口
for chunk in self.client.chat.completions.create(
messages=messages,
stream=True,
**kwargs
):
delta = chunk.choices[0].delta
if delta.content:
# 2. 创建ChatGenerationChunk
chunk_obj = ChatGenerationChunk(
message=AIMessageChunk(content=delta.content),
generation_info={
"finish_reason": chunk.choices[0].finish_reason
}
)
# 3. 触发on_llm_new_token回调
if run_manager:
run_manager.on_llm_new_token(
token=delta.content,
chunk=chunk_obj # 传递完整chunk
)
# 4. 返回chunk给用户
yield chunk_obj
关键代码: CallbackManagerForLLMRun.on_llm_new_token
文件: langchain_core/callbacks/manager.py
class CallbackManagerForLLMRun(RunManager, LLMManagerMixin):
"""LLM运行管理器"""
def on_llm_new_token(
self,
token: str,
*,
chunk: GenerationChunk | ChatGenerationChunk | None = None,
**kwargs: Any,
) -> None:
"""流式token回调
Args:
token: 新生成的token文本
chunk: 完整的生成chunk(包含metadata)
**kwargs: 其他参数
"""
if not self.handlers:
return
# 分发到所有handlers
handle_event(
self.handlers,
"on_llm_new_token",
"ignore_llm", # 检查handler.ignore_llm
token=token,
chunk=chunk,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""LLM完成回调
Args:
response: LLM响应(包含generations和llm_output)
**kwargs: 其他参数
"""
if not self.handlers:
return
handle_event(
self.handlers,
"on_llm_end",
"ignore_llm",
response,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
关键机制:
- 实时回调: 每个token生成立即触发回调(延迟<10ms)
- chunk传递: 除了token文本,还传递完整chunk(包含tool_calls等)
- StreamingStdOutCallbackHandler: 实时打印到stdout
- LangChainTracer: 累积tokens到trace,on_llm_end时提交
链路5: 异常处理链路
调用路径
Runnable.invoke()
↓ try:
↓ run_manager = callback_manager.on_chain_start(...)
↓ output = func(input, run_manager)
↓ except BaseException as e:
↓ run_manager.on_chain_error(e)
↓ raise
CallbackManagerForChainRun.on_chain_error()
↓ handle_event(handlers, "on_chain_error", "ignore_chain", ...)
↓ 遍历handlers
↓ handler.on_chain_error(error, run_id, ...)
CustomHandler.on_chain_error()
↓ log_error(error)
↓ send_alert(error)
LangChainTracer.on_chain_error()
↓ 标记trace为失败
↓ 记录错误信息到LangSmith
关键代码: Runnable._call_with_config (异常处理)
文件: langchain_core/runnables/base.py
class Runnable(ABC, Generic[Input, Output]):
def _call_with_config(
self,
func: Callable[[Input], Output],
input_: Input,
config: RunnableConfig | None,
run_type: str | None = None,
serialized: dict[str, Any] | None = None,
**kwargs: Any,
) -> Output:
"""带回调的执行(异常处理模板)
Args:
func: 实际执行的函数
input_: 输入数据
config: 运行配置
run_type: 运行类型
serialized: 序列化信息
Returns:
执行结果
Raises:
任何异常(在触发on_*_error后上抛)
"""
# 1. 准备回调
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
# 2. 触发开始事件
run_manager = callback_manager.on_chain_start(
serialized or {},
input_,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
# 3. 执行业务逻辑(带异常处理)
try:
# 创建子配置(继承回调)
child_config = patch_config(
config,
callbacks=run_manager.get_child()
)
# 执行函数
with set_config_context(child_config) as context:
output = cast(
Output,
context.run(
call_func_with_variable_args,
func,
input_,
config,
run_manager,
**kwargs,
),
)
except BaseException as e:
# 4. 异常处理: 触发on_chain_error
run_manager.on_chain_error(e)
raise # 继续上抛异常
else:
# 5. 成功: 触发on_chain_end
run_manager.on_chain_end(output)
return output
关键机制:
- BaseException捕获: 捕获所有异常(包括KeyboardInterrupt)
- 先回调后上抛: 确保异常被记录后再传播
- 异常不修改: 回调不改变异常类型和消息
- 资源清理: try-finally确保on_chain_error被调用
性能优化关键点
1. 异步回调批处理
# manager.py::handle_event
async def ahandle_event(...):
# 并发执行所有异步handler
await asyncio.gather(
*(handler.on_llm_start(...) for handler in handlers)
)
2. 线程池复用
_THREAD_POOL_EXECUTOR: ThreadPoolExecutor | None = None
def _executor() -> ThreadPoolExecutor:
global _THREAD_POOL_EXECUTOR
if _THREAD_POOL_EXECUTOR is None:
_THREAD_POOL_EXECUTOR = ThreadPoolExecutor(max_workers=10)
atexit.register(_THREAD_POOL_EXECUTOR.shutdown, wait=True)
return _THREAD_POOL_EXECUTOR
3. 事件循环检测
try:
asyncio.get_running_loop()
loop_running = True
except RuntimeError:
loop_running = False
if loop_running:
# 避免死锁: 提交到线程池
_executor().submit(copy_context().run, _run_coros, coros).result()
else:
# 直接运行
_run_coros(coros)
4. handler去重
def add_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
if handler not in self.handlers: # 去重
self.handlers.append(handler)
if inherit and handler not in self.inheritable_handlers:
self.inheritable_handlers.append(handler)
+ignore_llm: bool
+ignore_chain: bool
+ignore_agent: bool
+on_llm_start(...) None
+on_llm_end(...) None
+on_llm_error(...) None
+on_tool_start(...) None
+on_tool_end(...) None
+on_chain_start(...) None
+on_chain_end(...) None
}
class CallbackManager {
+handlers: list[BaseCallbackHandler]
+add_handler(handler) None
+on_llm_start(...) list[RunManager]
+on_chain_start(...) RunManager
+on_tool_start(...) RunManager
}
class CallbackManagerForLLMRun {
+on_llm_new_token(token) None
+on_llm_end(response) None
+on_llm_error(error) None
}
class StdOutCallbackHandler {
+on_llm_start(...) None
+on_llm_new_token(token) None
+on_chain_start(...) None
}
BaseCallbackHandler <|-- StdOutCallbackHandler
CallbackManager o-- BaseCallbackHandler
CallbackManager --> CallbackManagerForLLMRun
### 数据结构说明
#### BaseCallbackHandler方法
| 方法 | 说明 |
|------|------|
| on_llm_start | LLM开始调用时触发 |
| on_llm_new_token | 流式输出每个token时触发 |
| on_llm_end | LLM调用完成时触发 |
| on_llm_error | LLM调用出错时触发 |
| on_chain_start | 链开始执行时触发 |
| on_chain_end | 链执行完成时触发 |
| on_tool_start | 工具开始执行时触发 |
| on_tool_end | 工具执行完成时触发 |
| on_text | 中间文本输出时触发 |
#### CallbackManager字段
| 字段 | 类型 | 说明 |
|------|------|------|
| handlers | list[BaseCallbackHandler] | 注册的回调处理器列表 |
| inheritable_handlers | list[BaseCallbackHandler] | 可继承的处理器 |
| parent_run_id | UUID \| None | 父运行ID |
| tags | list[str] | 标签列表 |
| metadata | dict | 元数据 |
## 核心API详解
### API-1: BaseCallbackHandler自定义
#### 基本信息
- **名称**: `BaseCallbackHandler`
- **类型**: 抽象基类
- **幂等性**: 取决于实现
#### 功能说明
自定义回调处理器的基类,子类可重写感兴趣的事件方法。
#### 入口函数与关键代码
```python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
class MyCallbackHandler(BaseCallbackHandler):
"""自定义回调处理器"""
def on_llm_start(
self,
serialized: dict,
prompts: list[str],
**kwargs,
) -> None:
"""LLM开始调用
Args:
serialized: LLM序列化信息
prompts: 提示词列表
"""
print(f"LLM调用开始,提示词数量: {len(prompts)}")
def on_llm_new_token(self, token: str, **kwargs) -> None:
"""流式输出新token
Args:
token: 新生成的token
"""
print(token, end="", flush=True)
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
"""LLM调用完成
Args:
response: LLM响应结果
"""
print(f"\nLLM调用完成,总tokens: {response.llm_output.get('token_usage')}")
def on_llm_error(self, error: Exception, **kwargs) -> None:
"""LLM调用出错
Args:
error: 错误异常
"""
print(f"LLM调用错误: {error}")
def on_chain_start(
self,
serialized: dict,
inputs: dict,
**kwargs,
) -> None:
"""链开始执行"""
print(f"链执行开始: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs: dict, **kwargs) -> None:
"""链执行完成"""
print(f"链执行完成,输出: {outputs}")
def on_tool_start(
self,
serialized: dict,
input_str: str,
**kwargs,
) -> None:
"""工具开始执行"""
print(f"工具调用: {serialized.get('name')}, 输入: {input_str}")
def on_tool_end(self, output: str, **kwargs) -> None:
"""工具执行完成"""
print(f"工具输出: {output}")
使用示例:
from langchain_openai import ChatOpenAI
# 创建处理器
handler = MyCallbackHandler()
# 使用回调
model = ChatOpenAI(callbacks=[handler])
response = model.invoke("讲个笑话")
# 输出:
# LLM调用开始,提示词数量: 1
# 为什么程序员...
# LLM调用完成,总tokens: {...}
API-2: RunnableConfig配置回调
基本信息
- 名称:
RunnableConfig中的callbacks参数 - 幂等性: 取决于处理器实现
功能说明
通过RunnableConfig为单次调用配置回调。
请求结构体
callbacks: list[BaseCallbackHandler] | None
入口函数与关键代码
from langchain_core.runnables import RunnableConfig
# 方式1: 在invoke时传递
response = model.invoke(
"你好",
config=RunnableConfig(callbacks=[handler])
)
# 方式2: 使用with_config绑定
model_with_callbacks = model.with_config(callbacks=[handler])
response = model_with_callbacks.invoke("你好")
# 方式3: 链式配置
chain = prompt | model | output_parser
response = chain.invoke(
{"input": "你好"},
config={"callbacks": [handler]}
)
使用示例:
# 场景: 为特定调用添加日志
class LoggingHandler(BaseCallbackHandler):
def __init__(self, log_file: str):
self.log_file = log_file
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
with open(self.log_file, "a") as f:
f.write(f"{response.generations[0][0].text}\n")
# 仅为重要查询启用日志
important_query = "分析以下数据..."
response = model.invoke(
important_query,
config={"callbacks": [LoggingHandler("important_queries.log")]}
)
# 普通查询不记录
normal_query = "你好"
response = model.invoke(normal_query) # 无回调
API-3: StdOutCallbackHandler
基本信息
- 名称:
StdOutCallbackHandler - 幂等性: 幂等
功能说明
内置的标准输出回调处理器,打印关键事件到控制台。
入口函数与关键代码
from langchain_core.callbacks import StdOutCallbackHandler
class StdOutCallbackHandler(BaseCallbackHandler):
"""标准输出回调处理器"""
def on_llm_start(self, serialized: dict, prompts: list[str], **kwargs) -> None:
"""打印LLM开始信息"""
print("LLM Start")
def on_llm_new_token(self, token: str, **kwargs) -> None:
"""打印新token"""
print(token, end="", flush=True)
def on_chain_start(self, serialized: dict, inputs: dict, **kwargs) -> None:
"""打印链开始信息"""
class_name = serialized.get("name", serialized.get("id", ["<unknown>"])[-1])
print(f"\n\n\033[1m> Entering new {class_name} chain...\033[0m")
def on_chain_end(self, outputs: dict, **kwargs) -> None:
"""打印链完成信息"""
print("\n\033[1m> Finished chain.\033[0m")
def on_tool_start(self, serialized: dict, input_str: str, **kwargs) -> None:
"""打印工具调用"""
print(f"Tool: {serialized['name']}")
使用示例:
from langchain_core.callbacks import StdOutCallbackHandler
# 调试链执行流程
handler = StdOutCallbackHandler()
chain = prompt | model | output_parser
response = chain.invoke(
{"input": "解释量子计算"},
config={"callbacks": [handler]}
)
# 输出:
# > Entering new RunnableSequence chain...
# LLM Start
# 量子计算是...
# > Finished chain.
API-4: 异步回调
基本信息
- 名称:
AsyncCallbackHandler - 幂等性: 取决于实现
功能说明
支持异步操作的回调处理器。
入口函数与关键代码
from langchain_core.callbacks import AsyncCallbackHandler
class AsyncLoggingHandler(AsyncCallbackHandler):
"""异步日志回调处理器"""
def __init__(self, db_client):
self.db_client = db_client
async def on_llm_start(
self,
serialized: dict,
prompts: list[str],
**kwargs,
) -> None:
"""异步记录LLM调用"""
await self.db_client.log_event({
"type": "llm_start",
"prompts": prompts,
"timestamp": datetime.now()
})
async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
"""异步记录响应"""
await self.db_client.log_event({
"type": "llm_end",
"response": response.generations[0][0].text,
"token_usage": response.llm_output.get("token_usage"),
"timestamp": datetime.now()
})
async def on_chain_error(self, error: Exception, **kwargs) -> None:
"""异步记录错误"""
await self.db_client.log_error({
"error": str(error),
"timestamp": datetime.now()
})
使用示例:
# 异步使用
async def process_with_logging():
handler = AsyncLoggingHandler(db_client)
response = await model.ainvoke(
"你好",
config={"callbacks": [handler]}
)
return response
# 运行
result = await process_with_logging()
典型使用场景
场景1: 性能监控
import time
from langchain_core.callbacks import BaseCallbackHandler
class PerformanceMonitor(BaseCallbackHandler):
"""性能监控回调"""
def __init__(self):
self.start_times = {}
self.metrics = []
def on_llm_start(self, serialized: dict, prompts: list[str], run_id, **kwargs) -> None:
self.start_times[run_id] = time.time()
def on_llm_end(self, response: LLMResult, run_id, **kwargs) -> None:
elapsed = time.time() - self.start_times[run_id]
token_usage = response.llm_output.get("token_usage", {})
self.metrics.append({
"elapsed_seconds": elapsed,
"total_tokens": token_usage.get("total_tokens", 0),
"tokens_per_second": token_usage.get("total_tokens", 0) / elapsed if elapsed > 0 else 0
})
def get_summary(self) -> dict:
"""获取性能摘要"""
if not self.metrics:
return {}
return {
"total_calls": len(self.metrics),
"avg_latency": sum(m["elapsed_seconds"] for m in self.metrics) / len(self.metrics),
"total_tokens": sum(m["total_tokens"] for m in self.metrics),
"avg_tokens_per_second": sum(m["tokens_per_second"] for m in self.metrics) / len(self.metrics)
}
# 使用
monitor = PerformanceMonitor()
for query in queries:
model.invoke(query, config={"callbacks": [monitor]})
print(monitor.get_summary())
# {'total_calls': 10, 'avg_latency': 1.23, 'total_tokens': 5000, 'avg_tokens_per_second': 120}
场景2: 成本追踪
class CostTracker(BaseCallbackHandler):
"""成本追踪回调"""
PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
}
def __init__(self, model_name: str):
self.model_name = model_name
self.total_cost = 0.0
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
token_usage = response.llm_output.get("token_usage", {})
input_tokens = token_usage.get("prompt_tokens", 0)
output_tokens = token_usage.get("completion_tokens", 0)
pricing = self.PRICING.get(self.model_name, {"input": 0, "output": 0})
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"])
self.total_cost += cost
print(f"本次调用成本: ${cost:.6f}")
def get_total_cost(self) -> float:
return self.total_cost
# 使用
cost_tracker = CostTracker("gpt-4o-mini")
model = ChatOpenAI(model="gpt-4o-mini", callbacks=[cost_tracker])
# 处理多个请求
for query in queries:
model.invoke(query)
print(f"总成本: ${cost_tracker.get_total_cost():.4f}")
场景3: Agent执行追踪
class AgentTracer(BaseCallbackHandler):
"""Agent执行追踪"""
def __init__(self):
self.steps = []
self.current_step = {}
def on_tool_start(self, serialized: dict, input_str: str, **kwargs) -> None:
self.current_step = {
"tool": serialized.get("name"),
"input": input_str,
"start_time": time.time()
}
def on_tool_end(self, output: str, **kwargs) -> None:
self.current_step["output"] = output
self.current_step["duration"] = time.time() - self.current_step["start_time"]
self.steps.append(self.current_step)
print(f"步骤 {len(self.steps)}: {self.current_step['tool']}")
print(f" 输入: {self.current_step['input'][:50]}...")
print(f" 输出: {self.current_step['output'][:50]}...")
print(f" 耗时: {self.current_step['duration']:.2f}s")
def on_tool_error(self, error: Exception, **kwargs) -> None:
print(f"工具错误: {error}")
def get_execution_summary(self) -> dict:
return {
"total_steps": len(self.steps),
"total_time": sum(s["duration"] for s in self.steps),
"steps": self.steps
}
# 使用
tracer = AgentTracer()
agent_executor = AgentExecutor(agent=agent, tools=tools, callbacks=[tracer])
result = agent_executor.invoke({"input": "分析这个问题并给出答案"})
summary = tracer.get_execution_summary()
最佳实践
1. 选择性启用回调
# 推荐: 仅在需要时启用回调
if DEBUG_MODE:
callbacks = [StdOutCallbackHandler()]
else:
callbacks = []
model = ChatOpenAI(callbacks=callbacks)
# 或者环境变量控制
import os
if os.getenv("LANGCHAIN_VERBOSE") == "true":
callbacks = [StdOutCallbackHandler()]
2. 回调处理器错误隔离
class SafeCallbackHandler(BaseCallbackHandler):
"""安全的回调处理器"""
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
try:
# 可能失败的操作
self._process_response(response)
except Exception as e:
# 不要让回调错误影响主流程
print(f"回调处理失败: {e}")
# 可选: 记录到错误日志
3. 异步回调最佳实践
# 推荐: 使用异步回调处理耗时操作
class EfficientLoggingHandler(AsyncCallbackHandler):
def __init__(self):
self.buffer = []
self.buffer_size = 100
async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
# 批量写入
self.buffer.append(response)
if len(self.buffer) >= self.buffer_size:
await self._flush()
async def _flush(self):
# 批量写入数据库
await db.bulk_insert(self.buffer)
self.buffer.clear()
文档更新说明
本文档已完整剖析LangChain Callbacks模块的架构、时序流程和核心代码实现。
文档结构
- 模块概览: 职责定位、事件分类、依赖关系
- 整体架构图: 应用层→核心层→处理器层四层架构
- 模块交互时序图:
- 时序图1: Runnable链执行完整流程(45步骤)
- 时序图2: LLM错误处理流程
- 时序图3: Agent多步推理回调流程
- 核心调用链路与关键代码:
- 链路1: RunnableConfig → CallbackManager创建
- 链路2: on_chain_start → RunManager创建
- 链路3: ParentRunManager → 子RunManager
- 链路4: LLM流式生成回调
- 链路5: 异常处理链路
- 核心数据结构: 类图与字段说明
- 核心API详解: BaseCallbackHandler/RunnableConfig/StdOutCallbackHandler/异步回调
- 典型使用场景: 性能监控/成本追踪/Agent追踪
- 最佳实践: 选择性启用/错误隔离/异步优化
关键洞察
- 事件分发机制: handle_event()函数是整个回调系统的核心,实现了同步异步混合、异常隔离、事件循环检测
- 父子运行树: 通过run_id和parent_run_id建立运行树,支持嵌套追踪(在LangSmith中可视化)
- 配置继承策略: inheritable_handlers机制确保回调沿着调用链传递
- 性能优化: 线程池复用、异步批处理、handler去重、事件循环检测避免死锁
- 错误隔离原则: 回调异常不影响主流程,除非handler.raise_error=True
源码文件映射
| 模块 | 文件路径 | 核心类/函数 |
|---|---|---|
| 基类 | langchain_core/callbacks/base.py |
BaseCallbackHandler, BaseCallbackManager |
| 管理器 | langchain_core/callbacks/manager.py |
CallbackManager, handle_event, RunManager |
| 配置 | langchain_core/runnables/config.py |
get_callback_manager_for_config |
| 内置处理器 | langchain_core/callbacks/stdout.py |
StdOutCallbackHandler |
| 流式处理器 | langchain_core/callbacks/streaming_stdout.py |
StreamingStdOutCallbackHandler |
| LangSmith | langchain_core/tracers/langchain.py |
LangChainTracer |