概述
LangChain是一个用于构建大语言模型(LLM)应用的框架,其核心设计理念是通过统一的抽象接口和声明式组合语法,让开发者能够快速构建复杂的AI应用。LangChain的架构设计,揭示其背后的技术细节和设计哲学。
1. LangChain整体架构
1.1 架构分层设计
LangChain采用分层架构,从底层抽象到上层应用形成清晰的层次结构:
graph TB
subgraph "应用层 - Application Layer"
A1[用户应用 User Applications]
A2[Agent系统 Agent Systems]
A3[复杂工作流 Complex Workflows]
end
subgraph "编排层 - Orchestration Layer"
B1[LangChain Expression Language]
B2[Chain组合 Chain Composition]
B3[Agent执行器 Agent Executors]
end
subgraph "抽象层 - Abstraction Layer"
C1[Runnable接口 Runnable Interface]
C2[统一调用协议 Universal Invocation]
C3[组合原语 Composition Primitives]
end
subgraph "组件层 - Component Layer"
D1[语言模型 Language Models]
D2[向量存储 Vector Stores]
D3[工具集成 Tool Integrations]
D4[记忆系统 Memory Systems]
D5[检索器 Retrievers]
end
subgraph "基础设施层 - Infrastructure Layer"
E1[回调系统 Callback System]
E2[配置管理 Configuration]
E3[序列化 Serialization]
E4[缓存机制 Caching]
E5[监控追踪 Monitoring & Tracing]
end
A1 --> B1
A2 --> B2
A3 --> B3
B1 --> C1
B2 --> C2
B3 --> C3
C1 --> D1
C2 --> D2
C3 --> D3
C1 --> D4
C2 --> D5
D1 --> E1
D2 --> E2
D3 --> E3
D4 --> E4
D5 --> E5
style C1 fill:#e1f5fe
style B1 fill:#f3e5f5
style E1 fill:#e8f5e8
1.2 核心包结构
LangChain生态系统由多个相互协作的包组成:
graph LR
subgraph "核心包 Core Packages"
LC[langchain-core<br/>核心抽象]
LM[langchain<br/>主要实现]
LTS[langchain-text-splitters<br/>文本分割]
LST[langchain-standard-tests<br/>标准测试]
end
subgraph "集成包 Integration Packages"
LO[langchain-openai<br/>OpenAI集成]
LA[langchain-anthropic<br/>Anthropic集成]
LH[langchain-huggingface<br/>HuggingFace集成]
LG[langchain-groq<br/>Groq集成]
LMI[langchain-mistralai<br/>Mistral集成]
LC_COM[langchain-community<br/>社区集成]
end
subgraph "工具包 Tool Packages"
CLI[langchain-cli<br/>命令行工具]
EXP[langchain-experimental<br/>实验性功能]
end
LC --> LM
LC --> LTS
LC --> LST
LC --> LO
LC --> LA
LC --> LH
LC --> LG
LC --> LMI
LC --> LC_COM
LM --> CLI
LM --> EXP
style LC fill:#ffeb3b
style LM fill:#4caf50
2. Runnable:统一抽象的核心
2.1 Runnable接口设计
Runnable是LangChain最重要的抽象,所有组件都实现这个接口:
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List, Iterator, AsyncIterator
Input = TypeVar('Input')
Output = TypeVar('Output')
class Runnable(ABC, Generic[Input, Output]):
"""可调用、可批处理、可流式处理、可转换和可组合的工作单元。
核心方法
========
- invoke/ainvoke: 将单个输入转换为输出
- batch/abatch: 高效地将多个输入转换为输出
- stream/astream: 从单个输入流式输出
- astream_log: 流式输出和选定的中间结果
内置优化
========
- 批处理: 默认使用线程池并行执行invoke()
- 异步: 带'a'前缀的方法是异步的,默认在asyncio线程池中执行同步版本
LCEL和组合
==========
LangChain表达式语言(LCEL)是组合Runnable的声明式方式。
主要组合原语是RunnableSequence和RunnableParallel。
"""
@abstractmethod
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""将单个输入转换为输出。
Args:
input: Runnable的输入
config: 调用Runnable时使用的配置,支持标准键如'tags'、'metadata'
用于追踪目的,'max_concurrency'用于控制并行工作量
Returns:
Runnable的输出
"""
async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""ainvoke的默认实现,从线程调用invoke。
默认实现允许使用异步代码,即使Runnable没有实现invoke的原生异步版本。
如果子类可以异步运行,应该重写此方法。
"""
return await run_in_executor(config, self.invoke, input, config, **kwargs)
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
"""批量处理输入列表。
Args:
inputs: 输入列表
config: 配置或配置列表
return_exceptions: 是否返回异常而不是抛出
Returns:
输出列表
"""
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""流式处理输出。
Args:
input: 单个输入
config: 配置
Yields:
输出块
"""
async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
"""异步流式处理输出。"""
2.2 Runnable的组合机制
LangChain通过操作符重载实现直观的组合语法:
class Runnable(ABC, Generic[Input, Output]):
def __or__(self, other: Runnable[Any, Other]) -> RunnableSequence[Input, Other]:
"""实现 | 操作符,创建序列组合。
Example:
chain = prompt | model | parser
"""
return RunnableSequence(first=self, last=other)
def __ror__(self, other: Runnable[Other, Input]) -> RunnableSequence[Other, Output]:
"""实现反向 | 操作符。"""
return RunnableSequence(first=other, last=self)
def __getitem__(self, key: str) -> RunnableBinding[Input, Output]:
"""实现索引操作,用于配置绑定。
Example:
configured_chain = chain.with_config({"temperature": 0.7})
"""
return RunnableBinding(bound=self, config={"configurable": {key: True}})
2.3 核心组合原语
RunnableSequence:序列组合
class RunnableSequence(RunnableSerializable[Input, Output]):
"""Runnable序列,其中每个的输出是下一个的输入。
RunnableSequence是LangChain中最重要的组合操作符,因为它几乎用于每个链中。
RunnableSequence可以直接实例化,或更常见地通过|操作符使用,
其中左或右操作数(或两者)必须是Runnable。
任何RunnableSequence都自动支持同步、异步、批处理。
"""
first: Runnable[Input, Any] # 第一个runnable
middle: List[Runnable[Any, Any]] # 中间的runnable列表
last: Runnable[Any, Output] # 最后一个runnable
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""顺序调用每个runnable。"""
# 配置管理
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
# 执行序列
with callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name")
):
# 执行第一个runnable
intermediate = self.first.invoke(input, config)
# 执行中间的runnable
for step in self.middle:
intermediate = step.invoke(intermediate, config)
# 执行最后一个runnable
output = self.last.invoke(intermediate, config)
return output
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Output]:
"""批处理实现,按顺序对每个组件调用batch方法。"""
configs = get_config_list(config, len(inputs))
# 批处理第一个runnable
intermediate_outputs = self.first.batch(inputs, configs, **kwargs)
# 批处理中间的runnable
for step in self.middle:
intermediate_outputs = step.batch(intermediate_outputs, configs, **kwargs)
# 批处理最后一个runnable
return self.last.batch(intermediate_outputs, configs, **kwargs)
RunnableParallel:并行组合
class RunnableParallel(RunnableSerializable[Input, Dict[str, Any]]):
"""并行调用runnable,为每个提供相同的输入。
使用序列中的字典字面量或通过将字典传递给RunnableParallel来构造。
"""
steps: Dict[str, Runnable[Input, Any]]
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
"""并行调用所有步骤。"""
config = ensure_config(config)
# 使用线程池并行执行
with ThreadPoolExecutor(max_workers=config.get("max_concurrency")) as executor:
futures = {
key: executor.submit(runnable.invoke, input, config)
for key, runnable in self.steps.items()
}
return {
key: future.result()
for key, future in futures.items()
}
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
"""异步并行调用所有步骤。"""
config = ensure_config(config)
# 使用asyncio并发执行
tasks = {
key: runnable.ainvoke(input, config)
for key, runnable in self.steps.items()
}
results = await asyncio.gather(*tasks.values())
return dict(zip(tasks.keys(), results))
3. LangChain Expression Language (LCEL)
3.1 LCEL设计理念
LCEL是LangChain的声明式组合语言,让开发者能够用简洁的语法构建复杂的处理链:
# 传统命令式写法
def traditional_chain(input_text):
prompt_result = prompt_template.format(input=input_text)
model_result = model.invoke(prompt_result)
parsed_result = output_parser.parse(model_result)
return parsed_result
# LCEL声明式写法
chain = prompt_template | model | output_parser
result = chain.invoke({"input": input_text})
3.2 LCEL的核心优势
graph TD
A[LCEL核心优势] --> B[统一接口]
A --> C[自动优化]
A --> D[内置支持]
A --> E[可观测性]
B --> B1[所有组件统一的invoke/batch/stream接口]
B --> B2[一致的错误处理和配置管理]
C --> C1[自动批处理优化]
C --> C2[并行执行优化]
C --> C3[流式处理优化]
D --> D1[同步和异步执行]
D --> D2[批处理和流式处理]
D --> D3[重试和回退机制]
E --> E1[内置回调和追踪]
E --> E2[结构化日志记录]
E --> E3[性能监控]
3.3 复杂LCEL示例
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 构建复杂的RAG链
def build_rag_chain():
# 1. 检索相关文档
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 2. 格式化检索到的文档
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 3. 构建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有用的助手。基于以下上下文回答问题:\n\n{context}"),
("human", "{question}")
])
# 4. 初始化模型和解析器
model = ChatOpenAI(temperature=0)
output_parser = StrOutputParser()
# 5. 使用LCEL组合整个链
rag_chain = (
{
"context": retriever | format_docs, # 并行:检索并格式化文档
"question": RunnablePassthrough() # 直接传递问题
}
| prompt # 格式化提示
| model # 调用LLM
| output_parser # 解析输出
)
return rag_chain
# 使用链
chain = build_rag_chain()
# 单次调用
result = chain.invoke({"question": "什么是LangChain?"})
# 批量调用
results = chain.batch([
{"question": "什么是LangChain?"},
{"question": "如何使用LCEL?"},
{"question": "Runnable接口的作用是什么?"}
])
# 流式调用
for chunk in chain.stream({"question": "解释一下RAG的工作原理"}):
print(chunk, end="", flush=True)
4. 核心组件架构
4.1 语言模型抽象
classDiagram
class BaseLanguageModel {
<<abstract>>
+generate(prompts: List[str]) List[Generation]
+agenerate(prompts: List[str]) List[Generation]
+get_num_tokens(text: str) int
+get_token_ids(text: str) List[int]
}
class BaseLLM {
<<abstract>>
+_generate(prompts: List[str]) LLMResult
+_agenerate(prompts: List[str]) LLMResult
+predict(text: str) str
+predict_messages(messages: List[BaseMessage]) BaseMessage
}
class BaseChatModel {
<<abstract>>
+_generate(messages: List[List[BaseMessage]]) ChatResult
+_agenerate(messages: List[List[BaseMessage]]) ChatResult
+predict_messages(messages: List[BaseMessage]) BaseMessage
}
class ChatOpenAI {
+model_name: str
+temperature: float
+max_tokens: int
+_generate(messages) ChatResult
+_agenerate(messages) ChatResult
}
class OpenAI {
+model_name: str
+temperature: float
+max_tokens: int
+_generate(prompts) LLMResult
+_agenerate(prompts) LLMResult
}
BaseLanguageModel <|-- BaseLLM
BaseLanguageModel <|-- BaseChatModel
BaseLLM <|-- OpenAI
BaseChatModel <|-- ChatOpenAI
4.2 向量存储抽象
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple
class VectorStore(ABC):
"""向量存储的抽象基类。
向量存储负责存储嵌入向量并执行相似性搜索。
"""
@abstractmethod
def add_texts(
self,
texts: List[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""向向量存储添加文本。
Args:
texts: 要添加的文本列表
metadatas: 可选的元数据列表
Returns:
添加的文档ID列表
"""
@abstractmethod
def similarity_search(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
Args:
query: 查询字符串
k: 返回的文档数量
Returns:
最相似的文档列表
"""
def similarity_search_with_score(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档及其相似性分数。"""
def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
"""将向量存储转换为检索器。"""
return VectorStoreRetriever(vectorstore=self, **kwargs)
4.3 工具集成架构
from abc import ABC, abstractmethod
from typing import Optional, Type, Union
class BaseTool(ABC, BaseModel):
"""所有工具的基类。
工具是Agent可以使用的函数,用于与外部世界交互。
"""
name: str
"""工具的唯一名称,用于Agent识别"""
description: str
"""工具功能的描述,用于Agent理解何时使用此工具"""
args_schema: Optional[Type[BaseModel]] = None
"""工具参数的Pydantic模式"""
return_direct: bool = False
"""是否直接返回工具的结果给用户"""
@abstractmethod
def _run(
self,
*args: Any,
run_manager: Optional[CallbackManagerForToolRun] = None,
**kwargs: Any,
) -> Any:
"""同步执行工具。"""
async def _arun(
self,
*args: Any,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
**kwargs: Any,
) -> Any:
"""异步执行工具。默认实现调用同步版本。"""
return await run_in_executor(None, self._run, *args, **kwargs)
def run(
self,
tool_input: Union[str, Dict],
verbose: Optional[bool] = None,
start_color: Optional[str] = "green",
color: Optional[str] = "green",
callbacks: Callbacks = None,
**kwargs: Any,
) -> Any:
"""运行工具的用户接口。"""
# 参数验证
parsed_input = self._parse_input(tool_input)
# 回调管理
callback_manager = CallbackManager.configure(
callbacks, verbose, None, None
)
# 执行工具
with callback_manager.on_tool_start(
{"name": self.name, "description": self.description},
tool_input if isinstance(tool_input, str) else str(tool_input),
color=start_color,
) as run_manager:
try:
tool_output = self._run(parsed_input, run_manager=run_manager, **kwargs)
except Exception as e:
run_manager.on_tool_error(e)
raise
else:
run_manager.on_tool_end(str(tool_output), color=color)
return tool_output
5. 执行流程和时序分析
5.1 Chain执行时序图
sequenceDiagram
participant U as 用户
participant C as Chain
participant R1 as Runnable1
participant R2 as Runnable2
participant R3 as Runnable3
participant CB as CallbackManager
participant M as Memory
Note over U,M: Chain执行完整流程
U->>C: invoke(input, config)
C->>CB: on_chain_start(chain_info, input)
alt 如果有Memory
C->>M: load_memory_variables(input)
M-->>C: memory_context
C->>C: merge_input_with_memory(input, memory_context)
end
Note over C,R3: 顺序执行Runnable序列
C->>R1: invoke(input, config)
R1->>CB: on_llm_start() / on_tool_start()
R1->>R1: 执行核心逻辑
R1->>CB: on_llm_end() / on_tool_end()
R1-->>C: intermediate_output1
C->>R2: invoke(intermediate_output1, config)
R2->>CB: on_llm_start() / on_tool_start()
R2->>R2: 执行核心逻辑
R2->>CB: on_llm_end() / on_tool_end()
R2-->>C: intermediate_output2
C->>R3: invoke(intermediate_output2, config)
R3->>CB: on_llm_start() / on_tool_start()
R3->>R3: 执行核心逻辑
R3->>CB: on_llm_end() / on_tool_end()
R3-->>C: final_output
alt 如果有Memory
C->>M: save_context(input, final_output)
end
C->>CB: on_chain_end(final_output)
C-->>U: final_output
5.2 Agent执行循环
sequenceDiagram
participant U as 用户
participant AE as AgentExecutor
participant A as Agent
participant T as Tool
participant LLM as LanguageModel
Note over U,LLM: Agent推理-行动循环
U->>AE: invoke({"input": "用户问题"})
AE->>AE: 初始化(iterations=0, intermediate_steps=[])
loop Agent循环 (直到AgentFinish或达到最大迭代)
Note over AE,LLM: 规划阶段
AE->>A: plan(intermediate_steps, input)
A->>LLM: 构造提示并调用LLM
LLM-->>A: 返回推理结果
A->>A: 解析LLM输出
alt 如果是AgentFinish
A-->>AE: AgentFinish(return_values)
AE-->>U: 返回最终结果
else 如果是AgentAction
A-->>AE: AgentAction(tool, tool_input)
Note over AE,T: 行动阶段
AE->>T: run(tool_input)
T->>T: 执行工具逻辑
T-->>AE: tool_output
AE->>AE: 更新intermediate_steps
AE->>AE: iterations += 1
end
end
Note over AE: 如果达到最大迭代次数
AE->>AE: 根据early_stopping_method处理
AE-->>U: 返回结果或错误
6. 配置和回调系统
6.1 RunnableConfig设计
from typing import Any, Dict, List, Optional, Union
from langchain_core.callbacks import BaseCallbackHandler
class RunnableConfig(TypedDict, total=False):
"""Runnable的配置。
这个配置用于控制Runnable的执行行为,包括回调、标签、元数据等。
"""
# 回调配置
callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]
"""要使用的回调处理器列表或回调管理器"""
# 执行控制
max_concurrency: Optional[int]
"""最大并发数,用于控制并行执行的工作量"""
recursion_limit: Optional[int]
"""递归限制,防止无限递归"""
# 追踪和监控
tags: Optional[List[str]]
"""用于追踪和过滤的标签列表"""
metadata: Optional[Dict[str, Any]]
"""附加的元数据,用于追踪和调试"""
run_name: Optional[str]
"""运行的名称,用于标识特定的执行"""
run_id: Optional[UUID]
"""运行的唯一标识符"""
# 可配置字段
configurable: Optional[Dict[str, Any]]
"""可配置字段的值,用于运行时配置"""
def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
"""确保配置是有效的RunnableConfig。"""
if config is None:
return RunnableConfig()
# 验证和标准化配置
if not isinstance(config, dict):
raise TypeError("config must be a dict")
# 设置默认值
config.setdefault("tags", [])
config.setdefault("metadata", {})
config.setdefault("configurable", {})
return config
6.2 回调系统架构
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
class BaseCallbackHandler(ABC):
"""回调处理器的基类。
回调处理器用于在Runnable执行过程中的关键点执行自定义逻辑,
如日志记录、监控、调试等。
"""
ignore_llm: bool = False
"""是否忽略LLM事件"""
ignore_chain: bool = False
"""是否忽略Chain事件"""
ignore_agent: bool = False
"""是否忽略Agent事件"""
ignore_retriever: bool = False
"""是否忽略Retriever事件"""
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""LLM开始执行时调用。"""
def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""LLM生成新token时调用。"""
def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""LLM执行结束时调用。"""
def on_llm_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""LLM执行出错时调用。"""
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Chain开始执行时调用。"""
def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Chain执行结束时调用。"""
def on_chain_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Chain执行出错时调用。"""
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inputs: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Tool开始执行时调用。"""
def on_tool_end(
self,
output: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Tool执行结束时调用。"""
def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Tool执行出错时调用。"""
class CallbackManager:
"""回调管理器,负责管理和调度回调处理器。"""
def __init__(
self,
handlers: List[BaseCallbackHandler],
inheritable_handlers: Optional[List[BaseCallbackHandler]] = None,
parent_run_id: Optional[UUID] = None,
*,
tags: Optional[List[str]] = None,
inheritable_tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
):
"""初始化回调管理器。"""
self.handlers = handlers or []
self.inheritable_handlers = inheritable_handlers or []
self.parent_run_id = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any,
) -> CallbackManagerForLLMRun:
"""创建LLM运行的回调管理器。"""
run_id = uuid4()
# 通知所有处理器
for handler in self.handlers:
if not handler.ignore_llm:
try:
handler.on_llm_start(
serialized,
prompts,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
except Exception as e:
logger.warning(f"Error in callback handler: {e}")
return CallbackManagerForLLMRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
7. 性能优化和最佳实践
7.1 批处理优化
LangChain通过智能批处理显著提升性能:
# 自动批处理优化示例
class OptimizedChain(RunnableSequence):
"""优化的链,支持智能批处理。"""
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Output]:
"""优化的批处理实现。"""
configs = get_config_list(config, len(inputs))
# 检查是否可以进行批处理优化
if self._can_batch_optimize():
return self._optimized_batch(inputs, configs, **kwargs)
else:
return self._default_batch(inputs, configs, **kwargs)
def _can_batch_optimize(self) -> bool:
"""检查是否所有组件都支持高效批处理。"""
for step in [self.first] + self.middle + [self.last]:
if not hasattr(step, 'batch') or not step._supports_batch_optimization:
return False
return True
def _optimized_batch(
self,
inputs: List[Input],
configs: List[RunnableConfig],
**kwargs: Any,
) -> List[Output]:
"""优化的批处理实现,最小化网络调用。"""
# 批量执行第一个步骤
intermediate = self.first.batch(inputs, configs, **kwargs)
# 批量执行中间步骤
for step in self.middle:
intermediate = step.batch(intermediate, configs, **kwargs)
# 批量执行最后步骤
return self.last.batch(intermediate, configs, **kwargs)
7.2 流式处理优化
class StreamingChain(RunnableSequence):
"""支持流式处理的链。"""
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Output]:
"""流式处理实现。"""
config = ensure_config(config)
# 检查哪些组件支持流式处理
streaming_components = self._identify_streaming_components()
if streaming_components:
yield from self._streaming_execution(input, config, **kwargs)
else:
# 回退到标准执行
yield self.invoke(input, config, **kwargs)
def _identify_streaming_components(self) -> List[int]:
"""识别支持流式处理的组件。"""
streaming_indices = []
for i, step in enumerate([self.first] + self.middle + [self.last]):
if hasattr(step, 'stream') and step._supports_streaming:
streaming_indices.append(i)
return streaming_indices
def _streaming_execution(
self,
input: Input,
config: RunnableConfig,
**kwargs: Any,
) -> Iterator[Output]:
"""流式执行实现。"""
current_input = input
# 执行非流式组件直到第一个流式组件
for i, step in enumerate([self.first] + self.middle + [self.last]):
if step._supports_streaming:
# 开始流式处理
for chunk in step.stream(current_input, config, **kwargs):
# 处理后续组件
processed_chunk = self._process_chunk_through_remaining_steps(
chunk, i + 1, config, **kwargs
)
if processed_chunk is not None:
yield processed_chunk
break
else:
current_input = step.invoke(current_input, config, **kwargs)
7.3 缓存机制
from functools import lru_cache
from typing import Hashable
class CachedRunnable(Runnable[Input, Output]):
"""带缓存的Runnable包装器。"""
def __init__(
self,
runnable: Runnable[Input, Output],
cache_size: int = 128,
cache_ttl: Optional[int] = None,
):
self.runnable = runnable
self.cache_size = cache_size
self.cache_ttl = cache_ttl
self._cache = {}
self._cache_times = {}
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""带缓存的调用。"""
# 生成缓存键
cache_key = self._generate_cache_key(input, config, **kwargs)
# 检查缓存
if cache_key in self._cache:
if self._is_cache_valid(cache_key):
return self._cache[cache_key]
else:
# 缓存过期,删除
del self._cache[cache_key]
del self._cache_times[cache_key]
# 执行并缓存结果
result = self.runnable.invoke(input, config, **kwargs)
# 更新缓存
if len(self._cache) >= self.cache_size:
self._evict_oldest()
self._cache[cache_key] = result
self._cache_times[cache_key] = time.time()
return result
def _generate_cache_key(
self,
input: Input,
config: Optional[RunnableConfig],
**kwargs: Any,
) -> str:
"""生成缓存键。"""
# 简化的缓存键生成逻辑
import hashlib
key_data = {
"input": input,
"config": config,
"kwargs": kwargs,
}
key_str = json.dumps(key_data, sort_keys=True, default=str)
return hashlib.md5(key_str.encode()).hexdigest()
def _is_cache_valid(self, cache_key: str) -> bool:
"""检查缓存是否有效。"""
if self.cache_ttl is None:
return True
cache_time = self._cache_times.get(cache_key)
if cache_time is None:
return False
return time.time() - cache_time < self.cache_ttl
def _evict_oldest(self):
"""驱逐最老的缓存项。"""
if not self._cache_times:
return
oldest_key = min(self._cache_times.keys(), key=lambda k: self._cache_times[k])
del self._cache[oldest_key]
del self._cache_times[oldest_key]
8. 实际应用案例与最佳实践
8.1 企业级RAG系统实现
源码剖析文章,以下是一个企业级RAG系统的完整实现:
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from functools import lru_cache
import logging
class EnterpriseRAGSystem:
"""企业级RAG系统实现
https://jishuzhan.net/article/1895692926025994242
"""
def __init__(self, model_name: str = "gpt-3.5-turbo"):
self.model = ChatOpenAI(model_name=model_name, temperature=0.1)
self.embeddings = OpenAIEmbeddings()
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
)
self.logger = logging.getLogger(__name__)
@lru_cache(maxsize=128)
def load_vector_store(self, vs_path: str):
"""加载向量存储器,使用LRU缓存优化性能
https://blog.csdn.net/weixin_43829633/article/details/131330235
"""
try:
return FAISS.load_local(vs_path, self.embeddings)
except Exception as e:
self.logger.error(f"加载向量存储失败: {e}")
raise
def create_retrieval_chain(self, vectorstore):
"""创建检索链,实现混合检索策略"""
# 定义提示模板
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的AI助手。请基于以下上下文信息回答用户问题:
上下文信息:
{context}
请注意:
1. 如果上下文中没有相关信息,请明确说明
2. 回答要准确、简洁、有条理
3. 可以适当引用上下文中的具体内容"""),
("human", "{question}")
])
# 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"k": 5,
"score_threshold": 0.7
}
)
# 文档格式化函数
def format_docs(docs):
"""格式化检索到的文档"""
if not docs:
return "未找到相关信息"
formatted = []
for i, doc in enumerate(docs, 1):
content = doc.page_content.strip()
source = doc.metadata.get('source', '未知来源')
formatted.append(f"文档{i} (来源: {source}):\n{content}")
return "\n\n".join(formatted)
# 构建RAG链 - 使用LCEL语法
rag_chain = (
RunnableParallel({
"context": retriever | format_docs,
"question": RunnablePassthrough()
})
| prompt
| self.model
| StrOutputParser()
)
return rag_chain
def create_conversational_chain(self, vectorstore, memory):
"""创建对话式检索链
https://github.com/taishan1994/langchain-learning
"""
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
# 初始化对话内存
if memory is None:
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
# 创建对话式检索链
conversational_chain = ConversationalRetrievalChain.from_llm(
llm=self.model,
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
memory=memory,
return_source_documents=True,
verbose=True
)
return conversational_chain
# 使用示例
def create_enterprise_rag_demo():
"""企业级RAG系统使用示例"""
# 初始化系统
rag_system = EnterpriseRAGSystem()
# 加载预构建的向量存储
vectorstore = rag_system.load_vector_store("./knowledge_base")
# 创建检索链
rag_chain = rag_system.create_retrieval_chain(vectorstore)
# 测试查询
question = "什么是LangChain的核心设计理念?"
try:
response = rag_chain.invoke(question)
print(f"问题: {question}")
print(f"回答: {response}")
except Exception as e:
print(f"查询失败: {e}")
# 运行示例
if __name__ == "__main__":
create_enterprise_rag_demo()
8.2 性能优化最佳实践
8.2.1 向量存储优化
class OptimizedVectorStore:
"""优化的向量存储实现
https://blog.csdn.net/qq_28540861/article/details/148835695
"""
def __init__(self, embeddings, cache_size=1000):
self.embeddings = embeddings
self.cache_size = cache_size
self._embedding_cache = {}
self._query_cache = {}
@lru_cache(maxsize=1000)
def cached_similarity_search(self, query: str, k: int = 4):
"""缓存相似性搜索结果"""
cache_key = f"{query}:{k}"
if cache_key in self._query_cache:
return self._query_cache[cache_key]
# 执行搜索
results = self.vectorstore.similarity_search(query, k=k)
# 缓存结果
if len(self._query_cache) < self.cache_size:
self._query_cache[cache_key] = results
return results
def batch_add_texts(self, texts: List[str], batch_size: int = 100):
"""批量添加文本,提高索引构建效率"""
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
embeddings = self.embeddings.embed_documents(batch)
self.vectorstore.add_embeddings(
list(zip(batch, embeddings))
)
8.2.2 LLM调用优化
class OptimizedLLMChain:
"""优化的LLM调用链
https://liz-starfield.github.io/blog/zh/posts/LLM/langchain_source_code.html
"""
def __init__(self, llm, prompt_template):
self.llm = llm
self.prompt_template = prompt_template
self.request_cache = {}
self.batch_queue = []
self.batch_size = 10
async def optimized_batch_invoke(self, inputs: List[Dict]):
"""优化的批量调用实现"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 检查缓存
cached_results = {}
uncached_inputs = []
for i, input_data in enumerate(inputs):
cache_key = self._generate_cache_key(input_data)
if cache_key in self.request_cache:
cached_results[i] = self.request_cache[cache_key]
else:
uncached_inputs.append((i, input_data))
# 批量处理未缓存的请求
if uncached_inputs:
batch_results = await self._batch_process(
[inp for _, inp in uncached_inputs]
)
# 更新缓存和结果
for (i, input_data), result in zip(uncached_inputs, batch_results):
cache_key = self._generate_cache_key(input_data)
self.request_cache[cache_key] = result
cached_results[i] = result
# 按原始顺序返回结果
return [cached_results[i] for i in range(len(inputs))]
def _generate_cache_key(self, input_data: Dict) -> str:
"""生成缓存键"""
import hashlib
import json
key_str = json.dumps(input_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
async def _batch_process(self, inputs: List[Dict]):
"""批量处理输入"""
# 格式化提示
prompts = [
self.prompt_template.format(**input_data)
for input_data in inputs
]
# 批量调用LLM
responses = await self.llm.agenerate(prompts)
return [resp.generations[0][0].text for resp in responses]
8.3 监控和调试最佳实践
class LangChainMonitor:
"""LangChain监控和调试工具
https://github.com/wyq-zhangxf/langchain-
"""
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'average_response_time': 0,
'token_usage': 0
}
self.request_logs = []
def create_monitoring_callback(self):
"""创建监控回调处理器"""
from langchain.callbacks import BaseCallbackHandler
import time
class MonitoringCallback(BaseCallbackHandler):
def __init__(self, monitor):
self.monitor = monitor
self.start_time = None
def on_llm_start(self, serialized, prompts, **kwargs):
self.start_time = time.time()
self.monitor.metrics['total_requests'] += 1
def on_llm_end(self, response, **kwargs):
if self.start_time:
duration = time.time() - self.start_time
self.monitor._update_response_time(duration)
self.monitor.metrics['successful_requests'] += 1
# 记录token使用情况
if hasattr(response, 'llm_output') and response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
total_tokens = token_usage.get('total_tokens', 0)
self.monitor.metrics['token_usage'] += total_tokens
def on_llm_error(self, error, **kwargs):
self.monitor.metrics['failed_requests'] += 1
self.monitor.request_logs.append({
'timestamp': time.time(),
'error': str(error),
'type': 'llm_error'
})
return MonitoringCallback(self)
def _update_response_time(self, duration):
"""更新平均响应时间"""
current_avg = self.metrics['average_response_time']
total_requests = self.metrics['total_requests']
if total_requests == 1:
self.metrics['average_response_time'] = duration
else:
self.metrics['average_response_time'] = (
(current_avg * (total_requests - 1) + duration) / total_requests
)
def get_performance_report(self):
"""获取性能报告"""
success_rate = (
self.metrics['successful_requests'] /
max(self.metrics['total_requests'], 1) * 100
)
return {
'success_rate': f"{success_rate:.2f}%",
'average_response_time': f"{self.metrics['average_response_time']:.2f}s",
'total_token_usage': self.metrics['token_usage'],
'total_requests': self.metrics['total_requests'],
'recent_errors': self.request_logs[-5:] # 最近5个错误
}
# 使用示例
monitor = LangChainMonitor()
callback = monitor.create_monitoring_callback()
# 在链中使用监控回调
chain = prompt | model | parser
result = chain.invoke(
{"question": "测试问题"},
config={"callbacks": [callback]}
)
# 查看性能报告
print(monitor.get_performance_report())
9. 总结
9.1 LangChain架构优势
- 统一抽象:Runnable接口提供了一致的编程模型
- 组合性:通过LCEL实现声明式组合
- 可扩展性:模块化设计支持灵活扩展
- 可观测性:内置的回调和追踪系统
- 性能优化:自动批处理和流式处理
8.2 设计哲学
mindmap
root((LangChain设计哲学))
统一性
Runnable接口
一致的API
标准化配置
组合性
LCEL语法
管道操作符
声明式编程
可扩展性
插件架构
抽象接口
社区生态
可观测性
回调系统
结构化日志
性能监控
生产就绪
错误处理
重试机制
缓存优化
10. 关键函数与结构补充
10.1 Runnable 与 LCEL 组合
关键函数(核心代码与说明):
class Runnable(ABC, Generic[Input, Output]):
def invoke(self, input: Input, config: Optional[RunnableConfig] = None, **kwargs) -> Output:
"""同步处理单个输入,输出由子类定义。"""
raise NotImplementedError
def __or__(self, other: Runnable[Any, Other]) -> RunnableSequence[Input, Other]:
"""管道组合,形成可执行序列。"""
return RunnableSequence(first=self, last=other)
统一调用链:
invoke()
→ 子类实现|
组合:Runnable.__or__()
→RunnableSequence
类结构图(Mermaid):
classDiagram
class Runnable { <<abstract>> +invoke() }
class RunnableSequence { +invoke() +batch() +stream() }
Runnable <|-- RunnableSequence
时序图索引:见“5.1 Chain执行时序图”。
10.2 语言模型抽象(BaseLanguageModel / BaseChatModel)
关键函数(核心代码与说明):
class BaseLanguageModel(RunnableSerializable[LanguageModelInput, BaseMessage], ABC):
def generate(self, messages: List[List[BaseMessage]], stop: Optional[List[str]] = None, callbacks: Callbacks = None, **kwargs) -> LLMResult:
"""按消息序列生成结果,触发回调并聚合 generations 与 llm_output。"""
...
class BaseChatModel(BaseLanguageModel):
def _generate(self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs) -> ChatResult:
"""子类实现的核心生成方法;负责单轮消息生成。"""
...
统一调用链:
generate()
→ 回调 on_llm_start →_generate()
→ on_llm_end
类结构图(Mermaid):
classDiagram
class BaseLanguageModel { <<abstract>> +generate() +agenerate() }
class BaseChatModel { <<abstract>> +_generate() +_agenerate() }
BaseLanguageModel <|-- BaseChatModel
时序图索引:见“4.1 语言模型抽象”图。
10.3 向量存储抽象(VectorStore)
关键函数(核心代码与说明):
class VectorStore(ABC):
def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs) -> List[str]:
"""写入文本并建立索引;返回文档ID。"""
...
def similarity_search_with_score(self, query: str, k: int = 4, **kwargs) -> List[Tuple[Document, float]]:
"""检索最相似文档及分数。"""
...
统一调用链:
- 写入:
add_texts()
→ 生成嵌入 → 写入索引 - 检索:
similarity_search_with_score()
→ 嵌入查询 → 向量库搜索
类结构图(Mermaid):
classDiagram
class VectorStore { <<abstract>> +add_texts() +similarity_search() }
时序图索引:见“4.2 向量存储抽象”。
10.4 工具与回调(BaseTool / CallbackManager)
关键函数(核心代码与说明):
class BaseTool(BaseModel, ABC):
def run(self, tool_input: Union[str, Dict], callbacks: Callbacks = None, **kwargs) -> Any:
"""解析/校验→回调开始→执行核心→回调结束/错误。"""
...
class CallbackManager:
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs) -> CallbackManagerForChainRun:
"""触发所有处理器的链开始事件,返回运行上下文。"""
...
统一调用链:
- 工具:
run()
→_parse_input()
→_run()
- 回调:
on_chain_start()
/on_llm_start()
/on_tool_start()
→ handler 事件
类结构图(Mermaid):
classDiagram
class BaseTool { <<abstract>> +run() }
class CallbackManager { +on_chain_start() +on_llm_start() }