LangChain-00-总览

0. 摘要

项目目标与核心能力

LangChain是一个用于构建大语言模型(LLM)应用的框架,其核心目标是通过可组合的组件和第三方集成来简化AI应用开发。项目主要解决LLM应用开发中的以下问题:

  • 组件标准化: 为聊天模型、LLM、向量存储、检索器等核心组件定义统一接口
  • 可组合性: 通过Runnable协议和LCEL(LangChain Expression Language)实现组件链式组合
  • 模型互操作性: 提供统一抽象层,允许在不同模型提供商间无缝切换
  • 实时数据增强: 连接LLM与多种数据源和外部系统
  • 生产级特性: 内置支持同步/异步、批处理、流式输出、回调追踪等

核心能力边界

包含范围:

  • 核心抽象接口定义(langchain-core)
  • 通用组件实现(langchain-classic)
  • 标准集成接口(partners)
  • LCEL表达式语言
  • Runnable执行协议
  • 回调和追踪机制

不包含范围:

  • 特定模型提供商的API实现细节
  • 业务逻辑层面的应用框架
  • 前端UI组件
  • 数据库管理系统
  • 模型训练和微调功能

运行环境

  • 语言: Python >= 3.10.0
  • 核心依赖:
    • pydantic >= 2.7.4 (数据验证)
    • langsmith >= 0.3.45 (追踪和监控)
    • tenacity >= 8.1.0 (重试机制)
    • PyYAML >= 5.3.0 (配置管理)
    • typing-extensions >= 4.7.0 (类型支持)

部署形态

LangChain采用库/框架式部署,主要有以下形态:

  1. 单体应用集成: 作为Python包直接导入使用
  2. 微服务集成: 作为服务内部组件处理LLM交互
  3. Agent服务: 结合LangGraph构建独立的Agent服务
  4. CLI工具: 通过langchain-cli提供命令行界面

1. 整体架构图

flowchart TB
    subgraph Application["应用层"]
        APP[用户应用]
        AGENT[Agent应用]
        TOOL_APP[Tool应用]
    end

    subgraph LCEL["LCEL层 - 表达式语言"]
        SEQ[RunnableSequence<br/>顺序执行]
        PAR[RunnableParallel<br/>并行执行]
        BRANCH[RunnableBranch<br/>条件分支]
        LAMBDA[RunnableLambda<br/>自定义逻辑]
    end

    subgraph Core["langchain-core 核心抽象层"]
        RUNNABLE[Runnable<br/>统一执行协议]
        LM[BaseLanguageModel<br/>语言模型抽象]
        CHAT[BaseChatModel<br/>聊天模型]
        LLM[BaseLLM<br/>文本模型]
        MSG[Messages<br/>消息体系]
        PROMPT[Prompts<br/>提示模板]
        PARSER[OutputParsers<br/>输出解析]
        TOOL[Tools<br/>工具抽象]
        VECTOR[VectorStores<br/>向量存储]
        RETRIEVER[Retrievers<br/>检索器]
        CALLBACK[Callbacks<br/>回调系统]
    end

    subgraph Integrations["集成层"]
        PARTNERS[Partner包<br/>openai/anthropic/etc]
        CLASSIC[langchain-classic<br/>经典实现]
        SPLITTER[text-splitters<br/>文本分割]
    end

    subgraph External["外部系统"]
        LLM_API[LLM API<br/>OpenAI/Anthropic]
        VECTOR_DB[向量数据库<br/>Chroma/Pinecone]
        TOOLS_EXT[外部工具<br/>Search/APIs]
        LANGSMITH[LangSmith<br/>追踪监控]
    end

    APP --> LCEL
    AGENT --> LCEL
    TOOL_APP --> Core

    LCEL --> RUNNABLE
    SEQ -.继承.-> RUNNABLE
    PAR -.继承.-> RUNNABLE
    BRANCH -.继承.-> RUNNABLE
    LAMBDA -.继承.-> RUNNABLE

    RUNNABLE --> LM
    RUNNABLE --> PROMPT
    RUNNABLE --> PARSER
    RUNNABLE --> TOOL
    RUNNABLE --> VECTOR

    LM --> CHAT
    LM --> LLM
    CHAT --> MSG
    PROMPT --> MSG

    Core --> Integrations
    CHAT -.实现.-> PARTNERS
    LLM -.实现.-> PARTNERS
    VECTOR -.实现.-> PARTNERS
    TOOL -.实现.-> CLASSIC

    PARTNERS --> LLM_API
    PARTNERS --> VECTOR_DB
    CLASSIC --> TOOLS_EXT

    CALLBACK --> LANGSMITH

架构说明

分层职责

