LangChain-01-Core模块

模块概览

职责与定位

langchain-core是LangChain框架的核心基础库,定义了构建LLM应用所需的基本抽象、接口和协议。该模块不包含第三方集成,仅提供稳定的、经过实战验证的核心抽象,作为整个LangChain生态系统的基石。

核心职责:

  • 定义Runnable通用调用协议(LCEL)
  • 提供语言模型抽象(LLM、ChatModel)
  • 定义消息格式和提示词模板
  • 提供工具、输出解析器、向量存储等核心抽象
  • 定义回调和追踪机制
  • 提供Agent数据结构
  • 支持同步/异步/批量/流式操作

子模块结构

langchain-core包含以下核心子模块:

  1. runnables: LCEL基础,通用调用协议
  2. language_models: 语言模型抽象
  3. messages: 消息格式定义
  4. prompts: 提示词模板
  5. output_parsers: 输出解析器
  6. tools: 工具抽象
  7. vectorstores: 向量存储抽象
  8. callbacks: 回调和追踪
  9. agents: Agent数据结构
  10. documents: 文档结构
  11. embeddings: 嵌入模型抽象
  12. retrievers: 检索器抽象

整体架构图

flowchart TB
    subgraph Foundation["基础层"]
        RUNNABLE[Runnable<br/>通用调用协议<br/>LCEL核心]
        SERIALIZABLE[Serializable<br/>序列化基类]
    end

    subgraph Models["模型层"]
        LLM[BaseLLM<br/>传统LLM]
        CHAT[BaseChatModel<br/>对话模型]
        EMBED[Embeddings<br/>嵌入模型]
    end

    subgraph DataStructures["数据结构层"]
        MESSAGE[Messages<br/>消息格式]
        DOCUMENT[Document<br/>文档结构]
        PROMPT[PromptTemplate<br/>提示词模板]
    end

    subgraph Components["组件层"]
        TOOL[BaseTool<br/>工具]
        PARSER[OutputParser<br/>输出解析器]
        VECTOR[VectorStore<br/>向量存储]
        RETRIEVER[Retriever<br/>检索器]
    end

    subgraph HighLevel["高层抽象"]
        AGENT[Agent<br/>智能代理]
        CHAIN[Chain<br/>组合链]
    end

    subgraph Infrastructure["基础设施"]
        CALLBACK[Callbacks<br/>回调系统]
        CACHE[Cache<br/>缓存]
        TRACER[Tracer<br/>追踪]
    end

    RUNNABLE --> LLM
    RUNNABLE --> CHAT
    RUNNABLE --> EMBED
    RUNNABLE --> TOOL
    RUNNABLE --> PARSER
    RUNNABLE --> RETRIEVER

    MESSAGE --> CHAT
    PROMPT --> LLM
    PROMPT --> CHAT

    DOCUMENT --> VECTOR
    EMBED --> VECTOR
    VECTOR --> RETRIEVER

    TOOL --> AGENT
    LLM --> AGENT
    CHAT --> AGENT

    RUNNABLE --> CHAIN

    CALLBACK --> LLM
    CALLBACK --> TOOL
    CALLBACK --> CHAIN

    style RUNNABLE fill:#e1f5ff
    style CHAT fill:#fff4e1
    style AGENT fill:#e8f5e9

架构说明

分层设计

基础层:

  • Runnable: 所有组件的通用接口,支持invoke、stream、batch
  • Serializable: 支持序列化和持久化

模型层:

  • BaseLLM: 传统文本生成模型
  • BaseChatModel: 对话式模型
  • Embeddings: 文本嵌入模型

数据结构层:

  • BaseMessage: 消息基类(AIMessage、HumanMessage等)
  • Document: 文档结构
  • PromptTemplate: 提示词模板

组件层:

  • BaseTool: 工具抽象,Agent调用的函数
  • OutputParser: 解析模型输出
  • VectorStore: 向量数据库抽象
  • Retriever: 文档检索器

高层抽象:

  • Agent: 智能代理,动态决策
  • Chain: 组件组合链

基础设施:

  • Callbacks: 可观测性和监控
  • Cache: 缓存机制
  • Tracer: LangSmith追踪

模块间依赖关系

graph LR
    RUNNABLES[Runnables<br/>核心协议]
    MESSAGES[Messages]
    PROMPTS[Prompts]
    MODELS[LanguageModels]
    TOOLS[Tools]
    PARSERS[OutputParsers]
    AGENTS[Agents]
    VECTORS[VectorStores]
    CALLBACKS[Callbacks]

    MESSAGES --> MODELS
    PROMPTS --> MODELS
    MODELS --> PARSERS
    TOOLS --> AGENTS
    MODELS --> AGENTS
    VECTORS --> MODELS

    CALLBACKS -.监控.-> MODELS
    CALLBACKS -.监控.-> TOOLS

    RUNNABLES --> MODELS
    RUNNABLES --> TOOLS
    RUNNABLES --> PARSERS

    style RUNNABLES fill:#e1f5ff

依赖说明

核心依赖路径:

  1. Runnables是最底层,所有组件继承Runnable
  2. Messages定义数据格式,被LanguageModels使用
  3. Prompts生成输入,传递给LanguageModels
  4. OutputParsers处理LanguageModels的输出
  5. Tools被Agents调用,由LanguageModels驱动
  6. Callbacks横向依赖,监控所有组件

无循环依赖: 设计保证了清晰的单向依赖,便于理解和维护。

核心设计模式

1. Runnable协议(最重要)

# 所有组件实现统一接口
class Runnable(ABC):
    def invoke(self, input: Input) -> Output:
        """同步调用"""
        pass

    def stream(self, input: Input) -> Iterator[Output]:
        """流式输出"""
        pass

    def batch(self, inputs: list[Input]) -> list[Output]:
        """批量处理"""
        pass

    async def ainvoke(self, input: Input) -> Output:
        """异步调用"""
        pass

