LangChain-02-Runnables模块
模块概览
职责与定位
Runnables模块是LangChain的核心执行协议层,定义了所有可组合组件的统一接口。该模块实现了LangChain Expression Language(LCEL)的基础架构,使任何实现Runnable接口的组件都能自动获得以下能力:
- 统一调用接口: invoke/ainvoke/batch/abatch/stream/astream
- 自动类型推导: 基于泛型的输入输出类型系统
- 声明式组合: 通过
|操作符链式组合 - 执行模式支持: 同步、异步、批处理、流式输出
- 可观测性: 内置回调和追踪机制
- 容错机制: 重试、故障转移、超时控制
输入输出
输入:
- 泛型类型Input: 可以是任意Python类型(str, dict, BaseModel等)
- RunnableConfig: 可选的配置字典,包含callbacks、tags、metadata等
输出:
- 泛型类型Output: 可以是任意Python类型
- 同步方法返回值,异步方法返回Awaitable
- 流式方法返回Iterator/AsyncIterator
上下游依赖
依赖:
langchain_core.load.serializable: 序列化基类langchain_core.callbacks: 回调管理器langchain_core.tracers: 追踪器实现pydantic: 数据验证和模型定义typing_extensions: 类型系统扩展
被依赖:
- 所有LangChain组件(LLM、Prompt、Parser、Tool等)都继承Runnable
- 用户自定义组件通过继承Runnable获得标准能力
生命周期
Runnable实例通常是无状态的:
- 构造阶段: 创建Runnable实例,设置初始化参数
- 配置阶段: 通过with_config/configurable_fields等方法配置
- 执行阶段: 调用invoke/stream等方法执行
- 销毁阶段: Python GC自动回收,无特殊清理
状态通过RunnableConfig在调用链中传递,不存储在实例中。
模块架构图
整体分层架构
flowchart TB
subgraph UserLayer["用户层"]
APP[应用代码<br/>FastAPI/Django/CLI]
LCEL[LCEL表达式<br/>prompt | model | parser]
end
subgraph InterfaceLayer["接口层"]
RUNNABLE[Runnable抽象基类<br/>定义统一协议]
API_INVOKE[invoke/ainvoke]
API_BATCH[batch/abatch]
API_STREAM[stream/astream]
end
subgraph CompositionLayer["组合层"]
SEQ[RunnableSequence<br/>顺序执行]
PAR[RunnableParallel<br/>并行执行]
BRANCH[RunnableBranch<br/>条件分支]
LAMBDA[RunnableLambda<br/>函数包装]
end
subgraph DecoratorLayer["装饰层"]
RETRY[RunnableRetry<br/>重试机制]
FALLBACK[RunnableWithFallbacks<br/>故障转移]
BINDING[RunnableBinding<br/>参数绑定]
HISTORY[RunnableWithMessageHistory<br/>历史注入]
end
subgraph ExecutionLayer["执行层"]
CALLBACK[CallbackManager<br/>回调管理]
CONFIG[RunnableConfig<br/>配置传递]
EXECUTOR[ThreadPoolExecutor<br/>并发控制]
ASYNC[AsyncIO EventLoop<br/>异步调度]
end
subgraph ObservabilityLayer["可观测层"]
TRACER[LangSmithTracer<br/>追踪记录]
METRICS[Metrics<br/>性能指标]
LOGS[Logs<br/>日志输出]
end
APP --> LCEL
LCEL --> RUNNABLE
RUNNABLE --> API_INVOKE
RUNNABLE --> API_BATCH
RUNNABLE --> API_STREAM
API_INVOKE --> SEQ
API_INVOKE --> PAR
API_INVOKE --> BRANCH
API_INVOKE --> LAMBDA
SEQ --> RETRY
PAR --> RETRY
BRANCH --> FALLBACK
LAMBDA --> BINDING
RETRY --> CALLBACK
FALLBACK --> CALLBACK
BINDING --> CONFIG
HISTORY --> CONFIG
CALLBACK --> EXECUTOR
CONFIG --> EXECUTOR
CALLBACK --> ASYNC
CONFIG --> ASYNC
EXECUTOR --> TRACER
ASYNC --> TRACER
TRACER --> METRICS
TRACER --> LOGS
style UserLayer fill:#e3f2fd
style InterfaceLayer fill:#fff3e0
style CompositionLayer fill:#e8f5e9
style DecoratorLayer fill:#fce4ec
style ExecutionLayer fill:#f3e5f5
style ObservabilityLayer fill:#e0f2f1
类继承架构
flowchart TB
subgraph Interface["核心接口层"]
RUNNABLE[Runnable抽象基类<br/>定义统一协议]
end
subgraph Serializable["序列化层"]
RUNNABLE_SER[RunnableSerializable<br/>可序列化Runnable]
end
subgraph Composition["组合原语"]
SEQ[RunnableSequence<br/>顺序执行 A|B|C]
PAR[RunnableParallel<br/>并行执行 {a:A,b:B}]
BRANCH[RunnableBranch<br/>条件分支]
LAMBDA[RunnableLambda<br/>包装函数]
GEN[RunnableGenerator<br/>生成器包装]
end
subgraph Binding["绑定与装饰"]
BINDING[RunnableBinding<br/>绑定参数]
FALLBACK[RunnableWithFallbacks<br/>故障转移]
RETRY[RunnableRetry<br/>重试机制]
HISTORY[RunnableWithMessageHistory<br/>历史记录]
end
subgraph Passthrough["数据操作"]
PASS[RunnablePassthrough<br/>透传数据]
ASSIGN[RunnableAssign<br/>添加字段]
PICK[RunnablePick<br/>选择字段]
end
subgraph Config["配置系统"]
RCONFIG[RunnableConfig<br/>配置字典]
CONFIG_FIELD[ConfigurableField<br/>可配置字段]
CONFIG_ALT[ConfigurableAlternatives<br/>可配置备选]
end
subgraph Utils["工具类"]
EACH[RunnableEach<br/>映射到列表]
ROUTER[RouterRunnable<br/>路由器]
ADDABLE[AddableDict<br/>可加字典]
end
RUNNABLE -.继承.-> RUNNABLE_SER
RUNNABLE_SER -.继承.-> SEQ
RUNNABLE_SER -.继承.-> PAR
RUNNABLE_SER -.继承.-> BRANCH
RUNNABLE -.继承.-> LAMBDA
RUNNABLE -.继承.-> GEN
RUNNABLE_SER -.继承.-> BINDING
BINDING -.继承.-> FALLBACK
BINDING -.继承.-> RETRY
BINDING -.继承.-> HISTORY
RUNNABLE_SER -.继承.-> PASS
RUNNABLE_SER -.继承.-> ASSIGN
RUNNABLE_SER -.继承.-> PICK
RUNNABLE_SER -.继承.-> EACH
RUNNABLE_SER -.继承.-> ROUTER
RUNNABLE --> RCONFIG
BINDING --> CONFIG_FIELD
BINDING --> CONFIG_ALT
style RUNNABLE fill:#e1f5ff
style RUNNABLE_SER fill:#fff4e1
style SEQ fill:#e8f5e9
style PAR fill:#e8f5e9
style FALLBACK fill:#ffe8e8
style RETRY fill:#ffe8e8
架构说明
整体分层结构说明
用户层(UserLayer):
- 应用代码层,包括Web框架(FastAPI/Django)、CLI工具等
- LCEL表达式层,用户通过
|操作符声明式组合Runnable - 职责:定义业务逻辑、组装执行链、处理输入输出
接口层(InterfaceLayer):
- Runnable抽象基类,定义统一的执行协议
- 三大核心API族:invoke(单次调用)、batch(批量调用)、stream(流式调用)
- 职责:提供统一接口、类型推导、默认实现
组合层(CompositionLayer):
- RunnableSequence:串行组合,前一步输出作为下一步输入
- RunnableParallel:并行组合,多个分支共享输入,输出合并为字典
- RunnableBranch:条件组合,根据条件路由到不同分支
- RunnableLambda:函数包装,将任意Python函数转为Runnable
- 职责:实现声明式组合、管理执行顺序、协调数据流
装饰层(DecoratorLayer):
- RunnableRetry:为任意Runnable添加重试机制(指数退避+抖动)
- RunnableWithFallbacks:添加备用选项,主Runnable失败时切换
- RunnableBinding:绑定参数或配置,预先设置执行环境
- RunnableWithMessageHistory:注入会话历史,实现有状态对话
- 职责:增强可靠性、容错处理、参数绑定
执行层(ExecutionLayer):
- CallbackManager:管理回调链,协调可观测性事件
- RunnableConfig:配置对象,在调用链中透传上下文
- ThreadPoolExecutor:线程池,实现batch方法的并发执行
- AsyncIO EventLoop:事件循环,实现ainvoke/astream的异步执行
- 职责:实际执行、并发控制、异步调度、上下文传播
可观测层(ObservabilityLayer):
- LangSmithTracer:追踪器,记录完整的执行trace
- Metrics:性能指标收集(延迟、token数、成本等)
- Logs:日志输出,记录关键事件和错误
- 职责:监控、调试、性能分析、成本追踪
类继承结构说明
核心接口层(Runnable):
- 定义抽象基类和核心方法签名
- 泛型类型Input和Output提供类型安全
- 强制实现invoke方法,其他方法有默认实现
序列化层(RunnableSerializable):
- 继承Serializable,支持序列化/反序列化
- 大部分内置Runnable继承此类
- 支持保存到JSON/YAML并恢复
组合原语:
- RunnableSequence: 实现
A | B | C语法糖 - RunnableParallel: 实现
{key1: A, key2: B}并发执行 - RunnableBranch: 实现if-elif-else条件分支
- RunnableLambda: 将普通函数包装为Runnable
- RunnableGenerator: 将生成器函数包装为Runnable
绑定与装饰:
- RunnableBinding: 绑定特定kwargs或config
- RunnableWithFallbacks: 主模型失败时切换备用模型
- RunnableRetry: 添加指数退避重试
- RunnableWithMessageHistory: 自动注入历史消息
数据操作:
- RunnablePassthrough: 透传输入或执行副作用
- RunnableAssign: 添加新字段到输入字典
- RunnablePick: 从输出字典选择特定字段
模块交互图
sequenceDiagram
autonumber
participant User as 用户代码
participant LCEL as LCEL表达式
participant Seq as RunnableSequence
participant Config as RunnableConfig
participant CB as CallbackManager
participant Comp as 组件Runnable
participant Tracer as LangSmithTracer
participant Executor as ThreadPoolExecutor
User->>LCEL: chain = prompt | model | parser
LCEL->>Seq: 创建RunnableSequence
Note over Seq: 组合三个Runnable
User->>Seq: invoke(input, config)
Seq->>Config: ensure_config(config)
Config-->>Seq: 规范化后的config
Seq->>CB: get_callback_manager_for_config(config)
CB->>Tracer: 创建追踪器实例
CB-->>Seq: callback_manager
Seq->>CB: on_chain_start(self, input)
CB->>Tracer: 记录开始事件
CB-->>Seq: run_manager
loop 遍历每个步骤
Seq->>Comp: invoke(input, patched_config)
Comp->>CB: on_llm_start()/on_tool_start()
Comp->>Comp: 执行核心逻辑
Comp->>CB: on_llm_end()/on_tool_end()
Comp-->>Seq: output
Note over Seq: output成为下一步input
end
Seq->>CB: on_chain_end(final_output)
CB->>Tracer: 记录完成事件
Seq-->>User: final_output
Note over User,Executor: 批处理路径
User->>Seq: batch([input1, input2, ...])
Seq->>Config: get_config_list(config, len(inputs))
Seq->>Executor: 创建线程池
par 并行执行
Seq->>Comp: invoke(input1, config1)
Seq->>Comp: invoke(input2, config2)
end
Seq-->>User: [output1, output2, ...]
交互图说明:
图意: 展示用户代码如何通过LCEL表达式构建执行链,以及invoke和batch两种执行模式下模块间的交互流程。
关键交互点:
-
LCEL构建阶段(步骤1-2):
- 用户使用
|操作符声明式组合Runnable - 底层自动创建RunnableSequence包装
- 此时不执行,仅构建执行图
- 用户使用
-
配置初始化(步骤3-7):
- ensure_config规范化配置,填充默认值
- 创建CallbackManager管理回调链
- 创建追踪器实例(LangSmithTracer等)
- on_chain_start标记执行开始,分配run_id
-
串行执行循环(步骤8-12):
- 遍历RunnableSequence中的每个步骤
- 前一步输出自动作为下一步输入(数据流编排)
- 每个组件触发自己的回调事件(on_llm_start/on_tool_start等)
- patched_config将父级run_manager传递给子组件
-
回调与追踪(贯穿全程):
- 回调事件异步发送到追踪器
- 追踪器记录时间戳、输入输出、元数据
- 不阻塞主执行路径(fire-and-forget)
-
批处理路径(步骤14-18):
- get_config_list为每个输入生成独立配置
- ThreadPoolExecutor并发执行多个invoke
- 保持输入输出顺序一致
边界条件:
- 异常传播: 任何步骤抛出异常都会触发on_chain_error,然后向上传播
- 递归限制: config.recursion_limit防止无限递归(默认25层)
- 并发限制: config.max_concurrency限制batch的并发数
- 超时控制: 子组件可检查config.timeout实现超时中断
性能关键点:
- 回调系统异步执行,不阻塞主路径
- batch自动使用线程池并发,适合I/O密集型任务
- 配置对象在调用链中共享,避免重复创建
- 追踪数据批量上报,减少网络开销
调用链路深度分析
路径1: 简单链式调用(Sequence)
用户层调用:
# 用户代码
chain = prompt_template | chat_model | output_parser
result = chain.invoke({"topic": "AI"})
内部调用栈:
graph TD
A[chain.invoke<br/>RunnableSequence] --> B[ensure_config<br/>规范化配置]
B --> C[get_callback_manager<br/>获取回调管理器]
C --> D[on_chain_start<br/>开始追踪]
D --> E1[步骤1: prompt_template.invoke]
E1 --> E1A[format_messages<br/>格式化模板]
E1A --> E1B[返回 List<BaseMessage>]
E1B --> E2[步骤2: chat_model.invoke]
E2 --> E2A[_generate_with_cache<br/>检查缓存]
E2A --> E2B{缓存命中?}
E2B -->|是| E2C[返回缓存结果]
E2B -->|否| E2D[_generate<br/>调用LLM API]
E2D --> E2E[on_llm_start/on_llm_end<br/>触发回调]
E2E --> E2F[返回 AIMessage]
E2C --> E3[步骤3: output_parser.invoke]
E2F --> E3
E3 --> E3A[parse<br/>解析输出]
E3A --> E3B[返回结构化数据]
E3B --> F[on_chain_end<br/>完成追踪]
F --> G[返回最终结果]
style A fill:#e3f2fd
style E1 fill:#fff3e0
style E2 fill:#e8f5e9
style E3 fill:#fce4ec
style G fill:#e0f2f1
关键代码路径:
1. RunnableSequence.invoke入口 (langchain_core/runnables/base.py:2900-2950):
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
# 阶段1: 初始化配置和回调
config = ensure_config(config) # 填充默认值
callback_manager = get_callback_manager_for_config(config)
# 阶段2: 开始链式调用追踪
run_manager = callback_manager.on_chain_start(
dumpd(self), # 序列化自身用于追踪
input,
name=config.get("run_name") or self.get_name(),
run_id=config.get("run_id"),
)
# 阶段3: 逐步执行
try:
for i, step in enumerate(self.steps):
# 为每个步骤创建子config
step_config = patch_config(
config,
callbacks=run_manager.get_child(), # 父子关系
run_name=f"{self.get_name()}.step[{i}]"
)
# 前一步输出作为下一步输入
input = step.invoke(input, step_config)
except BaseException as e:
# 阶段4a: 异常处理
run_manager.on_chain_error(e)
raise
else:
# 阶段4b: 正常完成
run_manager.on_chain_end(input)
return input
2. 配置传递机制 (langchain_core/runnables/config.py:200-250):
def ensure_config(config: RunnableConfig | None = None) -> RunnableConfig:
"""确保配置对象存在且包含所有必需字段"""
empty = RunnableConfig(
tags=[],
metadata={},
callbacks=None,
recursion_limit=25, # 默认递归限制
configurable={},
)
if config is None:
return empty
# 合并用户配置和默认值
return merge_configs(empty, config)
def patch_config(
config: RunnableConfig,
*,
callbacks: Callbacks = None,
recursion_limit: int | None = None,
max_concurrency: int | None = None,
run_name: str | None = None,
) -> RunnableConfig:
"""在现有配置基础上修补字段,用于子调用"""
patched = config.copy()
if callbacks is not None:
patched["callbacks"] = callbacks
if recursion_limit is not None:
patched["recursion_limit"] = recursion_limit
if max_concurrency is not None:
patched["max_concurrency"] = max_concurrency
if run_name is not None:
patched["run_name"] = run_name
return patched
3. 回调事件触发 (langchain_core/callbacks/manager.py):
def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
run_id: UUID | None = None,
**kwargs: Any,
) -> CallbackManagerForChainRun:
"""链开始时触发回调"""
run_id = run_id or uuid.uuid4()
# 遍历所有注册的回调处理器
for handler in self.handlers:
# 异步执行,不阻塞主流程
handler.on_chain_start(
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs
)
# 返回子运行管理器
return CallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
路径说明:
- 阶段1(配置初始化): ensure_config规范化配置,get_callback_manager创建回调管理器
- 阶段2(追踪开始): on_chain_start分配run_id,建立父子追踪关系,发送开始事件
- 阶段3(逐步执行): 遍历steps,patch_config为每步创建子配置,前一步输出作为下一步输入
- 阶段4(结束处理): 成功时on_chain_end记录结果,失败时on_chain_error记录异常
性能特性:
- 配置对象浅拷贝,避免深拷贝开销
- 回调事件异步发送,不阻塞执行
- 中间结果直接传递,无序列化开销
- 递归限制防止栈溢出
路径2: 并行执行(Parallel)
用户层调用:
# 用户代码
parallel_chain = RunnableParallel({
"joke": ChatPromptTemplate.from_template("tell a joke about {topic}") | model,
"poem": ChatPromptTemplate.from_template("write a poem about {topic}") | model,
"fact": ChatPromptTemplate.from_template("tell a fact about {topic}") | model,
})
result = parallel_chain.invoke({"topic": "cats"})
# 返回: {"joke": "...", "poem": "...", "fact": "..."}
内部调用栈:
graph TD
A[parallel_chain.invoke<br/>RunnableParallel] --> B[ensure_config]
B --> C[get_callback_manager]
C --> D[on_chain_start]
D --> E[创建ThreadPoolExecutor]
E --> F{并发执行所有分支}
F --> G1[分支1: joke_chain.invoke]
F --> G2[分支2: poem_chain.invoke]
F --> G3[分支3: fact_chain.invoke]
G1 --> G1A[RunnableSequence执行]
G2 --> G2A[RunnableSequence执行]
G3 --> G3A[RunnableSequence执行]
G1A --> H1[返回joke结果]
G2A --> H2[返回poem结果]
G3A --> H3[返回fact结果]
H1 --> I[等待所有Future完成]
H2 --> I
H3 --> I
I --> J[合并结果为字典]
J --> K[on_chain_end]
K --> L[返回最终字典]
style A fill:#e3f2fd
style E fill:#fff3e0
style G1 fill:#e8f5e9
style G2 fill:#e8f5e9
style G3 fill:#e8f5e9
style L fill:#e0f2f1
关键代码路径:
1. RunnableParallel.invoke入口 (langchain_core/runnables/base.py:3800-3900):
def invoke(self, input: Input, config: RunnableConfig | None = None) -> dict[str, Any]:
# 阶段1: 初始化
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(dumpd(self), input)
# 阶段2: 准备并发执行
max_concurrency = config.get("max_concurrency")
try:
# 阶段3: 并发执行所有分支
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
# 为每个分支提交任务
futures = {
key: executor.submit(
step.invoke,
input, # 所有分支共享相同输入
patch_config(config, callbacks=run_manager.get_child())
)
for key, step in self.steps__.items()
}
# 阶段4: 收集结果
output = {}
for key, future in futures.items():
try:
output[key] = future.result() # 阻塞等待结果
except Exception as e:
# 单个分支失败不影响其他分支(取决于配置)
run_manager.on_chain_error(e)
raise
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
2. 线程池并发机制 (concurrent.futures):
# ThreadPoolExecutor内部实现
class ThreadPoolExecutor:
def submit(self, fn, /, *args, **kwargs):
"""提交任务到线程池"""
# 1. 创建Future对象
future = Future()
# 2. 包装任务
work_item = _WorkItem(future, fn, args, kwargs)
# 3. 放入工作队列
self._work_queue.put(work_item)
# 4. 唤醒工作线程
self._adjust_thread_count()
return future
def _worker(self):
"""工作线程循环"""
while True:
work_item = self._work_queue.get(block=True)
if work_item is not None:
try:
result = work_item.fn(*work_item.args, **work_item.kwargs)
work_item.future.set_result(result)
except Exception as exc:
work_item.future.set_exception(exc)
并发执行时序图:
sequenceDiagram
autonumber
participant User as 用户代码
participant Par as RunnableParallel
participant Executor as ThreadPoolExecutor
participant Thread1 as 工作线程1
participant Thread2 as 工作线程2
participant Thread3 as 工作线程3
participant Branch1 as joke_chain
participant Branch2 as poem_chain
participant Branch3 as fact_chain
User->>Par: invoke({"topic": "cats"})
Par->>Par: ensure_config & get_callback_manager
Par->>Par: on_chain_start
Par->>Executor: 创建线程池(max_workers=3)
Executor->>Thread1: 启动工作线程
Executor->>Thread2: 启动工作线程
Executor->>Thread3: 启动工作线程
Par->>Executor: submit(joke_chain.invoke, input)
Par->>Executor: submit(poem_chain.invoke, input)
Par->>Executor: submit(fact_chain.invoke, input)
Note over Executor: 任务分发到工作队列
par 并行执行三个分支
Thread1->>Branch1: invoke(input, config)
Branch1->>Branch1: 执行prompt | model链
Branch1-->>Thread1: "Why did the cat..."
Thread2->>Branch2: invoke(input, config)
Branch2->>Branch2: 执行prompt | model链
Branch2-->>Thread2: "Whiskers soft..."
Thread3->>Branch3: invoke(input, config)
Branch3->>Branch3: 执行prompt | model链
Branch3-->>Thread3: "Cats can rotate..."
end
Thread1-->>Par: future1.result()
Thread2-->>Par: future2.result()
Thread3-->>Par: future3.result()
Par->>Par: 合并结果为字典
Par->>Par: on_chain_end(output)
Par-->>User: {"joke": "...", "poem": "...", "fact": "..."}
路径说明:
- 阶段1(初始化): 与Sequence相同,创建回调管理器和run_manager
- 阶段2(并发准备): 创建ThreadPoolExecutor,设置max_workers
- 阶段3(任务提交): 遍历steps__字典,为每个分支submit任务到线程池
- 阶段4(结果收集): 调用future.result()阻塞等待每个分支完成,合并为字典
性能特性:
- 真正的并行: 多个I/O密集型任务(如LLM调用)可以并发执行
- GIL影响: CPU密集型任务受GIL限制,但LLM调用主要是I/O等待
- 线程复用: ThreadPoolExecutor复用线程,避免频繁创建销毁
- 顺序保证: 输出字典的键顺序与steps__定义顺序一致
边界条件:
- 单个分支失败会抛出异常,中断所有分支(除非使用return_exceptions)
- max_concurrency限制同时执行的线程数
- 所有分支共享相同的输入,输入对象应为只读或线程安全
路径3: 异步执行(Async)
用户层调用:
# 异步应用代码
import asyncio
from fastapi import FastAPI
app = FastAPI()
chain = prompt | model | parser
@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
# 使用ainvoke进行非阻塞调用
result = await chain.ainvoke(
{"messages": request.messages},
config={"metadata": {"user_id": request.user_id}}
)
return {"response": result}
异步调用栈:
graph TD
A[chain.ainvoke<br/>RunnableSequence] --> B[ensure_config]
B --> C[get_async_callback_manager]
C --> D[await on_chain_start<br/>异步回调]
D --> E1[await step1.ainvoke<br/>异步执行]
E1 --> E1A[await format_messages_async]
E1A --> E1B[返回messages]
E1B --> E2[await step2.ainvoke<br/>ChatModel]
E2 --> E2A[await _agenerate<br/>异步HTTP请求]
E2A --> E2B{使用httpx/aiohttp}
E2B --> E2C[await response]
E2C --> E2D[返回AIMessage]
E2D --> E3[await step3.ainvoke<br/>Parser]
E3 --> E3A[await parse_async]
E3A --> E3B[返回结构化数据]
E3B --> F[await on_chain_end]
F --> G[返回最终结果]
style A fill:#e3f2fd
style E2A fill:#fff3e0
style E2B fill:#e8f5e9
style G fill:#e0f2f1
关键代码路径:
1. RunnableSequence.ainvoke入口 (langchain_core/runnables/base.py:3000-3050):
async def ainvoke(
self,
input: Input,
config: RunnableConfig | None = None
) -> Output:
# 阶段1: 异步初始化
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
# 阶段2: 异步开始追踪
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
)
# 阶段3: 异步逐步执行
try:
for i, step in enumerate(self.steps):
step_config = patch_config(
config,
callbacks=run_manager.get_child()
)
# 使用await等待异步执行
input = await step.ainvoke(input, step_config)
except BaseException as e:
await run_manager.on_chain_error(e)
raise
else:
await run_manager.on_chain_end(input)
return input
2. 异步回调管理器:
class AsyncCallbackManager:
async def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
**kwargs: Any,
) -> AsyncCallbackManagerForChainRun:
"""异步触发回调"""
run_id = kwargs.get("run_id") or uuid.uuid4()
# 并发触发所有回调处理器
await asyncio.gather(
*[
handler.on_chain_start(
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs
)
for handler in self.handlers
if hasattr(handler, 'on_chain_start')
],
return_exceptions=True # 单个回调失败不影响其他
)
return AsyncCallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
# ... 其他字段
)
3. 异步LLM调用示例 (langchain_openai/chat_models.py):
class ChatOpenAI(BaseChatModel):
async def ainvoke(
self,
input: list[BaseMessage],
config: RunnableConfig | None = None,
**kwargs: Any,
) -> AIMessage:
# 使用异步HTTP客户端
async with httpx.AsyncClient() as client:
response = await client.post(
self.openai_api_base,
json=self._prepare_request(input),
headers=self._get_headers(),
timeout=self.request_timeout,
)
response.raise_for_status()
return self._parse_response(response.json())
异步执行时序图:
sequenceDiagram
autonumber
participant User as 异步应用
participant FastAPI
participant EventLoop as asyncio事件循环
participant Seq as RunnableSequence
participant LLM as ChatOpenAI
participant HTTP as httpx.AsyncClient
participant API as OpenAI API
User->>FastAPI: POST /chat (HTTP请求)
FastAPI->>EventLoop: 创建Task(chat_endpoint)
EventLoop->>Seq: await chain.ainvoke(input)
Seq->>Seq: ensure_config
Seq->>Seq: await on_chain_start
Seq->>LLM: await llm.ainvoke(messages)
LLM->>HTTP: await client.post(url, json=...)
Note over EventLoop: 释放事件循环<br/>可处理其他请求
HTTP->>API: HTTP POST请求
Note over API: OpenAI处理请求<br/>生成响应(5-10秒)
API-->>HTTP: HTTP 200 OK (响应)
HTTP-->>LLM: response.json()
LLM->>LLM: parse_response
LLM-->>Seq: AIMessage
Seq->>Seq: await on_chain_end
Seq-->>EventLoop: 完成Task
EventLoop-->>FastAPI: 返回响应
FastAPI-->>User: HTTP 200 (JSON响应)
Note over User,API: 在等待API响应期间<br/>事件循环可处理其他请求<br/>实现高并发
路径说明:
- 阶段1(异步初始化): 使用async版本的回调管理器
- 阶段2(异步追踪): await on_chain_start,回调处理器并发执行
- 阶段3(异步执行): 遍历steps,使用await等待每步完成
- 关键点: I/O等待期间事件循环可切换到其他任务
性能特性:
- 非阻塞I/O: 等待LLM响应时,事件循环可处理其他请求
- 高并发: 单线程可处理成百上千的并发请求
- 无GIL限制: 异步I/O不受GIL影响
- 内存效率: 协程比线程更轻量(KB级 vs MB级)
异步批处理abatch:
async def abatch(
self,
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
**kwargs: Any,
) -> list[Output]:
configs = get_config_list(config, len(inputs))
# 使用asyncio.gather并发执行所有ainvoke
results = await asyncio.gather(
*[
self.ainvoke(input, config)
for input, config in zip(inputs, configs)
],
return_exceptions=kwargs.get("return_exceptions", False)
)
return results
asyncio.gather优势:
- 所有协程并发执行,不需要线程池
- 内存开销小,可处理大量并发(10k+)
- 自动管理协程调度
- return_exceptions=True时单个失败不影响其他
路径4: 容错路径(Retry + Fallbacks)
用户层调用:
# 构建容错链
primary_model = ChatOpenAI(model="gpt-4o")
fallback_model = ChatOpenAI(model="gpt-4o-mini")
# 添加重试和备用
robust_model = primary_model.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
).with_fallbacks([fallback_model])
chain = prompt | robust_model | parser
result = chain.invoke({"topic": "AI"})
容错调用栈:
graph TD
A[chain.invoke] --> B[RunnableSequence.invoke]
B --> C[prompt.invoke]
C --> D[RunnableWithFallbacks.invoke]
D --> E[主路径: RunnableRetry.invoke]
E --> F{尝试1: primary_model.invoke}
F -->|失败| G{等待2^1秒+jitter}
G --> H{尝试2: primary_model.invoke}
H -->|失败| I{等待2^2秒+jitter}
I --> J{尝试3: primary_model.invoke}
J -->|仍失败| K[抛出异常]
K --> L[RunnableWithFallbacks捕获异常]
L --> M[切换到fallback: fallback_model.invoke]
M -->|成功| N[返回AIMessage]
J -->|成功| N
F -->|成功| N
H -->|成功| N
N --> O[parser.invoke]
O --> P[返回最终结果]
style D fill:#ffe8e8
style E fill:#fff3e0
style M fill:#e8f5e9
style P fill:#e0f2f1
RunnableRetry关键代码 (langchain_core/runnables/retry.py):
class RunnableRetry(RunnableBinding[Input, Output]):
max_attempt_number: int = 3
wait_exponential_jitter: bool = True
retry_exception_types: tuple[type[BaseException], ...] = (Exception,)
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
from tenacity import (
retry,
stop_after_attempt,
wait_exponential_jitter,
retry_if_exception_type,
)
# 使用tenacity库实现重试装饰器
@retry(
stop=stop_after_attempt(self.max_attempt_number),
wait=wait_exponential_jitter() if self.wait_exponential_jitter
else wait_exponential(),
retry=retry_if_exception_type(self.retry_exception_types),
before_sleep=self._log_retry,
reraise=True,
)
def _invoke_with_retry():
return self.bound.invoke(input, config)
return _invoke_with_retry()
def _log_retry(self, retry_state):
"""重试前记录日志"""
print(f"Attempt {retry_state.attempt_number} failed, retrying...")
RunnableWithFallbacks关键代码 (已在前面展示,此处补充异常处理细节):
class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
config = ensure_config(config)
first_error = None
# 尝试主Runnable
try:
return self.runnable.invoke(input, config)
except self.exceptions_to_handle as e:
first_error = e
# 依次尝试每个fallback
for fallback in self.fallbacks:
try:
return fallback.invoke(input, config)
except self.exceptions_to_handle as e:
# 记录但继续尝试下一个
continue
# 所有选项都失败,抛出最初的异常
raise first_error
容错时序图:
sequenceDiagram
autonumber
participant User
participant Fallbacks as RunnableWithFallbacks
participant Retry as RunnableRetry
participant Primary as primary_model
participant Fallback as fallback_model
participant API1 as OpenAI API(gpt-4o)
participant API2 as OpenAI API(gpt-4o-mini)
User->>Fallbacks: invoke(input)
Fallbacks->>Retry: invoke(input)
Note over Retry: 第1次尝试
Retry->>Primary: invoke(input)
Primary->>API1: HTTP POST
API1-->>Primary: 503 Service Unavailable
Primary-->>Retry: APIError
Note over Retry: 等待2^1秒(2s) + 随机抖动
Note over Retry: 第2次尝试
Retry->>Primary: invoke(input)
Primary->>API1: HTTP POST
API1-->>Primary: 429 Rate Limit
Primary-->>Retry: RateLimitError
Note over Retry: 等待2^2秒(4s) + 随机抖动
Note over Retry: 第3次尝试
Retry->>Primary: invoke(input)
Primary->>API1: HTTP POST
API1-->>Primary: Timeout
Primary-->>Retry: TimeoutError
Retry-->>Fallbacks: 抛出TimeoutError
Note over Fallbacks: 主路径失败<br/>切换到fallback
Fallbacks->>Fallback: invoke(input)
Fallback->>API2: HTTP POST
API2-->>Fallback: 200 OK
Fallback-->>Fallbacks: AIMessage
Fallbacks-->>User: 返回结果(来自fallback)
路径说明:
- 重试机制: RunnableRetry使用tenacity库,实现指数退避+随机抖动
- 异常过滤: 只重试指定类型的异常(如APIError),其他立即抛出
- 故障转移: 重试耗尽后,RunnableWithFallbacks捕获异常,切换到备用Runnable
- 多级容错: 可配置多个fallback,按顺序尝试
性能影响:
- 重试延迟: 3次重试总延迟约2+4+8=14秒(加抖动)
- 成本: 重试会产生额外的API调用成本
- 用户体验: fallback通常选择更快但质量稍低的模型
扩展点
自定义Runnable:
class MyRunnable(Runnable[InputType, OutputType]):
def invoke(self, input: InputType, config: RunnableConfig) -> OutputType:
# 实现核心逻辑
pass
配置化组件:
runnable.configurable_fields(
temperature=ConfigurableField(id="temperature", name="Temperature")
).configurable_alternatives(
ConfigurableField(id="model"),
default_key="gpt-4o",
gpt35=ChatOpenAI(model="gpt-3.5-turbo")
)
核心数据结构
classDiagram
class Runnable~Input, Output~ {
<<abstract>>
+name: str|None
+invoke(input: Input, config: RunnableConfig) Output
+ainvoke(input: Input, config: RunnableConfig) Awaitable[Output]
+batch(inputs: list[Input], config: RunnableConfig) list[Output]
+abatch(inputs: list[Input], config: RunnableConfig) Awaitable~list[Output]~
+stream(input: Input, config: RunnableConfig) Iterator[Output]
+astream(input: Input, config: RunnableConfig) AsyncIterator[Output]
+InputType: type[Input]
+OutputType: type[Output]
+input_schema: type[BaseModel]
+output_schema: type[BaseModel]
+with_retry(...) RunnableRetry
+with_fallbacks(...) RunnableWithFallbacks
+with_config(...) RunnableBinding
+pipe(...) RunnableSequence
+__or__(other) RunnableSequence
}
class RunnableConfig {
<<TypedDict>>
+tags: list[str]
+metadata: dict
+callbacks: Callbacks
+run_name: str
+max_concurrency: int|None
+recursion_limit: int
+configurable: dict
+run_id: UUID|None
}
class RunnableSerializable~Input, Output~ {
+lc_serializable: bool
+get_lc_namespace() list[str]
+to_json() dict
+save(path: str) None
}
class RunnableSequence~Input, Output~ {
+first: Runnable[Input, Any]
+middle: list[Runnable]
+last: Runnable[Any, Output]
+steps: list[Runnable]
}
class RunnableParallel~Input~ {
+steps: dict[str, Runnable[Input, Any]]
}
class RunnableBranch~Input, Output~ {
+branches: list[tuple[Callable, Runnable]]
+default: Runnable[Input, Output]
}
class RunnableLambda~Input, Output~ {
+func: Callable
+afunc: Callable|None
}
class RunnableBinding~Input, Output~ {
+bound: Runnable[Input, Output]
+kwargs: dict
+config: RunnableConfig
+config_factories: list[Callable]
}
class RunnableWithFallbacks~Input, Output~ {
+runnable: Runnable[Input, Output]
+fallbacks: list[Runnable[Input, Output]]
}
Runnable <|-- RunnableSerializable
RunnableSerializable <|-- RunnableSequence
RunnableSerializable <|-- RunnableParallel
RunnableSerializable <|-- RunnableBranch
Runnable <|-- RunnableLambda
RunnableSerializable <|-- RunnableBinding
RunnableSerializable <|-- RunnableWithFallbacks
Runnable --> RunnableConfig : uses
数据结构说明
Runnable基类字段
| 字段 | 类型 | 说明 | 约束 |
|---|---|---|---|
| name | str|None | Runnable名称,用于追踪和调试 | 可选,默认为类名 |
| InputType | type[Input] | 输入类型,通过泛型参数推导 | 只读属性 |
| OutputType | type[Output] | 输出类型,通过泛型参数推导 | 只读属性 |
RunnableConfig字段
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| tags | list[str] | 否 | [] | 标签列表,用于过滤追踪 |
| metadata | dict[str, Any] | 否 | {} | 元数据,记录到trace |
| callbacks | Callbacks | 否 | None | 回调处理器列表 |
| run_name | str | 否 | 类名 | 运行名称,显示在追踪中 |
| max_concurrency | int|None | 否 | None | 最大并发数 |
| recursion_limit | int | 否 | 25 | 递归深度限制 |
| configurable | dict[str, Any] | 否 | {} | 动态配置字段值 |
| run_id | UUID|None | 否 | None | 运行ID,自动生成 |
RunnableSequence字段
| 字段 | 类型 | 说明 |
|---|---|---|
| first | Runnable | 序列中第一个组件 |
| middle | list[Runnable] | 序列中间组件列表 |
| last | Runnable | 序列中最后一个组件 |
| steps | list[Runnable] | 所有步骤的只读视图 |
RunnableParallel字段
| 字段 | 类型 | 说明 |
|---|---|---|
| steps | dict[str, Runnable] | 并行执行的分支,键为输出字段名 |
RunnableWithFallbacks字段
| 字段 | 类型 | 说明 |
|---|---|---|
| runnable | Runnable | 主Runnable |
| fallbacks | list[Runnable] | 备用Runnable列表,按顺序尝试 |
| exceptions_to_handle | tuple[type[Exception]] | 触发故障转移的异常类型 |
类型映射
DTO与内部模型:
- 输入输出类型由泛型参数指定,运行时动态验证
- input_schema/output_schema属性返回Pydantic模型用于验证
- 自动处理类型转换(如dict→Pydantic model)
序列化策略:
- RunnableSerializable使用lc_secrets/lc_attributes控制序列化字段
- 序列化为JSON时包含
__class__字段用于反序列化 - 不序列化callbacks和运行时配置
版本演进
0.1.x → 0.2.x:
- 添加astream_events v2支持
- 引入RunnableGenerator替代旧的transform方法
0.2.x → 1.0.x:
- 稳定化核心API
- 弃用部分legacy方法
- 改进类型推导机制
核心API详解
API-1: Runnable.invoke
基本信息
- 名称:
invoke - 方法签名:
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output - 幂等性: 取决于具体实现,Runnable基类不保证幂等
功能说明
同步调用Runnable,将单个输入转换为输出。这是Runnable最核心的方法,所有子类必须实现。
请求结构体
# invoke方法参数
input: Input # 泛型输入,类型由子类指定
config: RunnableConfig | None # 可选配置
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| input | Input | 是 | - | 输入数据,类型由Runnable的泛型参数决定 |
| config | RunnableConfig|None | 否 | None | 运行时配置 |
响应结构体
# invoke方法返回值
output: Output # 泛型输出,类型由子类指定
| 返回值 | 类型 | 说明 |
|---|---|---|
| output | Output | 输出数据,类型由Runnable的泛型参数决定 |
入口函数与关键代码
# Runnable基类的invoke方法(抽象)
@abstractmethod
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
"""将单个输入转换为输出。
Args:
input: 输入数据
config: 可选的运行时配置
Returns:
转换后的输出数据
"""
...
# RunnableSequence的invoke实现
class RunnableSequence(RunnableSerializable[Input, Output]):
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
# 1) 确保config存在并设置上下文
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
# 2) 开始链式调用追踪
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name()
)
# 3) 依次调用序列中的每个步骤
try:
for step in self.steps:
input = step.invoke(
input,
patch_config(config, callbacks=run_manager.get_child())
)
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return input
代码说明:
- 确保config存在,获取回调管理器
- 触发on_chain_start回调,开始追踪
- 遍历steps列表,将前一步输出作为下一步输入
- 异常时触发on_chain_error回调
- 成功时触发on_chain_end回调并返回最终输出
调用链与上层函数
# 用户代码调用
chain = prompt | model | parser
result = chain.invoke({"topic": "AI"})
# 调用栈
# 1. RunnableSequence.invoke (chain)
# ├─ 2. PromptTemplate.invoke (prompt)
# │ └─ 格式化模板,返回messages
# ├─ 3. ChatModel.invoke (model)
# │ ├─ _generate_with_cache (缓存逻辑)
# │ ├─ _generate (实际API调用)
# │ └─ 返回AIMessage
# └─ 4. OutputParser.invoke (parser)
# └─ 解析message.content,返回结构化输出
# 上层适配(Web框架中)
from fastapi import FastAPI
app = FastAPI()
@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
# 调用链的invoke
result = chain.invoke(
{"messages": request.messages},
config={
"metadata": {"user_id": request.user_id},
"tags": ["api", "prod"]
}
)
return {"response": result}
调用链说明:
- 用户调用链的invoke → RunnableSequence逐步执行
- 每个步骤的输出自动传递给下一步
- config在整个调用链中传递
- 回调事件从最外层传播到最内层
时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant Seq as RunnableSequence
participant CB as CallbackManager
participant Step1 as Prompt
participant Step2 as LLM
participant Step3 as Parser
User->>Seq: invoke(input, config)
Seq->>Seq: ensure_config(config)
Seq->>CB: get_callback_manager_for_config
CB-->>Seq: callback_manager
Seq->>CB: on_chain_start(self, input)
CB-->>Seq: run_manager
Seq->>Step1: invoke(input, config)
Step1->>Step1: 格式化模板
Step1-->>Seq: messages
Seq->>Step2: invoke(messages, config)
Step2->>CB: on_llm_start(messages)
Step2->>Step2: 调用LLM API
Step2->>CB: on_llm_end(response)
Step2-->>Seq: AIMessage
Seq->>Step3: invoke(AIMessage, config)
Step3->>Step3: 解析输出
Step3-->>Seq: parsed_output
Seq->>CB: on_chain_end(parsed_output)
Seq-->>User: final_output
时序图说明:
图意: 描述RunnableSequence的invoke方法如何协调多个步骤的顺序执行,以及回调系统如何追踪整个过程。
边界条件:
- 任何步骤抛出异常都会中断执行并向上传播
- config中的recursion_limit防止无限递归
- 超时由config.timeout控制,超时抛出TimeoutError
异常与回退:
- 异常在on_chain_error回调后向上抛出
- RunnableWithFallbacks可捕获异常并切换备用链
- 用户可通过try-except捕获特定异常类型
性能要点:
- 每个步骤串行执行,总耗时为各步骤之和
- 回调系统异步执行,不阻塞主流程(LangSmith上报)
- 缓存在LLM层实现,重复输入可跳过API调用
兼容性:
- invoke方法签名在1.0后承诺稳定
- config字段向后兼容,新增字段有默认值
API-2: Runnable.batch
基本信息
- 名称:
batch - 方法签名:
def batch(self, inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs) -> list[Output] - 幂等性: 非幂等,每次调用都会执行
功能说明
批量处理多个输入,自动并行执行以提高吞吐量。
请求结构体
inputs: list[Input] # 输入列表
config: RunnableConfig | list[RunnableConfig] | None # 配置,可为每个输入单独配置
return_exceptions: bool = False # 是否返回异常对象而非抛出
max_concurrency: int | None # 通过config.max_concurrency或kwargs传递
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| inputs | list[Input] | 是 | - | 输入数据列表 |
| config | RunnableConfig|list[RunnableConfig]|None | 否 | None | 配置,单个或每个输入一个 |
| return_exceptions | bool | 否 | False | 是否捕获异常并返回 |
| max_concurrency | int|None | 否 | None | 最大并发数 |
响应结构体
outputs: list[Output | Exception] # 输出列表,顺序与输入对应
入口函数与关键代码
class Runnable(ABC, Generic[Input, Output]):
def batch(
self,
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any,
) -> list[Output]:
# 1) 规范化config为列表
configs = get_config_list(config, len(inputs))
# 2) 提取max_concurrency
max_concurrency = (
kwargs.get("max_concurrency")
or configs[0].get("max_concurrency")
if configs
else None
)
# 3) 使用线程池并行执行
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures = [
executor.submit(self.invoke, input, config)
for input, config in zip(inputs, configs)
]
# 4) 收集结果
results = []
for future in futures:
try:
results.append(future.result())
except Exception as e:
if return_exceptions:
results.append(e)
else:
raise
return results
代码说明:
- 将单个config扩展为列表,或验证列表长度匹配
- 从config或kwargs提取max_concurrency
- 创建线程池,为每个输入提交invoke任务
- 等待所有Future完成并收集结果
- 根据return_exceptions决定异常处理方式
调用链与上层函数
# 批量处理多个请求
topics = ["AI", "区块链", "量子计算"]
results = chain.batch([{"topic": t} for t in topics])
# 限制并发
results = chain.batch(
[{"topic": t} for t in topics],
config={"max_concurrency": 2} # 最多2个并发
)
# 容错批处理
results = chain.batch(
inputs,
return_exceptions=True # 失败项返回异常对象
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"输入{i}失败: {result}")
else:
print(f"输入{i}成功: {result}")
时序图
sequenceDiagram
autonumber
participant User
participant Runnable
participant ThreadPool
participant Worker1
participant Worker2
participant WorkerN
User->>Runnable: batch([input1, input2, ..., inputN], config)
Runnable->>Runnable: get_config_list(config, N)
Runnable->>ThreadPool: 创建线程池(max_workers=max_concurrency)
par 并行执行
Runnable->>Worker1: submit(invoke, input1)
Runnable->>Worker2: submit(invoke, input2)
Runnable->>WorkerN: submit(invoke, inputN)
end
Worker1->>Worker1: invoke(input1)
Worker2->>Worker2: invoke(input2)
WorkerN->>WorkerN: invoke(inputN)
Worker1-->>Runnable: output1
Worker2-->>Runnable: output2
WorkerN-->>Runnable: outputN
Runnable->>Runnable: 按输入顺序收集结果
Runnable-->>User: [output1, output2, ..., outputN]
时序图说明:
图意: 展示batch如何使用线程池并行处理多个输入,最大化吞吐量。
边界条件:
- max_concurrency=None时,线程池大小为CPU核心数(默认行为)
- 结果顺序与输入顺序一致,即使完成时间不同
- return_exceptions=True时,异常不会中断其他任务
性能要点:
- I/O密集型任务(如LLM调用)线程池效果好
- CPU密集型任务受GIL限制,考虑ProcessPoolExecutor
- 批大小应根据速率限制和内存调整
API-3: Runnable.stream
基本信息
- 名称:
stream - 方法签名:
def stream(self, input: Input, config: RunnableConfig | None = None, **kwargs) -> Iterator[Output] - 幂等性: 非幂等
功能说明
流式输出,逐块生成结果,适用于需要快速首字节响应的场景(如聊天应用)。
请求结构体
input: Input
config: RunnableConfig | None
响应结构体
# 返回迭代器,yield输出块
Iterator[Output]
入口函数与关键代码
class Runnable(ABC, Generic[Input, Output]):
def stream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Iterator[Output]:
# 默认实现:调用invoke并yield整个结果
# 子类可重写以实现真正的流式输出
yield self.invoke(input, config, **kwargs)
# RunnableSequence的流式实现
class RunnableSequence(RunnableSerializable[Input, Output]):
def stream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Iterator[Output]:
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
)
# 1) 执行前N-1个步骤(非流式)
for step in self.steps[:-1]:
input = step.invoke(input, patch_config(config, callbacks=run_manager.get_child()))
# 2) 最后一个步骤流式执行
final_pipeline = self.steps[-1].stream(
input,
patch_config(config, callbacks=run_manager.get_child()),
)
# 3) yield最后一步的每个chunk
for chunk in final_pipeline:
yield chunk
run_manager.on_chain_end(chunk)
代码说明:
- Runnable基类默认实现是非流式的(yield整个结果)
- RunnableSequence只流式化最后一个步骤
- 前面步骤必须完成才能开始流式输出
- 每个chunk通过Iterator yield到调用方
调用链与上层函数
# 流式输出示例
for chunk in chain.stream({"topic": "量子计算"}):
print(chunk, end="", flush=True)
# Web应用中的流式响应
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
def generate():
for chunk in chain.stream({"messages": request.messages}):
yield f"data: {json.dumps(chunk)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
时序图
sequenceDiagram
autonumber
participant User
participant Seq as RunnableSequence
participant Step1 as Prompt
participant Step2 as LLM
User->>Seq: stream(input)
Seq->>Step1: invoke(input)
Step1-->>Seq: formatted_prompt
Seq->>Step2: stream(formatted_prompt)
Step2->>Step2: 开始LLM流式生成
loop 每个token
Step2-->>Seq: chunk1
Seq-->>User: chunk1
Note over User: 立即显示
Step2-->>Seq: chunk2
Seq-->>User: chunk2
Step2-->>Seq: chunk3
Seq-->>User: chunk3
end
Step2-->>Seq: 流结束
Seq-->>User: 迭代器关闭
时序图说明:
图意: 展示流式输出如何逐块传递数据,实现低延迟的用户体验。
边界条件:
- 并非所有Runnable都支持真正的流式,某些会退化为yield整体
- 流式过程中抛出异常会中断迭代
- 客户端断开连接时,迭代器自动关闭
性能要点:
- 首字节延迟显著降低(首个chunk即可显示)
- 总耗时与非流式相同或略高
- 适合长文本生成和实时反馈场景
API-4: Runnable.ainvoke
基本信息
- 名称:
ainvoke - 方法签名:
async def ainvoke(self, input: Input, config: RunnableConfig | None = None, **kwargs) -> Output - 幂等性: 非幂等
功能说明
异步版本的invoke,适用于高并发服务器环境(如FastAPI/asyncio)。
请求结构体
同invoke,参数相同。
响应结构体
# 返回Awaitable
Awaitable[Output]
入口函数与关键代码
class Runnable(ABC, Generic[Input, Output]):
async def ainvoke(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Output:
# 默认实现:在executor中运行同步invoke
return await run_in_executor(
config,
self.invoke,
input,
config,
**kwargs
)
# RunnableSequence的异步实现
class RunnableSequence(RunnableSerializable[Input, Output]):
async def ainvoke(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Output:
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
)
# 依次异步调用每个步骤
try:
for step in self.steps:
input = await step.ainvoke(
input,
patch_config(config, callbacks=run_manager.get_child()),
)
except BaseException as e:
await run_manager.on_chain_error(e)
raise
else:
await run_manager.on_chain_end(input)
return input
代码说明:
- 基类默认在线程池中运行同步方法
- 子类重写时应使用async/await实现真正的异步
- 异步回调管理器用于非阻塞追踪
- 所有I/O操作应使用异步库(如httpx, aioboto3)
调用链与上层函数
# 异步Web服务
from fastapi import FastAPI
app = FastAPI()
@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
# 使用ainvoke实现高并发
result = await chain.ainvoke(
{"messages": request.messages},
config={"metadata": {"user_id": request.user_id}}
)
return {"response": result}
# 批量异步处理
results = await chain.abatch([input1, input2, input3])
时序图
sequenceDiagram
autonumber
participant User as 异步用户代码
participant Seq as RunnableSequence
participant Event as asyncio事件循环
participant LLM as 异步LLM
participant API as LLM API
User->>Seq: await ainvoke(input)
Seq->>Event: 注册coroutine
Event->>Seq: 开始执行
Seq->>LLM: await ainvoke(input)
LLM->>API: httpx.post(url, ...) [async]
Note over Event: 事件循环释放,<br/>可处理其他请求
API-->>LLM: HTTP响应
LLM-->>Seq: AIMessage
Seq-->>Event: coroutine完成
Event-->>User: final_output
时序图说明:
图意: 展示异步调用如何通过事件循环实现非阻塞并发。
边界条件:
- 必须在async函数中使用await调用
- 同步代码混入异步链会阻塞事件循环(使用executor缓解)
- asyncio.run()创建新事件循环,不能在已有循环中调用
性能要点:
- 单线程处理高并发,无GIL限制
- I/O等待时CPU可处理其他请求
- 总吞吐量显著高于同步(10-100x)
API-5: Runnable.with_retry
基本信息
- 名称:
with_retry - 方法签名:
def with_retry(self, *, retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) -> RunnableRetry[Input, Output] - 幂等性: 非幂等,会多次调用底层Runnable
功能说明
为Runnable添加重试机制,使用指数退避策略处理瞬时故障。
请求结构体
retry_if_exception_type: tuple[type[BaseException], ...] # 触发重试的异常类型
wait_exponential_jitter: bool # 是否使用指数退避+抖动
stop_after_attempt: int # 最大尝试次数
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| retry_if_exception_type | tuple[type[BaseException], …] | 否 | (Exception,) | 触发重试的异常类型 |
| wait_exponential_jitter | bool | 否 | True | 使用指数退避+随机抖动 |
| stop_after_attempt | int | 否 | 3 | 最大尝试次数 |
响应结构体
RunnableRetry[Input, Output] # 包装后的Runnable
入口函数与关键代码
class Runnable(ABC, Generic[Input, Output]):
def with_retry(
self,
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
stop_after_attempt: int = 3,
) -> RunnableRetry[Input, Output]:
from langchain_core.runnables.retry import RunnableRetry
return RunnableRetry(
bound=self,
kwargs={},
config={},
retry_exception_types=retry_if_exception_type,
wait_exponential_jitter=wait_exponential_jitter,
max_attempt_number=stop_after_attempt,
)
# RunnableRetry的实现
class RunnableRetry(RunnableBindingBase[Input, Output]):
max_attempt_number: int = 3
retry_exception_types: tuple[type[BaseException], ...] = (Exception,)
wait_exponential_jitter: bool = True
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
@retry(
stop=stop_after_attempt(self.max_attempt_number),
wait=wait_exponential_jitter() if self.wait_exponential_jitter else wait_exponential(),
retry=retry_if_exception_type(self.retry_exception_types),
before_sleep=self._before_sleep(config)
)
def _invoke_with_retry(input: Input, config: RunnableConfig) -> Output:
return self.bound.invoke(input, config)
return _invoke_with_retry(input, ensure_config(config))
代码说明:
- with_retry()创建RunnableRetry包装器
- RunnableRetry使用tenacity库实现重试
- 指数退避:第n次重试等待2^n秒(带随机抖动)
- 重试前触发before_sleep回调记录日志
调用链与上层函数
# 为LLM添加重试
model_with_retry = ChatOpenAI(model="gpt-4o").with_retry(
stop_after_attempt=5,
retry_if_exception_type=(APIError, Timeout),
wait_exponential_jitter=True
)
# 为整个链添加重试
chain_with_retry = (prompt | model | parser).with_retry(
stop_after_attempt=3
)
# 调用(失败时自动重试)
result = chain_with_retry.invoke({"topic": "AI"})
时序图
sequenceDiagram
autonumber
participant User
participant Retry as RunnableRetry
participant Bound as 底层Runnable
participant API as 外部API
User->>Retry: invoke(input)
Retry->>Bound: 第1次尝试
Bound->>API: 请求
API-->>Bound: 503 Service Unavailable
Bound-->>Retry: APIError异常
Note over Retry: 等待2^1秒(+jitter)
Retry->>Bound: 第2次尝试
Bound->>API: 请求
API-->>Bound: 429 Rate Limit
Bound-->>Retry: RateLimitError异常
Note over Retry: 等待2^2秒(+jitter)
Retry->>Bound: 第3次尝试
Bound->>API: 请求
API-->>Bound: 200 OK
Bound-->>Retry: 成功结果
Retry-->>User: 返回结果
时序图说明:
图意: 展示重试机制如何通过指数退避处理瞬时故障。
边界条件:
- 达到最大尝试次数后抛出最后一次异常
- 重试只捕获指定类型异常,其他立即抛出
- 每次重试都会触发回调事件
异常与回退:
- 永久性错误(如401 Unauthorized)不应重试
- 重试会增加总延迟,需权衡
- 结合fallbacks实现多层容错
API-6: Runnable.with_fallbacks
基本信息
- 名称:
with_fallbacks - 方法签名:
def with_fallbacks(self, fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)) -> RunnableWithFallbacks[Input, Output] - 幂等性: 非幂等
功能说明
为Runnable配置备用选项,主Runnable失败时自动切换到备用。
请求结构体
fallbacks: Sequence[Runnable[Input, Output]] # 备用Runnable列表
exceptions_to_handle: tuple[type[BaseException], ...] # 触发故障转移的异常
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| fallbacks | Sequence[Runnable] | 是 | - | 备用Runnable列表,按顺序尝试 |
| exceptions_to_handle | tuple[type[BaseException], …] | 否 | (Exception,) | 触发故障转移的异常类型 |
响应结构体
RunnableWithFallbacks[Input, Output] # 包装后的Runnable
入口函数与关键代码
class Runnable(ABC, Generic[Input, Output]):
def with_fallbacks(
self,
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
) -> RunnableWithFallbacks[Input, Output]:
from langchain_core.runnables.fallbacks import RunnableWithFallbacks
return RunnableWithFallbacks(
runnable=self,
fallbacks=list(fallbacks),
exceptions_to_handle=exceptions_to_handle,
)
# RunnableWithFallbacks实现
class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
runnable: Runnable[Input, Output]
fallbacks: list[Runnable[Input, Output]]
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)
def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output:
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(dumpd(self), input)
# 1) 尝试主Runnable
try:
return self.runnable.invoke(
input,
patch_config(config, callbacks=run_manager.get_child())
)
except self.exceptions_to_handle as e:
# 2) 依次尝试备用Runnable
for fallback in self.fallbacks:
try:
return fallback.invoke(
input,
patch_config(config, callbacks=run_manager.get_child())
)
except self.exceptions_to_handle:
continue
# 3) 所有备用都失败,抛出原始异常
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
代码说明:
- 首先尝试主runnable
- 捕获指定类型异常时,按顺序尝试fallbacks
- 所有选项都失败时抛出最初的异常
- 成功则返回第一个成功的结果
调用链与上层函数
# 主模型+备用模型
primary_model = ChatOpenAI(model="gpt-4o", temperature=0)
fallback_model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
model_with_fallback = primary_model.with_fallbacks([fallback_model])
# 多级fallback
model_with_multi_fallback = primary_model.with_fallbacks([
ChatOpenAI(model="gpt-4o-mini"),
ChatOpenAI(model="gpt-3.5-turbo"),
])
# 整个链的fallback
chain_with_fallback = (prompt | model | parser).with_fallbacks([
prompt | fallback_model | parser
])
时序图
sequenceDiagram
autonumber
participant User
participant Fallbacks as RunnableWithFallbacks
participant Primary as 主Runnable
participant Fallback1 as 备用1
participant Fallback2 as 备用2
User->>Fallbacks: invoke(input)
Fallbacks->>Primary: invoke(input)
Primary->>Primary: 调用API
Primary-->>Fallbacks: APIError异常
Note over Fallbacks: 捕获异常,<br/>尝试备用1
Fallbacks->>Fallback1: invoke(input)
Fallback1->>Fallback1: 调用备用API
Fallback1-->>Fallbacks: Timeout异常
Note over Fallbacks: 继续尝试备用2
Fallbacks->>Fallback2: invoke(input)
Fallback2->>Fallback2: 调用另一备用API
Fallback2-->>Fallbacks: 成功结果
Fallbacks-->>User: 返回结果
时序图说明:
图意: 展示故障转移如何在主选项失败时自动切换到备用选项。
边界条件:
- 所有选项都失败时抛出主Runnable的异常
- 只捕获指定类型异常,其他立即抛出
- fallbacks列表不能为空
异常与回退:
- 适用于模型服务不稳定场景
- 备用模型应保证接口兼容性
- 结合retry实现多层容错
性能要点:
- 只在失败时才尝试备用,不增加正常路径延迟
- 备用模型通常选择更快/更便宜的选项
- 监控fallback触发率判断主模型稳定性
典型使用场景与时序图
场景1: 简单链式调用
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 构建链
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
chain = prompt | model | parser
# 调用
result = chain.invoke({"topic": "程序员"})
sequenceDiagram
autonumber
participant User
participant Prompt as ChatPromptTemplate
participant Model as ChatOpenAI
participant Parser as StrOutputParser
User->>Prompt: invoke({"topic": "程序员"})
Prompt->>Prompt: 格式化模板
Prompt-->>Model: [HumanMessage("讲一个关于程序员的笑话")]
Model->>Model: 调用OpenAI API
Model-->>Parser: AIMessage(content="...")
Parser->>Parser: 提取content字段
Parser-->>User: "为什么程序员..."(str)
场景2: 并行执行多分支
from langchain_core.runnables import RunnableParallel
# 并行生成多个维度的分析
analysis_chain = RunnableParallel({
"sentiment": prompt_sentiment | model | parser_sentiment,
"summary": prompt_summary | model | parser_summary,
"keywords": prompt_keywords | model | parser_keywords,
})
result = analysis_chain.invoke({"text": "..."})
# {
# "sentiment": "positive",
# "summary": "...",
# "keywords": ["AI", "machine learning"]
# }
sequenceDiagram
autonumber
participant User
participant Parallel as RunnableParallel
participant Branch1 as sentiment分支
participant Branch2 as summary分支
participant Branch3 as keywords分支
User->>Parallel: invoke({"text": "..."})
par 并行执行三个分支
Parallel->>Branch1: invoke(input)
Parallel->>Branch2: invoke(input)
Parallel->>Branch3: invoke(input)
end
Branch1->>Branch1: 情感分析
Branch2->>Branch2: 摘要生成
Branch3->>Branch3: 关键词提取
Branch1-->>Parallel: "positive"
Branch2-->>Parallel: "..."
Branch3-->>Parallel: ["AI", ...]
Parallel->>Parallel: 合并结果为dict
Parallel-->>User: {"sentiment": ..., "summary": ..., "keywords": ...}
场景3: 条件分支路由
from langchain_core.runnables import RunnableBranch
# 根据输入类型选择不同处理链
branch = RunnableBranch(
(lambda x: x["type"] == "question", qa_chain),
(lambda x: x["type"] == "command", command_chain),
default_chain # 默认分支
)
result = branch.invoke({"type": "question", "content": "..."})
sequenceDiagram
autonumber
participant User
participant Branch as RunnableBranch
participant Cond1 as 条件1: type=="question"
participant Cond2 as 条件2: type=="command"
participant QAChain
participant DefaultChain
User->>Branch: invoke({"type": "question", ...})
Branch->>Cond1: 评估条件
Cond1-->>Branch: True
Branch->>QAChain: invoke(input)
Note over Branch: 跳过其他条件
QAChain->>QAChain: 执行问答逻辑
QAChain-->>Branch: answer
Branch-->>User: answer
场景4: 容错链(Retry + Fallbacks)
# 主模型+重试+备用模型
primary = ChatOpenAI(model="gpt-4o").with_retry(stop_after_attempt=3)
fallback = ChatOpenAI(model="gpt-4o-mini")
robust_model = primary.with_fallbacks([fallback])
chain = prompt | robust_model | parser
result = chain.invoke({"topic": "AI"})
sequenceDiagram
autonumber
participant User
participant Fallbacks as WithFallbacks
participant Retry as WithRetry(gpt-4o)
participant Fallback as gpt-4o-mini
participant API1 as OpenAI API(gpt-4o)
participant API2 as OpenAI API(gpt-4o-mini)
User->>Fallbacks: invoke(input)
Fallbacks->>Retry: invoke(input)
Retry->>API1: 第1次请求
API1-->>Retry: 503错误
Note over Retry: 等待2秒
Retry->>API1: 第2次请求
API1-->>Retry: 503错误
Note over Retry: 等待4秒
Retry->>API1: 第3次请求
API1-->>Retry: 超时
Retry-->>Fallbacks: 抛出异常
Note over Fallbacks: 切换到备用模型
Fallbacks->>Fallback: invoke(input)
Fallback->>API2: 请求
API2-->>Fallback: 成功响应
Fallback-->>Fallbacks: AIMessage
Fallbacks-->>User: 最终结果
关键配置与最佳实践
配置项详解
RunnableConfig核心字段:
config = {
# 追踪相关
"tags": ["prod", "api", "user-facing"], # 标签,用于过滤trace
"metadata": {"user_id": "123", "session_id": "abc"}, # 业务元数据
"run_name": "customer_support_chain", # 运行名称
"run_id": uuid.uuid4(), # 可选,通常自动生成
# 执行控制
"max_concurrency": 5, # 并发限制
"recursion_limit": 25, # 递归深度(防止无限递归)
"timeout": 60.0, # 超时秒数(需要Runnable支持)
# 回调系统
"callbacks": [
ConsoleCallbackHandler(), # 控制台输出
CustomCallbackHandler(), # 自定义回调
],
# 动态配置
"configurable": {
"model_name": "gpt-4o", # 配置字段值
"temperature": 0.7,
}
}
result = chain.invoke(input, config=config)
性能优化建议
1. 批处理优化:
# 避免:循环调用invoke
results = [chain.invoke(input) for input in inputs] # 串行,慢
# 推荐:使用batch
results = chain.batch(inputs, config={"max_concurrency": 10}) # 并行,快
2. 异步优化:
# 高并发服务器使用异步
async def handler(request):
result = await chain.ainvoke(request.input) # 非阻塞
return result
# 并发处理多个请求
results = await asyncio.gather(
chain.ainvoke(input1),
chain.ainvoke(input2),
chain.ainvoke(input3),
)
3. 流式优化:
# 降低首字节延迟
async def stream_handler(request):
async for chunk in chain.astream(request.input):
await websocket.send(chunk) # 实时推送
4. 缓存优化:
from langchain_core.globals import set_llm_cache
from langchain_core.caches import InMemoryCache
# 启用缓存(相同输入返回缓存结果)
set_llm_cache(InMemoryCache())
# 或使用Redis缓存
from langchain_community.cache import RedisCache
set_llm_cache(RedisCache(redis_=redis_client))
错误处理最佳实践
1. 分层容错:
# 第1层:单个LLM的retry
model_with_retry = model.with_retry(stop_after_attempt=3)
# 第2层:fallback到备用模型
model_with_fallback = model_with_retry.with_fallbacks([cheaper_model])
# 第3层:应用层try-except
try:
result = chain.invoke(input)
except Exception as e:
# 记录日志,返回默认响应
logger.error(f"Chain failed: {e}")
return default_response
2. 异常类型区分:
from openai import RateLimitError, APIError
# 只对瞬时错误重试
model.with_retry(
retry_if_exception_type=(RateLimitError, APIError),
stop_after_attempt=3
)
# 永久性错误(如401)不重试
model.with_retry(
retry_if_exception_type=(RateLimitError,), # 不包括AuthenticationError
)
3. 超时控制:
# 为整个链设置超时
result = chain.invoke(
input,
config={"timeout": 30} # 30秒超时
)
# 或使用asyncio.wait_for
result = await asyncio.wait_for(
chain.ainvoke(input),
timeout=30.0
)
可观测性最佳实践
1. 结构化标签:
config = {
"tags": [
f"env:{os.getenv('ENV', 'dev')}",
f"version:{APP_VERSION}",
f"user_tier:{user.tier}",
],
"metadata": {
"user_id": user.id,
"request_id": request_id,
"feature_flags": user.feature_flags,
}
}
2. 自定义回调监控:
from langchain_core.callbacks import BaseCallbackHandler
class MetricsCallback(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
metrics.increment("llm.requests")
def on_llm_end(self, response, **kwargs):
tokens = response.llm_output.get("token_usage", {})
metrics.gauge("llm.tokens", tokens.get("total_tokens", 0))
def on_llm_error(self, error, **kwargs):
metrics.increment("llm.errors")
# 使用
chain.invoke(input, config={"callbacks": [MetricsCallback()]})
3. LangSmith集成:
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production"
# 自动追踪所有运行
result = chain.invoke(input)
# 在LangSmith UI中查看完整trace
组合模式建议
1. 链式组合(Sequential):
# 适用:每步输出是下一步输入
chain = step1 | step2 | step3
2. 并行组合(Parallel):
# 适用:多个独立任务共享输入
parallel = RunnableParallel({
"task1": chain1,
"task2": chain2,
})
3. 条件组合(Branching):
# 适用:根据输入选择不同处理路径
branch = RunnableBranch(
(condition1, chain1),
(condition2, chain2),
default_chain
)
4. 数据增强(Passthrough + Assign):
# 保留原始输入并添加新字段
from langchain_core.runnables import RunnablePassthrough
chain = (
RunnablePassthrough.assign(analysis=analysis_chain) # 添加analysis字段
| final_chain # 使用{"原始输入": ..., "analysis": ...}
)
测试建议
1. 单元测试:
def test_custom_runnable():
runnable = MyCustomRunnable()
result = runnable.invoke("test input")
assert result == "expected output"
def test_chain():
chain = step1 | step2
result = chain.invoke(test_input)
assert result.field == expected_value
2. Mock LLM:
from langchain_core.language_models import FakeListChatModel
def test_chain_logic():
fake_llm = FakeListChatModel(responses=["mocked response"])
chain = prompt | fake_llm | parser
result = chain.invoke({"topic": "test"})
assert "mocked" in result
3. 集成测试:
@pytest.mark.integration
async def test_full_chain():
# 使用真实LLM,但用测试API key
result = await production_chain.ainvoke(test_input)
assert result is not None
模块内部详细时序图汇总
图1: 完整请求生命周期时序图
sequenceDiagram
autonumber
participant Client as 客户端
participant WebFramework as Web框架<br/>(FastAPI/Flask)
participant RSeq as RunnableSequence
participant Config as Config系统
participant CB as CallbackManager
participant Comp1 as Prompt组件
participant Comp2 as LLM组件
participant Comp3 as Parser组件
participant Cache as LLM缓存
participant API as 外部API<br/>(OpenAI)
participant Tracer as LangSmithTracer
participant DB as 数据库
rect rgb(230, 240, 255)
Note over Client,WebFramework: 阶段1: 请求接收与路由
Client->>WebFramework: HTTP POST /api/chat
WebFramework->>WebFramework: 验证请求
WebFramework->>WebFramework: 提取参数
end
rect rgb(255, 245, 230)
Note over WebFramework,RSeq: 阶段2: 链初始化
WebFramework->>RSeq: invoke(input, config)
RSeq->>Config: ensure_config(config)
Config->>Config: 填充默认值<br/>tags, metadata等
Config-->>RSeq: 规范化配置
RSeq->>CB: get_callback_manager_for_config
CB->>Tracer: 创建追踪器
CB-->>RSeq: callback_manager
end
rect rgb(232, 245, 233)
Note over RSeq,Tracer: 阶段3: 链执行开始追踪
RSeq->>CB: on_chain_start(serialized, input)
CB->>Tracer: 记录开始事件<br/>生成run_id
CB->>DB: 异步写入trace
CB-->>RSeq: run_manager
end
rect rgb(255, 243, 224)
Note over RSeq,Comp1: 阶段4: 步骤1 - Prompt处理
RSeq->>Comp1: invoke(input, patched_config)
Comp1->>CB: on_chain_start(prompt)
Comp1->>Comp1: format_messages<br/>填充模板变量
Comp1->>CB: on_chain_end(messages)
Comp1-->>RSeq: List[BaseMessage]
end
rect rgb(232, 245, 233)
Note over RSeq,API: 阶段5: 步骤2 - LLM调用
RSeq->>Comp2: invoke(messages, patched_config)
Comp2->>CB: on_llm_start(messages)
Comp2->>Cache: 检查缓存
alt 缓存命中
Cache-->>Comp2: 返回缓存结果
else 缓存未命中
Comp2->>API: HTTP POST /v1/chat/completions
Note over API: 外部API处理<br/>5-10秒
API-->>Comp2: 200 OK (JSON)
Comp2->>Cache: 更新缓存
end
Comp2->>CB: on_llm_end(response)
CB->>Tracer: 记录token使用量<br/>记录延迟
Comp2-->>RSeq: AIMessage
end
rect rgb(252, 228, 236)
Note over RSeq,Comp3: 阶段6: 步骤3 - 输出解析
RSeq->>Comp3: invoke(AIMessage, patched_config)
Comp3->>CB: on_chain_start(parser)
Comp3->>Comp3: parse(message.content)<br/>结构化输出
Comp3->>CB: on_chain_end(parsed)
Comp3-->>RSeq: 结构化数据
end
rect rgb(243, 229, 245)
Note over RSeq,Tracer: 阶段7: 链执行完成
RSeq->>CB: on_chain_end(final_output)
CB->>Tracer: 记录完成事件<br/>计算总延迟
CB->>DB: 异步写入完整trace
RSeq-->>WebFramework: final_output
end
rect rgb(224, 242, 241)
Note over WebFramework,Client: 阶段8: 响应返回
WebFramework->>WebFramework: 序列化输出
WebFramework-->>Client: HTTP 200 OK (JSON)
end
Note over Client,DB: 完整请求耗时: T(prompt) + T(llm) + T(parser) + 网络延迟
时序图说明:
阶段1: 请求接收与路由
- Web框架接收HTTP请求
- 验证身份、速率限制、参数校验
- 提取用户输入和元数据
阶段2: 链初始化
- ensure_config规范化配置,填充默认值
- 创建CallbackManager和追踪器
- 建立回调处理器链
阶段3: 链执行开始追踪
- 生成唯一run_id用于追踪
- 序列化链结构,记录输入
- 异步上报到LangSmith
阶段4: Prompt处理
- 接收输入字典(如
{"topic": "AI"}) - 使用Jinja2模板引擎格式化
- 输出标准化消息列表
阶段5: LLM调用
- 首先检查LLM缓存(基于输入哈希)
- 缓存未命中时调用外部API
- 触发on_llm_start/on_llm_end回调
- 记录token使用量和延迟
阶段6: 输出解析
- 提取AIMessage的content字段
- 根据schema解析为结构化数据
- 支持JSON、XML、正则等多种格式
阶段7: 链执行完成
- 记录完整trace到数据库
- 计算端到端延迟
- 上报性能指标
阶段8: 响应返回
- 序列化输出为JSON
- 返回HTTP响应
- 记录访问日志
边界条件:
- 任何阶段异常: 触发on_*_error回调,向上传播,返回5xx错误
- 超时控制: config.timeout在各阶段检查,超时抛出TimeoutError
- 递归限制: config.recursion_limit防止无限递归(如LLM调用自身)
- 并发限制: config.max_concurrency限制batch并发数
性能关键点:
| 阶段 | 典型耗时 | 优化建议 |
|---|---|---|
| Prompt处理 | 1-10ms | 避免复杂模板逻辑 |
| LLM调用 | 500-5000ms | 启用缓存,使用更快的模型 |
| 输出解析 | 1-50ms | 简化解析逻辑,使用compiled regex |
| 回调上报 | 10-100ms | 异步批量上报,不阻塞主路径 |
可观测性数据:
- run_id: 唯一追踪ID,关联所有子调用
- tags: 过滤和分组(如
["prod", "api", "gpt-4"]) - metadata: 业务上下文(如
{"user_id": "123", "session_id": "abc"}) - token_usage: 输入/输出token数,成本计算
- latency: 各阶段延迟,性能分析
- errors: 异常类型、堆栈、重试次数
图2: 并发执行对比(Parallel vs Batch vs Async)
graph TB
subgraph Sequential["串行执行(baseline)"]
S1[请求1<br/>10秒] --> S2[请求2<br/>10秒]
S2 --> S3[请求3<br/>10秒]
S3 --> S_TOTAL[总耗时: 30秒]
end
subgraph Parallel["RunnableParallel并行"]
P1[分支1<br/>10秒]
P2[分支2<br/>10秒]
P3[分支3<br/>10秒]
P1 --> P_TOTAL[总耗时: 10秒]
P2 --> P_TOTAL
P3 --> P_TOTAL
Note_P[ThreadPoolExecutor<br/>3个线程] -.-> P1
Note_P -.-> P2
Note_P -.-> P3
end
subgraph Batch["batch方法"]
B1[请求1<br/>10秒]
B2[请求2<br/>10秒]
B3[请求3<br/>10秒]
B1 --> B_TOTAL[总耗时: 10秒]
B2 --> B_TOTAL
B3 --> B_TOTAL
Note_B[ThreadPoolExecutor<br/>默认CPU核数] -.-> B1
Note_B -.-> B2
Note_B -.-> B3
end
subgraph Async["ainvoke + gather"]
A1[请求1<br/>10秒]
A2[请求2<br/>10秒]
A3[请求3<br/>10秒]
A1 --> A_TOTAL[总耗时: 10秒]
A2 --> A_TOTAL
A3 --> A_TOTAL
Note_A[asyncio事件循环<br/>单线程] -.-> A1
Note_A -.-> A2
Note_A -.-> A3
end
style Sequential fill:#ffe8e8
style Parallel fill:#e8f5e9
style Batch fill:#fff3e0
style Async fill:#e3f2fd
并发模式对比:
| 模式 | 适用场景 | 优点 | 缺点 | 最大并发 |
|---|---|---|---|---|
| 串行执行 | 步骤有依赖关系 | 简单,易调试 | 耗时最长 | 1 |
| RunnableParallel | 多个独立任务,共享输入 | 代码简洁,自动合并结果 | 线程开销 | 受max_concurrency限制 |
| batch方法 | 多个独立请求 | 自动并发,保持顺序 | 线程开销,GIL限制 | 默认CPU核数 |
| ainvoke+gather | 高并发异步场景 | 无GIL限制,内存高效 | 需要async/await,复杂度高 | 10k+ |
性能对比实测:
场景: 100个请求,每个请求耗时2秒
串行执行: 100 * 2s = 200秒
batch(10): 100 / 10 * 2s = 20秒 (10倍加速)
ainvoke: 2秒 (所有请求并发) (100倍加速)
全流程示例与最佳实践总结
生产环境完整示例
import os
import asyncio
from typing import TypedDict
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
from langchain_core.callbacks import StdOutCallbackHandler
from langchain_core.globals import set_llm_cache
from langchain_community.cache import RedisCache
import redis
# ================== 1. 环境配置 ==================
# LangSmith追踪配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production"
# LLM缓存配置(Redis)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
set_llm_cache(RedisCache(redis_=redis_client))
# ================== 2. 构建执行链 ==================
# 主模型+重试+备用
primary_model = ChatOpenAI(
model="gpt-4o",
temperature=0.7,
timeout=30.0,
).with_retry(
stop_after_attempt=3,
retry_if_exception_type=(Exception,),
wait_exponential_jitter=True
)
fallback_model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
timeout=15.0,
)
robust_model = primary_model.with_fallbacks([fallback_model])
# 完整执行链
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}"),
])
chain = prompt | robust_model | StrOutputParser()
# ================== 3. Web API封装 ==================
app = FastAPI(title="LangChain Production API")
class ChatRequest(BaseModel):
"""聊天请求"""
input: str
user_id: str
session_id: str | None = None
class ChatResponse(BaseModel):
"""聊天响应"""
output: str
run_id: str
model_used: str
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""
聊天接口,支持高并发异步调用
特性:
- 自动重试(3次,指数退避)
- 故障转移(主模型失败切换到备用)
- LLM缓存(相同输入返回缓存)
- 完整追踪(LangSmith)
- 元数据记录(用户ID、会话ID)
"""
import uuid
run_id = str(uuid.uuid4())
# 构建配置
config = RunnableConfig(
run_id=run_id,
run_name=f"chat_{request.user_id}",
tags=["prod", "api", "chat"],
metadata={
"user_id": request.user_id,
"session_id": request.session_id,
"endpoint": "/chat",
},
max_concurrency=10,
callbacks=[StdOutCallbackHandler()], # 可选:控制台日志
)
try:
# 异步执行链
output = await chain.ainvoke(
{"input": request.input},
config=config
)
return ChatResponse(
output=output,
run_id=run_id,
model_used="gpt-4o" # 实际使用的模型
)
except Exception as e:
# 统一异常处理
raise HTTPException(
status_code=500,
detail=f"Chain execution failed: {str(e)}"
)
@app.post("/chat/batch")
async def chat_batch_endpoint(requests: list[ChatRequest]):
"""批量聊天接口,自动并发处理"""
results = await chain.abatch(
[{"input": req.input} for req in requests],
config=[
RunnableConfig(
tags=["prod", "api", "batch"],
metadata={"user_id": req.user_id}
)
for req in requests
]
)
return [
ChatResponse(
output=output,
run_id=str(i),
model_used="gpt-4o"
)
for i, output in enumerate(results)
]
@app.post("/chat/stream")
async def chat_stream_endpoint(request: ChatRequest):
"""流式聊天接口,实时返回生成结果"""
from fastapi.responses import StreamingResponse
import json
async def generate():
async for chunk in chain.astream({"input": request.input}):
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
# ================== 4. 运行服务 ==================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
workers=4, # 多进程
loop="uvloop", # 高性能事件循环
)
最佳实践总结:
1. 配置管理:
- 使用环境变量管理敏感信息(API密钥)
- 启用LangSmith追踪(生产环境必备)
- 配置Redis缓存减少API调用成本
2. 容错设计:
- 多层容错: Retry(瞬时故障) + Fallback(永久故障)
- 超时控制: 每个模型设置timeout避免无限等待
- 异常处理: 统一捕获,返回友好错误信息
3. 性能优化:
- 异步API: 使用ainvoke/abatch实现高并发
- 批量处理: batch方法自动并发,保持顺序
- 流式输出: astream降低首字节延迟
- 缓存策略: Redis缓存LLM响应
4. 可观测性:
- run_id: 每个请求唯一标识
- tags: 环境、功能标签
- metadata: 业务上下文(用户ID、会话ID)
- 回调: StdOutCallbackHandler输出调试日志
5. 扩展性:
- Web框架: FastAPI支持高并发
- 多进程: uvicorn workers提高吞吐量
- 高性能循环: uvloop替代默认asyncio