应用层:

  • 用户应用通过LCEL构建处理链路
  • Agent应用实现自主决策和工具调用
  • Tool应用封装具体功能供Agent使用

LCEL层:

  • 提供声明式的链式组合语法
  • 支持顺序、并行、条件分支等执行模式
  • 自动处理同步/异步/流式/批处理转换

核心抽象层(langchain-core):

  • 定义所有组件的基础接口
  • 实现Runnable统一执行协议
  • 提供零第三方依赖的纯抽象

集成层:

  • Partner包实现特定提供商集成
  • langchain-classic提供通用工具和链实现
  • 各包独立版本,按需安装

数据流与控制流

控制流:

用户调用 → LCEL构建链 → Runnable.invoke → 组件执行 → 回调追踪 → 返回结果

数据流:

输入数据 → Prompt格式化 → LLM推理 → OutputParser解析 → 结构化输出

跨进程/线程/协程路径

  • 同步调用: 主线程执行,batch操作使用线程池并行
  • 异步调用: asyncio事件循环,ainvoke/astream等方法
  • 流式输出: 生成器/异步生成器逐块yield
  • 跨进程: 不直接支持,依赖外部任务队列(如Celery)

高可用与扩展性

高可用策略:

  • RunnableWithFallbacks: 自动故障转移到备用模型
  • 重试机制: 基于tenacity的指数退避
  • 超时控制: 通过RunnableConfig配置

扩展性:

  • 水平扩展: 应用层多实例部署
  • 并发控制: gather_with_concurrency限制并发数
  • 缓存机制: 支持内存/Redis等缓存层

状态管理:

  • Runnable本身无状态
  • 状态通过RunnableConfig传递
  • 持久化状态需外部存储(如LangGraph的checkpointer)

2. 全局时序图

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant LCEL as LCEL链
    participant Runnable as Runnable基类
    participant Prompt as PromptTemplate
    participant LLM as ChatModel
    participant Parser as OutputParser
    participant CB as CallbackManager
    participant Tracer as LangSmith

    User->>LCEL: chain.invoke(input)
    LCEL->>Runnable: 解析执行计划

    Runnable->>CB: 创建回调管理器
    CB->>Tracer: 开始追踪(run_id)

    Runnable->>Prompt: invoke(input, config)
    Prompt->>Prompt: 格式化模板
    Prompt-->>Runnable: messages

    Runnable->>LLM: invoke(messages, config)
    LLM->>CB: on_llm_start
    CB->>Tracer: 记录输入

    LLM->>LLM: 调用API(OpenAI等)
    Note over LLM: 网络请求<br/>Token计数<br/>速率限制

    LLM-->>Runnable: AIMessage
    LLM->>CB: on_llm_end
    CB->>Tracer: 记录输出和Token

    Runnable->>Parser: invoke(message, config)
    Parser->>Parser: 解析结构化输出
    Parser-->>Runnable: parsed_output

    Runnable->>CB: on_chain_end
    CB->>Tracer: 结束追踪
    Runnable-->>User: final_output

时序图说明

入口与鉴权

  • 入口: 用户调用链的invoke/ainvoke/stream方法
  • 鉴权: 不在框架层实现,由各Provider集成包处理API密钥
  • 配置传递: RunnableConfig在整个调用链中传递

幂等性

  • Runnable执行: 默认非幂等,每次调用都会执行
  • LLM调用: 取决于模型提供商,通常非幂等(相同输入可能不同输出)
  • 缓存支持: 可通过set_llm_cache实现幂等性

重试点

  1. LLM调用层: 使用with_retry()添加重试装饰器
  2. HTTP请求层: Provider包内部实现
  3. 整个链: 链级别可配置RunnableConfig中的max_retries

超时设定

  • 链级超时: RunnableConfig.timeout(秒)
  • LLM超时: 各Provider实现自己的request_timeout参数
  • 默认值: 通常60-120秒,具体看Provider

资源上界

  • 内存: 流式输出避免加载完整响应
  • 并发: max_concurrency参数控制并行度
  • Token: 由LLM提供商限制,框架层不控制
  • 速率限制: 部分Provider集成包实现客户端限流

3. 模块边界与交互矩阵

核心模块列表