# 组合性
chain = prompt | model | output_parser  # LCEL语法

优势:

  • 统一接口,降低学习成本
  • 支持链式组合
  • 自动支持同步/异步/批量/流式
  • 易于测试和调试

2. 消息抽象

# 统一的消息格式
class BaseMessage:
    content: str
    additional_kwargs: dict

# 不同角色
AIMessage(content="...")      # AI回复
HumanMessage(content="...")   # 用户输入
SystemMessage(content="...")  # 系统指令
ToolMessage(content="...")    # 工具结果

优势:

  • 统一的对话历史表示
  • 支持多模态(文本、图片、音频)
  • 支持工具调用
  • 易于序列化和存储

3. 提示词模板

# 参数化提示词
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个{role}"),
    ("human", "{input}")
])

# 运行时填充
messages = prompt.invoke({"role": "助手", "input": "你好"})

优势:

  • 分离提示词和逻辑
  • 支持模板复用
  • 支持部分变量
  • 支持聊天历史注入

4. 输出解析

# 结构化输出
class User(BaseModel):
    name: str
    age: int

parser = PydanticOutputParser(pydantic_object=User)
chain = prompt | model | parser

user = chain.invoke({"text": "..."})
# 自动解析为User对象

优势:

  • 类型安全
  • 自动验证
  • 易于集成
  • 支持错误修复

典型使用模式

模式1: LCEL链式组合

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 定义组件
prompt = ChatPromptTemplate.from_template("讲个关于{topic}的笑话")
model = ChatOpenAI()
parser = StrOutputParser()

# 组合为链
chain = prompt | model | parser

# 调用
result = chain.invoke({"topic": "程序员"})

优势: 清晰、可组合、易于调试

模式2: RAG模式

from langchain_core.runnables import RunnablePassthrough

# RAG链
retriever = vectorstore.as_retriever()

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

answer = chain.invoke("什么是LangChain?")

优势: 结合检索和生成,提高准确性

模式3: Agent模式

from langchain.agents import create_openai_functions_agent, AgentExecutor

# 创建Agent
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)

# 执行
result = agent_executor.invoke({"input": "帮我搜索并总结最新消息"})

优势: 动态决策,处理复杂任务

最佳实践总结

1. 使用LCEL构建应用

# 推荐: LCEL链式组合
chain = prompt | model | parser

# 不推荐: 手动调用
messages = prompt.invoke(input)
response = model.invoke(messages)
output = parser.invoke(response)

2. 利用流式输出

# 推荐: 流式显示
for chunk in chain.stream({"input": "长文本"}):
    print(chunk, end="", flush=True)

# 提升用户体验

3. 批量处理提高效率

# 推荐: 批量调用
inputs = [{"topic": f"主题{i}"} for i in range(10)]
results = chain.batch(inputs)

# 比循环调用快得多

4. 使用回调监控

# 推荐: 添加回调
from langchain_core.callbacks import StdOutCallbackHandler

chain = prompt | model | parser
result = chain.invoke(
    {"input": "..."},
    config={"callbacks": [StdOutCallbackHandler()]}
)

5. 类型安全

# 推荐: 使用Pydantic验证
from pydantic import BaseModel

class Output(BaseModel):
    answer: str
    confidence: float

parser = PydanticOutputParser(pydantic_object=Output)
chain = prompt | model | parser

# 自动类型检查和验证

版本兼容性

稳定性承诺

langchain-core遵循严格的版本管理:

  • 向后兼容: 公共API不会破坏性变更
  • 实验性功能: 使用@beta装饰器标注
  • 废弃流程: 提前多个版本警告

升级建议

# 检查过时API
import warnings
warnings.filterwarnings("default", category=DeprecationWarning)

# 使用新API
# 旧: LLMChain (已废弃)
# 新: LCEL
chain = prompt | model | parser

性能优化指南

1. 批量处理

# 将多个请求合并为批量
inputs = [{"query": q} for q in queries]
results = chain.batch(inputs, config={"max_concurrency": 5})

2. 缓存

from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache

# 启用缓存
set_llm_cache(InMemoryCache())

# 相同输入会命中缓存

3. 流式输出

# 降低首字节时间
async for chunk in chain.astream({"input": "..."}):
    yield chunk

核心调用链路详解

调用链路一: LCEL链式组合 (Prompt → Model → Parser)

这是LangChain最常见的使用模式,通过管道操作符|将多个Runnable组件串联。

链路架构图

flowchart LR
    subgraph UserCode["用户代码"]
        INPUT[用户输入<br/>dict]
    end

    subgraph LCELChain["LCEL链: prompt | model | parser"]
        SEQUENCE[RunnableSequence<br/>编排器]
        PROMPT[ChatPromptTemplate<br/>提示词模板]
        MODEL[BaseChatModel<br/>语言模型]
        PARSER[StrOutputParser<br/>输出解析器]
    end

    subgraph Infrastructure["基础设施"]
        CALLBACK[CallbackManager<br/>回调管理器]
        TRACER[LangSmithTracer<br/>追踪器]
    end

    INPUT --> SEQUENCE
    SEQUENCE --> PROMPT
    PROMPT --> MODEL
    MODEL --> PARSER
    PARSER --> OUTPUT[输出结果<br/>str]

    CALLBACK -.监控.-> PROMPT
    CALLBACK -.监控.-> MODEL
    CALLBACK -.监控.-> PARSER

    TRACER -.记录.-> SEQUENCE

    style SEQUENCE fill:#e1f5ff,stroke:#0288d1,stroke-width:3px
    style CALLBACK fill:#fff9c4
    style TRACER fill:#fff9c4

架构说明

层次结构:

  1. 用户代码层: 提供输入数据(通常是字典)
  2. LCEL链层: RunnableSequence负责编排,依次调用各组件
  3. 基础设施层: CallbackManager和Tracer提供可观测性

