1. Core模块 (langchain-core)
Core模块是LangChain生态系统的基础,定义了所有核心抽象和接口。
1.1 模块架构图
graph TB
subgraph "langchain-core"
A[runnables] --> A1[base.py - Runnable基类]
A --> A2[config.py - 配置管理]
A --> A3[utils.py - 工具函数]
B[language_models] --> B1[base.py - 基础语言模型]
B --> B2[chat_models.py - 聊天模型]
B --> B3[llms.py - 传统LLM]
C[messages] --> C1[base.py - 消息基类]
C --> C2[human.py - 人类消息]
C --> C3[ai.py - AI消息]
C --> C4[system.py - 系统消息]
D[tools] --> D1[base.py - 工具基类]
D --> D2[convert.py - 工具转换]
D --> D3[structured.py - 结构化工具]
E[vectorstores] --> E1[base.py - 向量存储基类]
E --> E2[in_memory.py - 内存向量存储]
F[callbacks] --> F1[manager.py - 回调管理器]
F --> F2[base.py - 回调基类]
G[prompts] --> G1[base.py - 提示基类]
G --> G2[chat.py - 聊天提示]
G --> G3[few_shot.py - 少样本提示]
end
1.2 Runnable接口详解
1.2.1 核心接口定义
# libs/core/langchain_core/runnables/base.py
from abc import ABC, abstractmethod
from typing import Any, Optional, List, Iterator, AsyncIterator, Union, Dict
from langchain_core.callbacks import Callbacks
from langchain_core.runnables.config import RunnableConfig
class Runnable(ABC, Generic[Input, Output]):
"""
LangChain的核心抽象类,定义了统一的执行接口
所有LangChain组件都应该实现这个接口,包括:
- 语言模型 (LLMs, Chat Models)
- 提示模板 (Prompt Templates)
- 输出解析器 (Output Parsers)
- 检索器 (Retrievers)
- 工具 (Tools)
- 链 (Chains)
核心方法:
- invoke: 同步单次调用
- ainvoke: 异步单次调用
- batch: 批量调用
- abatch: 异步批量调用
- stream: 流式调用
- astream: 异步流式调用
"""
@abstractmethod
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Output:
"""
同步执行单个输入
Args:
input: 输入数据,类型由具体实现定义
config: 运行时配置,包含回调、标签、元数据等
**kwargs: 额外的关键字参数
Returns:
处理结果,类型由具体实现定义
Raises:
NotImplementedError: 子类必须实现此方法
"""
raise NotImplementedError()
async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Output:
"""
异步执行单个输入
默认实现会在线程池中执行同步的invoke方法
子类可以重写此方法提供原生异步实现
Args:
input: 输入数据
config: 运行时配置
**kwargs: 额外的关键字参数
Returns:
处理结果
"""
# 默认实现:在线程池中执行同步方法
return await run_in_executor(
get_executor_for_config(config),
self.invoke,
input,
config,
**kwargs
)
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
max_concurrency: Optional[int] = None,
**kwargs: Any
) -> List[Output]:
"""
批量处理多个输入
默认实现使用ThreadPoolExecutor并行执行invoke
子类可以重写此方法提供优化的批量处理
Args:
inputs: 输入列表
config: 配置或配置列表
max_concurrency: 最大并发数
**kwargs: 额外参数
Returns:
处理结果列表
"""
if max_concurrency is None:
max_concurrency = len(inputs)
configs = get_config_list(config, len(inputs))
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures = [
executor.submit(self.invoke, input_item, config_item, **kwargs)
for input_item, config_item in zip(inputs, configs)
]
return [future.result() for future in futures]
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Iterator[Output]:
"""
流式处理单个输入
默认实现直接yield invoke的结果
支持流式处理的组件应该重写此方法
Args:
input: 输入数据
config: 运行时配置
**kwargs: 额外参数
Yields:
流式输出块
"""
# 默认实现:直接返回完整结果
yield self.invoke(input, config, **kwargs)
async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> AsyncIterator[Output]:
"""
异步流式处理单个输入
默认实现在线程池中执行stream方法
Args:
input: 输入数据
config: 运行时配置
**kwargs: 额外参数
Yields:
异步流式输出块
"""
# 默认实现:在线程池中执行同步stream
async for chunk in self._astream_from_sync_stream(
self.stream(input, config, **kwargs), config
):
yield chunk
# 组合操作符重载
def __or__(self, other: Union["Runnable", Callable]) -> "RunnableSequence":
"""
管道操作符重载,用于创建链式组合
Example:
chain = prompt | model | parser
"""
return RunnableSequence(first=self, last=coerce_to_runnable(other))
def __ror__(self, other: Union["Runnable", Callable]) -> "RunnableSequence":
"""反向管道操作符"""
return RunnableSequence(first=coerce_to_runnable(other), last=self)
# 配置方法
def with_config(self, **kwargs: Any) -> "RunnableBinding":
"""
返回绑定了特定配置的新实例
Args:
**kwargs: 配置参数
Returns:
绑定了配置的Runnable实例
"""
return RunnableBinding(bound=self, kwargs=kwargs)
def with_retry(
self,
*,
retry_if_exception_type: tuple = (Exception,),
wait_exponential_jitter: bool = True,
stop_after_attempt: int = 3,
) -> "RunnableRetry":
"""
返回带重试机制的新实例
Args:
retry_if_exception_type: 需要重试的异常类型
wait_exponential_jitter: 是否使用指数退避
stop_after_attempt: 最大重试次数
Returns:
带重试机制的Runnable实例
"""
return RunnableRetry(
bound=self,
retry_if_exception_type=retry_if_exception_type,
wait_exponential_jitter=wait_exponential_jitter,
stop_after_attempt=stop_after_attempt,
)
def with_fallbacks(
self,
fallbacks: List["Runnable"],
*,
exception_to_check: tuple = (Exception,),
) -> "RunnableWithFallbacks":
"""
返回带回退机制的新实例
Args:
fallbacks: 备用Runnable列表
exception_to_check: 触发回退的异常类型
Returns:
带回退机制的Runnable实例
"""
return RunnableWithFallbacks(
runnable=self,
fallbacks=fallbacks,
exception_to_check=exception_to_check,
)
1.2.2 组合模式实现
# libs/core/langchain_core/runnables/base.py
class RunnableSequence(RunnableSerializable[Input, Output]):
"""
顺序执行的Runnable组合
实现链式调用模式: A | B | C
其中A的输出作为B的输入,B的输出作为C的输入
"""
def __init__(
self,
first: Runnable[Input, Any],
last: Runnable[Any, Output],
*,
name: Optional[str] = None
):
"""
初始化序列
Args:
first: 第一个Runnable组件
last: 最后一个Runnable组件(可能是子序列)
name: 序列名称
"""
super().__init__(name=name)
self.first = first
self.last = last
# 展开嵌套序列以优化执行
if isinstance(last, RunnableSequence):
self.steps = [first] + last.steps
else:
self.steps = [first, last]
@property
def InputType(self) -> type:
"""获取输入类型"""
return self.first.InputType
@property
def OutputType(self) -> type:
"""获取输出类型"""
return self.last.OutputType
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Output:
"""
顺序执行所有步骤
Args:
input: 初始输入
config: 运行配置
**kwargs: 额外参数
Returns:
最终输出
"""
# 设置回调管理器
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
serialized=self.to_json(),
inputs=input,
name=config.get("run_name") if config else None,
)
try:
# 逐步执行
current_input = input
for i, step in enumerate(self.steps):
# 为每个步骤创建子配置
step_config = patch_config(
config,
callbacks=run_manager.get_child(f"seq:step:{i+1}"),
)
current_input = step.invoke(current_input, step_config, **kwargs)
# 记录成功完成
run_manager.on_chain_end(current_input)
return current_input
except Exception as e:
# 记录错误
run_manager.on_chain_error(e)
raise
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Iterator[Output]:
"""
流式执行序列
只有最后一步支持流式输出时,整个序列才能流式输出
否则只在最后一步开始流式输出
"""
# 执行除最后一步外的所有步骤
current_input = input
for step in self.steps[:-1]:
current_input = step.invoke(current_input, config, **kwargs)
# 流式执行最后一步
yield from self.steps[-1].stream(current_input, config, **kwargs)
class RunnableParallel(RunnableSerializable[Input, Dict[str, Any]]):
"""
并行执行的Runnable组合
同时执行多个Runnable,将结果合并为字典
Example: {"summary": summarizer, "analysis": analyzer}
"""
def __init__(self, steps_dict: Dict[str, Runnable]):
"""
初始化并行组合
Args:
steps_dict: 键为输出字段名,值为对应的Runnable
"""
super().__init__()
self.steps_dict = steps_dict
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""
并行执行所有步骤
Args:
input: 输入(会分发给所有并行步骤)
config: 运行配置
**kwargs: 额外参数
Returns:
包含所有步骤结果的字典
"""
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
serialized=self.to_json(),
inputs=input,
)
try:
with ThreadPoolExecutor() as executor:
# 提交所有任务
future_to_key = {
executor.submit(
step.invoke,
input,
patch_config(config, callbacks=run_manager.get_child(f"par:{key}")),
**kwargs
): key
for key, step in self.steps_dict.items()
}
# 收集结果
results = {}
for future in as_completed(future_to_key):
key = future_to_key[future]
try:
results[key] = future.result()
except Exception as e:
results[key] = e
# 继续执行其他任务
run_manager.on_chain_end(results)
return results
except Exception as e:
run_manager.on_chain_error(e)
raise
1.3 配置管理系统
1.3.1 RunnableConfig定义
# libs/core/langchain_core/runnables/config.py
from typing import Any, Dict, List, Optional, Union, Callable
from uuid import UUID
from langchain_core.callbacks import BaseCallbackHandler
class RunnableConfig(TypedDict, total=False):
"""
Runnable执行的配置参数
这个配置会在整个执行链中传递,允许控制:
- 回调处理
- 执行标签和元数据
- 运行标识
- 递归限制
- 并发控制
- 可配置参数
"""
# 回调相关
callbacks: Optional[List[BaseCallbackHandler]]
"""回调处理器列表,用于监控执行过程"""
# 追踪相关
tags: Optional[List[str]]
"""标签列表,用于分类和过滤运行记录"""
metadata: Optional[Dict[str, Any]]
"""元数据字典,用于存储额外的运行信息"""
run_name: Optional[str]
"""运行名称,用于标识特定的执行实例"""
run_id: Optional[UUID]
"""运行ID,全局唯一标识符"""
# 执行控制
max_concurrency: Optional[int]
"""最大并发数,控制并行执行的任务数量"""
recursion_limit: Optional[int]
"""递归深度限制,防止无限递归"""
# 可配置参数
configurable: Optional[Dict[str, Any]]
"""运行时可配置的参数字典"""
def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
"""
确保配置对象存在
Args:
config: 可选的配置对象
Returns:
有效的配置对象
"""
if config is None:
return RunnableConfig()
return config
def patch_config(
config: Optional[RunnableConfig],
*,
deep_copy_locals: Optional[List[str]] = None,
**kwargs: Any
) -> RunnableConfig:
"""
修补配置对象,返回新的配置
Args:
config: 原始配置
deep_copy_locals: 需要深拷贝的字段列表
**kwargs: 要更新的配置项
Returns:
更新后的新配置对象
"""
config = ensure_config(config)
new_config = config.copy()
for key, value in kwargs.items():
if key in deep_copy_locals and value is not None:
import copy
new_config[key] = copy.deepcopy(value)
else:
new_config[key] = value
return new_config
def merge_configs(*configs: Optional[RunnableConfig]) -> RunnableConfig:
"""
合并多个配置对象
Args:
*configs: 要合并的配置对象列表
Returns:
合并后的配置对象
"""
merged = RunnableConfig()
for config in configs:
if config is not None:
# 合并callbacks
if config.get("callbacks"):
merged_callbacks = merged.get("callbacks", [])
merged_callbacks.extend(config["callbacks"])
merged["callbacks"] = merged_callbacks
# 合并tags
if config.get("tags"):
merged_tags = merged.get("tags", [])
merged_tags.extend(config["tags"])
merged["tags"] = merged_tags
# 合并metadata
if config.get("metadata"):
merged_metadata = merged.get("metadata", {})
merged_metadata.update(config["metadata"])
merged["metadata"] = merged_metadata
# 其他字段直接覆盖
for key in ["run_name", "run_id", "max_concurrency", "recursion_limit"]:
if config.get(key) is not None:
merged[key] = config[key]
# 合并configurable
if config.get("configurable"):
merged_configurable = merged.get("configurable", {})
merged_configurable.update(config["configurable"])
merged["configurable"] = merged_configurable
return merged
2. 语言模型模块 (Language Models)
2.1 模块架构
classDiagram
class BaseLanguageModel {
<<abstract>>
+cache: BaseCache
+verbose: bool
+callbacks: Callbacks
+tags: List[str]
+metadata: Dict[str, Any]
+invoke(input) Any
+ainvoke(input) Any
+batch(inputs) List[Any]
+stream(input) Iterator[Any]
+get_token_ids(text) List[int]
}
class BaseLLM {
+generate(prompts) LLMResult
+agenerate(prompts) LLMResult
+_call(prompt, stop) str
+_acall(prompt, stop) str
+_stream(prompt, stop) Iterator[str]
+_astream(prompt, stop) AsyncIterator[str]
+get_sub_prompts(params) List[List[str]]
}
class BaseChatModel {
+_generate(messages) ChatResult
+_agenerate(messages) ChatResult
+_stream(messages) Iterator[ChatGenerationChunk]
+_astream(messages) AsyncIterator[ChatGenerationChunk]
+bind_tools(tools) BaseChatModel
+with_structured_output(schema) Runnable
}
BaseLanguageModel <|-- BaseLLM
BaseLanguageModel <|-- BaseChatModel
note for BaseLanguageModel : "实现Runnable接口\n提供缓存、回调、批处理等功能"
note for BaseLLM : "传统文本生成模型\n输入输出都是字符串"
note for BaseChatModel : "对话模型\n输入输出是消息对象"
2.2 BaseChatModel详解
# libs/core/langchain_core/language_models/chat_models.py
from typing import List, Optional, Dict, Any, Union, Iterator, AsyncIterator
from abc import abstractmethod
from langchain_core.language_models.base import BaseLanguageModel
from langchain_core.messages import BaseMessage, BaseMessageChunk
from langchain_core.outputs import ChatResult, ChatGeneration, ChatGenerationChunk
from langchain_core.tools import BaseTool
class BaseChatModel(BaseLanguageModel[BaseMessage], ABC):
"""
聊天模型基类
定义了与聊天模型交互的标准接口
支持多种消息类型:HumanMessage, AIMessage, SystemMessage等
"""
@abstractmethod
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""
生成聊天回复的核心方法
Args:
messages: 消息列表,包含对话历史
stop: 停止词列表
run_manager: 回调管理器
**kwargs: 模型特定的参数
Returns:
包含生成结果的ChatResult对象
Note:
这是子类必须实现的核心方法
"""
pass
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""
流式生成聊天回复
Args:
messages: 消息列表
stop: 停止词列表
run_manager: 回调管理器
**kwargs: 额外参数
Yields:
ChatGenerationChunk对象
Note:
默认实现直接返回_generate的结果
支持流式的模型应该重写此方法
"""
result = self._generate(messages, stop, run_manager, **kwargs)
for generation in result.generations:
yield ChatGenerationChunk(
message=generation.message,
generation_info=generation.generation_info,
)
def bind_tools(
self,
tools: List[Union[Dict[str, Any], type, Callable, BaseTool]],
**kwargs: Any,
) -> "BaseChatModel":
"""
绑定工具到聊天模型
返回一个能够调用指定工具的新模型实例
Args:
tools: 工具列表,支持多种格式
**kwargs: 工具绑定的额外参数
Returns:
绑定了工具的新模型实例
Raises:
NotImplementedError: 如果模型不支持工具调用
"""
formatted_tools = [self._convert_to_tool_spec(tool) for tool in tools]
return self.bind(tools=formatted_tools, **kwargs)
def with_structured_output(
self,
schema: Union[Dict, type],
*,
method: Literal["function_calling", "json_mode"] = "function_calling",
**kwargs: Any,
) -> Runnable[BaseMessage, Union[Dict, BaseModel]]:
"""
创建结构化输出的模型包装器
Args:
schema: 输出结构的定义(Pydantic模型或JSON Schema)
method: 结构化输出的实现方法
**kwargs: 额外配置参数
Returns:
返回结构化数据的Runnable实例
"""
if method == "function_calling":
return self._create_function_calling_wrapper(schema, **kwargs)
elif method == "json_mode":
return self._create_json_mode_wrapper(schema, **kwargs)
else:
raise ValueError(f"Unsupported method: {method}")
def invoke(
self,
input: Union[
PromptValue,
str,
List[Union[BaseMessage, List[BaseMessage], Tuple[str, str], str, Dict[str, Any]]]
],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> BaseMessage:
"""
调用聊天模型生成回复
Args:
input: 输入,支持多种格式
config: 运行配置
**kwargs: 额外参数
Returns:
生成的消息
"""
# 标准化输入为消息列表
messages = self._convert_input_to_messages(input)
# 获取回调管理器
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_llm_start(
serialized=self.to_json(),
prompts=[self._messages_to_string(messages)],
**kwargs,
)
try:
# 执行生成
result = self._generate(messages, run_manager=run_manager, **kwargs)
# 处理回调
if result.llm_output is not None:
run_manager.on_llm_end(result)
return result.generations[0].message
except Exception as e:
run_manager.on_llm_error(e)
raise
def stream(
self,
input: Union[PromptValue, str, List[BaseMessage]],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[BaseMessageChunk]:
"""
流式调用聊天模型
Args:
input: 输入
config: 运行配置
**kwargs: 额外参数
Yields:
消息块
"""
messages = self._convert_input_to_messages(input)
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_llm_start(
serialized=self.to_json(),
prompts=[self._messages_to_string(messages)],
**kwargs,
)
try:
generation_chunk = None
for chunk in self._stream(messages, run_manager=run_manager, **kwargs):
# 累积chunk用于最终回调
if generation_chunk is None:
generation_chunk = chunk
else:
generation_chunk += chunk
# 触发新token回调
run_manager.on_llm_new_token(
chunk.message.content if chunk.message.content else ""
)
yield chunk.message
# 触发结束回调
if generation_chunk is not None:
run_manager.on_llm_end(
LLMResult(generations=[[generation_chunk]])
)
except Exception as e:
run_manager.on_llm_error(e)
raise
def _convert_input_to_messages(self, input: Any) -> List[BaseMessage]:
"""
将各种输入格式转换为标准消息列表
支持的输入格式:
- str: 转换为HumanMessage
- List[BaseMessage]: 直接使用
- PromptValue: 转换为消息
- 其他格式...
"""
if isinstance(input, str):
return [HumanMessage(content=input)]
elif isinstance(input, list):
return convert_to_messages(input)
elif isinstance(input, PromptValue):
return input.to_messages()
else:
raise ValueError(f"Unsupported input type: {type(input)}")
def _convert_to_tool_spec(self, tool: Any) -> Dict[str, Any]:
"""将工具转换为模型可理解的规格"""
if isinstance(tool, BaseTool):
return {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.args_schema.model_json_schema() if tool.args_schema else {}
}
}
# 处理其他工具格式...
raise ValueError(f"Unsupported tool type: {type(tool)}")
2.3 消息系统
LangChain的消息系统定义了统一的消息格式,支持多种消息类型。详细的消息类定义和结构请参考《关键数据结构与UML图》文档。
3. 工具系统模块 (Tools)
3.1 模块架构
graph TB
subgraph "Tools System"
A[BaseTool] --> B[Tool]
A --> C[StructuredTool]
D[@tool装饰器] --> C
E[convert_runnable_to_tool] --> C
F[BaseToolkit] --> G[具体工具包]
H[ToolCall] --> I[工具调用执行]
J[ToolMessage] --> I
K[错误处理] --> L[ToolException]
end
subgraph "工具类型"
M[函数工具]
N[API工具]
O[数据库工具]
P[文件系统工具]
Q[网络工具]
end
A --> M
A --> N
A --> O
A --> P
A --> Q
3.2 BaseTool详解
# libs/core/langchain_core/tools/base.py
from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union
from abc import ABC, abstractmethod
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableSerializable
from langchain_core.callbacks import CallbackManagerForToolRun, AsyncCallbackManagerForToolRun
from langchain_core.messages import ToolCall
class BaseTool(RunnableSerializable[Union[str, Dict, ToolCall], Any]):
"""
工具基类
定义了LangChain工具的标准接口
工具是Agent可以调用的外部功能
"""
name: str
"""工具名称,必须唯一且描述性强"""
description: str = ""
"""工具描述,告诉模型何时以及如何使用这个工具"""
args_schema: Optional[Type[BaseModel]] = Field(default=None, exclude=True)
"""参数模式,定义工具接受的参数结构"""
return_direct: bool = False
"""是否直接返回工具结果而不经过Agent的后处理"""
verbose: bool = False
"""是否启用详细日志输出"""
callbacks: Optional[List[BaseCallbackHandler]] = Field(default=None, exclude=True)
"""回调处理器列表"""
callback_manager: Optional[BaseCallbackManager] = Field(default=None, exclude=True)
"""回调管理器"""
tags: Optional[List[str]] = Field(default=None, exclude=True)
"""工具标签,用于分类和过滤"""
metadata: Optional[Dict[str, Any]] = Field(default=None, exclude=True)
"""工具元数据"""
handle_tool_error: Optional[
Union[bool, str, Callable[[ToolException], str]]
] = Field(default=False, exclude=True)
"""错误处理策略"""
handle_validation_error: Optional[
Union[bool, str, Callable[[ValidationError], str]]
] = Field(default=False, exclude=True)
"""验证错误处理策略"""
# 核心抽象方法
@abstractmethod
def _run(
self,
*args: Any,
run_manager: Optional[CallbackManagerForToolRun] = None,
**kwargs: Any,
) -> Any:
"""
同步执行工具
Args:
*args: 位置参数
run_manager: 运行管理器
**kwargs: 关键字参数
Returns:
工具执行结果
Note:
子类必须实现此方法
"""
pass
async def _arun(
self,
*args: Any,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
**kwargs: Any,
) -> Any:
"""
异步执行工具
默认实现会在线程池中执行同步版本
子类可以重写提供原生异步实现
Args:
*args: 位置参数
run_manager: 异步运行管理器
**kwargs: 关键字参数
Returns:
工具执行结果
"""
return await run_in_executor(
None,
self._run,
*args,
run_manager=run_manager.get_sync() if run_manager else None,
**kwargs,
)
def invoke(
self,
input: Union[str, Dict, ToolCall],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Any:
"""
调用工具执行
Args:
input: 工具输入,支持多种格式
config: 运行配置
**kwargs: 额外参数
Returns:
工具执行结果
"""
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_tool_start(
serialized=self.to_json(),
input_str=str(input),
**kwargs,
)
try:
# 解析输入参数
tool_args, tool_kwargs = self._to_args_and_kwargs(input)
# 添加运行管理器到参数中
if run_manager:
tool_kwargs["run_manager"] = run_manager
# 执行工具
observation = self._run(*tool_args, **tool_kwargs)
# 处理执行结果
run_manager.on_tool_end(observation)
return observation
except ToolException as e:
# 处理工具异常
if not self.handle_tool_error:
run_manager.on_tool_error(e)
raise
observation = self._handle_tool_error(e)
run_manager.on_tool_end(observation)
return observation
except ValidationError as e:
# 处理验证错误
if not self.handle_validation_error:
run_manager.on_tool_error(e)
raise
observation = self._handle_validation_error(e)
run_manager.on_tool_end(observation)
return observation
except Exception as e:
run_manager.on_tool_error(e)
raise
async def ainvoke(
self,
input: Union[str, Dict, ToolCall],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Any:
"""异步调用工具"""
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_tool_start(
serialized=self.to_json(),
input_str=str(input),
**kwargs,
)
try:
tool_args, tool_kwargs = self._to_args_and_kwargs(input)
if run_manager:
tool_kwargs["run_manager"] = run_manager
observation = await self._arun(*tool_args, **tool_kwargs)
await run_manager.on_tool_end(observation)
return observation
except Exception as e:
await run_manager.on_tool_error(e)
if isinstance(e, (ToolException, ValidationError)):
return self._handle_error(e)
raise
def _to_args_and_kwargs(
self, tool_input: Union[str, Dict, ToolCall]
) -> Tuple[Tuple, Dict]:
"""
将工具输入转换为参数和关键字参数
Args:
tool_input: 工具输入
Returns:
(args, kwargs) 元组
"""
if isinstance(tool_input, str):
# 简单字符串输入
if self.args_schema and len(self.args_schema.__fields__) == 1:
# 单参数工具
field_name = next(iter(self.args_schema.__fields__))
return (), {field_name: tool_input}
else:
# 无参数模式或多参数,作为单一输入
return (tool_input,), {}
elif isinstance(tool_input, dict):
# 字典输入,解析为关键字参数
if self.args_schema:
# 验证参数
validated_args = self.args_schema(**tool_input)
return (), validated_args.dict()
else:
return (), tool_input
elif isinstance(tool_input, ToolCall):
# ToolCall对象
return self._to_args_and_kwargs(tool_input.args)
else:
# 其他类型,直接作为位置参数
return (tool_input,), {}
def _handle_tool_error(self, error: ToolException) -> str:
"""处理工具异常"""
if callable(self.handle_tool_error):
return self.handle_tool_error(error)
elif isinstance(self.handle_tool_error, str):
return self.handle_tool_error
else:
return f"Tool execution error: {error}"
def _handle_validation_error(self, error: ValidationError) -> str:
"""处理验证错误"""
if callable(self.handle_validation_error):
return self.handle_validation_error(error)
elif isinstance(self.handle_validation_error, str):
return self.handle_validation_error
else:
return f"Validation error: {error}"
@property
def args(self) -> Dict[str, Any]:
"""获取工具参数的JSON Schema"""
if self.args_schema:
return self.args_schema.schema()
else:
return {}
@property
def is_single_input(self) -> bool:
"""判断是否为单输入工具"""
return (
self.args_schema is None
or len(self.args_schema.__fields__) <= 1
)
class ToolException(Exception):
"""
工具执行异常
这种异常允许工具在执行失败时向Agent发送错误信息
而不会终止Agent的执行流程
"""
pass
3.3 @tool装饰器实现
# libs/core/langchain_core/tools/convert.py
from typing import Any, Awaitable, Callable, Optional, Type, Union, get_type_hints
from functools import wraps
from inspect import signature
from pydantic import BaseModel, create_model
from langchain_core.tools.simple import Tool
from langchain_core.tools.structured import StructuredTool
def tool(
name_or_callable: Union[str, Callable],
*,
description: Optional[str] = None,
return_direct: bool = False,
args_schema: Optional[Type[BaseModel]] = None,
infer_schema: bool = True,
parse_docstring: bool = False,
error_on_invalid_docstring: bool = True,
**kwargs: Any,
) -> Union[Callable[[Callable], BaseTool], BaseTool]:
"""
将函数转换为LangChain工具的装饰器
支持多种使用方式:
1. @tool - 自动推断参数和描述
2. @tool("工具名") - 指定工具名
3. @tool(description="描述") - 指定描述
4. @tool(args_schema=MySchema) - 指定参数模式
Args:
name_or_callable: 工具名称或被装饰的函数
description: 工具描述
return_direct: 是否直接返回结果
args_schema: 参数模式类
infer_schema: 是否自动推断参数模式
parse_docstring: 是否解析docstring获取描述
error_on_invalid_docstring: docstring解析失败是否报错
**kwargs: 额外参数
Returns:
BaseTool实例或装饰器函数
"""
def _create_tool_from_function(func: Callable) -> BaseTool:
"""从函数创建工具"""
# 获取函数签名
func_signature = signature(func)
func_name = name_or_callable if isinstance(name_or_callable, str) else func.__name__
# 获取描述
tool_description = description
if tool_description is None:
if parse_docstring and func.__doc__:
tool_description = _parse_function_docstring(func.__doc__)
else:
tool_description = func.__doc__ or f"Tool for {func_name}"
# 创建或推断参数模式
tool_args_schema = args_schema
if tool_args_schema is None and infer_schema:
tool_args_schema = _create_schema_from_function(func, func_name)
# 检查是否为异步函数
if asyncio.iscoroutinefunction(func):
# 异步工具
return StructuredTool(
name=func_name,
description=tool_description,
coroutine=func,
args_schema=tool_args_schema,
return_direct=return_direct,
**kwargs,
)
else:
# 同步工具
if tool_args_schema:
return StructuredTool(
name=func_name,
description=tool_description,
func=func,
args_schema=tool_args_schema,
return_direct=return_direct,
**kwargs,
)
else:
return Tool(
name=func_name,
description=tool_description,
func=func,
return_direct=return_direct,
**kwargs,
)
# 装饰器逻辑
if callable(name_or_callable):
# 直接使用 @tool
return _create_tool_from_function(name_or_callable)
else:
# 使用 @tool("名称") 或 @tool(description="...")
def decorator(func: Callable) -> BaseTool:
return _create_tool_from_function(func)
return decorator
def _create_schema_from_function(func: Callable, name: str) -> Type[BaseModel]:
"""
从函数签名创建Pydantic参数模式
Args:
func: 目标函数
name: 模式类名
Returns:
Pydantic模型类
"""
sig = signature(func)
type_hints = get_type_hints(func)
fields = {}
for param_name, param in sig.parameters.items():
# 跳过特殊参数
if param_name in FILTERED_ARGS:
continue
# 获取参数类型
param_type = type_hints.get(param_name, Any)
# 获取默认值
default_value = ... if param.default is param.empty else param.default
# 创建字段定义
if param.annotation and hasattr(param.annotation, '__metadata__'):
# 处理Annotated类型
fields[param_name] = (param_type, Field(default=default_value))
else:
fields[param_name] = (param_type, default_value)
# 创建动态模型
return create_model(name, **fields)
def _parse_function_docstring(docstring: str) -> str:
"""
解析函数docstring提取工具描述
支持Google、Numpy和Sphinx格式的docstring
Args:
docstring: 函数的docstring
Returns:
提取的描述文本
"""
lines = docstring.strip().split('\n')
# 提取第一行作为简短描述
short_desc = lines[0].strip()
# 查找详细描述(在Args:之前的部分)
detailed_lines = []
for line in lines[1:]:
line = line.strip()
if line.lower().startswith(('args:', 'arguments:', 'parameters:', 'params:')):
break
if line: # 跳过空行
detailed_lines.append(line)
if detailed_lines:
return short_desc + '\n\n' + '\n'.join(detailed_lines)
else:
return short_desc
# 使用示例
@tool
def calculator(expression: str) -> str:
"""
计算数学表达式的值
这个工具可以计算基本的数学表达式,包括加减乘除和括号。
Args:
expression: 要计算的数学表达式,如 "2+3*4"
Returns:
计算结果的字符串表示
"""
try:
# 安全的数学计算(生产环境应该用更安全的方法)
result = eval(expression)
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {e}"
@tool("天气查询", description="查询指定城市的当前天气")
def get_weather(city: str) -> str:
"""查询天气信息"""
# 模拟API调用
weather_data = {
"北京": "晴天,15°C",
"上海": "多云,18°C",
"深圳": "小雨,25°C"
}
return weather_data.get(city, f"无法获取{city}的天气信息")
# 使用Pydantic模式的高级工具
from pydantic import BaseModel, Field
class SearchArgs(BaseModel):
"""搜索参数"""
query: str = Field(description="搜索关键词")
limit: int = Field(default=10, description="返回结果数量", ge=1, le=100)
category: Optional[str] = Field(default=None, description="搜索类别")
@tool("网络搜索", args_schema=SearchArgs)
def web_search(query: str, limit: int = 10, category: Optional[str] = None) -> str:
"""在网络上搜索信息"""
# 模拟搜索逻辑
results = [
f"搜索结果 {i+1}: {query} 相关内容"
for i in range(min(limit, 5))
]
return "\n".join(results)
这个文档详细解析了LangChain的核心模块,包括Core模块的Runnable接口、语言模型系统和工具系统。每个模块都提供了完整的源码分析、架构图和使用示例。
<function_calls>