模块名称 路径 职责 对外API
Runnables langchain_core/runnables 统一执行协议和组合原语 Runnable, RunnableSequence, RunnableParallel等
LanguageModels langchain_core/language_models 语言模型抽象 BaseChatModel, BaseLLM
Messages langchain_core/messages 消息类型体系 BaseMessage, AIMessage, HumanMessage等
Prompts langchain_core/prompts 提示模板系统 ChatPromptTemplate, PromptTemplate
OutputParsers langchain_core/output_parsers 输出解析器 BaseOutputParser, PydanticOutputParser等
Tools langchain_core/tools 工具抽象和装饰器 BaseTool, @tool, StructuredTool
VectorStores langchain_core/vectorstores 向量存储接口 VectorStore, VectorStoreRetriever
Retrievers langchain_core/retrievers 检索器抽象 BaseRetriever
Callbacks langchain_core/callbacks 回调和追踪系统 BaseCallbackHandler, CallbackManager
Agents langchain/agents Agent框架(classic) AgentExecutor, Agent类

模块交互矩阵

调用方 → 被调方 Runnables LanguageModels Messages Prompts OutputParsers Tools Callbacks
Runnables - 调用/同步 传递 调用/同步 调用/同步 调用/同步 钩子/同步
LanguageModels 继承 - 输入输出 被调 被调 - 钩子/同步
Prompts 继承 - 输出 - - - 钩子/同步
OutputParsers 继承 - 输入 - - - -
Tools 继承 可选调用 输出 - - - 钩子/同步
Agents 使用 调用/同步 传递 调用 调用 调用/同步 钩子/同步

交互说明

同步调用

  • Prompt → LLM: Prompt.invoke()直接调用LLM.invoke()
  • LLM → Parser: Parser解析LLM输出
  • Agent → Tool: Agent同步调用工具并等待结果

异步消息

  • Callback系统: 事件触发异步通知,不阻塞主流程
  • astream事件: 通过AsyncIterator异步生成中间结果

共享存储

  • RunnableConfig: 贯穿整个调用链的配置字典
  • 内存: 不在core实现,由langchain-classic的Memory组件提供

错误语义

  • 标准异常: 各组件抛出Python标准异常
  • 特定异常: OutputParserException, ToolException等
  • 传播: 异常沿调用链向上传播,可被RunnableWithFallbacks捕获

一致性要求

  • 无状态: Core组件本身不维护状态,保证幂等性
  • 配置传递: 必须在整个链中传递RunnableConfig
  • 接口稳定: Core包承诺API稳定性,不轻易破坏兼容性

4. 关键设计与权衡

数据一致性与事务

一致性模型:

  • 无事务保证: Runnable执行不提供事务语义
  • 重试语义: 重试可能导致副作用重复执行
  • 幂等性要求: 用户需自行确保工具调用幂等性

锁与并发:

  • 无锁设计: Core组件无状态,不需要锁
  • 并发控制: gather_with_concurrency使用Semaphore限制
  • 线程安全: Runnable实例可安全共享于多线程

并发策略:

# 批处理使用线程池
with ThreadPoolExecutor(max_workers=config.max_workers) as executor:
    futures = [executor.submit(self.invoke, input) for input in inputs]

# 异步使用gather_with_concurrency
async def abatch(self, inputs):
    tasks = [self.ainvoke(input) for input in inputs]
    return await gather_with_concurrency(config.max_concurrency, *tasks)

性能关键路径

P95延迟瓶颈:

  1. LLM API调用: 占总耗时80-95%,取决于模型和Token数
  2. 序列化/反序列化: Pydantic模型验证,通常<10ms
  3. Callback追踪: LangSmith网络请求,异步执行不阻塞

内存峰值:

  • 流式输出: 单个chunk,内存占用<1MB
  • 批处理: 所有输入输出同时在内存,注意批大小
  • 向量存储: 取决于embedding维度和数量

I/O热点:

  • 网络I/O: LLM API和向量数据库查询
  • 磁盘I/O: 通常不涉及,除非使用文件缓存

可观测性

追踪指标:

  • 运行时: 每个Runnable的执行时间
  • Token计数: LLM输入输出Token数
  • 错误率: 失败次数和异常类型
  • 重试次数: with_retry的实际重试情况

集成点:

  • LangSmith: 官方追踪平台,自动捕获所有运行数据
  • Custom Callbacks: 实现BaseCallbackHandler接入自定义监控
  • OpenTelemetry: 可通过Callback桥接

配置项

核心配置(RunnableConfig):

{
    "callbacks": [...],           # 回调处理器列表
    "tags": ["env:prod"],        # 标签,用于过滤和分组
    "metadata": {"user_id": "123"}, # 元数据,记录到trace
    "run_name": "my_chain",      # 运行名称
    "max_concurrency": 5,        # 最大并发数
    "recursion_limit": 25,       # 递归深度限制
    "timeout": 60,               # 超时(秒)
    "configurable": {...}        # 动态配置字段
}

性能相关:

  • max_concurrency: 控制并行度,默认无限制
  • timeout: 防止长时间挂起
  • max_workers: 线程池大小,默认CPU核心数