组件职责:

  • RunnableSequence: 管道编排器,负责按顺序调用步骤,传递中间结果
  • ChatPromptTemplate: 模板填充,将输入字典转换为消息列表
  • BaseChatModel: 调用LLM API,生成AI响应
  • StrOutputParser: 提取消息内容,转换为字符串

数据流转:

  1. {"topic": "AI"} → Prompt → [HumanMessage("讲个关于AI的笑话")]
  2. [HumanMessage(...)] → Model → AIMessage("为什么AI...?")
  3. AIMessage(...) → Parser → "为什么AI...?"

完整时序图

sequenceDiagram
    autonumber
    participant U as 用户
    participant SEQ as RunnableSequence
    participant CBM as CallbackManager
    participant P as ChatPromptTemplate
    participant M as BaseChatModel
    participant PAR as StrOutputParser
    participant API as LLM API
    participant TR as LangSmithTracer

    Note over U,TR: 阶段1: 初始化与链式组合(构建时)
    U->>SEQ: prompt | model | parser
    Note right of SEQ: 通过__or__操作符<br/>构建RunnableSequence
    SEQ->>SEQ: steps = [prompt, model, parser]

    Note over U,TR: 阶段2: 链式调用开始
    U->>SEQ: invoke({"topic": "AI"}, config)
    SEQ->>CBM: ensure_config(config)
    CBM-->>SEQ: RunnableConfig
    SEQ->>CBM: on_chain_start(input, name)
    CBM->>TR: 记录chain_start事件
    TR-->>CBM: run_id
    CBM-->>SEQ: RunManager(run_id)

    Note over U,TR: 阶段3: Step 1 - Prompt执行
    SEQ->>P: invoke({"topic": "AI"}, config)
    P->>CBM: on_chain_start("ChatPromptTemplate")
    CBM->>TR: 记录prompt_start
    P->>P: format_messages({"topic": "AI"})
    Note right of P: 填充模板:<br/>[HumanMessage("讲个关于AI的笑话")]
    P->>CBM: on_chain_end(messages)
    CBM->>TR: 记录prompt_end
    P-->>SEQ: [HumanMessage(...)]

    Note over U,TR: 阶段4: Step 2 - Model执行
    SEQ->>M: invoke([HumanMessage(...)], config)
    M->>CBM: on_llm_start(messages)
    CBM->>TR: 记录llm_start
    M->>M: _generate_from_messages(messages)
    M->>API: POST /chat/completions
    Note right of API: 实际调用OpenAI/Claude等
    API-->>M: {"choices": [...], "usage": {...}}
    M->>M: 构造AIMessage
    M->>CBM: on_llm_end(AIMessage, usage_metadata)
    CBM->>TR: 记录llm_end + token使用量
    M-->>SEQ: AIMessage("为什么AI...?")

    Note over U,TR: 阶段5: Step 3 - Parser执行
    SEQ->>PAR: invoke(AIMessage(...), config)
    PAR->>CBM: on_chain_start("StrOutputParser")
    CBM->>TR: 记录parser_start
    PAR->>PAR: parse_result([ChatGeneration(message)])
    Note right of PAR: 提取message.content
    PAR->>CBM: on_chain_end(str_content)
    CBM->>TR: 记录parser_end
    PAR-->>SEQ: "为什么AI...?"

    Note over U,TR: 阶段6: 链式调用结束
    SEQ->>CBM: on_chain_end(output)
    CBM->>TR: 记录chain_end
    TR->>TR: 构建完整Run树
    SEQ-->>U: "为什么AI...?"

时序图详细说明

阶段1: 初始化与链式组合(构建时)

  • 步骤1: 用户使用|操作符连接组件
  • 内部机制: Python调用__or__魔术方法,创建RunnableSequence实例
  • 关键点: 此时只构建结构,不执行逻辑;steps列表记录了[prompt, model, parser]

阶段2: 链式调用开始

  • 步骤2: 用户调用chain.invoke(input, config)
  • 步骤3-5: 配置初始化,包括回调管理器、追踪器
  • 步骤6-8: 触发on_chain_start,LangSmith tracer记录运行开始,分配唯一run_id
  • 关键代码:
# langchain_core/runnables/base.py:3075-3087
def invoke(self, input, config=None, **kwargs):
    config = ensure_config(config)  # 步骤3
    callback_manager = get_callback_manager_for_config(config)  # 步骤4
    run_manager = callback_manager.on_chain_start(  # 步骤6
        None, input, name=self.get_name(), run_id=config.pop("run_id", None)
    )

阶段3: Step 1 - Prompt执行

  • 步骤9: Sequence调用第一个步骤:prompt.invoke(input, config)
  • 步骤10-11: Prompt触发回调,记录子运行
  • 步骤12-13: 执行模板填充format_messages,将{"topic": "AI"}转换为消息列表
  • 步骤14-16: 记录完成,返回[HumanMessage("讲个关于AI的笑话")]
  • 数据转换: dictlist[BaseMessage]
  • 关键代码:
# langchain_core/prompts/chat.py
def format_messages(self, **kwargs) -> list[BaseMessage]:
    # 遍历消息模板,填充变量
    for message_template in self.messages:
        messages.extend(message_template.format_messages(**kwargs))
    return messages

阶段4: Step 2 - Model执行

  • 步骤17: Sequence将Step 1的输出传给Model
  • 步骤18-19: Model触发on_llm_start,记录输入消息
  • 步骤20-22: 调用_generate_from_messages,实际向LLM API发送HTTP请求
  • 步骤23-26: 解析响应,构造AIMessage,记录token使用量
  • 数据转换: list[BaseMessage]AIMessage
  • 关键代码:
