LangChain-01-Core模块
模块概览
职责与定位
langchain-core是LangChain框架的核心基础库,定义了构建LLM应用所需的基本抽象、接口和协议。该模块不包含第三方集成,仅提供稳定的、经过实战验证的核心抽象,作为整个LangChain生态系统的基石。
核心职责:
- 定义Runnable通用调用协议(LCEL)
- 提供语言模型抽象(LLM、ChatModel)
- 定义消息格式和提示词模板
- 提供工具、输出解析器、向量存储等核心抽象
- 定义回调和追踪机制
- 提供Agent数据结构
- 支持同步/异步/批量/流式操作
子模块结构
langchain-core包含以下核心子模块:
- runnables: LCEL基础,通用调用协议
- language_models: 语言模型抽象
- messages: 消息格式定义
- prompts: 提示词模板
- output_parsers: 输出解析器
- tools: 工具抽象
- vectorstores: 向量存储抽象
- callbacks: 回调和追踪
- agents: Agent数据结构
- documents: 文档结构
- embeddings: 嵌入模型抽象
- retrievers: 检索器抽象
整体架构图
flowchart TB
subgraph Foundation["基础层"]
RUNNABLE[Runnable<br/>通用调用协议<br/>LCEL核心]
SERIALIZABLE[Serializable<br/>序列化基类]
end
subgraph Models["模型层"]
LLM[BaseLLM<br/>传统LLM]
CHAT[BaseChatModel<br/>对话模型]
EMBED[Embeddings<br/>嵌入模型]
end
subgraph DataStructures["数据结构层"]
MESSAGE[Messages<br/>消息格式]
DOCUMENT[Document<br/>文档结构]
PROMPT[PromptTemplate<br/>提示词模板]
end
subgraph Components["组件层"]
TOOL[BaseTool<br/>工具]
PARSER[OutputParser<br/>输出解析器]
VECTOR[VectorStore<br/>向量存储]
RETRIEVER[Retriever<br/>检索器]
end
subgraph HighLevel["高层抽象"]
AGENT[Agent<br/>智能代理]
CHAIN[Chain<br/>组合链]
end
subgraph Infrastructure["基础设施"]
CALLBACK[Callbacks<br/>回调系统]
CACHE[Cache<br/>缓存]
TRACER[Tracer<br/>追踪]
end
RUNNABLE --> LLM
RUNNABLE --> CHAT
RUNNABLE --> EMBED
RUNNABLE --> TOOL
RUNNABLE --> PARSER
RUNNABLE --> RETRIEVER
MESSAGE --> CHAT
PROMPT --> LLM
PROMPT --> CHAT
DOCUMENT --> VECTOR
EMBED --> VECTOR
VECTOR --> RETRIEVER
TOOL --> AGENT
LLM --> AGENT
CHAT --> AGENT
RUNNABLE --> CHAIN
CALLBACK --> LLM
CALLBACK --> TOOL
CALLBACK --> CHAIN
style RUNNABLE fill:#e1f5ff
style CHAT fill:#fff4e1
style AGENT fill:#e8f5e9
架构说明
分层设计
基础层:
Runnable: 所有组件的通用接口,支持invoke、stream、batchSerializable: 支持序列化和持久化
模型层:
BaseLLM: 传统文本生成模型BaseChatModel: 对话式模型Embeddings: 文本嵌入模型
数据结构层:
BaseMessage: 消息基类(AIMessage、HumanMessage等)Document: 文档结构PromptTemplate: 提示词模板
组件层:
BaseTool: 工具抽象,Agent调用的函数OutputParser: 解析模型输出VectorStore: 向量数据库抽象Retriever: 文档检索器
高层抽象:
Agent: 智能代理,动态决策Chain: 组件组合链
基础设施:
Callbacks: 可观测性和监控Cache: 缓存机制Tracer: LangSmith追踪
模块间依赖关系
graph LR
RUNNABLES[Runnables<br/>核心协议]
MESSAGES[Messages]
PROMPTS[Prompts]
MODELS[LanguageModels]
TOOLS[Tools]
PARSERS[OutputParsers]
AGENTS[Agents]
VECTORS[VectorStores]
CALLBACKS[Callbacks]
MESSAGES --> MODELS
PROMPTS --> MODELS
MODELS --> PARSERS
TOOLS --> AGENTS
MODELS --> AGENTS
VECTORS --> MODELS
CALLBACKS -.监控.-> MODELS
CALLBACKS -.监控.-> TOOLS
RUNNABLES --> MODELS
RUNNABLES --> TOOLS
RUNNABLES --> PARSERS
style RUNNABLES fill:#e1f5ff
依赖说明
核心依赖路径:
- Runnables是最底层,所有组件继承Runnable
- Messages定义数据格式,被LanguageModels使用
- Prompts生成输入,传递给LanguageModels
- OutputParsers处理LanguageModels的输出
- Tools被Agents调用,由LanguageModels驱动
- Callbacks横向依赖,监控所有组件
无循环依赖: 设计保证了清晰的单向依赖,便于理解和维护。
核心设计模式
1. Runnable协议(最重要)
# 所有组件实现统一接口
class Runnable(ABC):
def invoke(self, input: Input) -> Output:
"""同步调用"""
pass
def stream(self, input: Input) -> Iterator[Output]:
"""流式输出"""
pass
def batch(self, inputs: list[Input]) -> list[Output]:
"""批量处理"""
pass
async def ainvoke(self, input: Input) -> Output:
"""异步调用"""
pass
# 组合性
chain = prompt | model | output_parser # LCEL语法
优势:
- 统一接口,降低学习成本
- 支持链式组合
- 自动支持同步/异步/批量/流式
- 易于测试和调试
2. 消息抽象
# 统一的消息格式
class BaseMessage:
content: str
additional_kwargs: dict
# 不同角色
AIMessage(content="...") # AI回复
HumanMessage(content="...") # 用户输入
SystemMessage(content="...") # 系统指令
ToolMessage(content="...") # 工具结果
优势:
- 统一的对话历史表示
- 支持多模态(文本、图片、音频)
- 支持工具调用
- 易于序列化和存储
3. 提示词模板
# 参数化提示词
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个{role}"),
("human", "{input}")
])
# 运行时填充
messages = prompt.invoke({"role": "助手", "input": "你好"})
优势:
- 分离提示词和逻辑
- 支持模板复用
- 支持部分变量
- 支持聊天历史注入
4. 输出解析
# 结构化输出
class User(BaseModel):
name: str
age: int
parser = PydanticOutputParser(pydantic_object=User)
chain = prompt | model | parser
user = chain.invoke({"text": "..."})
# 自动解析为User对象
优势:
- 类型安全
- 自动验证
- 易于集成
- 支持错误修复
典型使用模式
模式1: LCEL链式组合
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 定义组件
prompt = ChatPromptTemplate.from_template("讲个关于{topic}的笑话")
model = ChatOpenAI()
parser = StrOutputParser()
# 组合为链
chain = prompt | model | parser
# 调用
result = chain.invoke({"topic": "程序员"})
优势: 清晰、可组合、易于调试
模式2: RAG模式
from langchain_core.runnables import RunnablePassthrough
# RAG链
retriever = vectorstore.as_retriever()
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
answer = chain.invoke("什么是LangChain?")
优势: 结合检索和生成,提高准确性
模式3: Agent模式
from langchain.agents import create_openai_functions_agent, AgentExecutor
# 创建Agent
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# 执行
result = agent_executor.invoke({"input": "帮我搜索并总结最新消息"})
优势: 动态决策,处理复杂任务
最佳实践总结
1. 使用LCEL构建应用
# 推荐: LCEL链式组合
chain = prompt | model | parser
# 不推荐: 手动调用
messages = prompt.invoke(input)
response = model.invoke(messages)
output = parser.invoke(response)
2. 利用流式输出
# 推荐: 流式显示
for chunk in chain.stream({"input": "长文本"}):
print(chunk, end="", flush=True)
# 提升用户体验
3. 批量处理提高效率
# 推荐: 批量调用
inputs = [{"topic": f"主题{i}"} for i in range(10)]
results = chain.batch(inputs)
# 比循环调用快得多
4. 使用回调监控
# 推荐: 添加回调
from langchain_core.callbacks import StdOutCallbackHandler
chain = prompt | model | parser
result = chain.invoke(
{"input": "..."},
config={"callbacks": [StdOutCallbackHandler()]}
)
5. 类型安全
# 推荐: 使用Pydantic验证
from pydantic import BaseModel
class Output(BaseModel):
answer: str
confidence: float
parser = PydanticOutputParser(pydantic_object=Output)
chain = prompt | model | parser
# 自动类型检查和验证
版本兼容性
稳定性承诺
langchain-core遵循严格的版本管理:
- 向后兼容: 公共API不会破坏性变更
- 实验性功能: 使用
@beta装饰器标注 - 废弃流程: 提前多个版本警告
升级建议
# 检查过时API
import warnings
warnings.filterwarnings("default", category=DeprecationWarning)
# 使用新API
# 旧: LLMChain (已废弃)
# 新: LCEL
chain = prompt | model | parser
性能优化指南
1. 批量处理
# 将多个请求合并为批量
inputs = [{"query": q} for q in queries]
results = chain.batch(inputs, config={"max_concurrency": 5})
2. 缓存
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
# 启用缓存
set_llm_cache(InMemoryCache())
# 相同输入会命中缓存
3. 流式输出
# 降低首字节时间
async for chunk in chain.astream({"input": "..."}):
yield chunk
核心调用链路详解
调用链路一: LCEL链式组合 (Prompt → Model → Parser)
这是LangChain最常见的使用模式,通过管道操作符|将多个Runnable组件串联。
链路架构图
flowchart LR
subgraph UserCode["用户代码"]
INPUT[用户输入<br/>dict]
end
subgraph LCELChain["LCEL链: prompt | model | parser"]
SEQUENCE[RunnableSequence<br/>编排器]
PROMPT[ChatPromptTemplate<br/>提示词模板]
MODEL[BaseChatModel<br/>语言模型]
PARSER[StrOutputParser<br/>输出解析器]
end
subgraph Infrastructure["基础设施"]
CALLBACK[CallbackManager<br/>回调管理器]
TRACER[LangSmithTracer<br/>追踪器]
end
INPUT --> SEQUENCE
SEQUENCE --> PROMPT
PROMPT --> MODEL
MODEL --> PARSER
PARSER --> OUTPUT[输出结果<br/>str]
CALLBACK -.监控.-> PROMPT
CALLBACK -.监控.-> MODEL
CALLBACK -.监控.-> PARSER
TRACER -.记录.-> SEQUENCE
style SEQUENCE fill:#e1f5ff,stroke:#0288d1,stroke-width:3px
style CALLBACK fill:#fff9c4
style TRACER fill:#fff9c4
架构说明
层次结构:
- 用户代码层: 提供输入数据(通常是字典)
- LCEL链层: RunnableSequence负责编排,依次调用各组件
- 基础设施层: CallbackManager和Tracer提供可观测性
组件职责:
RunnableSequence: 管道编排器,负责按顺序调用步骤,传递中间结果ChatPromptTemplate: 模板填充,将输入字典转换为消息列表BaseChatModel: 调用LLM API,生成AI响应StrOutputParser: 提取消息内容,转换为字符串
数据流转:
{"topic": "AI"}→ Prompt →[HumanMessage("讲个关于AI的笑话")][HumanMessage(...)]→ Model →AIMessage("为什么AI...?")AIMessage(...)→ Parser →"为什么AI...?"
完整时序图
sequenceDiagram
autonumber
participant U as 用户
participant SEQ as RunnableSequence
participant CBM as CallbackManager
participant P as ChatPromptTemplate
participant M as BaseChatModel
participant PAR as StrOutputParser
participant API as LLM API
participant TR as LangSmithTracer
Note over U,TR: 阶段1: 初始化与链式组合(构建时)
U->>SEQ: prompt | model | parser
Note right of SEQ: 通过__or__操作符<br/>构建RunnableSequence
SEQ->>SEQ: steps = [prompt, model, parser]
Note over U,TR: 阶段2: 链式调用开始
U->>SEQ: invoke({"topic": "AI"}, config)
SEQ->>CBM: ensure_config(config)
CBM-->>SEQ: RunnableConfig
SEQ->>CBM: on_chain_start(input, name)
CBM->>TR: 记录chain_start事件
TR-->>CBM: run_id
CBM-->>SEQ: RunManager(run_id)
Note over U,TR: 阶段3: Step 1 - Prompt执行
SEQ->>P: invoke({"topic": "AI"}, config)
P->>CBM: on_chain_start("ChatPromptTemplate")
CBM->>TR: 记录prompt_start
P->>P: format_messages({"topic": "AI"})
Note right of P: 填充模板:<br/>[HumanMessage("讲个关于AI的笑话")]
P->>CBM: on_chain_end(messages)
CBM->>TR: 记录prompt_end
P-->>SEQ: [HumanMessage(...)]
Note over U,TR: 阶段4: Step 2 - Model执行
SEQ->>M: invoke([HumanMessage(...)], config)
M->>CBM: on_llm_start(messages)
CBM->>TR: 记录llm_start
M->>M: _generate_from_messages(messages)
M->>API: POST /chat/completions
Note right of API: 实际调用OpenAI/Claude等
API-->>M: {"choices": [...], "usage": {...}}
M->>M: 构造AIMessage
M->>CBM: on_llm_end(AIMessage, usage_metadata)
CBM->>TR: 记录llm_end + token使用量
M-->>SEQ: AIMessage("为什么AI...?")
Note over U,TR: 阶段5: Step 3 - Parser执行
SEQ->>PAR: invoke(AIMessage(...), config)
PAR->>CBM: on_chain_start("StrOutputParser")
CBM->>TR: 记录parser_start
PAR->>PAR: parse_result([ChatGeneration(message)])
Note right of PAR: 提取message.content
PAR->>CBM: on_chain_end(str_content)
CBM->>TR: 记录parser_end
PAR-->>SEQ: "为什么AI...?"
Note over U,TR: 阶段6: 链式调用结束
SEQ->>CBM: on_chain_end(output)
CBM->>TR: 记录chain_end
TR->>TR: 构建完整Run树
SEQ-->>U: "为什么AI...?"
时序图详细说明
阶段1: 初始化与链式组合(构建时)
- 步骤1: 用户使用
|操作符连接组件 - 内部机制: Python调用
__or__魔术方法,创建RunnableSequence实例 - 关键点: 此时只构建结构,不执行逻辑;steps列表记录了[prompt, model, parser]
阶段2: 链式调用开始
- 步骤2: 用户调用
chain.invoke(input, config) - 步骤3-5: 配置初始化,包括回调管理器、追踪器
- 步骤6-8: 触发
on_chain_start,LangSmith tracer记录运行开始,分配唯一run_id - 关键代码:
# langchain_core/runnables/base.py:3075-3087
def invoke(self, input, config=None, **kwargs):
config = ensure_config(config) # 步骤3
callback_manager = get_callback_manager_for_config(config) # 步骤4
run_manager = callback_manager.on_chain_start( # 步骤6
None, input, name=self.get_name(), run_id=config.pop("run_id", None)
)
阶段3: Step 1 - Prompt执行
- 步骤9: Sequence调用第一个步骤:
prompt.invoke(input, config) - 步骤10-11: Prompt触发回调,记录子运行
- 步骤12-13: 执行模板填充
format_messages,将{"topic": "AI"}转换为消息列表 - 步骤14-16: 记录完成,返回
[HumanMessage("讲个关于AI的笑话")] - 数据转换:
dict→list[BaseMessage] - 关键代码:
# langchain_core/prompts/chat.py
def format_messages(self, **kwargs) -> list[BaseMessage]:
# 遍历消息模板,填充变量
for message_template in self.messages:
messages.extend(message_template.format_messages(**kwargs))
return messages
阶段4: Step 2 - Model执行
- 步骤17: Sequence将Step 1的输出传给Model
- 步骤18-19: Model触发
on_llm_start,记录输入消息 - 步骤20-22: 调用
_generate_from_messages,实际向LLM API发送HTTP请求 - 步骤23-26: 解析响应,构造
AIMessage,记录token使用量 - 数据转换:
list[BaseMessage]→AIMessage - 关键代码:
# langchain_core/language_models/chat_models.py
def invoke(self, input, config=None, **kwargs):
messages = convert_to_messages(input) # 标准化输入
run_manager = callback_manager.on_llm_start(serialized, messages)
result = self._generate_from_messages(messages, run_manager=run_manager)
run_manager.on_llm_end(result)
return result.generations[0].message
阶段5: Step 3 - Parser执行
- 步骤27: Sequence将Model输出传给Parser
- 步骤28-29: Parser触发回调
- 步骤30-31: 执行
parse_result,提取message.content - 步骤32-34: 记录完成,返回字符串
- 数据转换:
AIMessage→str - 关键代码:
# langchain_core/output_parsers/string.py
class StrOutputParser(BaseOutputParser[str]):
def parse_result(self, result, *, partial=False):
# 从ChatGeneration中提取message.content
return result[0].message.content
阶段6: 链式调用结束
- 步骤35-37: Sequence调用
on_chain_end,tracer记录整个链的完成 - 步骤38: 构建完整的Run树,包含所有子运行的时间、输入输出、token使用量
- 步骤39: 返回最终结果给用户
可观测性机制:
- Run树结构: 每个组件都是一个子Run,形成层次结构
- 记录内容: 输入、输出、执行时间、错误信息、token使用量
- 追踪器类型:
LangSmithTracer: 发送到LangSmith平台ConsoleCallbackHandler: 打印到控制台StdOutCallbackHandler: 简化版控制台输出
错误处理机制:
# langchain_core/runnables/base.py:3091-3108
try:
for i, step in enumerate(self.steps):
config = patch_config(config, callbacks=run_manager.get_child(f"seq:step:{i+1}"))
input_ = step.invoke(input_, config)
except BaseException as e:
run_manager.on_chain_error(e) # 记录错误
raise # 向上抛出
else:
run_manager.on_chain_end(input_) # 记录成功
return input_
性能优化点:
- 批量处理:
batch()方法使用线程池并行处理多个输入 - 流式输出:
stream()方法逐chunk返回,降低首字节时间 - 异步执行:
ainvoke()使用asyncio,提高IO密集型任务效率
调用链路二: RAG检索增强生成
RAG模式结合了向量检索和语言模型生成,是LangChain的典型应用场景。
链路架构图
flowchart TB
subgraph UserInput["用户输入"]
QUERY[用户问题<br/>"什么是LCEL?"]
end
subgraph RAGChain["RAG链"]
PARALLEL[RunnableParallel<br/>并行执行]
RETRIEVER[VectorStoreRetriever<br/>检索器]
PASSTHROUGH[RunnablePassthrough<br/>原样传递]
PROMPT[ChatPromptTemplate<br/>包含context变量]
MODEL[ChatModel<br/>LLM]
PARSER[StrOutputParser]
end
subgraph VectorDB["向量数据库"]
EMBED[Embeddings<br/>嵌入模型]
STORE[VectorStore<br/>存储层]
end
QUERY --> PARALLEL
PARALLEL --> RETRIEVER
PARALLEL --> PASSTHROUGH
RETRIEVER --> EMBED
EMBED --> STORE
STORE --> DOCS[检索到的文档<br/>list[Document]]
DOCS --> PROMPT
PASSTHROUGH --> QUESTION[原始问题]
QUESTION --> PROMPT
PROMPT --> MODEL
MODEL --> PARSER
PARSER --> ANSWER[生成的答案]
style PARALLEL fill:#e8f5e9,stroke:#4caf50,stroke-width:3px
style RETRIEVER fill:#fff3e0
style DOCS fill:#f3e5f5
RAG时序图
sequenceDiagram
autonumber
participant U as 用户
participant CHAIN as RAG Chain
participant PAR as RunnableParallel
participant RET as Retriever
participant EMB as Embeddings
participant VS as VectorStore
participant PASS as RunnablePassthrough
participant P as ChatPromptTemplate
participant M as ChatModel
participant API as LLM API
Note over U,API: 阶段1: RAG链构建(构建时)
U->>CHAIN: 构建RAG链
CHAIN->>PAR: {"context": retriever, "question": passthrough}
Note right of PAR: 创建RunnableParallel<br/>两个分支并行执行
PAR->>P: | prompt
P->>M: | model
M->>CHAIN: | parser
Note over U,API: 阶段2: 问题输入
U->>CHAIN: invoke("什么是LCEL?")
CHAIN->>PAR: invoke("什么是LCEL?")
Note over U,API: 阶段3: 并行执行检索和传递
par 分支1: 检索相关文档
PAR->>RET: invoke("什么是LCEL?")
RET->>EMB: embed_query("什么是LCEL?")
EMB-->>RET: [0.123, 0.456, ...]
Note right of EMB: 将问题转换为向量
RET->>VS: similarity_search(query_vector, k=4)
VS->>VS: 计算余弦相似度
VS-->>RET: [Document(page_content="LCEL是..."), ...]
Note right of VS: 返回Top-K相关文档
RET-->>PAR: context = [Document(...), ...]
and 分支2: 原样传递问题
PAR->>PASS: invoke("什么是LCEL?")
PASS-->>PAR: question = "什么是LCEL?"
Note right of PASS: 直接返回输入
end
Note over U,API: 阶段4: 合并结果
PAR->>PAR: 等待两个分支完成
PAR-->>P: {"context": [Document(...)], "question": "什么是LCEL?"}
Note over U,API: 阶段5: 格式化提示词
P->>P: format_messages(context, question)
Note right of P: 模板:<br/>"根据上下文: {context}<br/>回答问题: {question}"
P-->>M: [HumanMessage("根据上下文: LCEL是...\n回答问题: ...")]
Note over U,API: 阶段6: LLM生成答案
M->>API: POST /chat/completions
API-->>M: AIMessage("LCEL是LangChain表达式语言...")
M-->>CHAIN: "LCEL是LangChain表达式语言..."
CHAIN-->>U: "LCEL是LangChain表达式语言..."
RAG时序详解
阶段1: RAG链构建(构建时)
- 步骤1-5: 使用LCEL语法构建链
- 关键代码:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
# 构建RAG链
chain = (
RunnableParallel({
"context": retriever, # 分支1: 检索文档
"question": RunnablePassthrough() # 分支2: 传递原问题
})
| prompt # 格式化提示词
| model # LLM生成
| parser # 解析输出
)
- 架构特点:
RunnableParallel允许并行执行多个分支,提高效率
阶段2: 问题输入
- 步骤6-7: 用户输入问题字符串
阶段3: 并行执行检索和传递
-
分支1(步骤8-14): 检索相关文档
- 步骤8: 调用Retriever的
invoke方法 - 步骤9-10: 使用Embeddings模型将问题转换为向量(通常768或1536维)
- 步骤11-13: VectorStore执行相似度搜索,计算余弦相似度,返回Top-K文档
- 步骤14: 返回
list[Document],每个Document包含page_content和metadata - 关键代码:
# langchain_core/vectorstores/base.py def similarity_search(self, query: str, k: int = 4, **kwargs) -> list[Document]: # 1. 将query转换为向量 embedding = self.embedding_function.embed_query(query) # 2. 在向量数据库中搜索 docs_and_scores = self._similarity_search_with_score(embedding, k) # 3. 返回文档 return [doc for doc, _ in docs_and_scores] - 步骤8: 调用Retriever的
-
分支2(步骤15-17): 原样传递问题
RunnablePassthrough直接返回输入,无任何转换- 作用: 保留原始问题,供后续Prompt使用
阶段4: 合并结果
- 步骤18-19:
RunnableParallel等待所有分支完成,合并结果为字典 - 输出格式:
{"context": [Document(...)], "question": "什么是LCEL?"}
阶段5: 格式化提示词
- 步骤20-22:
ChatPromptTemplate填充变量 - 模板示例:
template = """根据以下上下文回答问题:
上下文:
{context}
问题: {question}
回答:"""
- context处理: 将
list[Document]转换为字符串,通常用\n\n分隔
阶段6: LLM生成答案
- 步骤23-25: 与普通LCEL链相同,调用LLM API生成答案
- 步骤26: 返回最终答案
RAG性能优化:
- 并行检索:
RunnableParallel使用线程池并行执行retriever和passthrough - 批量嵌入: 对多个query使用
embed_documents批量生成向量 - 缓存: 对频繁查询的问题缓存嵌入向量和检索结果
- 异步: 使用
ainvoke异步调用,提高吞吐量
检索策略:
- 相似度阈值: 过滤低相关度文档
- 文档重排: 使用Reranker模型对初步检索结果重新排序
- 混合检索: 结合关键词搜索(BM25)和向量搜索
调用链路三: Agent智能代理
Agent是最复杂的调用模式,支持LLM动态决策,循环调用工具直到完成任务。
Agent架构图
flowchart TB
subgraph UserInput["用户输入"]
TASK[用户任务<br/>"搜索并总结最新AI新闻"]
end
subgraph AgentSystem["Agent系统"]
EXECUTOR[AgentExecutor<br/>代理执行器]
AGENT[OpenAI Functions Agent<br/>决策器]
LLM[ChatModel<br/>with function calling]
end
subgraph ToolSystem["工具系统"]
TOOL1[SearchTool<br/>搜索工具]
TOOL2[SummarizeTool<br/>总结工具]
TOOL3[CalculatorTool<br/>计算器工具]
end
subgraph Memory["记忆系统"]
HISTORY[对话历史<br/>list[BaseMessage]]
SCRATCHPAD[中间步骤<br/>list[AgentAction]]
end
TASK --> EXECUTOR
EXECUTOR --> AGENT
AGENT --> LLM
LLM -->|tool_calls| DECISION{决策结果?}
DECISION -->|AgentAction| EXECUTOR
EXECUTOR -->|调用工具| TOOL1
EXECUTOR -->|调用工具| TOOL2
EXECUTOR -->|调用工具| TOOL3
TOOL1 --> RESULT1[工具结果]
RESULT1 --> HISTORY
HISTORY --> AGENT
DECISION -->|AgentFinish| OUTPUT[最终答案]
SCRATCHPAD -.记录.-> AGENT
style EXECUTOR fill:#e1f5ff,stroke:#0288d1,stroke-width:3px
style DECISION fill:#fff9c4
style LLM fill:#f3e5f5
Agent完整时序图
sequenceDiagram
autonumber
participant U as 用户
participant EXE as AgentExecutor
participant AGT as Agent
participant LLM as ChatModel(GPT-4)
participant API as OpenAI API
participant SEARCH as SearchTool
participant SUMM as SummarizeTool
participant MEM as Memory
Note over U,MEM: 阶段1: Agent初始化
U->>EXE: create_openai_functions_agent(llm, tools, prompt)
EXE->>EXE: 注册工具: [SearchTool, SummarizeTool]
EXE->>LLM: bind_tools(tools)
Note right of LLM: 将工具转换为<br/>OpenAI function calling格式
Note over U,MEM: 阶段2: 任务开始
U->>EXE: invoke({"input": "搜索并总结最新AI新闻"})
EXE->>MEM: 初始化对话历史 []
EXE->>AGT: plan(input, intermediate_steps=[])
Note over U,MEM: 阶段3: 第1轮 - LLM决策
AGT->>AGT: 构造prompt
Note right of AGT: System: 你是一个有用的助手...<br/>Human: 搜索并总结最新AI新闻<br/>可用工具: [search, summarize]
AGT->>LLM: invoke(messages)
LLM->>API: POST /chat/completions + tools=[...]
API-->>LLM: tool_calls=[{"name": "search", "arguments": {"query": "最新AI新闻"}}]
Note right of API: GPT-4决定调用search工具
LLM-->>AGT: AIMessage(tool_calls=[...])
AGT->>AGT: 解析为AgentAction
AGT-->>EXE: AgentAction(tool="search", tool_input={"query": "..."})
Note over U,MEM: 阶段4: 执行工具
EXE->>SEARCH: invoke({"query": "最新AI新闻"})
SEARCH->>SEARCH: 调用搜索API
SEARCH-->>EXE: "1. OpenAI发布GPT-5\n2. Google推出Gemini 2.0..."
EXE->>MEM: 记录中间步骤
Note right of MEM: intermediate_steps = [<br/> (AgentAction, "1. OpenAI...") <br/>]
Note over U,MEM: 阶段5: 第2轮 - LLM再次决策
EXE->>AGT: plan(input, intermediate_steps)
AGT->>AGT: 构造prompt(包含上一轮结果)
Note right of AGT: Human: 搜索并总结最新AI新闻<br/>Tool: search<br/>Result: 1. OpenAI发布GPT-5...
AGT->>LLM: invoke(messages)
LLM->>API: POST /chat/completions + tools=[...]
API-->>LLM: tool_calls=[{"name": "summarize", "arguments": {"text": "..."}}]
Note right of API: GPT-4决定调用summarize工具
LLM-->>AGT: AIMessage(tool_calls=[...])
AGT-->>EXE: AgentAction(tool="summarize", tool_input={"text": "..."})
Note over U,MEM: 阶段6: 执行总结工具
EXE->>SUMM: invoke({"text": "1. OpenAI..."})
SUMM->>SUMM: 调用LLM总结
SUMM-->>EXE: "本周AI领域主要新闻..."
EXE->>MEM: 更新中间步骤
Note over U,MEM: 阶段7: 第3轮 - LLM给出最终答案
EXE->>AGT: plan(input, intermediate_steps)
AGT->>LLM: invoke(messages)
LLM->>API: POST /chat/completions
API-->>LLM: content="根据搜索和总结,本周AI领域..."
Note right of API: 无tool_calls,直接返回文本
LLM-->>AGT: AIMessage(content="...")
AGT->>AGT: 解析为AgentFinish
AGT-->>EXE: AgentFinish(return_values={"output": "..."})
Note over U,MEM: 阶段8: 任务完成
EXE->>MEM: 保存完整对话历史
EXE-->>U: {"output": "根据搜索和总结,本周AI领域..."}
Agent时序详解
阶段1: Agent初始化
- 步骤1-3: 创建Agent,注册可用工具
- 步骤4: 绑定工具到LLM,转换为function calling格式
- 关键代码:
from langchain.agents import create_openai_functions_agent, AgentExecutor
# 定义工具
tools = [search_tool, summarize_tool]
# 创建Agent
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
阶段2: 任务开始
- 步骤5-7: 用户输入任务,初始化对话历史和中间步骤
阶段3: 第1轮 - LLM决策
- 步骤8-9: 构造包含工具描述的prompt
- 步骤10-12: 调用OpenAI API,使用function calling
- 步骤13-15: 解析响应,提取tool_calls,转换为
AgentAction - 关键数据结构:
# AgentAction定义
class AgentAction:
tool: str # 工具名称
tool_input: dict | str # 工具输入
log: str # LLM原始输出
- OpenAI Function Calling格式:
{
"tool_calls": [{
"id": "call_abc123",
"type": "function",
"function": {
"name": "search",
"arguments": "{\"query\": \"最新AI新闻\"}"
}
}]
}
阶段4: 执行工具
- 步骤16-18: AgentExecutor调用对应工具
- 步骤19-20: 记录
(AgentAction, observation)到intermediate_steps - 工具执行:
# langchain/agents/agent.py
def _call(self, inputs, run_manager):
intermediate_steps = []
while True:
# 1. LLM决策
action = agent.plan(inputs, intermediate_steps)
# 2. 如果是AgentFinish,结束循环
if isinstance(action, AgentFinish):
return action.return_values
# 3. 执行工具
tool = tools[action.tool]
observation = tool.invoke(action.tool_input)
# 4. 记录中间步骤
intermediate_steps.append((action, observation))
阶段5: 第2轮 - LLM再次决策
- 步骤21-26: 将上一轮的工具结果添加到对话历史,再次调用LLM
- Prompt格式:
System: 你是一个有用的助手...
Human: 搜索并总结最新AI新闻
AI: (调用search工具)
Tool: search
Result: 1. OpenAI发布GPT-5...
- 循环决策: Agent进入第二轮,根据搜索结果决定下一步动作
阶段6: 执行总结工具
- 步骤27-29: 执行summarize工具
- 步骤30: 更新
intermediate_steps,现在包含2轮
阶段7: 第3轮 - LLM给出最终答案
- 步骤31-35: LLM判断已收集足够信息,不再调用工具,直接返回文本答案
- 判断机制: 响应中无
tool_calls字段,解析为AgentFinish
阶段8: 任务完成
- 步骤36-37: 保存对话历史,返回最终结果
Agent执行循环:
# langchain/agents/agent_executor.py
def _call(self, inputs):
intermediate_steps = []
iterations = 0
max_iterations = 15
while iterations < max_iterations:
# 1. LLM决策
output = agent.plan(inputs, intermediate_steps)
# 2. 检查是否结束
if isinstance(output, AgentFinish):
return output.return_values
# 3. 执行工具
tool_name = output.tool
tool_input = output.tool_input
observation = tools[tool_name].invoke(tool_input)
# 4. 记录步骤
intermediate_steps.append((output, observation))
iterations += 1
# 达到最大迭代次数
raise ValueError("Agent达到最大迭代次数")
Agent关键机制:
- Function Calling: 将工具转换为JSON Schema,LLM决定调用哪个工具
- Scratchpad:
intermediate_steps记录所有历史动作和观察,提供给LLM作为上下文 - 循环执行: AgentExecutor持续调用LLM,直到返回
AgentFinish - 错误处理: 工具执行失败时,将错误信息返回给LLM,由LLM决定如何处理
- 最大迭代: 防止无限循环,通常设置为15次
Agent性能优化:
- 提前终止: 检测到特定关键词(如"Final Answer")时立即结束
- 工具并行: 支持同时调用多个工具(OpenAI parallel function calling)
- 流式输出: 每个步骤的结果实时流式返回
- 缓存工具结果: 对幂等工具(如搜索)缓存结果
Agent类型对比:
| Agent类型 | 决策机制 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| OpenAI Functions | Function Calling | 需要精确工具调用 | 结构化、可靠 | 仅支持OpenAI/Anthropic |
| ReAct | 思维链提示词 | 需要推理过程 | 通用、可解释 | Token消耗大 |
| Structured Chat | JSON Schema | 复杂工具输入 | 灵活、支持多模态 | 解析复杂 |
| Self-Ask | 子问题分解 | 复杂问答 | 递归、系统化 | 迭代次数多 |
流式输出机制详解
流式输出是LangChain的重要特性,可以显著提升用户体验。
流式架构图
flowchart TB
subgraph Client["客户端"]
UI[用户界面<br/>实时显示]
end
subgraph StreamingChain["流式链"]
SEQ[RunnableSequence]
PROMPT[ChatPromptTemplate]
MODEL[BaseChatModel]
PARSER[StrOutputParser]
end
subgraph LLMProvider["LLM提供商"]
API[API Server<br/>SSE流]
end
UI -->|invoke.stream()| SEQ
SEQ --> PROMPT
PROMPT --> MODEL
MODEL -->|逐chunk请求| API
API -.chunk 1.-> MODEL
API -.chunk 2.-> MODEL
API -.chunk 3.-> MODEL
MODEL -->|逐chunk传递| PARSER
PARSER -.chunk 1.-> SEQ
PARSER -.chunk 2.-> SEQ
PARSER -.chunk 3.-> SEQ
SEQ -.实时返回.-> UI
style MODEL fill:#e1f5ff
style API fill:#fff3e0
流式时序图
sequenceDiagram
autonumber
participant U as 用户
participant SEQ as RunnableSequence
participant M as ChatModel
participant API as LLM API(SSE)
participant PAR as StrOutputParser
Note over U,PAR: 流式调用开始
U->>SEQ: stream({"input": "..."})
SEQ->>M: stream(messages)
M->>API: POST /chat/completions (stream=True)
Note over U,PAR: 流式响应
loop 每个token
API-->>M: data: {"choices": [{"delta": {"content": "你"}}]}
M->>M: 构造AIMessageChunk("你")
M-->>PAR: yield AIMessageChunk("你")
PAR->>PAR: 提取content
PAR-->>SEQ: yield "你"
SEQ-->>U: yield "你"
Note right of U: 立即显示
API-->>M: data: {"choices": [{"delta": {"content": "好"}}]}
M-->>PAR: yield AIMessageChunk("好")
PAR-->>SEQ: yield "好"
SEQ-->>U: yield "好"
API-->>M: data: {"choices": [{"delta": {"content": "!"}}]}
M-->>PAR: yield AIMessageChunk("!")
PAR-->>SEQ: yield "!"
SEQ-->>U: yield "!"
end
API-->>M: data: [DONE]
M-->>PAR: StopIteration
PAR-->>SEQ: StopIteration
SEQ-->>U: StopIteration
流式实现原理
关键代码:
# langchain_core/runnables/base.py - RunnableSequence.stream()
def stream(self, input, config=None, **kwargs):
# 设置回调
config = ensure_config(config)
# 依次流式调用每个步骤
input_ = input
for i, step in enumerate(self.steps):
# 判断当前步骤是否支持流式
if hasattr(step, 'transform'):
# 支持流式: 逐chunk传递
input_ = step.transform(input_, config)
else:
# 不支持流式: 完整执行后再流式输出
input_ = step.invoke(input_, config)
input_ = iter([input_]) # 包装为迭代器
# 返回最终流
for chunk in input_:
yield chunk
流式传播规则:
- 全支持: 所有步骤都实现
transform,端到端流式 - 部分支持: 遇到不支持的步骤,在该步骤阻塞,后续继续流式
- 不支持: 只有最后一个步骤支持,实际变为invoke后流式输出
示例:
# 场景1: 全流式(推荐)
chain = prompt | model | parser
# prompt: 不支持流式,立即返回完整messages
# model: 支持流式,逐token返回
# parser: 支持流式,逐chunk提取content
# 结果: 阻塞在prompt,之后端到端流式
# 场景2: 部分流式
chain = retriever | prompt | model | parser
# retriever: 不支持流式,需要完整检索
# 结果: 阻塞在retriever和prompt,model和parser流式
流式优化:
- 首字节时间: 通过流式可降低至几十毫秒
- 长文本生成: 用户可以提前阅读,提升体验
- 早期终止: 用户可以随时停止生成
- 异步流式:
astream()提供异步迭代,提高并发
批量处理机制详解
批量处理可以显著提高吞吐量,适用于离线任务、数据处理等场景。
批量架构图
flowchart LR
subgraph Inputs["批量输入"]
IN1[Input 1]
IN2[Input 2]
IN3[Input 3]
IN4[Input 4]
end
subgraph BatchProcessor["批量处理器"]
SEQ[RunnableSequence.batch]
POOL[ThreadPoolExecutor<br/>max_workers=5]
end
subgraph Workers["并行Worker"]
W1[Worker 1<br/>Input 1]
W2[Worker 2<br/>Input 2]
W3[Worker 3<br/>Input 3]
W4[Worker 4<br/>Input 4]
end
subgraph Outputs["批量输出"]
OUT1[Output 1]
OUT2[Output 2]
OUT3[Output 3]
OUT4[Output 4]
end
IN1 --> SEQ
IN2 --> SEQ
IN3 --> SEQ
IN4 --> SEQ
SEQ --> POOL
POOL --> W1
POOL --> W2
POOL --> W3
POOL --> W4
W1 --> OUT1
W2 --> OUT2
W3 --> OUT3
W4 --> OUT4
style POOL fill:#e1f5ff
style SEQ fill:#fff3e0
批量时序图
sequenceDiagram
autonumber
participant U as 用户
participant SEQ as RunnableSequence
participant POOL as ThreadPool
participant W1 as Worker 1
participant W2 as Worker 2
participant API as LLM API
Note over U,API: 批量调用开始
U->>SEQ: batch([input1, input2, ...], max_concurrency=5)
SEQ->>SEQ: 为每个input创建config
SEQ->>POOL: 提交批量任务
Note over U,API: 并行执行
par Worker 1处理input1
POOL->>W1: invoke(input1, config1)
loop 每个步骤
W1->>W1: step.invoke(input1)
end
W1->>API: POST /chat/completions (input1)
API-->>W1: response1
W1-->>POOL: output1
and Worker 2处理input2
POOL->>W2: invoke(input2, config2)
loop 每个步骤
W2->>W2: step.invoke(input2)
end
W2->>API: POST /chat/completions (input2)
API-->>W2: response2
W2-->>POOL: output2
end
Note over U,API: 收集结果
POOL->>POOL: 等待所有worker完成
POOL-->>SEQ: [output1, output2, ...]
SEQ-->>U: [output1, output2, ...]
批量处理关键代码
# langchain_core/runnables/base.py - RunnableSequence.batch()
def batch(self, inputs, config=None, *, return_exceptions=False, **kwargs):
if not inputs:
return []
# 为每个input创建config
configs = get_config_list(config, len(inputs))
# 获取线程池
max_concurrency = configs[0].get("max_concurrency", None)
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
# 提交所有任务
futures = [
executor.submit(self.invoke, input_, config_)
for input_, config_ in zip(inputs, configs)
]
# 收集结果
results = []
for future in futures:
try:
results.append(future.result())
except Exception as e:
if return_exceptions:
results.append(e)
else:
raise
return results
批量优化策略:
-
max_concurrency: 限制并发数,避免过载
chain.batch(inputs, config={"max_concurrency": 5}) -
return_exceptions: 容错处理,部分失败不影响整体
results = chain.batch(inputs, return_exceptions=True) # results可能包含Exception对象 -
批量API调用: LLM提供商的批量接口(如OpenAI batch API)
# 使用原生批量API(更便宜、更快) results = model.batch(messages_list) -
异步批量:
abatch()使用asyncio,提高IO密集型任务效率results = await chain.abatch(inputs, config={"max_concurrency": 10})
批量性能对比:
| 场景 | 顺序调用(10个) | 批量调用(10个) | 加速比 |
|---|---|---|---|
| LLM推理 | 50秒 | 12秒 | 4.2x |
| 向量检索 | 8秒 | 2秒 | 4x |
| 工具调用 | 30秒 | 8秒 | 3.75x |
总结
langchain-core是LangChain生态的核心基础,提供了:
核心抽象:
- ✅ Runnable通用协议(LCEL)
- ✅ 语言模型抽象(LLM/ChatModel)
- ✅ 消息和提示词格式
- ✅ 工具和输出解析器
- ✅ 向量存储和检索器
- ✅ 回调和追踪系统
- ✅ Agent数据结构
核心调用链路:
- LCEL链式组合: 通过
|操作符串联组件,RunnableSequence顺序执行 - RAG检索增强: RunnableParallel并行检索和传递,提升效率
- Agent智能代理: 循环决策和工具调用,处理复杂任务
关键机制:
- 流式输出: 通过
stream()逐chunk返回,降低首字节时间 - 批量处理: 通过
batch()并行处理多个输入,提高吞吐量 - 回调系统: 通过CallbackManager和Tracer记录完整执行过程
- 错误处理: 每个环节都有异常捕获和上报机制
设计理念:
- 📦 最小化依赖
- 🔒 稳定的公共API
- 🔄 统一的调用协议
- 🧩 高度可组合
- ⚡ 高性能(批量/流式/异步)
- 🔍 完整的可观测性
使用建议:
- 优先使用LCEL语法,清晰可组合
- 利用流式输出提升用户体验
- 批量处理提高离线任务效率
- 添加回调监控性能和调试
- 使用Pydantic保证类型安全
- 选择合适的Agent类型处理复杂任务
性能优化:
- 使用
batch()并行处理多个输入 - 使用
astream()异步流式提高并发 - 配置
max_concurrency控制并发数 - 启用LLM缓存减少重复调用
- 对向量检索结果进行缓存