追踪相关:

  • tags: 用于LangSmith中过滤查询
  • metadata: 附加业务信息
  • run_name: 便于识别运行实例

5. 典型使用示例与最佳实践

示例1: 最小可运行链

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 构建链: Prompt → LLM → Parser
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()

chain = prompt | model | parser

# 调用
result = chain.invoke({"topic": "程序员"})
print(result)  # "为什么程序员总是混淆圣诞节和万圣节?因为 Oct 31 == Dec 25!"

说明:

  • | 操作符自动构建RunnableSequence
  • 每个组件都实现Runnable接口
  • 自动处理类型转换(dict→messages→str)

示例2: 并行执行与结构化输出

from langchain_core.runnables import RunnableParallel
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field

class Joke(BaseModel):
    setup: str = Field(description="笑话的铺垫")
    punchline: str = Field(description="笑点")

# 并行生成多个字段
chain = RunnableParallel({
    "joke": prompt | model | JsonOutputParser(pydantic_object=Joke),
    "topic_analysis": ChatPromptTemplate.from_template(
        "分析主题'{topic}'的幽默潜力"
    ) | model | StrOutputParser()
})

result = chain.invoke({"topic": "AI"})
# {
#   "joke": {"setup": "...", "punchline": "..."},
#   "topic_analysis": "AI作为主题..."
# }

说明:

  • RunnableParallel并发执行多个子链
  • JsonOutputParser自动解析JSON并验证Pydantic模型
  • 输入自动分发到所有并行分支

示例3: 容错与重试

from langchain_core.runnables import RunnableWithFallbacks

# 主模型
primary = ChatOpenAI(model="gpt-4o", temperature=0)

# 备用模型
fallback = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 配置故障转移
model_with_fallbacks = primary.with_fallbacks([fallback])

# 配置重试(指数退避)
model_with_retry = model_with_fallbacks.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

chain = prompt | model_with_retry | parser

# 当gpt-4o失败时自动切换到gpt-4o-mini
result = chain.invoke({"topic": "量子计算"})

说明:

  • with_fallbacks(): 失败时按顺序尝试备用选项
  • with_retry(): 底层使用tenacity库
  • 两者可组合使用

示例4: 流式输出

# 流式处理
for chunk in chain.stream({"topic": "太空"}):
    print(chunk, end="", flush=True)
# 输出逐字显示...

# 异步流式
async def async_stream():
    async for chunk in chain.astream({"topic": "深海"}):
        print(chunk, end="", flush=True)

import asyncio
asyncio.run(async_stream())

说明:

  • stream()返回同步生成器
  • astream()返回异步生成器
  • 每个组件的输出逐块传递到下游

示例5: 批处理

topics = ["AI", "区块链", "量子计算", "生物技术"]

# 批量处理(并行)
results = chain.batch([{"topic": t} for t in topics])

# 配置并发限制
results = chain.batch(
    [{"topic": t} for t in topics],
    config={"max_concurrency": 2}  # 最多2个并发
)

说明:

  • batch()自动并行处理
  • max_concurrency避免速率限制
  • 返回结果顺序与输入一致

最佳实践总结

1. 链构建:

  • 优先使用LCEL(|语法)而非手动组合
  • 保持链的单一职责,复杂逻辑拆分为多个链
  • 使用RunnablePassthrough.assign()添加中间字段

2. 错误处理:

  • 生产环境必须配置fallbacks
  • 设置合理的timeout避免无限等待
  • 使用with_retry()处理瞬时故障

3. 性能优化:

  • 使用streaming减少首字节延迟
  • 批处理时设置max_concurrency避免资源耗尽
  • 考虑使用cache减少重复LLM调用

4. 可观测性:

  • 生产环境启用LangSmith追踪
  • 添加有意义的tags和metadata
  • 实现自定义Callback监控关键指标

5. 配置管理:

  • 使用环境变量管理API密钥
  • 通过ConfigurableField实现运行时配置切换
  • 在RunnableConfig中传递请求级上下文

6. 扩展与集成

自定义组件

所有自定义组件应继承Runnable并实现核心方法:

from langchain_core.runnables import Runnable

class MyCustomRunnable(Runnable[str, str]):
    def invoke(self, input: str, config: RunnableConfig = None) -> str:
        # 实现同步逻辑
        return input.upper()

    async def ainvoke(self, input: str, config: RunnableConfig = None) -> str:
        # 实现异步逻辑(可选,默认调用invoke)
        return self.invoke(input, config)

Partner集成包

Partner包实现特定提供商的集成:

  • langchain-openai: OpenAI模型
  • langchain-anthropic: Anthropic/Claude模型
  • langchain-chroma: Chroma向量数据库
  • langchain-google-genai: Google Gemini