# langchain_core/language_models/chat_models.py
def invoke(self, input, config=None, **kwargs):
    messages = convert_to_messages(input)  # 标准化输入
    run_manager = callback_manager.on_llm_start(serialized, messages)
    result = self._generate_from_messages(messages, run_manager=run_manager)
    run_manager.on_llm_end(result)
    return result.generations[0].message

阶段5: Step 3 - Parser执行

  • 步骤27: Sequence将Model输出传给Parser
  • 步骤28-29: Parser触发回调
  • 步骤30-31: 执行parse_result,提取message.content
  • 步骤32-34: 记录完成,返回字符串
  • 数据转换: AIMessagestr
  • 关键代码:
# langchain_core/output_parsers/string.py
class StrOutputParser(BaseOutputParser[str]):
    def parse_result(self, result, *, partial=False):
        # 从ChatGeneration中提取message.content
        return result[0].message.content

阶段6: 链式调用结束

  • 步骤35-37: Sequence调用on_chain_end,tracer记录整个链的完成
  • 步骤38: 构建完整的Run树,包含所有子运行的时间、输入输出、token使用量
  • 步骤39: 返回最终结果给用户

可观测性机制:

  • Run树结构: 每个组件都是一个子Run,形成层次结构
  • 记录内容: 输入、输出、执行时间、错误信息、token使用量
  • 追踪器类型:
    • LangSmithTracer: 发送到LangSmith平台
    • ConsoleCallbackHandler: 打印到控制台
    • StdOutCallbackHandler: 简化版控制台输出

错误处理机制:

# langchain_core/runnables/base.py:3091-3108
try:
    for i, step in enumerate(self.steps):
        config = patch_config(config, callbacks=run_manager.get_child(f"seq:step:{i+1}"))
        input_ = step.invoke(input_, config)
except BaseException as e:
    run_manager.on_chain_error(e)  # 记录错误
    raise  # 向上抛出
else:
    run_manager.on_chain_end(input_)  # 记录成功
    return input_

性能优化点:

  • 批量处理: batch()方法使用线程池并行处理多个输入
  • 流式输出: stream()方法逐chunk返回,降低首字节时间
  • 异步执行: ainvoke()使用asyncio,提高IO密集型任务效率

调用链路二: RAG检索增强生成

RAG模式结合了向量检索和语言模型生成,是LangChain的典型应用场景。

链路架构图

flowchart TB
    subgraph UserInput["用户输入"]
        QUERY[用户问题<br/>"什么是LCEL?"]
    end

    subgraph RAGChain["RAG链"]
        PARALLEL[RunnableParallel<br/>并行执行]
        RETRIEVER[VectorStoreRetriever<br/>检索器]
        PASSTHROUGH[RunnablePassthrough<br/>原样传递]
        PROMPT[ChatPromptTemplate<br/>包含context变量]
        MODEL[ChatModel<br/>LLM]
        PARSER[StrOutputParser]
    end

    subgraph VectorDB["向量数据库"]
        EMBED[Embeddings<br/>嵌入模型]
        STORE[VectorStore<br/>存储层]
    end

    QUERY --> PARALLEL
    PARALLEL --> RETRIEVER
    PARALLEL --> PASSTHROUGH
    RETRIEVER --> EMBED
    EMBED --> STORE
    STORE --> DOCS[检索到的文档<br/>list[Document]]
    DOCS --> PROMPT
    PASSTHROUGH --> QUESTION[原始问题]
    QUESTION --> PROMPT
    PROMPT --> MODEL
    MODEL --> PARSER
    PARSER --> ANSWER[生成的答案]

    style PARALLEL fill:#e8f5e9,stroke:#4caf50,stroke-width:3px
    style RETRIEVER fill:#fff3e0
    style DOCS fill:#f3e5f5

RAG时序图

sequenceDiagram
    autonumber
    participant U as 用户
    participant CHAIN as RAG Chain
    participant PAR as RunnableParallel
    participant RET as Retriever
    participant EMB as Embeddings
    participant VS as VectorStore
    participant PASS as RunnablePassthrough
    participant P as ChatPromptTemplate
    participant M as ChatModel
    participant API as LLM API

    Note over U,API: 阶段1: RAG链构建(构建时)
    U->>CHAIN: 构建RAG链
    CHAIN->>PAR: {"context": retriever, "question": passthrough}
    Note right of PAR: 创建RunnableParallel<br/>两个分支并行执行
    PAR->>P: | prompt
    P->>M: | model
    M->>CHAIN: | parser

    Note over U,API: 阶段2: 问题输入
    U->>CHAIN: invoke("什么是LCEL?")
    CHAIN->>PAR: invoke("什么是LCEL?")

    Note over U,API: 阶段3: 并行执行检索和传递
    par 分支1: 检索相关文档
        PAR->>RET: invoke("什么是LCEL?")
        RET->>EMB: embed_query("什么是LCEL?")
        EMB-->>RET: [0.123, 0.456, ...]
        Note right of EMB: 将问题转换为向量
        RET->>VS: similarity_search(query_vector, k=4)
        VS->>VS: 计算余弦相似度
        VS-->>RET: [Document(page_content="LCEL是..."), ...]
        Note right of VS: 返回Top-K相关文档
        RET-->>PAR: context = [Document(...), ...]
    and 分支2: 原样传递问题
        PAR->>PASS: invoke("什么是LCEL?")
        PASS-->>PAR: question = "什么是LCEL?"
        Note right of PASS: 直接返回输入
    end

    Note over U,API: 阶段4: 合并结果
    PAR->>PAR: 等待两个分支完成
    PAR-->>P: {"context": [Document(...)], "question": "什么是LCEL?"}

    Note over U,API: 阶段5: 格式化提示词
    P->>P: format_messages(context, question)
    Note right of P: 模板:<br/>"根据上下文: {context}<br/>回答问题: {question}"
    P-->>M: [HumanMessage("根据上下文: LCEL是...\n回答问题: ...")]

    Note over U,API: 阶段6: LLM生成答案
    M->>API: POST /chat/completions
    API-->>M: AIMessage("LCEL是LangChain表达式语言...")
    M-->>CHAIN: "LCEL是LangChain表达式语言..."

    CHAIN-->>U: "LCEL是LangChain表达式语言..."

