概述
本文档提供LangChain框架的完整使用手册和深度源码分析,帮助开发者从入门到精通掌握整个项目的代码逻辑。
1. LangChain框架使用手册
1.1 快速入门
1.1.1 安装与配置
# 安装核心包
pip install langchain-core
# 安装主要集成包
pip install langchain-openai langchain-anthropic langchain-community
# 安装完整包(包含所有组件)
pip install langchain
1.1.2 基础使用示例
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 1. 创建LLM实例
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)
# 2. 创建简单链
chain = llm | StrOutputParser()
# 3. 调用链
result = chain.invoke([HumanMessage(content="Hello, world!")])
print(result)
1.2 核心概念
1.2.1 Runnable接口
所有LangChain组件都实现了Runnable
接口,提供统一的执行模式:
from langchain_core.runnables import Runnable
from typing import Any, Dict, List, Optional
class MyCustomRunnable(Runnable):
"""自定义Runnable组件示例"""
def invoke(self, input: Any, config: Optional[Dict] = None) -> Any:
"""同步执行"""
return f"Processed: {input}"
async def ainvoke(self, input: Any, config: Optional[Dict] = None) -> Any:
"""异步执行"""
return f"Async processed: {input}"
def stream(self, input: Any, config: Optional[Dict] = None):
"""流式执行"""
for char in str(input):
yield char
def batch(self, inputs: List[Any], config: Optional[Dict] = None) -> List[Any]:
"""批量执行"""
return [self.invoke(inp, config) for inp in inputs]
1.2.2 链式组合
使用|
操作符进行链式组合:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 创建提示模板
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
# 创建LLM
llm = ChatOpenAI()
# 创建输出解析器
output_parser = StrOutputParser()
# 链式组合
chain = prompt | llm | output_parser
# 执行
result = chain.invoke({"topic": "cats"})
1.3 主要组件API概览
1.3.1 语言模型API
# Chat Models
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# 基础LLM
from langchain_openai import OpenAI
# 嵌入模型
from langchain_openai import OpenAIEmbeddings
1.3.2 提示模板API
from langchain_core.prompts import (
PromptTemplate,
ChatPromptTemplate,
FewShotPromptTemplate,
MessagesPlaceholder
)
# 简单模板
template = PromptTemplate.from_template("Tell me about {topic}")
# 聊天模板
chat_template = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant"),
("human", "{input}")
])
1.3.3 输出解析器API
from langchain_core.output_parsers import (
StrOutputParser,
JsonOutputParser,
PydanticOutputParser
)
from pydantic import BaseModel
class Person(BaseModel):
name: str
age: int
# 结构化输出解析
parser = PydanticOutputParser(pydantic_object=Person)
2. 核心API深入分析
2.1 Runnable接口源码分析
2.1.1 核心接口定义
# langchain_core/runnables/base.py
from abc import ABC, abstractmethod
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Union
class Runnable(Generic[Input, Output], ABC):
"""LangChain的核心抽象接口,所有组件的基类"""
@abstractmethod
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Output:
"""
同步执行单个输入
Args:
input: 输入数据
config: 运行时配置,包含callbacks、tags、metadata等
**kwargs: 额外参数
Returns:
处理后的输出
"""
...
async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Output:
"""
异步执行单个输入
默认实现:在线程池中执行同步invoke方法
"""
return await asyncio.get_event_loop().run_in_executor(
None, partial(self.invoke, input, config, **kwargs)
)
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Iterator[Output]:
"""
流式执行,返回迭代器
默认实现:一次性返回invoke结果
"""
yield self.invoke(input, config, **kwargs)
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Any
) -> List[Output]:
"""
批量执行多个输入
Args:
inputs: 输入列表
config: 配置,可以是单个配置或配置列表
return_exceptions: 是否返回异常而不是抛出
Returns:
输出列表
"""
if not inputs:
return []
# 处理配置
configs = self._get_config_list(config, len(inputs))
# 并发执行
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.invoke, input, config, **kwargs)
for input, config in zip(inputs, configs)
]
results = []
for future in futures:
try:
results.append(future.result())
except Exception as e:
if return_exceptions:
results.append(e)
else:
raise e
return results
2.1.2 链式组合实现
# langchain_core/runnables/base.py
class Runnable(Generic[Input, Output], ABC):
def __or__(self, other: Runnable[Any, Other]) -> RunnableSequence[Input, Other]:
"""
实现 | 操作符,创建序列链
Args:
other: 下一个Runnable组件
Returns:
RunnableSequence实例
"""
return RunnableSequence(first=self, last=other)
def __ror__(self, other: Runnable[Other, Input]) -> RunnableSequence[Other, Output]:
"""
实现反向 | 操作符
"""
return RunnableSequence(first=other, last=self)
class RunnableSequence(Runnable[Input, Output]):
"""序列执行多个Runnable组件"""
def __init__(
self,
*steps: Runnable[Any, Any],
first: Optional[Runnable[Input, Any]] = None,
middle: Optional[List[Runnable[Any, Any]]] = None,
last: Optional[Runnable[Any, Output]] = None,
name: Optional[str] = None
):
"""
初始化序列
Args:
steps: 步骤列表
first: 第一个步骤
middle: 中间步骤列表
last: 最后一个步骤
name: 序列名称
"""
if steps and (first is not None or last is not None):
raise ValueError("Cannot specify both steps and first/last")
if steps:
self.steps = list(steps)
else:
self.steps = []
if first is not None:
self.steps.append(first)
if middle:
self.steps.extend(middle)
if last is not None:
self.steps.append(last)
self.name = name
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any
) -> Output:
"""
顺序执行所有步骤
每个步骤的输出作为下个步骤的输入
"""
# 配置回调管理
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=self.name
)
try:
# 逐步执行
for i, step in enumerate(self.steps):
input = step.invoke(
input,
patch_config(
config,
callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
**kwargs
)
# 记录结束
run_manager.on_chain_end(input)
return input
except Exception as e:
run_manager.on_chain_error(e)
raise e
2.2 Chain组件源码分析
2.2.1 LLMChain实现
# langchain/chains/llm.py
from langchain_core.chains import Chain
from langchain_core.language_models import BaseLanguageModel
from langchain_core.prompts import BasePromptTemplate
from langchain_core.output_parsers import BaseOutputParser
class LLMChain(Chain):
"""
最基础的LLM调用链
将Prompt模板、LLM和输出解析器组合成一个可执行的链
"""
llm: BaseLanguageModel
"""语言模型实例"""
prompt: BasePromptTemplate
"""提示模板"""
output_parser: Optional[BaseOutputParser] = None
"""输出解析器,可选"""
output_key: str = "text"
"""输出键名"""
@property
def input_keys(self) -> List[str]:
"""返回输入键列表"""
return self.prompt.input_variables
@property
def output_keys(self) -> List[str]:
"""返回输出键列表"""
return [self.output_key]
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""
执行LLM链的核心逻辑
Args:
inputs: 输入字典,包含prompt模板所需的变量
run_manager: 回调管理器
Returns:
包含输出的字典
"""
# 1. 格式化Prompt
prompt_value = self.prompt.format_prompt(**inputs)
# 2. 调用LLM生成
response = self.llm.generate_prompt(
[prompt_value],
callbacks=run_manager.get_child() if run_manager else None
)
# 3. 提取生成文本
if self.llm._llm_type == "chat":
# Chat模型返回AIMessage
generated_text = response.generations[0][0].message.content
else:
# 文本模型返回字符串
generated_text = response.generations[0][0].text
# 4. 解析输出
if self.output_parser:
try:
parsed_output = self.output_parser.parse(generated_text)
return {self.output_key: parsed_output}
except Exception as e:
# 解析失败时的处理
if run_manager:
run_manager.on_text(f"Output parsing failed: {e}")
return {self.output_key: generated_text}
else:
return {self.output_key: generated_text}
async def _acall(
self,
inputs: Dict[str, Any],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""异步版本的_call方法"""
# 1. 格式化Prompt
prompt_value = self.prompt.format_prompt(**inputs)
# 2. 异步调用LLM
response = await self.llm.agenerate_prompt(
[prompt_value],
callbacks=run_manager.get_child() if run_manager else None
)
# 3. 处理输出(同步版本相同逻辑)
generated_text = response.generations[0][0].text
if self.output_parser:
parsed_output = self.output_parser.parse(generated_text)
return {self.output_key: parsed_output}
else:
return {self.output_key: generated_text}
def predict(self, **kwargs: Any) -> str:
"""
便捷方法:直接返回生成的文本
Args:
**kwargs: 输入变量
Returns:
生成的文本
"""
return self(kwargs)[self.output_key]
async def apredict(self, **kwargs: Any) -> str:
"""predict的异步版本"""
result = await self.acall(kwargs)
return result[self.output_key]
2.2.2 SequentialChain实现
# langchain/chains/sequential.py
class SequentialChain(Chain):
"""
顺序执行多个Chain的组合链
每个Chain的输出会作为下个Chain的输入
"""
chains: List[Chain]
"""Chain列表,按顺序执行"""
input_variables: List[str]
"""整个序列的输入变量"""
output_variables: List[str]
"""整个序列的输出变量"""
return_all: bool = False
"""是否返回所有中间结果"""
@property
def input_keys(self) -> List[str]:
return self.input_variables
@property
def output_keys(self) -> List[str]:
return self.output_variables
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""
顺序执行所有Chain
Args:
inputs: 初始输入
run_manager: 回调管理器
Returns:
最终输出字典
"""
known_values = inputs.copy()
# 逐个执行Chain
for i, chain in enumerate(self.chains):
# 1. 准备当前Chain的输入
chain_inputs = {
k: known_values[k]
for k in chain.input_keys
if k in known_values
}
# 2. 检查输入完整性
missing_keys = set(chain.input_keys) - set(chain_inputs.keys())
if missing_keys:
raise ValueError(
f"Chain {i} missing required inputs: {missing_keys}"
)
# 3. 执行Chain
outputs = chain(
chain_inputs,
callbacks=run_manager.get_child(f"step_{i}") if run_manager else None
)
# 4. 更新已知值
known_values.update(outputs)
# 5. 返回指定的输出变量
if self.return_all:
return known_values
else:
return {k: known_values[k] for k in self.output_variables}
def _validate_chains(self) -> None:
"""
验证Chain序列的有效性
检查:
1. 输入输出变量的连接性
2. 没有循环依赖
3. 所有必需的变量都能被提供
"""
available_vars = set(self.input_variables)
for i, chain in enumerate(self.chains):
# 检查当前Chain的输入是否都可用
missing_inputs = set(chain.input_keys) - available_vars
if missing_inputs:
raise ValueError(
f"Chain {i} ({chain.__class__.__name__}) "
f"missing inputs: {missing_inputs}. "
f"Available: {available_vars}"
)
# 添加当前Chain的输出到可用变量
available_vars.update(chain.output_keys)
# 检查最终输出是否都可用
missing_outputs = set(self.output_variables) - available_vars
if missing_outputs:
raise ValueError(
f"Output variables {missing_outputs} not produced by any chain"
)
2.3 Agent系统源码分析
2.3.1 AgentExecutor核心实现
# langchain/agents/agent.py
from langchain_core.agents import BaseSingleActionAgent, AgentAction, AgentFinish
from langchain_core.tools import BaseTool
class AgentExecutor(Chain):
"""
Agent执行器 - 实现推理-行动循环
核心功能:
1. 管理Agent的执行循环
2. 工具调用和结果处理
3. 错误处理和重试机制
4. 执行限制和安全控制
"""
agent: BaseSingleActionAgent
"""Agent实例,负责决策"""
tools: Sequence[BaseTool]
"""可用工具列表"""
max_iterations: int = 15
"""最大迭代次数"""
max_execution_time: Optional[float] = None
"""最大执行时间(秒)"""
early_stopping_method: str = "force"
"""早停方法:force或generate"""
handle_parsing_errors: Union[bool, str, Callable[[OutputParserException], str]] = False
"""解析错误处理方式"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 构建工具映射
self.name_to_tool_map = {tool.name: tool for tool in self.tools}
@property
def input_keys(self) -> List[str]:
return self.agent.input_keys
@property
def output_keys(self) -> List[str]:
return self.agent.return_values
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""
执行Agent的推理-行动循环
Args:
inputs: 输入字典,通常包含'input'键
run_manager: 回调管理器
Returns:
Agent的最终输出
"""
# 初始化状态
intermediate_steps: List[Tuple[AgentAction, str]] = []
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# 主循环
while self._should_continue(iterations, time_elapsed):
try:
# 1. Agent规划下一步行动
next_step_output = self._take_next_step(
name_to_tool_map=self.name_to_tool_map,
color_mapping={},
inputs=inputs,
intermediate_steps=intermediate_steps,
run_manager=run_manager,
)
# 2. 检查是否结束
if isinstance(next_step_output, AgentFinish):
return self._return(
next_step_output,
intermediate_steps,
run_manager
)
# 3. 更新中间步骤
intermediate_steps.extend(next_step_output)
except Exception as e:
# 错误处理
if self.handle_parsing_errors:
error_msg = self._handle_parsing_error(e)
intermediate_steps.append(
(AgentAction("_Exception", error_msg, ""), error_msg)
)
else:
raise e
# 4. 更新计数器
iterations += 1
time_elapsed = time.time() - start_time
# 超出限制,强制结束
output = self._return(
AgentFinish(
{"output": "Agent stopped due to iteration limit or time limit."},
""
),
intermediate_steps,
run_manager,
)
return output
def _take_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, Any],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
"""
执行Agent的一个推理-行动步骤
Args:
name_to_tool_map: 工具名称到工具的映射
color_mapping: 颜色映射(用于日志)
inputs: 原始输入
intermediate_steps: 已执行的中间步骤
run_manager: 回调管理器
Returns:
AgentFinish(结束)或新的中间步骤列表
"""
try:
# 1. Agent决策
output = self.agent.plan(
intermediate_steps=intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
# 处理解析错误
if self.handle_parsing_errors:
text = self._handle_parsing_error(e)
output = AgentAction("_Exception", text, text)
else:
raise e
# 2. 检查输出类型
if isinstance(output, AgentFinish):
return output
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
# 3. 执行工具调用
result = []
for agent_action in actions:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
# 查找工具
tool = name_to_tool_map.get(agent_action.tool)
if tool is None:
observation = f"Error: Tool '{agent_action.tool}' not found."
else:
try:
# 执行工具
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color="blue",
callbacks=run_manager.get_child() if run_manager else None,
)
except Exception as e:
observation = f"Error running tool: {str(e)}"
result.append((agent_action, observation))
return result
def _should_continue(self, iterations: int, time_elapsed: float) -> bool:
"""
判断是否应该继续执行
Args:
iterations: 当前迭代次数
time_elapsed: 已用时间
Returns:
是否继续执行
"""
if iterations >= self.max_iterations:
return False
if self.max_execution_time and time_elapsed >= self.max_execution_time:
return False
return True
def _return(
self,
output: AgentFinish,
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""
处理Agent的最终返回
Args:
output: Agent的最终输出
intermediate_steps: 所有中间步骤
run_manager: 回调管理器
Returns:
格式化的输出字典
"""
if run_manager:
run_manager.on_agent_finish(output, color="green")
final_output = output.return_values
if self.return_intermediate_steps:
final_output["intermediate_steps"] = intermediate_steps
return final_output
2.3.2 ReAct Agent实现
# langchain/agents/react/agent.py
def create_react_agent(
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
prompt: ChatPromptTemplate,
tools_renderer: ToolsRenderer = render_text_description,
) -> Runnable:
"""
创建ReAct (Reasoning + Acting) Agent
ReAct模式:
1. Thought: 思考当前情况
2. Action: 选择要执行的行动
3. Observation: 观察行动结果
4. 重复直到得出最终答案
Args:
llm: 语言模型
tools: 可用工具列表
prompt: 提示模板,必须包含特定变量
tools_renderer: 工具描述渲染器
Returns:
可执行的Agent Runnable
"""
# 验证提示模板
missing_vars = {"tools", "tool_names", "agent_scratchpad"}.difference(
prompt.input_variables
)
if missing_vars:
raise ValueError(f"Prompt missing variables: {missing_vars}")
# 渲染工具信息
tools_str = tools_renderer(list(tools))
tool_names = ", ".join([tool.name for tool in tools])
# 部分填充提示模板
prompt = prompt.partial(
tools=tools_str,
tool_names=tool_names,
)
# 构建Agent链
agent = (
RunnablePassthrough.assign(
agent_scratchpad=lambda x: format_to_openai_function_messages(
x["intermediate_steps"]
)
)
| prompt
| llm
| ReActSingleActionAgent()
)
return agent
class ReActSingleActionAgent(BaseSingleActionAgent):
"""
ReAct单动作Agent
解析LLM输出,提取思考过程和下一步行动
"""
@property
def input_keys(self) -> List[str]:
return ["input", "intermediate_steps"]
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""
根据当前状态规划下一步行动
Args:
intermediate_steps: 已执行的步骤
callbacks: 回调函数
**kwargs: 其他参数,包含'input'
Returns:
下一个行动或结束信号
"""
# 构建agent_scratchpad
thoughts = ""
for action, observation in intermediate_steps:
thoughts += f"Thought: {action.log}\n"
thoughts += f"Action: {action.tool}\n"
thoughts += f"Action Input: {action.tool_input}\n"
thoughts += f"Observation: {observation}\n"
# 添加当前思考提示
if thoughts:
thoughts += "Thought:"
# 构建完整输入
full_input = kwargs.copy()
full_input["agent_scratchpad"] = thoughts
# 调用LLM
llm_output = self.llm_chain.run(callbacks=callbacks, **full_input)
# 解析输出
return self.output_parser.parse(llm_output)
class ReActOutputParser(AgentOutputParser):
"""
ReAct输出解析器
解析LLM的输出,提取Action和Action Input
"""
def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
"""
解析LLM输出
Args:
llm_output: LLM的原始输出
Returns:
解析后的AgentAction或AgentFinish
"""
# 检查是否包含Final Answer
if "Final Answer:" in llm_output:
return AgentFinish(
return_values={"output": llm_output.split("Final Answer:")[-1].strip()},
log=llm_output,
)
# 解析Action
regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
match = re.search(regex, llm_output, re.DOTALL)
if not match:
raise OutputParserException(f"Could not parse LLM output: `{llm_output}`")
action = match.group(1).strip()
action_input = match.group(2)
# 尝试解析JSON格式的action_input
try:
action_input = json.loads(action_input)
except json.JSONDecodeError:
# 如果不是JSON,保持原字符串
action_input = action_input.strip(' "')
return AgentAction(
tool=action,
tool_input=action_input,
log=llm_output
)
@property
def _type(self) -> str:
return "react"
3. 整体架构图和时序图
3.1 LangChain整体架构图
graph TB
subgraph "应用层 (Application Layer)"
A[用户应用] --> B[LangChain SDK]
B --> C[Chain 编排器]
B --> D[Agent 智能体]
B --> E[Tool 工具集]
end
subgraph "核心抽象层 (Core Abstractions)"
F[Runnable 接口] --> G[BaseChain]
F --> H[BaseLLM]
F --> I[BaseRetriever]
F --> J[BaseTool]
F --> K[BaseMemory]
end
subgraph "模型层 (Model Layer)"
L[LLM Models]
M[Chat Models]
N[Embedding Models]
O[Multimodal Models]
L --> P[OpenAI]
L --> Q[Anthropic]
L --> R[Local Models]
M --> S[ChatOpenAI]
M --> T[ChatAnthropic]
N --> U[OpenAIEmbeddings]
N --> V[HuggingFaceEmbeddings]
end
subgraph "工具生态 (Tool Ecosystem)"
W[Search Tools] --> W1[DuckDuckGo]
W --> W2[Tavily]
W --> W3[Google Search]
X[Database Tools] --> X1[SQL Database]
X --> X2[Vector Database]
X --> X3[Graph Database]
Y[API Tools] --> Y1[REST API]
Y --> Y2[GraphQL]
Y --> Y3[Custom Tools]
end
subgraph "记忆系统 (Memory System)"
Z[Memory Types] --> Z1[ConversationBufferMemory]
Z --> Z2[ConversationSummaryMemory]
Z --> Z3[VectorStoreRetrieverMemory]
Z --> Z4[EntityMemory]
end
subgraph "检索系统 (Retrieval System)"
AA[Document Loaders] --> AA1[PDF Loader]
AA --> AA2[Web Loader]
AA --> AA3[Database Loader]
BB[Text Splitters] --> BB1[RecursiveCharacterTextSplitter]
BB --> BB2[TokenTextSplitter]
BB --> BB3[SemanticSplitter]
CC[Vector Stores] --> CC1[FAISS]
CC --> CC2[Pinecone]
CC --> CC3[Weaviate]
CC --> CC4[Chroma]
DD[Retrievers] --> DD1[VectorStoreRetriever]
DD --> DD2[BM25Retriever]
DD --> DD3[HybridRetriever]
end
subgraph "可观测性 (Observability)"
EE[Callbacks] --> EE1[StdOutCallbackHandler]
EE --> EE2[FileCallbackHandler]
EE --> EE3[LangSmithCallbackHandler]
EE --> EE4[WandbCallbackHandler]
FF[Tracing] --> FF1[LangSmith]
FF --> FF2[LangFuse]
FF --> FF3[Custom Tracers]
GG[Metrics] --> GG1[Token Usage]
GG --> GG2[Latency]
GG --> GG3[Error Rate]
end
%% 连接关系
C --> F
D --> F
E --> F
G --> L
G --> M
H --> L
H --> M
I --> CC
J --> W
J --> X
J --> Y
K --> Z
D --> E
D --> K
C --> I
C --> K
AA --> BB
BB --> CC
CC --> DD
F --> EE
F --> FF
F --> GG
3.2 数据流时序图
sequenceDiagram
participant User as 用户
participant App as 应用层
participant Chain as Chain/Agent
participant LLM as 语言模型
participant Tool as 工具
participant Memory as 记忆
participant Retriever as 检索器
participant Callback as 回调系统
User->>App: 输入查询
App->>Chain: 创建执行链
Chain->>Memory: 加载历史上下文
Memory-->>Chain: 返回上下文
Chain->>Retriever: 检索相关文档
Retriever->>Retriever: 向量相似度搜索
Retriever-->>Chain: 返回相关文档
Chain->>LLM: 构造 Prompt + 上下文
LLM->>Callback: 触发 on_llm_start
LLM->>LLM: 模型推理
LLM->>Callback: 触发 on_llm_end
LLM-->>Chain: 返回模型输出
alt 需要工具调用
Chain->>Tool: 解析并调用工具
Tool->>Callback: 触发 on_tool_start
Tool->>Tool: 执行工具逻辑
Tool->>Callback: 触发 on_tool_end
Tool-->>Chain: 返回工具结果
Chain->>LLM: 基于工具结果再次推理
LLM-->>Chain: 返回最终答案
end
Chain->>Memory: 保存对话历史
Chain->>Callback: 触发 on_chain_end
Chain-->>App: 返回最终结果
App-->>User: 展示结果
3.3 模块交互图
graph LR
subgraph "Core Module"
A[Runnable Interface]
B[RunnableConfig]
C[CallbackManager]
end
subgraph "Chain Module"
D[LLMChain]
E[SequentialChain]
F[RouterChain]
end
subgraph "Agent Module"
G[AgentExecutor]
H[ReActAgent]
I[OpenAIFunctionsAgent]
end
subgraph "Tool Module"
J[BaseTool]
K[StructuredTool]
L[Tool Registry]
end
subgraph "Memory Module"
M[BaseMemory]
N[ConversationMemory]
O[VectorMemory]
end
subgraph "Retrieval Module"
P[BaseRetriever]
Q[VectorStoreRetriever]
R[MultiQueryRetriever]
end
A --> D
A --> E
A --> F
A --> G
A --> H
A --> I
D --> J
E --> J
G --> J
H --> J
I --> J
G --> M
H --> M
I --> M
D --> P
E --> P
C --> D
C --> E
C --> G
C --> H
C --> I
C --> J
<function_calls>