概述
LangChain主库构建在Core模块的抽象基础之上,实现了具体的业务逻辑和高级功能。主库的核心组件,包括Chain的执行机制、Agent的推理循环、Memory的状态管理等关键实现。
1. LangChain主库架构
1.1 主库模块组织
graph TB
subgraph "LangChain 主库结构"
subgraph "核心执行层"
C[chains/ - 链式执行]
A[agents/ - 智能代理]
M[memory/ - 状态记忆]
end
subgraph "组件集成层"
LLM[llms/ - 语言模型集成]
CM[chat_models/ - 聊天模型]
E[embeddings/ - 嵌入模型]
VS[vectorstores/ - 向量存储]
end
subgraph "数据处理层"
DL[document_loaders/ - 文档加载]
TS[text_splitter/ - 文本分割]
DT[document_transformers/ - 文档转换]
OP[output_parsers/ - 输出解析]
end
subgraph "工具生态层"
T[tools/ - 工具集成]
U[utilities/ - 工具函数]
R[retrievers/ - 检索器]
I[indexes/ - 索引系统]
end
subgraph "评估监控层"
EV[evaluation/ - 评估框架]
CB[callbacks/ - 回调扩展]
S[smith/ - LangSmith集成]
end
end
C --> LLM
A --> T
M --> VS
LLM --> DL
CM --> TS
E --> DT
T --> U
R --> I
EV --> CB
CB --> S
style C fill:#e1f5fe
style A fill:#f3e5f5
style M fill:#e8f5e8
1.2 依赖关系和继承层次
# langchain/__init__.py
"""包的主要入口点。
Main entrypoint into package."""
import warnings
from importlib import metadata
from typing import Any, Optional
from langchain_core._api.deprecation import surface_langchain_deprecation_warnings
try:
__version__ = metadata.version(__package__)
except metadata.PackageNotFoundError:
# Case where package metadata is not available.
__version__ = ""
del metadata # optional, avoids polluting the results of dir(__package__)
def _warn_on_import(name: str, replacement: Optional[str] = None) -> None:
"""导入已弃用模块时发出警告。
Warn on import of deprecated module."""
from langchain._api.interactive_env import is_interactive_env
if is_interactive_env():
# No warnings for interactive environments.
# This is done to avoid polluting the output of interactive environments
# where users rely on auto-complete and may trigger this warning
# even if they are not using any deprecated modules
return
if replacement:
warnings.warn(
f"Importing {name} from langchain root module is no longer supported. "
f"Please use {replacement} instead.",
stacklevel=3,
)
else:
warnings.warn(
f"Importing {name} from langchain root module is no longer supported.",
stacklevel=3,
)
# Surfaces Deprecation and Pending Deprecation warnings from langchain.
surface_langchain_deprecation_warnings()
2. Chain模块:链式执行的核心
2.1 Chain基类设计
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from langchain_core.runnables import RunnableSerializable
from langchain_core.memory import BaseMemory
from langchain_core.callbacks import Callbacks
class Chain(RunnableSerializable[Dict[str, Any], Dict[str, Any]], ABC):
"""创建结构化组件调用序列的抽象基类。
Chain应该用于编码对模型、文档检索器、其他Chain等组件的调用序列,
并为此序列提供简单的接口。
Chain接口使创建以下应用变得容易:
- 有状态的:向任何Chain添加Memory以赋予其状态
- 可观察的:向Chain传递Callbacks以在主要组件调用序列之外执行额外功能
- 可组合的:Chain API足够灵活,可以轻松地将Chain与其他组件(包括其他Chain)结合
主要暴露的方法:
- __call__:Chain是可调用的。__call__方法是执行Chain的主要方式
- run:一个便利方法,接受args/kwargs作为输入并返回字符串或对象输出
"""
# === 核心属性 ===
memory: Optional[BaseMemory] = None
"""可选的内存对象。默认为None。
Memory是一个在每个链的开始和结束时被调用的类。
在开始时,memory加载变量并在链中传递它们。
在结束时,它保存任何返回的变量。
有许多不同类型的memory - 请参阅memory文档以获取完整目录。"""
callbacks: Callbacks = Field(default=None, exclude=True)
"""可选的回调处理器列表(或回调管理器)。默认为None。
回调处理器在链调用的整个生命周期中被调用,
从on_chain_start开始,以on_chain_end或on_chain_error结束。
每个自定义链可以选择调用额外的回调方法,详见Callback文档。"""
verbose: bool = Field(default_factory=_get_verbosity)
"""是否以详细模式运行。在详细模式下,一些中间日志将被打印到控制台。
默认为`langchain.verbose`值。"""
tags: Optional[List[str]] = Field(default=None, exclude=True)
"""与链关联的可选标签列表。默认为None。
这些标签将与对此链的每次调用相关联,
并作为参数传递给在`callbacks`中定义的处理器。
您可以使用这些来例如识别链的特定实例与其用例。"""
metadata: Optional[Dict[str, Any]] = Field(default=None, exclude=True)
"""与链关联的可选元数据。默认为None。
此元数据将与对此链的每次调用相关联,
并作为参数传递给在`callbacks`中定义的处理器。
您可以使用这些来例如识别链的特定实例与其用例。"""
# === 抽象属性 ===
@property
@abstractmethod
def input_keys(self) -> List[str]:
"""输入键。
:meta private:
"""
@property
@abstractmethod
def output_keys(self) -> List[str]:
"""输出键。
:meta private:
"""
# === 抽象方法 ===
@abstractmethod
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""执行链。
这是一个不面向用户的私有方法。它只在Chain.__call__内部调用,
Chain.__call__是处理回调配置和一些输入/输出处理的面向用户的包装方法。
Args:
inputs: 链的命名输入字典。假定包含Chain.input_keys中指定的所有输入,
包括memory添加的任何输入。
run_manager: 包含此链运行的回调处理器的回调管理器。
Returns:
命名输出字典。应包含Chain.output_keys中指定的所有输出。
"""
async def _acall(
self,
inputs: Dict[str, Any],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""异步执行链。
这是一个不面向用户的私有方法。它只在Chain.acall内部调用,
Chain.acall是处理回调配置和一些输入/输出处理的面向用户的包装方法。
Args:
inputs: 链的命名输入字典。假定包含Chain.input_keys中指定的所有输入,
包括memory添加的任何输入。
run_manager: 包含此链运行的回调处理器的回调管理器。
Returns:
命名输出字典。应包含Chain.output_keys中指定的所有输出。
"""
return await run_in_executor(
None,
self._call,
inputs,
run_manager.get_sync() if run_manager else None,
)
# === 输入输出处理 ===
def prep_inputs(self, inputs: Union[Dict[str, Any], Any]) -> Dict[str, Any]:
"""验证并准备链输入,包括添加来自memory的输入。
Args:
inputs: 字典输入或单个输入(如果链只期望一个输入)。
Returns:
包含所有输入的字典,包括来自memory的输入。
"""
if not isinstance(inputs, dict):
_input_keys = set(self.input_keys)
if self.memory is not None:
# 如果有memory,我们需要检查它添加了哪些键
_input_keys = _input_keys.difference(self.memory.memory_variables)
if len(_input_keys) != 1:
raise ValueError(
f"单个输入传递给期望多个输入的链: {_input_keys}。"
f"请传递一个字典,例如 {{'foo': 'bar'}}"
)
inputs = {list(_input_keys)[0]: inputs}
if self.memory is not None:
external_context = self.memory.load_memory_variables(inputs)
inputs = dict(inputs, **external_context)
self._validate_inputs(inputs)
return inputs
def prep_outputs(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
return_only_outputs: bool = False,
) -> Dict[str, Any]:
"""验证并准备链输出,并保存到memory。
Args:
inputs: 链输入字典。
outputs: 链输出字典。
return_only_outputs: 是否只返回输出。
如果为False,输入也会包含在最终输出中。
Returns:
包含链输出的字典,如果return_only_outputs为False,还包含输入。
"""
self._validate_outputs(outputs)
if self.memory is not None:
self.memory.save_context(inputs, outputs)
if return_only_outputs:
return outputs
else:
return {**inputs, **outputs}
def _validate_inputs(self, inputs: Dict[str, Any]) -> None:
"""检查链输入是否有效。
Args:
inputs: 链输入字典。
Raises:
ValueError: 如果输入无效。
"""
missing_keys = set(self.input_keys).difference(inputs)
if missing_keys:
raise ValueError(f"缺少一些输入键: {missing_keys}")
def _validate_outputs(self, outputs: Dict[str, Any]) -> None:
"""检查链输出是否有效。
Args:
outputs: 链输出字典。
Raises:
ValueError: 如果输出无效。
"""
missing_keys = set(self.output_keys).difference(outputs)
if missing_keys:
raise ValueError(f"缺少一些输出键: {missing_keys}")
# === Runnable接口实现 ===
def invoke(
self,
input: Dict[str, Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Runnable接口的invoke实现。"""
config = ensure_config(config)
return self(
input,
callbacks=config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
**kwargs,
)
async def ainvoke(
self,
input: Dict[str, Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""异步invoke实现。"""
config = ensure_config(config)
return await self.acall(
input,
callbacks=config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
**kwargs,
)
# === 主要执行方法 ===
@deprecated("0.1.0", alternative="invoke", removal="1.0")
def __call__(
self,
inputs: Union[Dict[str, Any], Any],
return_only_outputs: bool = False,
callbacks: Callbacks = None,
*,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
run_name: Optional[str] = None,
include_run_info: bool = False,
) -> Dict[str, Any]:
"""执行链。
Args:
inputs: 链输入字典。
return_only_outputs: 是否只返回输出。
callbacks: 要使用的回调。
tags: 要附加到运行的标签。
metadata: 要附加到运行的元数据。
run_name: 运行的名称。
include_run_info: 是否在输出中包含运行信息。
Returns:
链输出字典。
"""
config = {
"callbacks": callbacks,
"tags": tags,
"metadata": metadata,
"run_name": run_name,
}
inputs = self.prep_inputs(inputs)
callback_manager = get_callback_manager_for_config(config)
new_arg_supported = inspect.signature(self._call).parameters.get("run_manager")
with callback_manager.on_chain_start(
dumpd(self),
inputs,
name=run_name,
) as run_manager:
try:
if new_arg_supported:
outputs = self._call(inputs, run_manager=run_manager)
else:
outputs = self._call(inputs)
except Exception as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(outputs)
final_outputs: Dict[str, Any] = self.prep_outputs(
inputs, outputs, return_only_outputs
)
if include_run_info:
final_outputs[RUN_KEY] = RunInfo(run_id=run_manager.run_id)
return final_outputs
async def acall(
self,
inputs: Union[Dict[str, Any], Any],
return_only_outputs: bool = False,
callbacks: Callbacks = None,
*,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
run_name: Optional[str] = None,
include_run_info: bool = False,
) -> Dict[str, Any]:
"""异步执行链。"""
config = {
"callbacks": callbacks,
"tags": tags,
"metadata": metadata,
"run_name": run_name,
}
inputs = self.prep_inputs(inputs)
callback_manager = get_async_callback_manager_for_config(config)
new_arg_supported = inspect.signature(self._acall).parameters.get("run_manager")
async with callback_manager.on_chain_start(
dumpd(self),
inputs,
name=run_name,
) as run_manager:
try:
if new_arg_supported:
outputs = await self._acall(inputs, run_manager=run_manager)
else:
outputs = await self._acall(inputs)
except Exception as e:
await run_manager.on_chain_error(e)
raise
else:
await run_manager.on_chain_end(outputs)
final_outputs: Dict[str, Any] = self.prep_outputs(
inputs, outputs, return_only_outputs
)
if include_run_info:
final_outputs[RUN_KEY] = RunInfo(run_id=run_manager.run_id)
return final_outputs
# === 便利方法 ===
def run(
self,
*args: Any,
callbacks: Callbacks = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""便利方法,用于执行链并返回单个字符串输出。
Args:
*args: 如果链只期望一个输入,则可以作为位置参数传递。
callbacks: 要使用的回调。
tags: 要附加到运行的标签。
metadata: 要附加到运行的元数据。
**kwargs: 如果链期望多个输入,则可以作为关键字参数传递。
Returns:
链输出。如果链有单个输出键,则返回该键的值。
否则,返回包含所有输出的字典。
"""
if args and not kwargs:
if len(args) != 1:
raise ValueError("`run` 支持单个位置参数或关键字参数,不支持两者。")
if self.input_keys and len(self.input_keys) == 1:
inputs = {self.input_keys[0]: args[0]}
else:
inputs = args[0]
elif kwargs and not args:
inputs = kwargs
else:
raise ValueError("`run` 支持单个位置参数或关键字参数,不支持两者。")
outputs = self(
inputs,
callbacks=callbacks,
tags=tags,
metadata=metadata,
return_only_outputs=True,
)
# 如果只有一个输出键,返回该值
if len(self.output_keys) == 1:
return outputs[self.output_keys[0]]
else:
return outputs
async def arun(
self,
*args: Any,
callbacks: Callbacks = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""异步便利方法。"""
if args and not kwargs:
if len(args) != 1:
raise ValueError("`arun` 支持单个位置参数或关键字参数,不支持两者。")
if self.input_keys and len(self.input_keys) == 1:
inputs = {self.input_keys[0]: args[0]}
else:
inputs = args[0]
elif kwargs and not args:
inputs = kwargs
else:
raise ValueError("`arun` 支持单个位置参数或关键字参数,不支持两者。")
outputs = await self.acall(
inputs,
callbacks=callbacks,
tags=tags,
metadata=metadata,
return_only_outputs=True,
)
if len(self.output_keys) == 1:
return outputs[self.output_keys[0]]
else:
return outputs
2.2 LLMChain:最基础的链实现
from typing import Any, Dict, List, Optional
from langchain_core.language_models import BaseLanguageModel
from langchain_core.prompts import BasePromptTemplate
from langchain_core.output_parsers import BaseOutputParser
class LLMChain(Chain):
"""使用语言模型的链。
这是最基础的链实现,将提示模板、语言模型和输出解析器组合在一起。
示例:
.. code-block:: python
from langchain import LLMChain, OpenAI, PromptTemplate
prompt_template = "Tell me a {adjective} joke"
prompt = PromptTemplate(
input_variables=["adjective"], template=prompt_template
)
llm = LLMChain(llm=OpenAI(), prompt=prompt)
"""
prompt: BasePromptTemplate
"""要使用的提示模板。"""
llm: BaseLanguageModel
"""要使用的语言模型。"""
output_parser: BaseOutputParser = Field(default_factory=NoOpOutputParser)
"""要使用的输出解析器。默认为简单的字符串解析器。"""
output_key: str = "text" #: :meta private:
"""输出键。"""
return_final_only: bool = True
"""是否只返回最终解析的结果。如果为False,将返回原始文本和解析结果。"""
llm_kwargs: dict = Field(default_factory=dict)
"""传递给LLM的额外关键字参数。"""
@property
def input_keys(self) -> List[str]:
"""输入键。
:meta private:
"""
return self.prompt.input_variables
@property
def output_keys(self) -> List[str]:
"""输出键。
:meta private:
"""
if self.return_final_only:
return [self.output_key]
else:
return [self.output_key, f"{self.output_key}_raw"]
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, str]:
"""执行LLM链。
Args:
inputs: 输入字典,包含提示模板所需的所有变量。
run_manager: 回调管理器。
Returns:
包含LLM输出的字典。
"""
# 1. 格式化提示
prompt_value = self.prompt.format_prompt(**inputs)
# 2. 调用LLM
response = self.llm.generate_prompt(
[prompt_value],
callbacks=run_manager.get_child() if run_manager else None,
**self.llm_kwargs,
)
# 3. 提取文本
if self.llm.llm_type == "openai-chat":
# 聊天模型返回消息
text_output = response.generations[0][0].message.content
else:
# 文本模型返回文本
text_output = response.generations[0][0].text
# 4. 解析输出
if self.output_parser:
parsed_output = self.output_parser.parse(text_output)
else:
parsed_output = text_output
# 5. 构建返回结果
if self.return_final_only:
return {self.output_key: parsed_output}
else:
return {
self.output_key: parsed_output,
f"{self.output_key}_raw": text_output,
}
async def _acall(
self,
inputs: Dict[str, Any],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, str]:
"""异步执行LLM链。"""
# 1. 格式化提示
prompt_value = self.prompt.format_prompt(**inputs)
# 2. 异步调用LLM
response = await self.llm.agenerate_prompt(
[prompt_value],
callbacks=run_manager.get_child() if run_manager else None,
**self.llm_kwargs,
)
# 3. 提取文本
if self.llm.llm_type == "openai-chat":
text_output = response.generations[0][0].message.content
else:
text_output = response.generations[0][0].text
# 4. 解析输出
if self.output_parser:
parsed_output = self.output_parser.parse(text_output)
else:
parsed_output = text_output
# 5. 构建返回结果
if self.return_final_only:
return {self.output_key: parsed_output}
else:
return {
self.output_key: parsed_output,
f"{self.output_key}_raw": text_output,
}
def predict(self, **kwargs: Any) -> str:
"""便利方法,用于预测单个输出。
Args:
**kwargs: 提示模板的输入变量。
Returns:
LLM的预测结果。
"""
return self.run(**kwargs)
async def apredict(self, **kwargs: Any) -> str:
"""异步预测方法。"""
return await self.arun(**kwargs)
def predict_and_parse(self, **kwargs: Any) -> Union[str, List[str], Dict]:
"""预测并解析输出。
Args:
**kwargs: 提示模板的输入变量。
Returns:
解析后的输出。
"""
result = self.predict(**kwargs)
if self.output_parser:
return self.output_parser.parse(result)
else:
return result
async def apredict_and_parse(self, **kwargs: Any) -> Union[str, List[str], Dict]:
"""异步预测并解析输出。"""
result = await self.apredict(**kwargs)
if self.output_parser:
return self.output_parser.parse(result)
else:
return result
def apply(self, input_list: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""对输入列表应用链。
Args:
input_list: 输入字典列表。
Returns:
输出字典列表。
"""
response = self.generate(input_list)
return self.create_outputs(response)
async def aapply(self, input_list: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""异步对输入列表应用链。"""
response = await self.agenerate(input_list)
return self.create_outputs(response)
def generate(
self,
input_list: List[Dict[str, Any]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> LLMResult:
"""为输入列表生成LLM结果。
Args:
input_list: 输入字典列表。
run_manager: 回调管理器。
Returns:
LLM结果。
"""
prompts = []
for inputs in input_list:
prompt_value = self.prompt.format_prompt(**inputs)
prompts.append(prompt_value)
return self.llm.generate_prompt(
prompts,
callbacks=run_manager.get_child() if run_manager else None,
**self.llm_kwargs,
)
async def agenerate(
self,
input_list: List[Dict[str, Any]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> LLMResult:
"""异步为输入列表生成LLM结果。"""
prompts = []
for inputs in input_list:
prompt_value = self.prompt.format_prompt(**inputs)
prompts.append(prompt_value)
return await self.llm.agenerate_prompt(
prompts,
callbacks=run_manager.get_child() if run_manager else None,
**self.llm_kwargs,
)
def create_outputs(self, llm_result: LLMResult) -> List[Dict[str, str]]:
"""从LLM结果创建输出。
Args:
llm_result: LLM结果。
Returns:
输出字典列表。
"""
outputs = []
for generation in llm_result.generations:
# 提取文本
if self.llm.llm_type == "openai-chat":
text_output = generation[0].message.content
else:
text_output = generation[0].text
# 解析输出
if self.output_parser:
parsed_output = self.output_parser.parse(text_output)
else:
parsed_output = text_output
# 构建输出字典
if self.return_final_only:
outputs.append({self.output_key: parsed_output})
else:
outputs.append({
self.output_key: parsed_output,
f"{self.output_key}_raw": text_output,
})
return outputs
@property
def _chain_type(self) -> str:
return "llm"
2.3 SequentialChain:序列链实现
class SequentialChain(Chain):
"""其中一个链的输出直接馈送到下一个链的链。
这个链允许你将多个链串联起来,其中每个链的输出
成为下一个链的输入。
示例:
.. code-block:: python
from langchain import SequentialChain, LLMChain, OpenAI, PromptTemplate
# 第一个链:生成故事大纲
outline_template = "Write an outline for a story about {topic}"
outline_prompt = PromptTemplate(
input_variables=["topic"], template=outline_template
)
outline_chain = LLMChain(
llm=OpenAI(), prompt=outline_prompt, output_key="outline"
)
# 第二个链:基于大纲写故事
story_template = "Write a story based on this outline: {outline}"
story_prompt = PromptTemplate(
input_variables=["outline"], template=story_template
)
story_chain = LLMChain(
llm=OpenAI(), prompt=story_prompt, output_key="story"
)
# 组合成序列链
sequential_chain = SequentialChain(
chains=[outline_chain, story_chain],
input_variables=["topic"],
output_variables=["story"]
)
"""
chains: List[Chain]
"""要按顺序运行的链列表。"""
input_variables: List[str]
"""序列链期望的输入变量。"""
output_variables: List[str] #: :meta private:
"""序列链将输出的变量。"""
return_all: bool = False
"""是否返回所有中间输出,还是只返回最终输出。"""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra="forbid",
)
@property
def input_keys(self) -> List[str]:
"""返回链的期望输入键。
:meta private:
"""
return self.input_variables
@property
def output_keys(self) -> List[str]:
"""返回输出键。
:meta private:
"""
return self.output_variables
@model_validator(mode="before")
@classmethod
def validate_chains(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""验证链配置。
Args:
values: 配置值字典。
Returns:
验证后的配置值。
Raises:
ValueError: 如果链配置无效。
"""
chains = values.get("chains", [])
input_variables = values.get("input_variables", [])
output_variables = values.get("output_variables", [])
if not chains:
raise ValueError("必须提供至少一个链")
# 验证输入变量
if not input_variables:
raise ValueError("必须指定input_variables")
# 验证输出变量
if not output_variables:
raise ValueError("必须指定output_variables")
# 检查链的连接性
cls._validate_chain_connectivity(chains, input_variables, output_variables)
return values
@classmethod
def _validate_chain_connectivity(
cls,
chains: List[Chain],
input_variables: List[str],
output_variables: List[str],
) -> None:
"""验证链之间的连接性。
Args:
chains: 链列表。
input_variables: 输入变量。
output_variables: 输出变量。
Raises:
ValueError: 如果链连接无效。
"""
# 跟踪可用变量
available_vars = set(input_variables)
for i, chain in enumerate(chains):
# 检查当前链的输入是否都可用
missing_inputs = set(chain.input_keys) - available_vars
if missing_inputs:
raise ValueError(
f"链 {i} 缺少输入变量: {missing_inputs}。"
f"可用变量: {available_vars}"
)
# 添加当前链的输出到可用变量
available_vars.update(chain.output_keys)
# 检查最终输出变量是否都可用
missing_outputs = set(output_variables) - available_vars
if missing_outputs:
raise ValueError(
f"输出变量 {missing_outputs} 在任何链中都不可用。"
f"可用变量: {available_vars}"
)
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""执行序列链。
Args:
inputs: 输入字典。
run_manager: 回调管理器。
Returns:
输出字典。
"""
# 初始化已知变量
known_values = inputs.copy()
# 按顺序执行每个链
for i, chain in enumerate(self.chains):
# 准备当前链的输入
chain_inputs = {
key: known_values[key]
for key in chain.input_keys
if key in known_values
}
# 执行链
if run_manager:
child_manager = run_manager.get_child(f"chain_{i}")
chain_outputs = chain(chain_inputs, callbacks=[child_manager])
else:
chain_outputs = chain(chain_inputs)
# 更新已知变量
known_values.update(chain_outputs)
# 准备最终输出
if self.return_all:
# 返回所有变量(除了原始输入)
outputs = {
key: known_values[key]
for key in known_values
if key not in inputs
}
else:
# 只返回指定的输出变量
outputs = {
key: known_values[key]
for key in self.output_variables
}
return outputs
async def _acall(
self,
inputs: Dict[str, Any],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""异步执行序列链。"""
known_values = inputs.copy()
for i, chain in enumerate(self.chains):
chain_inputs = {
key: known_values[key]
for key in chain.input_keys
if key in known_values
}
if run_manager:
child_manager = await run_manager.get_child(f"chain_{i}")
chain_outputs = await chain.acall(chain_inputs, callbacks=[child_manager])
else:
chain_outputs = await chain.acall(chain_inputs)
known_values.update(chain_outputs)
if self.return_all:
outputs = {
key: known_values[key]
for key in known_values
if key not in inputs
}
else:
outputs = {
key: known_values[key]
for key in self.output_variables
}
return outputs
@property
def _chain_type(self) -> str:
return "sequential"
3. Agent模块:智能代理系统
3.1 Agent执行器架构
sequenceDiagram
participant U as 用户
participant AE as AgentExecutor
participant A as Agent
participant LLM as LanguageModel
participant T as Tool
participant M as Memory
Note over U,M: Agent执行完整流程
U->>AE: invoke({"input": "用户问题"})
AE->>AE: 初始化状态
AE->>M: load_memory_variables()
M-->>AE: 历史上下文
loop Agent推理循环
Note over AE,LLM: 规划阶段 - Agent Planning
AE->>A: plan(intermediate_steps, inputs)
A->>A: 构造提示模板
A->>LLM: 调用语言模型推理
LLM-->>A: 返回推理结果
A->>A: 解析LLM输出
alt 如果是AgentFinish
A-->>AE: AgentFinish(return_values)
AE->>M: save_context(inputs, outputs)
AE-->>U: 返回最终答案
else 如果是AgentAction
A-->>AE: AgentAction(tool, tool_input, log)
Note over AE,T: 行动阶段 - Tool Execution
AE->>T: run(tool_input)
T->>T: 执行工具逻辑
T-->>AE: tool_output
AE->>AE: 更新intermediate_steps
AE->>AE: 检查迭代限制
alt 达到最大迭代次数
AE->>AE: 强制停止或生成最终答案
AE-->>U: 返回结果
else 继续循环
Note over AE: 准备下一轮推理
end
end
end
3.2 AgentExecutor:Agent执行器实现
from typing import Any, Dict, List, Optional, Sequence, Union
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.tools import BaseTool
from langchain.agents.agent import BaseSingleActionAgent, BaseMultiActionAgent
class AgentExecutor(Chain):
"""使用工具的Agent。
AgentExecutor负责运行Agent并管理其与工具的交互。
它实现了Agent的推理-行动循环,包括:
1. 调用Agent进行推理
2. 执行Agent选择的工具
3. 将工具结果反馈给Agent
4. 重复直到Agent决定停止
设计特点:
1. 灵活的Agent接口:支持单动作和多动作Agent
2. 工具管理:统一的工具调用和错误处理
3. 执行控制:迭代限制、时间限制、早停策略
4. 可观测性:完整的回调和追踪支持
"""
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent, Runnable]
"""要运行的Agent,用于在执行循环的每个步骤创建计划并确定要采取的行动。"""
tools: Sequence[BaseTool]
"""Agent可以调用的有效工具。"""
return_intermediate_steps: bool = False
"""是否在最终输出之外还返回Agent的中间步骤轨迹。"""
max_iterations: Optional[int] = 15
"""结束执行循环之前要采取的最大步骤数。
设置为'None'可能导致无限循环。"""
max_execution_time: Optional[float] = None
"""在执行循环中花费的最大墙钟时间量。"""
early_stopping_method: str = "force"
"""如果Agent从未返回`AgentFinish`,用于早停的方法。
可以是'force'或'generate'。
`"force"`返回一个字符串,说明它因为遇到时间或迭代限制而停止。
`"generate"`最后一次调用Agent的LLM Chain,
基于之前的步骤生成最终答案。"""
handle_parsing_errors: Union[bool, str, Callable[[OutputParserException], str]] = (
False
)
"""如何处理Agent输出解析器引发的错误。
默认为`False`,会抛出错误。
如果为`true`,错误将作为观察发送回LLM。
如果为字符串,字符串本身将作为观察发送给LLM。
如果为可调用函数,函数将以异常作为参数调用,
函数的结果将作为观察传递给Agent。"""
trim_intermediate_steps: Union[
int,
Callable[[List[Tuple[AgentAction, str]]], List[Tuple[AgentAction, str]]],
] = -1
"""如何修剪传递给Agent的中间步骤。
如果为int,将保留最后N个步骤。
如果为callable,将调用该函数来修剪步骤。"""
def __init__(
self,
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent, Runnable],
tools: Sequence[BaseTool],
callback_manager: Optional[BaseCallbackManager] = None,
**kwargs: Any,
):
"""初始化AgentExecutor。
Args:
agent: 要使用的Agent。
tools: 工具列表。
callback_manager: 回调管理器。
**kwargs: 额外参数。
"""
super().__init__(**kwargs)
self.agent = agent
self.tools = tools
# 验证工具名称唯一性
tool_names = [tool.name for tool in tools]
if len(tool_names) != len(set(tool_names)):
raise ValueError("工具名称必须唯一")
@property
def input_keys(self) -> List[str]:
"""返回期望的输入键。"""
return self.agent.input_keys
@property
def output_keys(self) -> List[str]:
"""返回输出键。"""
if self.return_intermediate_steps:
return self.agent.return_values + ["intermediate_steps"]
else:
return self.agent.return_values
def _should_continue(self, iterations: int, time_elapsed: float) -> bool:
"""检查是否应该继续执行循环。
Args:
iterations: 当前迭代次数。
time_elapsed: 已经过的时间。
Returns:
是否应该继续。
"""
if self.max_iterations is not None and iterations >= self.max_iterations:
return False
if (
self.max_execution_time is not None
and time_elapsed >= self.max_execution_time
):
return False
return True
def _call(
self,
inputs: Dict[str, str],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""运行文本并获取Agent响应。
Args:
inputs: 输入字典。
run_manager: 回调管理器。
Returns:
包含Agent输出的字典。
"""
# 构建工具名称到工具的映射,便于查找
name_to_tool_map = {tool.name: tool for tool in self.tools}
# 为日志记录构建从每个工具到颜色的映射
color_mapping = get_color_mapping(
[tool.name for tool in self.tools],
excluded_colors=["green", "red"],
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# 开始跟踪迭代次数和经过的时间
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# 现在进入Agent循环(直到它返回某些东西)
while self._should_continue(iterations, time_elapsed):
next_step_output = self._take_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if isinstance(next_step_output, AgentFinish):
return self._return(
next_step_output,
intermediate_steps,
run_manager=run_manager,
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# 查看工具是否应该直接返回
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return self._return(
tool_return,
intermediate_steps,
run_manager=run_manager,
)
iterations += 1
time_elapsed = time.time() - start_time
output = self._return(
self._get_action_agent().return_stopped_response(
self.early_stopping_method, intermediate_steps
),
intermediate_steps,
run_manager=run_manager,
)
return output
def _take_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
"""采取单个步骤在Agent循环中。
这包括:
1. 调用Agent规划下一个动作
2. 执行该动作
3. 返回结果
Args:
name_to_tool_map: 工具名称到工具的映射。
color_mapping: 工具到颜色的映射。
inputs: 用户输入。
intermediate_steps: 到目前为止的中间步骤。
run_manager: 回调管理器。
Returns:
AgentFinish或步骤列表。
"""
try:
# 修剪中间步骤
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# 调用Agent规划
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"Agent输出解析出错。"
"请传递`handle_parsing_errors=True`以自动处理这些错误。"
) from e
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "解析LLM输出时出现无效格式"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
# 如果Agent返回AgentFinish,则完成
if isinstance(output, AgentFinish):
return output
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
result = []
for agent_action in actions:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
# 否则我们查找工具并运行它
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# 执行工具
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = InvalidTool().run(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
result.append((agent_action, observation))
return result
def _prepare_intermediate_steps(
self, steps: List[Tuple[AgentAction, str]]
) -> List[Tuple[AgentAction, str]]:
"""准备中间步骤以传递给Agent。
这可能涉及修剪步骤以保持在上下文长度限制内。
Args:
steps: 原始中间步骤。
Returns:
处理后的中间步骤。
"""
if isinstance(self.trim_intermediate_steps, int):
if self.trim_intermediate_steps == -1:
# 不修剪
return steps
else:
# 保留最后N个步骤
return steps[-self.trim_intermediate_steps :]
elif callable(self.trim_intermediate_steps):
# 使用自定义函数修剪
return self.trim_intermediate_steps(steps)
else:
raise ValueError(
f"Got unexpected type of `trim_intermediate_steps`: "
f"{type(self.trim_intermediate_steps)}"
)
def _return(
self,
output: AgentFinish,
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""返回Agent的输出。
Args:
output: Agent的最终输出。
intermediate_steps: 中间步骤列表。
run_manager: 回调管理器。
Returns:
格式化的输出字典。
"""
if run_manager:
run_manager.on_agent_finish(output, color="green", verbose=self.verbose)
final_output = output.return_values
if self.return_intermediate_steps:
final_output["intermediate_steps"] = intermediate_steps
return final_output
def _get_tool_return(
self, next_step_output: Tuple[AgentAction, str]
) -> Optional[AgentFinish]:
"""检查工具是否应该直接返回。
Args:
next_step_output: 下一步输出。
Returns:
如果工具应该直接返回,则返回AgentFinish,否则返回None。
"""
agent_action, observation = next_step_output
name_to_tool_map = {tool.name: tool for tool in self.tools}
if agent_action.tool in name_to_tool_map:
if name_to_tool_map[agent_action.tool].return_direct:
return AgentFinish(
{self.agent.return_values[0]: observation},
"",
)
return None
async def _acall(
self,
inputs: Dict[str, str],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""异步运行Agent。"""
name_to_tool_map = {tool.name: tool for tool in self.tools}
color_mapping = get_color_mapping(
[tool.name for tool in self.tools],
excluded_colors=["green", "red"],
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
iterations = 0
time_elapsed = 0.0
start_time = time.time()
while self._should_continue(iterations, time_elapsed):
next_step_output = await self._atake_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if isinstance(next_step_output, AgentFinish):
return self._return(
next_step_output,
intermediate_steps,
run_manager=run_manager,
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return self._return(
tool_return,
intermediate_steps,
run_manager=run_manager,
)
iterations += 1
time_elapsed = time.time() - start_time
output = self._return(
self._get_action_agent().return_stopped_response(
self.early_stopping_method, intermediate_steps
),
intermediate_steps,
run_manager=run_manager,
)
return output
async def _atake_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
"""异步采取下一步。"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# 调用Agent规划
output = await self.agent.aplan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
# 错误处理逻辑与同步版本相同
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"Agent输出解析出错。"
"请传递`handle_parsing_errors=True`以自动处理这些错误。"
) from e
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "解析LLM输出时出现无效格式"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
if isinstance(output, AgentFinish):
return output
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
result = []
for agent_action in actions:
if run_manager:
await run_manager.on_agent_action(agent_action, color="green")
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# 异步执行工具
observation = await tool.arun(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = await InvalidTool().arun(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
result.append((agent_action, observation))
return result
@property
def _chain_type(self) -> str:
return "agent_executor"
3.3 RunnableAgent:通过…实现
class RunnableAgent(BaseSingleActionAgent):
"""由Runnable驱动的Agent。
这是一个现代化的Agent实现,基于Runnable接口构建。
它将Agent的推理逻辑封装在一个Runnable中,提供了更好的
组合性和可扩展性。
设计特点:
1. 基于Runnable:利用统一的执行接口
2. 流式支持:可以流式输出推理过程
3. 类型安全:明确的输入输出类型
4. 易于测试:可以独立测试推理逻辑
"""
runnable: Runnable[Dict[str, Any], Union[AgentAction, AgentFinish]]
"""执行Agent推理的Runnable。"""
input_keys_arg: List[str] = []
"""Agent期望的输入键。"""
stream_runnable: bool = True
"""是否使用流式执行Runnable。"""
def __init__(
self,
runnable: Runnable[Dict[str, Any], Union[AgentAction, AgentFinish]],
input_keys_arg: Optional[List[str]] = None,
stream_runnable: bool = True,
):
"""初始化RunnableAgent。
Args:
runnable: 执行推理的Runnable。
input_keys_arg: 输入键列表。
stream_runnable: 是否使用流式执行。
"""
super().__init__()
self.runnable = runnable
self.input_keys_arg = input_keys_arg or []
self.stream_runnable = stream_runnable
@property
def input_keys(self) -> List[str]:
"""返回输入键。"""
return self.input_keys_arg
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""基于过去历史和当前输入,决定要做什么。
Args:
intermediate_steps: LLM到目前为止采取的步骤,以及观察结果。
callbacks: 要运行的回调。
**kwargs: 用户输入。
Returns:
指定要使用什么工具的动作。
"""
inputs = {**kwargs, "intermediate_steps": intermediate_steps}
final_output: Any = None
if self.stream_runnable:
# 使用流式处理确保底层LLM以流式方式调用
# 这使得在使用Agent Executor的stream_log时
# 可以访问单个LLM token
# 因为来自plan的响应不是生成器,我们需要
# 将输出累积到final_output中并返回
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks})
return final_output
async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""异步规划方法。"""
inputs = {**kwargs, "intermediate_steps": intermediate_steps}
final_output: Any = None
if self.stream_runnable:
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = await self.runnable.ainvoke(
inputs, config={"callbacks": callbacks}
)
return final_output
def get_allowed_tools(self) -> Optional[List[str]]:
"""获取允许的工具列表。"""
return None
@property
def return_values(self) -> List[str]:
"""返回值列表。"""
return ["output"]
def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""当Agent因为达到最大迭代次数而停止时返回响应。
Args:
early_stopping_method: 早停方法。
intermediate_steps: 中间步骤。
**kwargs: 额外参数。
Returns:
AgentFinish对象。
"""
if early_stopping_method == "force":
return AgentFinish(
{"output": "Agent因达到最大迭代次数而停止。"}, ""
)
elif early_stopping_method == "generate":
# 尝试让Agent生成最终答案
try:
final_output = self.plan(
intermediate_steps,
**kwargs,
)
if isinstance(final_output, AgentFinish):
return final_output
else:
return AgentFinish(
{"output": "Agent无法生成最终答案。"}, ""
)
except Exception:
return AgentFinish(
{"output": "Agent在生成最终答案时出错。"}, ""
)
else:
raise ValueError(
f"不支持的早停方法: {early_stopping_method}。"
f"支持的方法: ['force', 'generate']"
)
def tool_run_logging_kwargs(self) -> Dict[str, Any]:
"""返回工具运行日志的关键字参数。"""
return {
"llm_prefix": "Thought: ",
"observation_prefix": "Observation: ",
}
4. Memory模块:状态管理系统
4.1 Memory继承层次
classDiagram
class BaseMemory {
<<abstract>>
+memory_variables: List[str]
+load_memory_variables(inputs) Dict[str, Any]
+save_context(inputs, outputs) None
+clear() None
}
class BaseChatMemory {
<<abstract>>
+chat_memory: BaseChatMessageHistory
+output_key: str
+input_key: str
+return_messages: bool
}
class ConversationBufferMemory {
+buffer: str
+human_prefix: str
+ai_prefix: str
}
class ConversationBufferWindowMemory {
+k: int
+buffer: List[BaseMessage]
}
class ConversationSummaryMemory {
+llm: BaseLanguageModel
+prompt: BasePromptTemplate
+buffer: str
}
class ConversationSummaryBufferMemory {
+llm: BaseLanguageModel
+max_token_limit: int
+buffer: List[BaseMessage]
}
class VectorStoreRetrieverMemory {
+retriever: VectorStoreRetriever
+memory_key: str
}
BaseMemory <|-- BaseChatMemory
BaseChatMemory <|-- ConversationBufferMemory
BaseChatMemory <|-- ConversationBufferWindowMemory
BaseChatMemory <|-- ConversationSummaryMemory
BaseChatMemory <|-- ConversationSummaryBufferMemory
BaseMemory <|-- VectorStoreRetrieverMemory
4.2 BaseChatMemory:聊天记忆基类
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from langchain_core.memory import BaseMemory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.chat_history import BaseChatMessageHistory
class BaseChatMemory(BaseMemory, ABC):
"""聊天记忆的抽象基类。
聊天记忆专门用于存储和管理对话历史,支持:
1. 消息格式的对话存储
2. 灵活的输入输出键配置
3. 消息和字符串格式的转换
4. 对话历史的持久化
"""
chat_memory: BaseChatMessageHistory
"""存储聊天消息的对象。"""
output_key: Optional[str] = None
"""用于从链输出中提取AI消息的键。如果为None,假定只有一个输出键。"""
input_key: Optional[str] = None
"""用于从链输入中提取人类消息的键。如果为None,假定只有一个输入键。"""
return_messages: bool = False
"""是否返回消息列表而不是字符串。"""
def __init__(
self,
chat_memory: Optional[BaseChatMessageHistory] = None,
output_key: Optional[str] = None,
input_key: Optional[str] = None,
return_messages: bool = False,
):
"""初始化聊天记忆。
Args:
chat_memory: 聊天消息历史存储。
output_key: 输出键。
input_key: 输入键。
return_messages: 是否返回消息格式。
"""
super().__init__()
if chat_memory is None:
from langchain_core.chat_history import InMemoryChatMessageHistory
chat_memory = InMemoryChatMessageHistory()
self.chat_memory = chat_memory
self.output_key = output_key
self.input_key = input_key
self.return_messages = return_messages
def _get_input_output(
self, inputs: Dict[str, Any], outputs: Dict[str, str]
) -> Tuple[str, str]:
"""从输入输出字典中提取输入和输出字符串。
Args:
inputs: 输入字典。
outputs: 输出字典。
Returns:
(输入字符串, 输出字符串) 元组。
Raises:
ValueError: 如果无法确定输入或输出键。
"""
# 确定输入键
if self.input_key is None:
prompt_input_keys = list(inputs.keys())
if len(prompt_input_keys) != 1:
raise ValueError(
f"一个输入键期望得到,但得到 {prompt_input_keys}。"
f"指定input_key以避免此错误。"
)
input_key = prompt_input_keys[0]
else:
input_key = self.input_key
# 确定输出键
if self.output_key is None:
if len(outputs) != 1:
raise ValueError(
f"一个输出键期望得到,但得到 {list(outputs.keys())}。"
f"指定output_key以避免此错误。"
)
output_key = list(outputs.keys())[0]
else:
output_key = self.output_key
return inputs[input_key], outputs[output_key]
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""保存对话上下文到记忆中。
Args:
inputs: 链输入字典。
outputs: 链输出字典。
"""
input_str, output_str = self._get_input_output(inputs, outputs)
# 添加消息到聊天历史
self.chat_memory.add_user_message(input_str)
self.chat_memory.add_ai_message(output_str)
def clear(self) -> None:
"""清除记忆内容。"""
self.chat_memory.clear()
@property
@abstractmethod
def memory_variables(self) -> List[str]:
"""此记忆类将添加到链输入中的字符串键。"""
@abstractmethod
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""从记忆中加载变量。"""
4.3 ConversationBufferMemory:缓冲区记忆
class ConversationBufferMemory(BaseChatMemory):
"""简单的对话缓冲区记忆。
这种记忆类型简单地存储所有对话消息,并在需要时返回它们。
适用于短对话或不关心上下文长度的场景。
特点:
1. 完整保存:保存所有对话历史
2. 格式灵活:支持字符串和消息格式
3. 简单高效:没有额外的处理开销
4. 内存占用:随对话长度线性增长
"""
human_prefix: str = "Human"
"""人类消息的前缀。"""
ai_prefix: str = "AI"
"""AI消息的前缀。"""
memory_key: str = "history"
"""记忆在链输入中的键名。"""
@property
def buffer(self) -> Any:
"""返回缓冲区内容。"""
return self.chat_memory.messages
@property
def memory_variables(self) -> List[str]:
"""返回记忆变量列表。"""
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""从记忆中加载变量。
Args:
inputs: 链输入字典(未使用,但保持接口一致性)。
Returns:
包含记忆变量的字典。
"""
messages = self.chat_memory.messages
if self.return_messages:
# 返回消息列表
return {self.memory_key: messages}
else:
# 返回格式化的字符串
buffer = self._messages_to_string(messages)
return {self.memory_key: buffer}
def _messages_to_string(self, messages: List[BaseMessage]) -> str:
"""将消息列表转换为字符串。
Args:
messages: 消息列表。
Returns:
格式化的字符串。
"""
string_messages = []
for message in messages:
if isinstance(message, HumanMessage):
role = self.human_prefix
elif isinstance(message, AIMessage):
role = self.ai_prefix
else:
role = message.__class__.__name__
string_messages.append(f"{role}: {message.content}")
return "\n".join(string_messages)
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""保存上下文到记忆。"""
super().save_context(inputs, outputs)
def clear(self) -> None:
"""清除记忆。"""
super().clear()
class ConversationBufferWindowMemory(BaseChatMemory):
"""滑动窗口对话记忆。
这种记忆类型只保留最近的k轮对话,适用于需要控制上下文长度
但又要保持一定对话连续性的场景。
特点:
1. 固定窗口:只保留最近k轮对话
2. 自动清理:自动删除过旧的消息
3. 内存可控:内存占用有上限
4. 连续性:保持最近对话的连续性
"""
k: int = 5
"""要保留的对话轮数。"""
memory_key: str = "history"
"""记忆在链输入中的键名。"""
@property
def buffer(self) -> List[BaseMessage]:
"""返回缓冲区内容(最近k轮对话)。"""
return self.chat_memory.messages[-2 * self.k :] if self.k > 0 else []
@property
def memory_variables(self) -> List[str]:
"""返回记忆变量列表。"""
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""从记忆中加载变量。"""
messages = self.buffer
if self.return_messages:
return {self.memory_key: messages}
else:
buffer = self._messages_to_string(messages)
return {self.memory_key: buffer}
def _messages_to_string(self, messages: List[BaseMessage]) -> str:
"""将消息列表转换为字符串。"""
string_messages = []
for message in messages:
if isinstance(message, HumanMessage):
role = "Human"
elif isinstance(message, AIMessage):
role = "AI"
else:
role = message.__class__.__name__
string_messages.append(f"{role}: {message.content}")
return "\n".join(string_messages)
class ConversationSummaryMemory(BaseChatMemory):
"""对话摘要记忆。
这种记忆类型使用LLM对对话历史进行摘要,适用于长对话场景。
通过摘要可以在有限的上下文中保留更多的对话信息。
特点:
1. 智能压缩:使用LLM生成对话摘要
2. 信息保留:在压缩的同时保留关键信息
3. 上下文控制:有效控制上下文长度
4. 计算开销:需要额外的LLM调用
"""
llm: BaseLanguageModel
"""用于生成摘要的语言模型。"""
prompt: BasePromptTemplate = SUMMARY_PROMPT
"""用于生成摘要的提示模板。"""
memory_key: str = "history"
"""记忆在链输入中的键名。"""
buffer: str = ""
"""当前的摘要缓冲区。"""
def __init__(
self,
llm: BaseLanguageModel,
chat_memory: Optional[BaseChatMessageHistory] = None,
prompt: BasePromptTemplate = SUMMARY_PROMPT,
memory_key: str = "history",
return_messages: bool = False,
**kwargs: Any,
):
"""初始化摘要记忆。
Args:
llm: 用于生成摘要的语言模型。
chat_memory: 聊天消息历史。
prompt: 摘要提示模板。
memory_key: 记忆键名。
return_messages: 是否返回消息格式。
**kwargs: 额外参数。
"""
super().__init__(
chat_memory=chat_memory,
return_messages=return_messages,
**kwargs,
)
self.llm = llm
self.prompt = prompt
self.memory_key = memory_key
@property
def memory_variables(self) -> List[str]:
"""返回记忆变量列表。"""
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""从记忆中加载变量。"""
if self.return_messages:
# 如果返回消息格式,需要将摘要转换为消息
if self.buffer:
messages = [AIMessage(content=f"Previous conversation summary: {self.buffer}")]
else:
messages = []
return {self.memory_key: messages}
else:
return {self.memory_key: self.buffer}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""保存上下文并更新摘要。"""
# 首先保存到聊天历史
super().save_context(inputs, outputs)
# 然后更新摘要
self._update_summary()
def _update_summary(self) -> None:
"""更新对话摘要。"""
messages = self.chat_memory.messages
if not messages:
return
# 将消息转换为字符串
new_lines = self._messages_to_string(messages)
# 生成新的摘要
if self.buffer:
# 如果已有摘要,则基于现有摘要和新对话生成更新的摘要
summary_input = {
"summary": self.buffer,
"new_lines": new_lines,
}
else:
# 如果没有摘要,则基于所有对话生成初始摘要
summary_input = {
"summary": "",
"new_lines": new_lines,
}
# 调用LLM生成摘要
self.buffer = self.llm.predict(**summary_input)
# 清除聊天历史(因为已经摘要了)
self.chat_memory.clear()
def _messages_to_string(self, messages: List[BaseMessage]) -> str:
"""将消息列表转换为字符串。"""
string_messages = []
for message in messages:
if isinstance(message, HumanMessage):
role = "Human"
elif isinstance(message, AIMessage):
role = "AI"
else:
role = message.__class__.__name__
string_messages.append(f"{role}: {message.content}")
return "\n".join(string_messages)
def clear(self) -> None:
"""清除记忆。"""
super().clear()
self.buffer = ""
# 摘要提示模板
SUMMARY_PROMPT = PromptTemplate(
input_variables=["summary", "new_lines"],
template="""逐步总结所提供的对话内容,将新内容添加到之前的摘要中,返回一个新的摘要。
示例:
当前摘要:
人类询问AI对人工智能的看法。AI认为人工智能是一种有益的技术。
新的对话内容:
Human: 为什么你认为人工智能是有益的?
AI: 因为人工智能可以帮助人类解决复杂问题并提高效率。
新摘要:
人类询问AI对人工智能的看法。AI认为人工智能是一种有益的技术,因为它可以帮助人类解决复杂问题并提高效率。
结束示例
当前摘要:
{summary}
新的对话内容:
{new_lines}
新摘要:""",
)
5. 实际应用案例与最佳实践
5.1 企业级Agent系统实现
文章的实践经验,以下是一个完整的企业级Agent系统:
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import BaseTool, tool
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain.memory import ConversationBufferWindowMemory
from typing import Dict, Any, List, Optional
import json
import logging
class EnterpriseAgentSystem:
"""企业级Agent系统实现
https://zeeklog.com/langchain-yuan-ma-zhu-xing-jie-mi-zhi-xi-tong-yi--2/
"""
def __init__(self, model_name: str = "gpt-3.5-turbo"):
self.llm = ChatOpenAI(model_name=model_name, temperature=0.1)
self.memory = ConversationBufferWindowMemory(
memory_key="chat_history",
return_messages=True,
k=10 # 保留最近10轮对话
)
self.tools = self._create_enterprise_tools()
self.agent_executor = self._create_agent_executor()
self.logger = logging.getLogger(__name__)
def _create_enterprise_tools(self) -> List[BaseTool]:
"""创建企业级工具集"""
@tool
def database_query(query: str) -> str:
"""执行数据库查询
Args:
query: SQL查询语句
Returns:
查询结果的JSON字符串
"""
# 安全的SQL查询实现
try:
if "SELECT" in query.upper():
return json.dumps({
"status": "success",
"data": [{"id": 1, "name": "示例数据"}],
"count": 1
})
else:
return json.dumps({
"status": "error",
"message": "只支持SELECT查询"
})
except Exception as e:
return json.dumps({
"status": "error",
"message": str(e)
})
@tool
def api_call(endpoint: str, method: str = "GET", data: str = "{}") -> str:
"""调用外部API
Args:
endpoint: API端点URL
method: HTTP方法
data: 请求数据(JSON字符串)
Returns:
API响应的JSON字符串
"""
import requests
from urllib.parse import urlparse
try:
# 安全检查:只允许特定域名
allowed_domains = ["api.example.com", "internal-api.company.com"]
domain = urlparse(endpoint).netloc
if domain not in allowed_domains:
return json.dumps({
"status": "error",
"message": f"不允许访问域名: {domain}"
})
# 执行API调用
if method.upper() == "GET":
response = requests.get(endpoint, timeout=10)
elif method.upper() == "POST":
response = requests.post(
endpoint,
json=json.loads(data),
timeout=10
)
else:
return json.dumps({
"status": "error",
"message": f"不支持的HTTP方法: {method}"
})
return json.dumps({
"status": "success",
"data": response.json(),
"status_code": response.status_code
})
except Exception as e:
return json.dumps({
"status": "error",
"message": str(e)
})
return [database_query, api_call]
def _create_agent_executor(self) -> AgentExecutor:
"""创建Agent执行器"""
# 自定义ReAct提示模板
react_prompt = PromptTemplate.from_template("""
你是一个专业的AI助手,能够使用各种工具来帮助用户完成任务。
可用工具:
{tools}
工具名称:{tool_names}
请使用以下格式进行推理:
Question: 用户的问题
Thought: 我需要思考如何解决这个问题
Action: 选择要使用的工具
Action Input: 工具的输入参数
Observation: 工具的执行结果
... (这个Thought/Action/Action Input/Observation过程可以重复多次)
Thought: 我现在知道最终答案了
Final Answer: 对用户问题的最终回答
开始!
Question: {input}
Thought: {agent_scratchpad}
""")
# 创建ReAct Agent
agent = create_react_agent(
llm=self.llm,
tools=self.tools,
prompt=react_prompt
)
# 创建Agent执行器
agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
memory=self.memory,
verbose=True,
max_iterations=10,
max_execution_time=60,
early_stopping_method="generate",
handle_parsing_errors=True
)
return agent_executor
def execute_task(self, task: str) -> Dict[str, Any]:
"""执行任务"""
try:
self.logger.info(f"开始执行任务: {task}")
result = self.agent_executor.invoke({
"input": task
})
self.logger.info(f"任务执行完成: {result.get('output', '')}")
return {
"status": "success",
"result": result.get("output", ""),
"intermediate_steps": result.get("intermediate_steps", [])
}
except Exception as e:
self.logger.error(f"任务执行失败: {str(e)}")
return {
"status": "error",
"message": str(e)
}
# 使用示例
def create_enterprise_agent_demo():
"""企业级Agent系统使用示例"""
# 初始化系统
agent_system = EnterpriseAgentSystem()
# 测试任务
test_tasks = [
"查询数据库中的用户信息",
"调用API获取天气信息",
"分析最近的销售数据趋势"
]
for task in test_tasks:
print(f"执行任务: {task}")
result = agent_system.execute_task(task)
if result["status"] == "success":
print(f"结果: {result['result']}")
else:
print(f"错误: {result['message']}")
if __name__ == "__main__":
create_enterprise_agent_demo()
5.2 高级Memory管理策略
class AdvancedMemoryManager:
"""高级记忆管理器
https://github.com/taishan1994/langchain-learning
"""
def __init__(self):
self.memory_strategies = {}
self.active_strategy = None
def create_hybrid_memory(self, vectorstore, llm):
"""创建混合记忆系统"""
from langchain.memory import (
ConversationBufferMemory,
ConversationSummaryMemory,
VectorStoreRetrieverMemory,
CombinedMemory
)
# 1. 缓冲记忆 - 保存最近对话
buffer_memory = ConversationBufferMemory(
memory_key="recent_chat_history",
input_key="input",
output_key="output",
return_messages=True
)
# 2. 摘要记忆 - 压缩历史对话
summary_memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_summary",
input_key="input",
output_key="output",
return_messages=False
)
# 3. 向量记忆 - 语义检索历史
vector_memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 3}),
memory_key="relevant_history",
input_key="input"
)
# 4. 组合记忆
combined_memory = CombinedMemory(
memories=[buffer_memory, summary_memory, vector_memory]
)
return combined_memory
def create_adaptive_memory(self, llm, max_token_limit: int = 4000):
"""创建自适应记忆系统"""
from langchain.memory import ConversationTokenBufferMemory
class AdaptiveTokenMemory(ConversationTokenBufferMemory):
"""自适应Token记忆"""
def __init__(self, llm, max_token_limit, **kwargs):
super().__init__(llm=llm, max_token_limit=max_token_limit, **kwargs)
self.compression_ratio = 0.7 # 压缩比例
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""保存上下文,自动压缩"""
super().save_context(inputs, outputs)
# 检查是否需要压缩
current_tokens = self.llm.get_num_tokens(self.buffer)
if current_tokens > self.max_token_limit * 0.9: # 90%阈值
self._compress_memory()
def _compress_memory(self):
"""压缩记忆内容"""
messages = self.chat_memory.messages
if len(messages) > 4: # 保留最近2轮对话
# 获取需要压缩的消息
to_compress = messages[:-4]
to_keep = messages[-4:]
# 生成摘要
if to_compress:
compress_text = self._messages_to_string(to_compress)
summary_prompt = f"""
请将以下对话内容压缩为简洁的摘要,保留关键信息:
{compress_text}
摘要:
"""
summary = self.llm.predict(summary_prompt)
# 清空并重新构建记忆
self.chat_memory.clear()
# 添加摘要作为系统消息
from langchain_core.messages import SystemMessage
self.chat_memory.add_message(
SystemMessage(content=f"历史对话摘要:{summary}")
)
# 添加保留的消息
for message in to_keep:
self.chat_memory.add_message(message)
def _messages_to_string(self, messages):
"""将消息转换为字符串"""
result = []
for message in messages:
role = message.__class__.__name__.replace("Message", "")
result.append(f"{role}: {message.content}")
return "\n".join(result)
return AdaptiveTokenMemory(llm=llm, max_token_limit=max_token_limit)
# 使用示例
def demo_advanced_memory():
"""高级记忆管理演示"""
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
# 初始化组件
llm = ChatOpenAI(temperature=0.1)
embeddings = OpenAIEmbeddings()
# 创建向量存储(用于演示)
texts = ["这是一些示例文本", "用于演示向量记忆功能"]
vectorstore = FAISS.from_texts(texts, embeddings)
# 创建记忆管理器
memory_manager = AdvancedMemoryManager()
# 测试混合记忆
hybrid_memory = memory_manager.create_hybrid_memory(vectorstore, llm)
# 测试自适应记忆
adaptive_memory = memory_manager.create_adaptive_memory(llm, max_token_limit=2000)
print("高级记忆系统创建完成")
# 模拟对话
test_conversations = [
{"input": "你好,我想了解人工智能", "output": "你好!我很乐意为你介绍人工智能的相关知识。"},
{"input": "AI有哪些应用领域?", "output": "AI在医疗、金融、教育、交通等多个领域都有广泛应用。"},
{"input": "机器学习和深度学习有什么区别?", "output": "机器学习是更广泛的概念,深度学习是机器学习的一个子集。"}
]
# 测试自适应记忆
for conv in test_conversations:
adaptive_memory.save_context({"input": conv["input"]}, {"output": conv["output"]})
# 查看记忆内容
memory_vars = adaptive_memory.load_memory_variables({})
print(f"记忆内容: {memory_vars}")
if __name__ == "__main__":
demo_advanced_memory()
6. 总结
6.1 LangChain主库设计优势
- Chain执行机制:完整的生命周期管理和错误处理
- Agent推理循环:灵活的工具集成和决策机制
- Memory状态管理:多样化的记忆策略和持久化支持
- 组件生态:丰富的预构建组件和集成
6.2 核心设计模式总结
mindmap
root((LangChain主库设计模式))
模板方法模式
Chain执行流程
Agent推理循环
Memory管理流程
策略模式
不同Memory策略
Agent停止策略
错误处理策略
观察者模式
回调机制
事件通知
状态监控
组合模式
Chain组合
Tool组合
Memory组合
工厂模式
Agent创建
Memory创建
Tool创建
通过深入理解LangChain主库的设计和实现,开发者能够更好地构建复杂的AI应用,充分利用Chain、Agent和Memory等核心组件的强大功能。