RAG时序详解

阶段1: RAG链构建(构建时)

  • 步骤1-5: 使用LCEL语法构建链
  • 关键代码:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 构建RAG链
chain = (
    RunnableParallel({
        "context": retriever,      # 分支1: 检索文档
        "question": RunnablePassthrough()  # 分支2: 传递原问题
    })
    | prompt    # 格式化提示词
    | model     # LLM生成
    | parser    # 解析输出
)
  • 架构特点: RunnableParallel允许并行执行多个分支,提高效率

阶段2: 问题输入

  • 步骤6-7: 用户输入问题字符串

阶段3: 并行执行检索和传递

  • 分支1(步骤8-14): 检索相关文档

    • 步骤8: 调用Retriever的invoke方法
    • 步骤9-10: 使用Embeddings模型将问题转换为向量(通常768或1536维)
    • 步骤11-13: VectorStore执行相似度搜索,计算余弦相似度,返回Top-K文档
    • 步骤14: 返回list[Document],每个Document包含page_contentmetadata
    • 关键代码:
    # langchain_core/vectorstores/base.py
    def similarity_search(self, query: str, k: int = 4, **kwargs) -> list[Document]:
        # 1. 将query转换为向量
        embedding = self.embedding_function.embed_query(query)
        # 2. 在向量数据库中搜索
        docs_and_scores = self._similarity_search_with_score(embedding, k)
        # 3. 返回文档
        return [doc for doc, _ in docs_and_scores]
    
  • 分支2(步骤15-17): 原样传递问题

    • RunnablePassthrough直接返回输入,无任何转换
    • 作用: 保留原始问题,供后续Prompt使用

阶段4: 合并结果

  • 步骤18-19: RunnableParallel等待所有分支完成,合并结果为字典
  • 输出格式: {"context": [Document(...)], "question": "什么是LCEL?"}

阶段5: 格式化提示词

  • 步骤20-22: ChatPromptTemplate填充变量
  • 模板示例:
template = """根据以下上下文回答问题:

上下文:
{context}

问题: {question}

回答:"""
  • context处理: 将list[Document]转换为字符串,通常用\n\n分隔

阶段6: LLM生成答案

  • 步骤23-25: 与普通LCEL链相同,调用LLM API生成答案
  • 步骤26: 返回最终答案

RAG性能优化:

  • 并行检索: RunnableParallel使用线程池并行执行retriever和passthrough
  • 批量嵌入: 对多个query使用embed_documents批量生成向量
  • 缓存: 对频繁查询的问题缓存嵌入向量和检索结果
  • 异步: 使用ainvoke异步调用,提高吞吐量

检索策略:

  • 相似度阈值: 过滤低相关度文档
  • 文档重排: 使用Reranker模型对初步检索结果重新排序
  • 混合检索: 结合关键词搜索(BM25)和向量搜索

调用链路三: Agent智能代理

Agent是最复杂的调用模式,支持LLM动态决策,循环调用工具直到完成任务。

Agent架构图

flowchart TB
    subgraph UserInput["用户输入"]
        TASK[用户任务<br/>"搜索并总结最新AI新闻"]
    end

    subgraph AgentSystem["Agent系统"]
        EXECUTOR[AgentExecutor<br/>代理执行器]
        AGENT[OpenAI Functions Agent<br/>决策器]
        LLM[ChatModel<br/>with function calling]
    end

    subgraph ToolSystem["工具系统"]
        TOOL1[SearchTool<br/>搜索工具]
        TOOL2[SummarizeTool<br/>总结工具]
        TOOL3[CalculatorTool<br/>计算器工具]
    end

    subgraph Memory["记忆系统"]
        HISTORY[对话历史<br/>list[BaseMessage]]
        SCRATCHPAD[中间步骤<br/>list[AgentAction]]
    end

    TASK --> EXECUTOR
    EXECUTOR --> AGENT
    AGENT --> LLM

    LLM -->|tool_calls| DECISION{决策结果?}
    DECISION -->|AgentAction| EXECUTOR
    EXECUTOR -->|调用工具| TOOL1
    EXECUTOR -->|调用工具| TOOL2
    EXECUTOR -->|调用工具| TOOL3
    TOOL1 --> RESULT1[工具结果]
    RESULT1 --> HISTORY
    HISTORY --> AGENT

    DECISION -->|AgentFinish| OUTPUT[最终答案]

    SCRATCHPAD -.记录.-> AGENT

    style EXECUTOR fill:#e1f5ff,stroke:#0288d1,stroke-width:3px
    style DECISION fill:#fff9c4
    style LLM fill:#f3e5f5

Agent完整时序图

