概述
LangChain Core是整个LangChain生态系统的基石,定义了核心抽象和统一的编程接口。本文将深入分析Core模块的设计理念、关键组件实现和源码细节,揭示其如何通过Runnable接口实现统一的编程模型。
1. LangChain Core模块架构
1.1 Core模块组织结构
graph TB
subgraph "LangChain Core 模块结构"
subgraph "核心抽象层"
R[runnables/ - 可执行抽象]
LM[language_models/ - 语言模型抽象]
P[prompts/ - 提示模板]
OP[output_parsers/ - 输出解析器]
end
subgraph "数据结构层"
M[messages/ - 消息类型]
D[documents/ - 文档结构]
O[outputs/ - 输出类型]
PV[prompt_values/ - 提示值]
end
subgraph "工具和集成层"
T[tools/ - 工具抽象]
E[embeddings/ - 嵌入抽象]
VS[vectorstores/ - 向量存储抽象]
RT[retrievers/ - 检索器抽象]
end
subgraph "基础设施层"
CB[callbacks/ - 回调系统]
C[caches/ - 缓存抽象]
TR[tracers/ - 追踪器]
U[utils/ - 工具函数]
end
R --> LM
R --> P
R --> OP
R --> T
LM --> M
P --> PV
OP --> O
T --> CB
VS --> E
RT --> VS
CB --> TR
end
1.2 核心包依赖关系
graph LR
subgraph "依赖层次"
A[langchain-core<br/>核心抽象] --> B[langchain<br/>主要实现]
A --> C[langchain-community<br/>社区集成]
A --> D[langchain-openai<br/>OpenAI集成]
A --> E[langchain-anthropic<br/>Anthropic集成]
B --> F[langchain-experimental<br/>实验功能]
C --> F
style A fill:#ffeb3b
style B fill:#4caf50
style C fill:#2196f3
end
2. Runnable:统一抽象的核心
2.1 Runnable接口设计哲学
Runnable是LangChain最重要的抽象,体现了"一切皆可执行"的设计理念:
# langchain_core/runnables/base.py
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List, Iterator, AsyncIterator, Union, Any, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
Input = TypeVar('Input')
Output = TypeVar('Output')
class Runnable(ABC, Generic[Input, Output]):
"""可调用、可批处理、可流式处理、可转换和可组合的工作单元。
核心方法
========
- invoke/ainvoke: 将单个输入转换为输出
- batch/abatch: 高效地将多个输入转换为输出
- stream/astream: 从单个输入流式输出
- astream_log: 流式输出和选定的中间结果
内置优化
========
- 批处理: 默认使用线程池并行执行invoke()
- 异步: 带'a'前缀的方法是异步的,默认在asyncio线程池中执行同步版本
LCEL和组合
==========
LangChain表达式语言(LCEL)是组合Runnable的声明式方式。
主要组合原语是RunnableSequence和RunnableParallel。
"""
@abstractmethod
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""将单个输入转换为输出。
Args:
input: Runnable的输入
config: 调用Runnable时使用的配置,支持标准键如'tags'、'metadata'
用于追踪目的,'max_concurrency'用于控制并行工作量
Returns:
Runnable的输出
"""
async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""ainvoke的默认实现,从线程调用invoke。
默认实现允许使用异步代码,即使Runnable没有实现invoke的原生异步版本。
如果子类可以异步运行,应该重写此方法。
"""
return await run_in_executor(config, self.invoke, input, config, **kwargs)
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
"""批量处理输入列表。
Args:
inputs: 输入列表
config: 配置或配置列表
return_exceptions: 是否返回异常而不是抛出
Returns:
输出列表
"""
if not inputs:
return []
# 处理配置
configs = get_config_list(config, len(inputs))
# 使用线程池并行执行
def run_input(input_config_pair):
input_item, config_item = input_config_pair
try:
return self.invoke(input_item, config_item, **kwargs)
except Exception as e:
if return_exceptions:
return e
raise
with ThreadPoolExecutor(max_workers=config.get("max_concurrency", None) if config else None) as executor:
futures = [
executor.submit(run_input, (input_item, config_item))
for input_item, config_item in zip(inputs, configs)
]
results = []
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
if return_exceptions:
results.append(e)
else:
raise
return results
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""流式处理输出。
默认实现简单地yield invoke()的结果。
子类应该重写此方法以提供真正的流式处理。
Args:
input: 单个输入
config: 配置
Yields:
输出块
"""
yield self.invoke(input, config, **kwargs)
async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
"""异步流式处理输出。
默认实现在线程池中执行stream()。
"""
async def _astream():
for chunk in self.stream(input, config, **kwargs):
yield chunk
async for chunk in _astream():
yield chunk
# 组合操作符
def __or__(self, other: Union["Runnable[Any, Other]", Callable[[Any], Other]]) -> "RunnableSequence[Input, Other]":
"""实现 | 操作符,创建序列组合。
Example:
chain = prompt | model | parser
"""
return RunnableSequence(first=self, last=coerce_to_runnable(other))
def __ror__(self, other: Union["Runnable[Other, Input]", Callable[[Other], Input]]) -> "RunnableSequence[Other, Output]":
"""实现反向 | 操作符。"""
return RunnableSequence(first=coerce_to_runnable(other), last=self)
def pipe(self, *others: Union["Runnable[Any, Any]", Callable[[Any], Any]]) -> "RunnableSequence[Input, Any]":
"""管道操作,等价于使用|操作符。"""
runnable = self
for other in others:
runnable = runnable | other
return runnable
def with_config(
self,
config: Optional[RunnableConfig] = None,
# 其他配置参数
**kwargs: Any,
) -> "RunnableBinding[Input, Output]":
"""绑定配置到Runnable。
Returns:
绑定了配置的新Runnable
"""
return RunnableBinding(bound=self, config=config, kwargs=kwargs)
def with_retry(
self,
*,
retry_if_exception_type: Tuple[Type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
stop_after_attempt: int = 3,
) -> "RunnableRetry[Input, Output]":
"""添加重试机制。"""
return RunnableRetry(
bound=self,
retry_if_exception_type=retry_if_exception_type,
wait_exponential_jitter=wait_exponential_jitter,
stop_after_attempt=stop_after_attempt,
)
def with_fallbacks(
self,
fallbacks: Sequence["Runnable[Input, Output]"],
*,
exceptions_to_handle: Tuple[Type[BaseException], ...] = (Exception,),
) -> "RunnableWithFallbacks[Input, Output]":
"""添加回退机制。"""
return RunnableWithFallbacks(
runnable=self,
fallbacks=fallbacks,
exceptions_to_handle=exceptions_to_handle,
)
def assign(self, **kwargs: Union["Runnable[Dict, Any]", Callable[[Dict], Any], Any]) -> "RunnableAssign":
"""分配新的键值对到输入字典。"""
return RunnableAssign(RunnableParallel(kwargs))
def pick(self, keys: Union[str, List[str]]) -> "RunnablePick":
"""从输入字典中选择指定的键。"""
return RunnablePick(keys) | self
def map(self) -> "RunnableEach[List[Input], List[Output]]":
"""将Runnable映射到输入列表的每个元素。"""
return RunnableEach(bound=self)
2.2 核心组合原语实现
RunnableSequence:序列组合
# langchain_core/runnables/base.py
class RunnableSequence(RunnableSerializable[Input, Output]):
"""Runnable序列,其中每个的输出是下一个的输入。
RunnableSequence是LangChain中最重要的组合操作符,因为它几乎用于每个链中。
RunnableSequence可以直接实例化,或更常见地通过|操作符使用,
其中左或右操作数(或两者)必须是Runnable。
任何RunnableSequence都自动支持同步、异步、批处理和流式处理。
"""
first: Runnable[Input, Any] # 第一个runnable
middle: List[Runnable[Any, Any]] # 中间的runnable列表
last: Runnable[Any, Output] # 最后一个runnable
def __init__(
self,
first: Runnable[Input, Any],
*middle: Runnable[Any, Any],
last: Runnable[Any, Output],
) -> None:
"""初始化序列。"""
super().__init__(first=first, middle=list(middle), last=last)
@classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取LangChain命名空间。"""
return ["langchain", "schema", "runnable"]
@property
def steps(self) -> List[Runnable[Any, Any]]:
"""获取所有步骤。"""
return [self.first] + self.middle + [self.last]
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""顺序调用每个runnable。"""
# 配置管理
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
# 执行序列
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", self.__class__.__name__)
)
try:
# 执行第一个runnable
for i, step in enumerate(self.steps):
# 为每个步骤创建子配置
step_config = patch_config(config, callbacks=run_manager.get_child(f"seq:step:{i+1}"))
if i == 0:
input = step.invoke(input, step_config)
else:
input = step.invoke(input, step_config)
# 记录成功
run_manager.on_chain_end(input)
return input
except Exception as e:
run_manager.on_chain_error(e)
raise
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""异步顺序调用每个runnable。"""
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", self.__class__.__name__)
)
try:
for i, step in enumerate(self.steps):
step_config = patch_config(config, callbacks=run_manager.get_child(f"seq:step:{i+1}"))
input = await step.ainvoke(input, step_config)
await run_manager.on_chain_end(input)
return input
except Exception as e:
await run_manager.on_chain_error(e)
raise
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Output]:
"""批处理实现,按顺序对每个组件调用batch方法。"""
if not inputs:
return []
configs = get_config_list(config, len(inputs))
# 批处理所有步骤
def run_step(step: Runnable, step_inputs: List[Any], step_configs: List[RunnableConfig]) -> List[Any]:
return step.batch(step_inputs, step_configs, **kwargs)
# 依次执行每个步骤
current_inputs = inputs
for i, step in enumerate(self.steps):
step_configs = [
patch_config(config, callbacks=get_callback_manager_for_config(config).get_child(f"seq:step:{i+1}"))
for config in configs
]
current_inputs = run_step(step, current_inputs, step_configs)
return current_inputs
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""流式处理实现。"""
config = ensure_config(config)
# 检查最后一个步骤是否支持流式处理
if hasattr(self.last, 'stream') and callable(getattr(self.last, 'stream')):
# 执行前面的步骤
intermediate = input
for step in self.steps[:-1]:
intermediate = step.invoke(intermediate, config, **kwargs)
# 流式执行最后一个步骤
yield from self.last.stream(intermediate, config, **kwargs)
else:
# 回退到标准执行
yield self.invoke(input, config, **kwargs)
def __repr__(self) -> str:
return f"{self.first!r} | ... | {self.last!r}"
def __eq__(self, other: object) -> bool:
return (
isinstance(other, RunnableSequence)
and self.first == other.first
and self.middle == other.middle
and self.last == other.last
)
def __hash__(self) -> int:
return hash((self.first, tuple(self.middle), self.last))
RunnableParallel:并行组合
# langchain_core/runnables/base.py
class RunnableParallel(RunnableSerializable[Input, Dict[str, Any]]):
"""并行调用runnable,为每个提供相同的输入。
使用序列中的字典字面量或通过将字典传递给RunnableParallel来构造。
Example:
# 使用字典字面量
chain = {"joke": joke_chain, "poem": poem_chain}
# 显式构造
chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
# 混合使用
chain = RunnableParallel({"joke": joke_chain, "poem": poem_chain})
"""
steps: Dict[str, Runnable[Input, Any]]
def __init__(
self,
steps: Optional[Dict[str, Union[Runnable[Input, Any], Callable[[Input], Any], Any]]] = None,
**kwargs: Union[Runnable[Input, Any], Callable[[Input], Any], Any],
) -> None:
"""初始化并行组合。"""
steps = steps or {}
merged = {**steps, **kwargs}
super().__init__(steps={key: coerce_to_runnable(r) for key, r in merged.items()})
@classmethod
def get_lc_namespace(cls) -> List[str]:
return ["langchain", "schema", "runnable"]
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
"""并行调用所有步骤。"""
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", self.__class__.__name__)
)
try:
# 使用线程池并行执行
with ThreadPoolExecutor(max_workers=config.get("max_concurrency")) as executor:
# 提交所有任务
futures = {}
for key, runnable in self.steps.items():
step_config = patch_config(config, callbacks=run_manager.get_child(f"parallel:step:{key}"))
future = executor.submit(runnable.invoke, input, step_config)
futures[key] = future
# 收集结果
results = {}
for key, future in futures.items():
try:
results[key] = future.result()
except Exception as e:
run_manager.on_chain_error(e)
raise
run_manager.on_chain_end(results)
return results
except Exception as e:
run_manager.on_chain_error(e)
raise
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
"""异步并行调用所有步骤。"""
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", self.__class__.__name__)
)
try:
# 使用asyncio并发执行
async def run_step(key: str, runnable: Runnable) -> Tuple[str, Any]:
step_config = patch_config(config, callbacks=run_manager.get_child(f"parallel:step:{key}"))
result = await runnable.ainvoke(input, step_config)
return key, result
# 并发执行所有步骤
tasks = [run_step(key, runnable) for key, runnable in self.steps.items()]
step_results = await asyncio.gather(*tasks)
# 构建结果字典
results = dict(step_results)
await run_manager.on_chain_end(results)
return results
except Exception as e:
await run_manager.on_chain_error(e)
raise
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Dict[str, Any]]:
"""批处理实现。"""
if not inputs:
return []
configs = get_config_list(config, len(inputs))
# 为每个步骤批量执行
def run_step_batch(key: str, runnable: Runnable) -> List[Any]:
step_configs = [
patch_config(config, callbacks=get_callback_manager_for_config(config).get_child(f"parallel:step:{key}"))
for config in configs
]
return runnable.batch(inputs, step_configs, **kwargs)
# 并行执行所有步骤的批处理
with ThreadPoolExecutor() as executor:
futures = {
key: executor.submit(run_step_batch, key, runnable)
for key, runnable in self.steps.items()
}
step_results = {}
for key, future in futures.items():
step_results[key] = future.result()
# 重新组织结果
results = []
for i in range(len(inputs)):
result = {key: step_results[key][i] for key in self.steps.keys()}
results.append(result)
return results
def __repr__(self) -> str:
items = [f"{k}: {v!r}" for k, v in self.steps.items()]
return "{\n " + ",\n ".join(items) + "\n}"
def __eq__(self, other: object) -> bool:
return isinstance(other, RunnableParallel) and self.steps == other.steps
def __hash__(self) -> int:
return hash(tuple(sorted(self.steps.items())))
2.3 其他重要的Runnable实现
RunnableLambda:函数包装器
# langchain_core/runnables/base.py
class RunnableLambda(Runnable[Input, Output]):
"""将普通函数包装为Runnable。
RunnableLambda将任何函数转换为Runnable,使其可以在LCEL链中使用。
"""
def __init__(
self,
func: Union[
Callable[[Input], Output],
Callable[[Input, RunnableConfig], Output],
Callable[[Input, CallbackManagerForChainRun], Output],
Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output],
],
afunc: Optional[
Union[
Callable[[Input], Awaitable[Output]],
Callable[[Input, RunnableConfig], Awaitable[Output]],
Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]],
Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]],
]
] = None,
) -> None:
"""初始化Lambda Runnable。
Args:
func: 要包装的函数
afunc: 可选的异步版本函数
"""
if afunc is not None:
self.afunc = afunc
self.func = func
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""调用包装的函数。"""
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", "RunnableLambda")
)
try:
# 检查函数签名并调用
if accepts_run_manager(self.func):
if accepts_config(self.func):
output = self.func(input, run_manager, config) # type: ignore
else:
output = self.func(input, run_manager) # type: ignore
elif accepts_config(self.func):
output = self.func(input, config) # type: ignore
else:
output = self.func(input) # type: ignore
run_manager.on_chain_end(output)
return output
except Exception as e:
run_manager.on_chain_error(e)
raise
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""异步调用包装的函数。"""
if hasattr(self, 'afunc'):
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", "RunnableLambda")
)
try:
# 调用异步函数
if accepts_run_manager(self.afunc):
if accepts_config(self.afunc):
output = await self.afunc(input, run_manager, config) # type: ignore
else:
output = await self.afunc(input, run_manager) # type: ignore
elif accepts_config(self.afunc):
output = await self.afunc(input, config) # type: ignore
else:
output = await self.afunc(input) # type: ignore
await run_manager.on_chain_end(output)
return output
except Exception as e:
await run_manager.on_chain_error(e)
raise
else:
# 回退到在线程池中执行同步版本
return await super().ainvoke(input, config)
def __repr__(self) -> str:
"""返回函数的字符串表示。"""
if hasattr(self.func, '__name__'):
return f"RunnableLambda({self.func.__name__})"
else:
return f"RunnableLambda({self.func!r})"
RunnablePassthrough:透传组件
# langchain_core/runnables/passthrough.py
class RunnablePassthrough(RunnableSerializable[Input, Input]):
"""透传输入到输出的Runnable。
通常用于在并行组合中保持原始输入。
"""
input_type: Optional[Type[Input]] = None
@classmethod
def assign(
cls,
**kwargs: Union[Runnable[Dict, Any], Callable[[Dict], Any], Any],
) -> "RunnableAssign":
"""分配新的键值对到输入字典。
Example:
runnable = RunnablePassthrough.assign(
new_key=lambda x: x["existing_key"] + " processed"
)
"""
return RunnableAssign(RunnableParallel(kwargs))
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Input:
"""直接返回输入。"""
return input
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Input:
"""异步直接返回输入。"""
return input
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Input]:
"""批量透传输入。"""
return inputs
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Input]:
"""流式透传输入。"""
yield input
def __repr__(self) -> str:
return "RunnablePassthrough()"
3. 回调系统深度分析
3.1 回调系统架构
graph TB
subgraph "回调系统架构"
subgraph "回调处理器"
A[BaseCallbackHandler] --> B[StdOutCallbackHandler]
A --> C[FileCallbackHandler]
A --> D[LangSmithCallbackHandler]
A --> E[WandbCallbackHandler]
end
subgraph "回调管理器"
F[CallbackManager] --> G[CallbackManagerForChainRun]
F --> H[CallbackManagerForLLMRun]
F --> I[CallbackManagerForToolRun]
F --> J[CallbackManagerForRetrieverRun]
end
subgraph "异步回调"
K[AsyncCallbackManager] --> L[AsyncCallbackManagerForChainRun]
K --> M[AsyncCallbackManagerForLLMRun]
K --> N[AsyncCallbackManagerForToolRun]
end
A --> F
F --> K
end
3.2 BaseCallbackHandler实现
# langchain_core/callbacks/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
class BaseCallbackHandler(ABC):
"""回调处理器的基类。
回调处理器用于在Runnable执行过程中的关键点执行自定义逻辑,
如日志记录、监控、调试等。
"""
ignore_llm: bool = False
"""是否忽略LLM事件"""
ignore_chain: bool = False
"""是否忽略Chain事件"""
ignore_agent: bool = False
"""是否忽略Agent事件"""
ignore_retriever: bool = False
"""是否忽略Retriever事件"""
ignore_chat_model: bool = False
"""是否忽略ChatModel事件"""
raise_error: bool = False
"""是否在回调中抛出错误"""
run_inline: bool = False
"""是否内联运行回调"""
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""LLM开始执行时调用。
Args:
serialized: LLM的序列化信息
prompts: 输入提示列表
run_id: 运行ID
parent_run_id: 父运行ID
tags: 标签列表
metadata: 元数据字典
"""
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""ChatModel开始执行时调用。"""
def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""LLM生成新token时调用。
Args:
token: 新生成的token
chunk: 生成块(可选)
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""LLM执行结束时调用。
Args:
response: LLM响应结果
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_llm_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""LLM执行出错时调用。
Args:
error: 异常对象
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Chain开始执行时调用。
Args:
serialized: Chain的序列化信息
inputs: 输入字典
run_id: 运行ID
parent_run_id: 父运行ID
tags: 标签列表
metadata: 元数据字典
"""
def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Chain执行结束时调用。
Args:
outputs: 输出字典
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_chain_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Chain执行出错时调用。
Args:
error: 异常对象
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inputs: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Tool开始执行时调用。
Args:
serialized: Tool的序列化信息
input_str: 输入字符串
run_id: 运行ID
parent_run_id: 父运行ID
tags: 标签列表
metadata: 元数据字典
inputs: 输入字典(可选)
"""
def on_tool_end(
self,
output: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Tool执行结束时调用。
Args:
output: 工具输出
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Tool执行出错时调用。
Args:
error: 异常对象
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_text(
self,
text: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""任意文本事件时调用。
Args:
text: 文本内容
run_id: 运行ID
parent_run_id: 父运行ID
"""
def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Retriever开始执行时调用。"""
def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Retriever执行结束时调用。"""
def on_retriever_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Retriever执行出错时调用。"""
3.3 CallbackManager实现
# langchain_core/callbacks/manager.py
class CallbackManager(BaseCallbackManager):
"""回调管理器,负责管理和调度回调处理器。"""
def __init__(
self,
handlers: List[BaseCallbackHandler],
inheritable_handlers: Optional[List[BaseCallbackHandler]] = None,
parent_run_id: Optional[UUID] = None,
*,
tags: Optional[List[str]] = None,
inheritable_tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
):
"""初始化回调管理器。
Args:
handlers: 回调处理器列表
inheritable_handlers: 可继承的处理器列表
parent_run_id: 父运行ID
tags: 标签列表
inheritable_tags: 可继承的标签列表
metadata: 元数据字典
inheritable_metadata: 可继承的元数据字典
"""
self.handlers: List[BaseCallbackHandler] = handlers or []
self.inheritable_handlers: List[BaseCallbackHandler] = inheritable_handlers or []
self.parent_run_id: Optional[UUID] = parent_run_id
self.tags: List[str] = tags or []
self.inheritable_tags: List[str] = inheritable_tags or []
self.metadata: Dict[str, Any] = metadata or {}
self.inheritable_metadata: Dict[str, Any] = inheritable_metadata or {}
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any,
) -> CallbackManagerForLLMRun:
"""创建LLM运行的回调管理器。"""
run_id = uuid4()
# 通知所有处理器
for handler in self.handlers:
if not handler.ignore_llm:
try:
handler.on_llm_start(
serialized,
prompts,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
except Exception as e:
if handler.raise_error:
raise e
logger.warning(f"Error in callback handler {handler}: {e}")
return CallbackManagerForLLMRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
**kwargs: Any,
) -> CallbackManagerForLLMRun:
"""创建ChatModel运行的回调管理器。"""
run_id = uuid4()
for handler in self.handlers:
if not handler.ignore_chat_model:
try:
handler.on_chat_model_start(
serialized,
messages,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
except Exception as e:
if handler.raise_error:
raise e
logger.warning(f"Error in callback handler {handler}: {e}")
return CallbackManagerForLLMRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any,
) -> CallbackManagerForChainRun:
"""创建Chain运行的回调管理器。"""
run_id = uuid4()
for handler in self.handlers:
if not handler.ignore_chain:
try:
handler.on_chain_start(
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
except Exception as e:
if handler.raise_error:
raise e
logger.warning(f"Error in callback handler {handler}: {e}")
return CallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any,
) -> CallbackManagerForToolRun:
"""创建Tool运行的回调管理器。"""
run_id = uuid4()
for handler in self.handlers:
if not handler.ignore_tool:
try:
handler.on_tool_start(
serialized,
input_str,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
except Exception as e:
if handler.raise_error:
raise e
logger.warning(f"Error in callback handler {handler}: {e}")
return CallbackManagerForToolRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
def add_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
"""添加回调处理器。
Args:
handler: 要添加的处理器
inherit: 是否可被子运行继承
"""
self.handlers.append(handler)
if inherit:
self.inheritable_handlers.append(handler)
def remove_handler(self, handler: BaseCallbackHandler) -> None:
"""移除回调处理器。"""
if handler in self.handlers:
self.handlers.remove(handler)
if handler in self.inheritable_handlers:
self.inheritable_handlers.remove(handler)
def set_handlers(self, handlers: List[BaseCallbackHandler], inherit: bool = True) -> None:
"""设置回调处理器列表。"""
self.handlers = handlers
if inherit:
self.inheritable_handlers = handlers
def set_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
"""设置单个回调处理器。"""
self.set_handlers([handler], inherit)
4. 配置系统实现
4.1 RunnableConfig设计
# langchain_core/runnables/config.py
from typing import Any, Dict, List, Optional, Union
from typing_extensions import TypedDict
from langchain_core.callbacks import BaseCallbackHandler, BaseCallbackManager
class RunnableConfig(TypedDict, total=False):
"""Runnable的配置。
这个配置用于控制Runnable的执行行为,包括回调、标签、元数据等。
"""
# 回调配置
callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]
"""要使用的回调处理器列表或回调管理器"""
# 执行控制
max_concurrency: Optional[int]
"""最大并发数,用于控制并行执行的工作量"""
recursion_limit: Optional[int]
"""递归限制,防止无限递归"""
# 追踪和监控
tags: Optional[List[str]]
"""用于追踪和过滤的标签列表"""
metadata: Optional[Dict[str, Any]]
"""附加的元数据,用于追踪和调试"""
run_name: Optional[str]
"""运行的名称,用于标识特定的执行"""
run_id: Optional[UUID]
"""运行的唯一标识符"""
# 可配置字段
configurable: Optional[Dict[str, Any]]
"""可配置字段的值,用于运行时配置"""
def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
"""确保配置是有效的RunnableConfig。
Args:
config: 可选的配置字典
Returns:
有效的RunnableConfig
"""
if config is None:
return RunnableConfig()
# 验证和标准化配置
if not isinstance(config, dict):
raise TypeError("config must be a dict")
# 设置默认值
config.setdefault("tags", [])
config.setdefault("metadata", {})
config.setdefault("configurable", {})
return config
def get_config_list(
config: Optional[Union[RunnableConfig, List[RunnableConfig]]],
length: int
) -> List[RunnableConfig]:
"""获取配置列表。
Args:
config: 单个配置或配置列表
length: 所需的配置数量
Returns:
配置列表
"""
if config is None:
return [RunnableConfig() for _ in range(length)]
elif isinstance(config, list):
if len(config) != length:
raise ValueError(f"Config list length {len(config)} does not match input length {length}")
return [ensure_config(c) for c in config]
else:
return [ensure_config(config) for _ in range(length)]
def patch_config(
config: RunnableConfig,
*,
deep_copy_locals: Optional[List[str]] = None,
callbacks: Optional[BaseCallbackManager] = None,
recursion_limit: Optional[int] = None,
max_concurrency: Optional[int] = None,
run_name: Optional[str] = None,
**kwargs: Any,
) -> RunnableConfig:
"""修补配置,创建新的配置副本。
Args:
config: 原始配置
deep_copy_locals: 需要深拷贝的本地字段
callbacks: 新的回调管理器
recursion_limit: 新的递归限制
max_concurrency: 新的最大并发数
run_name: 新的运行名称
**kwargs: 其他配置项
Returns:
修补后的新配置
"""
config_kwargs = config.copy() if config else {}
if callbacks is not None:
config_kwargs["callbacks"] = callbacks
if recursion_limit is not None:
config_kwargs["recursion_limit"] = recursion_limit
if max_concurrency is not None:
config_kwargs["max_concurrency"] = max_concurrency
if run_name is not None:
config_kwargs["run_name"] = run_name
config_kwargs.update(kwargs)
return RunnableConfig(**config_kwargs)
4.2 可配置字段系统
# langchain_core/runnables/configurable.py
from typing import Any, Dict, Optional, Type, Union
from pydantic import BaseModel
class ConfigurableField(BaseModel):
"""可配置字段定义。"""
id: str
"""字段的唯一标识符"""
name: Optional[str] = None
"""字段的显示名称"""
description: Optional[str] = None
"""字段的描述"""
annotation: Optional[Type] = None
"""字段的类型注解"""
default: Optional[Any] = None
"""字段的默认值"""
is_shared: bool = False
"""是否为共享字段"""
class ConfigurableFieldSpec(BaseModel):
"""可配置字段规范。"""
id: str
name: Optional[str] = None
description: Optional[str] = None
default: Optional[Any] = None
annotation: Optional[Type] = None
is_shared: bool = False
class RunnableConfigurableFields(RunnableSerializable[Input, Output]):
"""支持可配置字段的Runnable包装器。"""
default: Runnable[Input, Output]
"""默认的Runnable"""
alternatives: Dict[str, Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]]
"""备选的Runnable映射"""
which: ConfigurableField
"""选择字段"""
def __init__(
self,
default: Runnable[Input, Output],
*,
which: ConfigurableField,
**alternatives: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]],
) -> None:
"""初始化可配置字段Runnable。
Args:
default: 默认的Runnable
which: 选择字段配置
**alternatives: 备选Runnable映射
"""
super().__init__(
default=default,
which=which,
alternatives=alternatives,
)
def configurable_fields(self, **kwargs: ConfigurableField) -> "RunnableConfigurableFields":
"""配置字段。"""
return self
def configurable_alternatives(
self,
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]],
) -> "RunnableConfigurableAlternatives":
"""配置备选项。"""
return RunnableConfigurableAlternatives(
which=which,
default=self,
alternatives=kwargs,
default_key=default_key,
prefix_keys=prefix_keys,
)
def invoke(self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) -> Output:
"""根据配置选择相应的Runnable执行。"""
config = ensure_config(config)
# 获取配置值
which_value = config.get("configurable", {}).get(self.which.id, "default")
# 选择Runnable
if which_value == "default":
runnable = self.default
elif which_value in self.alternatives:
alternative = self.alternatives[which_value]
if callable(alternative) and not isinstance(alternative, Runnable):
runnable = alternative()
else:
runnable = alternative
else:
raise ValueError(f"Unknown alternative: {which_value}")
return runnable.invoke(input, config, **kwargs)
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) -> Output:
"""异步版本的invoke。"""
config = ensure_config(config)
which_value = config.get("configurable", {}).get(self.which.id, "default")
if which_value == "default":
runnable = self.default
elif which_value in self.alternatives:
alternative = self.alternatives[which_value]
if callable(alternative) and not isinstance(alternative, Runnable):
runnable = alternative()
else:
runnable = alternative
else:
raise ValueError(f"Unknown alternative: {which_value}")
return await runnable.ainvoke(input, config, **kwargs)
def config_specs(self) -> List[ConfigurableFieldSpec]:
"""获取配置规范。"""
return [
ConfigurableFieldSpec(
id=self.which.id,
name=self.which.name,
description=self.which.description,
default="default",
annotation=str,
is_shared=self.which.is_shared,
)
]
5. 总结
LangChain Core模块通过精心设计的抽象层次,实现了统一、灵活、可扩展的编程模型:
5.1 核心设计原则
- 统一抽象:Runnable接口提供了一致的编程模型
- 组合优先:通过组合而非继承实现复杂功能
- 异步优先:原生支持异步和并发执行
- 可观测性:内置完整的回调和追踪系统
- 配置驱动:通过配置系统实现灵活的运行时行为
5.2 架构优势
- 类型安全:完整的泛型类型系统
- 性能优化:自动批处理和并行执行
- 错误处理:统一的异常处理机制
- 可扩展性:插件化的组件架构
- 生产就绪:完善的监控和调试支持
LangChain Core的设计体现了现代软件架构的最佳实践,为构建复杂的AI应用提供了坚实的基础。