sequenceDiagram
    autonumber
    participant U as 用户
    participant EXE as AgentExecutor
    participant AGT as Agent
    participant LLM as ChatModel(GPT-4)
    participant API as OpenAI API
    participant SEARCH as SearchTool
    participant SUMM as SummarizeTool
    participant MEM as Memory

    Note over U,MEM: 阶段1: Agent初始化
    U->>EXE: create_openai_functions_agent(llm, tools, prompt)
    EXE->>EXE: 注册工具: [SearchTool, SummarizeTool]
    EXE->>LLM: bind_tools(tools)
    Note right of LLM: 将工具转换为<br/>OpenAI function calling格式

    Note over U,MEM: 阶段2: 任务开始
    U->>EXE: invoke({"input": "搜索并总结最新AI新闻"})
    EXE->>MEM: 初始化对话历史 []
    EXE->>AGT: plan(input, intermediate_steps=[])

    Note over U,MEM: 阶段3: 1 - LLM决策
    AGT->>AGT: 构造prompt
    Note right of AGT: System: 你是一个有用的助手...<br/>Human: 搜索并总结最新AI新闻<br/>可用工具: [search, summarize]
    AGT->>LLM: invoke(messages)
    LLM->>API: POST /chat/completions + tools=[...]
    API-->>LLM: tool_calls=[{"name": "search", "arguments": {"query": "最新AI新闻"}}]
    Note right of API: GPT-4决定调用search工具
    LLM-->>AGT: AIMessage(tool_calls=[...])
    AGT->>AGT: 解析为AgentAction
    AGT-->>EXE: AgentAction(tool="search", tool_input={"query": "..."})

    Note over U,MEM: 阶段4: 执行工具
    EXE->>SEARCH: invoke({"query": "最新AI新闻"})
    SEARCH->>SEARCH: 调用搜索API
    SEARCH-->>EXE: "1. OpenAI发布GPT-5\n2. Google推出Gemini 2.0..."
    EXE->>MEM: 记录中间步骤
    Note right of MEM: intermediate_steps = [<br/>  (AgentAction, "1. OpenAI...")  <br/>]

    Note over U,MEM: 阶段5: 2 - LLM再次决策
    EXE->>AGT: plan(input, intermediate_steps)
    AGT->>AGT: 构造prompt(包含上一轮结果)
    Note right of AGT: Human: 搜索并总结最新AI新闻<br/>Tool: search<br/>Result: 1. OpenAI发布GPT-5...
    AGT->>LLM: invoke(messages)
    LLM->>API: POST /chat/completions + tools=[...]
    API-->>LLM: tool_calls=[{"name": "summarize", "arguments": {"text": "..."}}]
    Note right of API: GPT-4决定调用summarize工具
    LLM-->>AGT: AIMessage(tool_calls=[...])
    AGT-->>EXE: AgentAction(tool="summarize", tool_input={"text": "..."})

    Note over U,MEM: 阶段6: 执行总结工具
    EXE->>SUMM: invoke({"text": "1. OpenAI..."})
    SUMM->>SUMM: 调用LLM总结
    SUMM-->>EXE: "本周AI领域主要新闻..."
    EXE->>MEM: 更新中间步骤

    Note over U,MEM: 阶段7: 3 - LLM给出最终答案
    EXE->>AGT: plan(input, intermediate_steps)
    AGT->>LLM: invoke(messages)
    LLM->>API: POST /chat/completions
    API-->>LLM: content="根据搜索和总结,本周AI领域..."
    Note right of API: tool_calls,直接返回文本
    LLM-->>AGT: AIMessage(content="...")
    AGT->>AGT: 解析为AgentFinish
    AGT-->>EXE: AgentFinish(return_values={"output": "..."})

    Note over U,MEM: 阶段8: 任务完成
    EXE->>MEM: 保存完整对话历史
    EXE-->>U: {"output": "根据搜索和总结,本周AI领域..."}

Agent时序详解

阶段1: Agent初始化

  • 步骤1-3: 创建Agent,注册可用工具
  • 步骤4: 绑定工具到LLM,转换为function calling格式
  • 关键代码:
from langchain.agents import create_openai_functions_agent, AgentExecutor

# 定义工具
tools = [search_tool, summarize_tool]

# 创建Agent
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

阶段2: 任务开始

  • 步骤5-7: 用户输入任务,初始化对话历史和中间步骤

阶段3: 第1轮 - LLM决策

  • 步骤8-9: 构造包含工具描述的prompt
  • 步骤10-12: 调用OpenAI API,使用function calling
  • 步骤13-15: 解析响应,提取tool_calls,转换为AgentAction
  • 关键数据结构:
# AgentAction定义
class AgentAction:
    tool: str  # 工具名称
    tool_input: dict | str  # 工具输入
    log: str  # LLM原始输出
  • OpenAI Function Calling格式:
{
  "tool_calls": [{
    "id": "call_abc123",
    "type": "function",
    "function": {
      "name": "search",
      "arguments": "{\"query\": \"最新AI新闻\"}"
    }
  }]
}

阶段4: 执行工具

  • 步骤16-18: AgentExecutor调用对应工具
  • 步骤19-20: 记录(AgentAction, observation)intermediate_steps
  • 工具执行:
# langchain/agents/agent.py
def _call(self, inputs, run_manager):
    intermediate_steps = []
    while True:
        # 1. LLM决策
        action = agent.plan(inputs, intermediate_steps)

        # 2. 如果是AgentFinish,结束循环
        if isinstance(action, AgentFinish):
            return action.return_values

        # 3. 执行工具
        tool = tools[action.tool]
        observation = tool.invoke(action.tool_input)

        # 4. 记录中间步骤
        intermediate_steps.append((action, observation))

阶段5: 第2轮 - LLM再次决策

  • 步骤21-26: 将上一轮的工具结果添加到对话历史,再次调用LLM
  • Prompt格式:
System: 你是一个有用的助手...
Human: 搜索并总结最新AI新闻
AI: (调用search工具)
Tool: search
Result: 1. OpenAI发布GPT-5...
  • 循环决策: Agent进入第二轮,根据搜索结果决定下一步动作

阶段6: 执行总结工具

  • 步骤27-29: 执行summarize工具
  • 步骤30: 更新intermediate_steps,现在包含2轮

阶段7: 第3轮 - LLM给出最终答案

  • 步骤31-35: LLM判断已收集足够信息,不再调用工具,直接返回文本答案
  • 判断机制: 响应中无tool_calls字段,解析为AgentFinish

阶段8: 任务完成

  • 步骤36-37: 保存对话历史,返回最终结果

Agent执行循环:

# langchain/agents/agent_executor.py
def _call(self, inputs):
    intermediate_steps = []
    iterations = 0
    max_iterations = 15

    while iterations < max_iterations:
        # 1. LLM决策
        output = agent.plan(inputs, intermediate_steps)

        # 2. 检查是否结束
        if isinstance(output, AgentFinish):
            return output.return_values

        # 3. 执行工具
        tool_name = output.tool
        tool_input = output.tool_input
        observation = tools[tool_name].invoke(tool_input)

        # 4. 记录步骤
        intermediate_steps.append((output, observation))
        iterations += 1

    # 达到最大迭代次数
    raise ValueError("Agent达到最大迭代次数")

Agent关键机制:

  1. Function Calling: 将工具转换为JSON Schema,LLM决定调用哪个工具
  2. Scratchpad: intermediate_steps记录所有历史动作和观察,提供给LLM作为上下文
  3. 循环执行: AgentExecutor持续调用LLM,直到返回AgentFinish
  4. 错误处理: 工具执行失败时,将错误信息返回给LLM,由LLM决定如何处理
  5. 最大迭代: 防止无限循环,通常设置为15次

Agent性能优化:

  • 提前终止: 检测到特定关键词(如"Final Answer")时立即结束
  • 工具并行: 支持同时调用多个工具(OpenAI parallel function calling)
  • 流式输出: 每个步骤的结果实时流式返回
  • 缓存工具结果: 对幂等工具(如搜索)缓存结果

Agent类型对比:

Agent类型 决策机制 适用场景 优点 缺点
OpenAI Functions Function Calling 需要精确工具调用 结构化、可靠 仅支持OpenAI/Anthropic
ReAct 思维链提示词 需要推理过程 通用、可解释 Token消耗大
Structured Chat JSON Schema 复杂工具输入 灵活、支持多模态 解析复杂
Self-Ask 子问题分解 复杂问答 递归、系统化 迭代次数多

流式输出机制详解

流式输出是LangChain的重要特性,可以显著提升用户体验。

流式架构图

flowchart TB
    subgraph Client["客户端"]
        UI[用户界面<br/>实时显示]
    end

    subgraph StreamingChain["流式链"]
        SEQ[RunnableSequence]
        PROMPT[ChatPromptTemplate]
        MODEL[BaseChatModel]
        PARSER[StrOutputParser]
    end

    subgraph LLMProvider["LLM提供商"]
        API[API Server<br/>SSE流]
    end

    UI -->|invoke.stream()| SEQ
    SEQ --> PROMPT
    PROMPT --> MODEL
    MODEL -->|逐chunk请求| API
    API -.chunk 1.-> MODEL
    API -.chunk 2.-> MODEL
    API -.chunk 3.-> MODEL
    MODEL -->|逐chunk传递| PARSER
    PARSER -.chunk 1.-> SEQ
    PARSER -.chunk 2.-> SEQ
    PARSER -.chunk 3.-> SEQ
    SEQ -.实时返回.-> UI

    style MODEL fill:#e1f5ff
    style API fill:#fff3e0

流式时序图

sequenceDiagram
    autonumber
    participant U as 用户
    participant SEQ as RunnableSequence
    participant M as ChatModel
    participant API as LLM API(SSE)
    participant PAR as StrOutputParser

    Note over U,PAR: 流式调用开始
    U->>SEQ: stream({"input": "..."})
    SEQ->>M: stream(messages)
    M->>API: POST /chat/completions (stream=True)

    Note over U,PAR: 流式响应
    loop 每个token
        API-->>M: data: {"choices": [{"delta": {"content": "你"}}]}
        M->>M: 构造AIMessageChunk("你")
        M-->>PAR: yield AIMessageChunk("你")
        PAR->>PAR: 提取content
        PAR-->>SEQ: yield "你"
        SEQ-->>U: yield "你"
        Note right of U: 立即显示

        API-->>M: data: {"choices": [{"delta": {"content": "好"}}]}
        M-->>PAR: yield AIMessageChunk("好")
        PAR-->>SEQ: yield "好"
        SEQ-->>U: yield "好"

        API-->>M: data: {"choices": [{"delta": {"content": "!"}}]}
        M-->>PAR: yield AIMessageChunk("!")
        PAR-->>SEQ: yield "!"
        SEQ-->>U: yield "!"
    end

    API-->>M: data: [DONE]
    M-->>PAR: StopIteration
    PAR-->>SEQ: StopIteration
    SEQ-->>U: StopIteration

流式实现原理

关键代码:

# langchain_core/runnables/base.py - RunnableSequence.stream()
def stream(self, input, config=None, **kwargs):
    # 设置回调
    config = ensure_config(config)

    # 依次流式调用每个步骤
    input_ = input
    for i, step in enumerate(self.steps):
        # 判断当前步骤是否支持流式
        if hasattr(step, 'transform'):
            # 支持流式: 逐chunk传递
            input_ = step.transform(input_, config)
        else:
            # 不支持流式: 完整执行后再流式输出
            input_ = step.invoke(input_, config)
            input_ = iter([input_])  # 包装为迭代器

    # 返回最终流
    for chunk in input_:
        yield chunk

流式传播规则:

  1. 全支持: 所有步骤都实现transform,端到端流式
  2. 部分支持: 遇到不支持的步骤,在该步骤阻塞,后续继续流式
  3. 不支持: 只有最后一个步骤支持,实际变为invoke后流式输出

示例:

# 场景1: 全流式(推荐)
chain = prompt | model | parser
# prompt: 不支持流式,立即返回完整messages
# model: 支持流式,逐token返回
# parser: 支持流式,逐chunk提取content
# 结果: 阻塞在prompt,之后端到端流式

# 场景2: 部分流式
chain = retriever | prompt | model | parser
# retriever: 不支持流式,需要完整检索
# 结果: 阻塞在retriever和prompt,model和parser流式

流式优化:

  • 首字节时间: 通过流式可降低至几十毫秒
  • 长文本生成: 用户可以提前阅读,提升体验
  • 早期终止: 用户可以随时停止生成
  • 异步流式: astream()提供异步迭代,提高并发

批量处理机制详解

批量处理可以显著提高吞吐量,适用于离线任务、数据处理等场景。

批量架构图

flowchart LR
    subgraph Inputs["批量输入"]
        IN1[Input 1]
        IN2[Input 2]
        IN3[Input 3]
        IN4[Input 4]
    end

    subgraph BatchProcessor["批量处理器"]
        SEQ[RunnableSequence.batch]
        POOL[ThreadPoolExecutor<br/>max_workers=5]
    end

    subgraph Workers["并行Worker"]
        W1[Worker 1<br/>Input 1]
        W2[Worker 2<br/>Input 2]
        W3[Worker 3<br/>Input 3]
        W4[Worker 4<br/>Input 4]
    end

    subgraph Outputs["批量输出"]
        OUT1[Output 1]
        OUT2[Output 2]
        OUT3[Output 3]
        OUT4[Output 4]
    end

    IN1 --> SEQ
    IN2 --> SEQ
    IN3 --> SEQ
    IN4 --> SEQ

    SEQ --> POOL
    POOL --> W1
    POOL --> W2
    POOL --> W3
    POOL --> W4

    W1 --> OUT1
    W2 --> OUT2
    W3 --> OUT3
    W4 --> OUT4

    style POOL fill:#e1f5ff
    style SEQ fill:#fff3e0

批量时序图

sequenceDiagram
    autonumber
    participant U as 用户
    participant SEQ as RunnableSequence
    participant POOL as ThreadPool
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant API as LLM API

    Note over U,API: 批量调用开始
    U->>SEQ: batch([input1, input2, ...], max_concurrency=5)
    SEQ->>SEQ: 为每个input创建config
    SEQ->>POOL: 提交批量任务

    Note over U,API: 并行执行
    par Worker 1处理input1
        POOL->>W1: invoke(input1, config1)
        loop 每个步骤
            W1->>W1: step.invoke(input1)
        end
        W1->>API: POST /chat/completions (input1)
        API-->>W1: response1
        W1-->>POOL: output1
    and Worker 2处理input2
        POOL->>W2: invoke(input2, config2)
        loop 每个步骤
            W2->>W2: step.invoke(input2)
        end
        W2->>API: POST /chat/completions (input2)
        API-->>W2: response2
        W2-->>POOL: output2
    end

    Note over U,API: 收集结果
    POOL->>POOL: 等待所有worker完成
    POOL-->>SEQ: [output1, output2, ...]
    SEQ-->>U: [output1, output2, ...]

批量处理关键代码

# langchain_core/runnables/base.py - RunnableSequence.batch()
def batch(self, inputs, config=None, *, return_exceptions=False, **kwargs):
    if not inputs:
        return []

    # 为每个input创建config
    configs = get_config_list(config, len(inputs))

    # 获取线程池
    max_concurrency = configs[0].get("max_concurrency", None)
    with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
        # 提交所有任务
        futures = [
            executor.submit(self.invoke, input_, config_)
            for input_, config_ in zip(inputs, configs)
        ]

        # 收集结果
        results = []
        for future in futures:
            try:
                results.append(future.result())
            except Exception as e:
                if return_exceptions:
                    results.append(e)
                else:
                    raise

        return results

批量优化策略:

  1. max_concurrency: 限制并发数,避免过载

    chain.batch(inputs, config={"max_concurrency": 5})
    
  2. return_exceptions: 容错处理,部分失败不影响整体

    results = chain.batch(inputs, return_exceptions=True)
    # results可能包含Exception对象
    
  3. 批量API调用: LLM提供商的批量接口(如OpenAI batch API)

    # 使用原生批量API(更便宜、更快)
    results = model.batch(messages_list)
    
  4. 异步批量: abatch()使用asyncio,提高IO密集型任务效率

    results = await chain.abatch(inputs, config={"max_concurrency": 10})
    

批量性能对比:

场景 顺序调用(10个) 批量调用(10个) 加速比
LLM推理 50秒 12秒 4.2x
向量检索 8秒 2秒 4x
工具调用 30秒 8秒 3.75x

总结

langchain-core是LangChain生态的核心基础,提供了:

核心抽象:

  • ✅ Runnable通用协议(LCEL)
  • ✅ 语言模型抽象(LLM/ChatModel)
  • ✅ 消息和提示词格式
  • ✅ 工具和输出解析器
  • ✅ 向量存储和检索器
  • ✅ 回调和追踪系统
  • ✅ Agent数据结构

核心调用链路:

  1. LCEL链式组合: 通过|操作符串联组件,RunnableSequence顺序执行
  2. RAG检索增强: RunnableParallel并行检索和传递,提升效率
  3. Agent智能代理: 循环决策和工具调用,处理复杂任务

关键机制:

  • 流式输出: 通过stream()逐chunk返回,降低首字节时间
  • 批量处理: 通过batch()并行处理多个输入,提高吞吐量
  • 回调系统: 通过CallbackManager和Tracer记录完整执行过程
  • 错误处理: 每个环节都有异常捕获和上报机制

设计理念:

  • 📦 最小化依赖
  • 🔒 稳定的公共API
  • 🔄 统一的调用协议
  • 🧩 高度可组合
  • ⚡ 高性能(批量/流式/异步)
  • 🔍 完整的可观测性

使用建议:

  • 优先使用LCEL语法,清晰可组合
  • 利用流式输出提升用户体验
  • 批量处理提高离线任务效率
  • 添加回调监控性能和调试
  • 使用Pydantic保证类型安全
  • 选择合适的Agent类型处理复杂任务

性能优化:

  • 使用batch()并行处理多个输入
  • 使用astream()异步流式提高并发
  • 配置max_concurrency控制并发数
  • 启用LLM缓存减少重复调用
  • 对向量检索结果进行缓存