LangChain-06-OutputParsers模块
模块概览
职责与定位
OutputParsers模块负责将语言模型的文本输出解析为结构化数据。该模块提供了多种解析器,支持JSON、XML、CSV等格式,以及Pydantic模型验证,是构建可靠LLM应用的关键组件。
核心职责:
- 将模型文本输出解析为结构化数据
- 提供格式指令注入到提示词
- 支持多种输出格式(JSON、XML、CSV等)
- 集成Pydantic进行数据验证
- 处理解析错误和格式修复
- 支持流式解析(streaming)和累积解析(cumulative)
输入输出
输入:
- 字符串: 模型生成的文本输出
- BaseMessage: 完整的AI消息
- Iterator[str | BaseMessage]: 流式输入
- List[Generation]: 生成结果列表
输出:
- 结构化数据: dict、list、Pydantic模型等
- 取决于具体解析器类型
- 流式模式下逐步输出解析结果
模块在LangChain中的位置
OutputParsers位于LangChain架构的输出处理层,作为Runnable的一环参与LCEL链式调用。其上游通常是LanguageModel,下游可能是应用逻辑或其他Runnable组件。
flowchart LR
PROMPT[PromptTemplate] --> MODEL[LanguageModel]
MODEL --> PARSER[OutputParser]
PARSER --> APP[Application Logic]
style PARSER fill:#fff4e1
整体服务架构图
系统级架构视图
flowchart TB
subgraph External["外部调用层"]
USER[用户应用]
CHAIN[LCEL Chain]
end
subgraph Interface["Runnable接口层"]
INVOKE[invoke/ainvoke]
TRANSFORM[transform/atransform]
STREAM[stream/astream]
end
subgraph Core["核心解析层"]
PARSE_RESULT[parse_result]
PARSE[parse]
FORMAT_INST[get_format_instructions]
end
subgraph Implementation["解析器实现层"]
direction LR
subgraph Simple["简单解析器"]
STR[StrOutputParser]
end
subgraph Structured["结构化解析器"]
JSON[JsonOutputParser]
PYDANTIC[PydanticOutputParser]
end
subgraph List["列表解析器"]
CSV[CommaSeparatedList]
NUMBERED[NumberedList]
MARKDOWN[MarkdownList]
end
end
subgraph Base["基础抽象层"]
BASE_OUTPUT[BaseOutputParser]
BASE_TRANSFORM[BaseTransformOutputParser]
BASE_CUMULATIVE[BaseCumulativeTransformOutputParser]
BASE_GEN[BaseGenerationOutputParser]
BASE_LLM[BaseLLMOutputParser]
end
subgraph Dependencies["依赖模块"]
RUNNABLE[Runnables模块]
MESSAGES[Messages模块]
OUTPUTS[Outputs模块]
UTILS[Utils工具]
end
USER --> CHAIN
CHAIN --> INVOKE
CHAIN --> TRANSFORM
CHAIN --> STREAM
INVOKE --> PARSE_RESULT
TRANSFORM --> PARSE_RESULT
STREAM --> PARSE_RESULT
PARSE_RESULT --> PARSE
PARSE --> Implementation
STR --> BASE_TRANSFORM
JSON --> BASE_CUMULATIVE
PYDANTIC --> JSON
CSV --> BASE_TRANSFORM
NUMBERED --> BASE_TRANSFORM
MARKDOWN --> BASE_TRANSFORM
BASE_TRANSFORM --> BASE_OUTPUT
BASE_CUMULATIVE --> BASE_TRANSFORM
BASE_OUTPUT --> BASE_GEN
BASE_GEN --> BASE_LLM
BASE_OUTPUT -.继承.-> RUNNABLE
Core -.调用.-> MESSAGES
Core -.调用.-> OUTPUTS
Core -.调用.-> UTILS
style Core fill:#fff4e1
style Base fill:#e1f5ff
style Implementation fill:#e8f5e9
架构说明
1. 图意概述
该架构图展示了OutputParsers模块的五层架构体系:
- 外部调用层: 用户应用通过LCEL Chain调用解析器
- Runnable接口层: 提供统一的invoke/transform/stream接口
- 核心解析层: 实现parse_result和parse核心逻辑
- 解析器实现层: 各类具体解析器的实现
- 基础抽象层: 提供解析器的抽象基类
2. 关键接口与组件
Runnable接口层:
invoke(input: str|BaseMessage) -> T: 同步调用解析ainvoke: 异步版本transform(input: Iterator) -> Iterator[T]: 流式转换atransform: 异步流式转换stream: 流式输出(底层调用transform)
核心解析层:
parse_result(result: List[Generation]) -> T: 从Generation列表解析parse(text: str) -> T: 从文本字符串解析get_format_instructions() -> str: 生成格式指令
解析器实现层:
- 简单解析器: StrOutputParser直接返回字符串
- 结构化解析器: JSON/Pydantic解析并验证结构化数据
- 列表解析器: 解析各种列表格式(CSV/编号/Markdown)
3. 边界与约束
并发性:
- 所有解析器都是线程安全的(无共享状态)
- 支持同步和异步两种调用模式
- 流式解析器支持增量处理
超时与重试:
- 解析操作通常是CPU密集型,不涉及网络IO
- 超时控制由上游RunnableConfig提供
- 不内置重试逻辑,由调用方决定
幂等性:
- 所有解析操作都是幂等的(相同输入产生相同输出)
- 无副作用,可安全重试
顺序保证:
- 流式解析保证输出顺序与输入顺序一致
- 累积解析器(Cumulative)保证逐步累加
4. 异常与回退
异常类型:
OutputParserException: 解析失败时抛出error: 错误描述observation: 可选的观察信息llm_output: 导致失败的原始输出
错误处理策略:
- JSON解析失败: 尝试提取markdown代码块后重试
- Pydantic验证失败: 携带详细的验证错误信息
- 流式解析失败: partial模式下返回None而非抛异常
回退机制:
- 可通过
OutputFixingParser包装实现自动修复 - 利用LLM重新生成符合格式的输出
5. 性能与容量
时间复杂度:
- StrOutputParser: O(1) - 常数时间
- JsonOutputParser: O(n) - 文本长度线性
- PydanticOutputParser: O(n) + 验证开销
- 列表解析器: O(n) - 正则匹配线性
内存开销:
- 非流式模式: 需要保存完整输出字符串
- 流式模式: 仅保留累积缓冲区(通常<4KB)
- Pydantic模式: 额外保存模型实例
容量上界:
- 字符串长度: 理论无上限,实际受内存限制
- JSON嵌套深度: 受Python递归深度限制(默认1000)
- 列表长度: 无硬性限制
性能优化:
- 流式解析避免等待完整输出
- 使用
orjson等快速JSON库(如果可用) - Pydantic v2利用Rust加速验证
6. 版本兼容与演进
向后兼容:
SimpleJsonOutputParser是JsonOutputParser的别名(向后兼容)- 支持Pydantic v1和v2(通过适配层)
版本演进:
- 0.1.x: 基础解析器(Str/Json/List)
- 0.2.x: 增加流式支持(BaseTransformOutputParser)
- 1.0.x: 统一Runnable接口,增加异步支持
废弃机制:
- 使用
@deprecated装饰器标记废弃API - 提供迁移指南和替代方案
模块架构图
flowchart TB
subgraph Base["基类层"]
direction TB
BASE_LLM[BaseLLMOutputParser<br/>最基础抽象]
BASE_GEN[BaseGenerationOutputParser<br/>Generation解析]
BASE_PARSER[BaseOutputParser<br/>核心基类]
TRANSFORM_PARSER[BaseTransformOutputParser<br/>流式解析]
CUMULATIVE_PARSER[BaseCumulativeTransformOutputParser<br/>累积解析]
BASE_LLM --> BASE_GEN
BASE_LLM --> BASE_PARSER
BASE_PARSER --> TRANSFORM_PARSER
TRANSFORM_PARSER --> CUMULATIVE_PARSER
end
subgraph Common["常用解析器"]
STR_PARSER[StrOutputParser<br/>字符串解析]
JSON_PARSER[JsonOutputParser<br/>JSON解析]
PYDANTIC_PARSER[PydanticOutputParser<br/>Pydantic模型]
end
subgraph Specialized["列表解析器"]
LIST_BASE[ListOutputParser<br/>列表基类]
CSV_PARSER[CommaSeparatedListOutputParser<br/>CSV列表]
NUM_PARSER[NumberedListOutputParser<br/>编号列表]
MD_PARSER[MarkdownListOutputParser<br/>Markdown列表]
LIST_BASE --> CSV_PARSER
LIST_BASE --> NUM_PARSER
LIST_BASE --> MD_PARSER
end
subgraph Tools["工具相关"]
JSON_TOOLS[JsonOutputKeyToolsParser<br/>JSON工具]
PYDANTIC_TOOLS[PydanticToolsParser<br/>Pydantic工具]
end
TRANSFORM_PARSER -.-> STR_PARSER
CUMULATIVE_PARSER -.-> JSON_PARSER
JSON_PARSER -.-> PYDANTIC_PARSER
TRANSFORM_PARSER -.-> LIST_BASE
BASE_PARSER -.-> JSON_TOOLS
BASE_PARSER -.-> PYDANTIC_TOOLS
style BASE_PARSER fill:#e1f5ff
style JSON_PARSER fill:#fff4e1
style PYDANTIC_PARSER fill:#e8f5e9
style CUMULATIVE_PARSER fill:#ffe1e1
架构说明
解析器类型
BaseLLMOutputParser: 最基础的抽象
- 定义
parse_result(result: List[Generation]) -> T抽象方法 - 提供
aparse_result异步版本 - 不依赖Runnable接口
BaseGenerationOutputParser: Generation解析器
- 继承BaseLLMOutputParser和RunnableSerializable
- 实现Runnable接口(invoke/ainvoke)
- 输入类型:
str | BaseMessage
BaseOutputParser: 核心基类
- 继承BaseLLMOutputParser和RunnableSerializable
- 定义
parse(text: str) -> T抽象方法 - 提供
get_format_instructions() -> str - 实现
parse_result(调用parse) - 支持
parse_with_prompt用于错误修复
BaseTransformOutputParser: 流式解析器
- 继承BaseOutputParser
- 实现
transform/atransform方法 - 支持流式输入
Iterator[str | BaseMessage] - 逐块处理输入,逐块输出结果
BaseCumulativeTransformOutputParser: 累积解析器
- 继承BaseTransformOutputParser
- 累积输入块,每次输出完整的解析结果
- 支持
diff模式(输出增量变化) - 适用于JSON等需要完整文本的格式
常用解析器:
- StrOutputParser: 继承BaseTransformOutputParser,直接返回字符串
- JsonOutputParser: 继承BaseCumulativeTransformOutputParser,解析JSON
- PydanticOutputParser: 继承JsonOutputParser,额外验证Pydantic模型
列表解析器:
- ListOutputParser: 列表解析基类,继承BaseTransformOutputParser
- CommaSeparatedListOutputParser: 解析逗号分隔列表
- NumberedListOutputParser: 解析编号列表(1. 2. 3.)
- MarkdownListOutputParser: 解析Markdown列表(- * 开头)
核心数据结构
classDiagram
class BaseLLMOutputParser~T~ {
<<abstract>>
+parse_result(result: List[Generation], partial: bool)* T
+aparse_result(result: List[Generation], partial: bool) T
}
class BaseOutputParser~T~ {
<<abstract>>
+parse(text: str)* T
+aparse(text: str) T
+parse_result(result: List[Generation], partial: bool) T
+parse_with_prompt(text: str, prompt: PromptValue) T
+get_format_instructions() str
+invoke(input: str|BaseMessage, config: RunnableConfig) T
+ainvoke(input: str|BaseMessage, config: RunnableConfig) T
+_type: str
+InputType: Type
+OutputType: Type[T]
}
class BaseTransformOutputParser~T~ {
+transform(input: Iterator, config: RunnableConfig) Iterator[T]
+atransform(input: AsyncIterator, config: RunnableConfig) AsyncIterator[T]
#_transform(input: Iterator) Iterator[T]
#_atransform(input: AsyncIterator) AsyncIterator[T]
}
class BaseCumulativeTransformOutputParser~T~ {
+diff: bool
#_diff(prev: T|None, next: T) T
#_transform(input: Iterator) Iterator[T]
#_atransform(input: AsyncIterator) AsyncIterator[T]
}
class StrOutputParser {
+parse(text: str) str
+_type: str = "default"
}
class JsonOutputParser {
+pydantic_object: Type[BaseModel]|None
+parse(text: str) dict|list
+parse_result(result: List[Generation], partial: bool) dict|list
+get_format_instructions() str
+_diff(prev: Any, next: Any) Any
-_get_schema(pydantic_object: Type[BaseModel]) dict
}
class PydanticOutputParser~T~ {
+pydantic_object: Type[T]
+parse(text: str) T
+parse_result(result: List[Generation], partial: bool) T|None
+get_format_instructions() str
+OutputType: Type[T]
-_parse_obj(obj: dict) T
-_parser_exception(e: Exception, json_object: dict) OutputParserException
}
class ListOutputParser {
<<abstract>>
+parse(text: str)* List[str]
+parse_iter(text: str) Iterator[re.Match]
+_transform(input: Iterator) Iterator[List[str]]
+_atransform(input: AsyncIterator) AsyncIterator[List[str]]
}
class CommaSeparatedListOutputParser {
+parse(text: str) List[str]
+get_format_instructions() str
+_type: str = "comma-separated-list"
}
class OutputParserException {
<<Exception>>
+error: str
+observation: str|None
+llm_output: str|None
}
BaseLLMOutputParser <|-- BaseOutputParser
BaseOutputParser <|-- BaseTransformOutputParser
BaseTransformOutputParser <|-- BaseCumulativeTransformOutputParser
BaseTransformOutputParser <|-- StrOutputParser
BaseCumulativeTransformOutputParser <|-- JsonOutputParser
JsonOutputParser <|-- PydanticOutputParser
BaseTransformOutputParser <|-- ListOutputParser
ListOutputParser <|-- CommaSeparatedListOutputParser
BaseOutputParser ..> OutputParserException : raises
note for BaseOutputParser "继承RunnableSerializable<br/>实现Runnable协议"
note for BaseCumulativeTransformOutputParser "累积输入块<br/>支持diff模式"
数据结构说明
BaseLLMOutputParser字段
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| parse_result | result: List[Generation] partial: bool |
T | 从Generation列表解析(抽象方法) |
| aparse_result | result: List[Generation] partial: bool |
T | 异步版本,默认使用run_in_executor |
BaseOutputParser字段
| 方法/属性 | 类型 | 说明 |
|---|---|---|
| parse | (text: str) -> T | 核心解析方法,子类必须实现 |
| aparse | (text: str) -> T | 异步解析,默认使用run_in_executor |
| parse_result | (result: List[Generation], partial: bool) -> T | 从Generation解析,默认调用parse |
| get_format_instructions | () -> str | 返回格式指令字符串 |
| parse_with_prompt | (completion: str, prompt: PromptValue) -> T | 携带prompt解析,用于错误修复 |
| invoke | (input: str|BaseMessage, config) -> T | Runnable接口,调用parse_result |
| ainvoke | (input: str|BaseMessage, config) -> T | 异步invoke |
| _type | str | 序列化类型标识 |
| InputType | Type | 输入类型(str | BaseMessage) |
| OutputType | Type[T] | 输出类型(从泛型推断) |
BaseTransformOutputParser字段
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| transform | input: Iterator[str|BaseMessage] config: RunnableConfig |
Iterator[T] | 流式转换(公开接口) |
| atransform | input: AsyncIterator | AsyncIterator[T] | 异步流式转换 |
| _transform | input: Iterator | Iterator[T] | 内部转换逻辑(子类可覆盖) |
| _atransform | input: AsyncIterator | AsyncIterator[T] | 异步内部转换 |
BaseCumulativeTransformOutputParser字段
| 字段/方法 | 类型 | 说明 |
|---|---|---|
| diff | bool | 是否输出增量差异(默认False) |
| _diff | (prev: T|None, next: T) -> T | 计算增量差异 |
| _transform | (input: Iterator) -> Iterator[T] | 累积输入并解析 |
JsonOutputParser字段
| 字段/方法 | 类型 | 说明 |
|---|---|---|
| pydantic_object | Type[BaseModel]|None | 可选的Pydantic模型(用于schema生成) |
| parse | (text: str) -> dict|list | 解析JSON字符串 |
| parse_result | (result, partial) -> dict|list | 支持partial模式 |
| _diff | (prev, next) -> JSONPatch | 生成JSONPatch增量 |
PydanticOutputParser字段
| 字段/方法 | 类型 | 说明 |
|---|---|---|
| pydantic_object | Type[T] | Pydantic模型类(必填) |
| parse | (text: str) -> T | 解析并验证为Pydantic实例 |
| _parse_obj | (obj: dict) -> T | 从dict构造Pydantic实例 |
| OutputType | Type[T] | 返回pydantic_object |
ListOutputParser字段
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| parse | text: str | List[str] | 解析列表(抽象方法) |
| parse_iter | text: str | Iterator[re.Match] | 迭代匹配(可选实现) |
| _transform | input: Iterator | Iterator[List[str]] | 流式解析列表 |
CommaSeparatedListOutputParser字段
| 方法 | 返回值 | 说明 |
|---|---|---|
| parse | List[str] | 使用csv.reader解析逗号分隔 |
| get_format_instructions | str | 返回CSV格式说明 |
全局时序图
典型调用流程总览
sequenceDiagram
autonumber
participant User as 用户代码
participant Chain as LCEL Chain
participant Model as LanguageModel
participant Parser as OutputParser
participant Core as 核心解析逻辑
Note over User,Core: 场景1: 标准同步调用
User->>Chain: chain.invoke(input)
Chain->>Model: model.invoke(messages)
Model-->>Chain: AIMessage
Chain->>Parser: parser.invoke(message)
activate Parser
Parser->>Parser: 类型判断(str vs BaseMessage)
Parser->>Core: _call_with_config(parse_result)
activate Core
Core->>Core: 包装为Generation
Core->>Core: parse_result([generation])
Core->>Core: parse(text)
Core->>Core: 具体解析逻辑
Core-->>Parser: 结构化结果
deactivate Core
Parser-->>Chain: 结构化结果
deactivate Parser
Chain-->>User: 最终结果
Note over User,Core: 场景2: 流式调用
User->>Chain: chain.stream(input)
Chain->>Model: model.stream(messages)
loop 每个chunk
Model-->>Chain: BaseMessageChunk
Chain->>Parser: parser.transform(iter)
activate Parser
Parser->>Core: _transform_stream_with_config
activate Core
Core->>Core: 累积chunk(如果是Cumulative)
Core->>Core: parse_result([acc_gen], partial=True)
Core-->>Parser: 部分结果(如果有变化)
deactivate Core
Parser-->>Chain: yield 部分结果
deactivate Parser
Chain-->>User: yield 部分结果
end
Note over User,Core: 场景3: 异步调用
User->>Chain: await chain.ainvoke(input)
Chain->>Model: await model.ainvoke(messages)
Model-->>Chain: AIMessage
Chain->>Parser: await parser.ainvoke(message)
activate Parser
Parser->>Core: _acall_with_config(aparse_result)
activate Core
Core->>Core: await aparse_result([generation])
Core->>Core: await run_in_executor(parse, text)
Core->>Core: 在线程池中执行parse
Core-->>Parser: 结构化结果
deactivate Core
Parser-->>Chain: 结构化结果
deactivate Parser
Chain-->>User: 最终结果
时序图说明
1. 图意概述
该时序图展示了OutputParsers的三种典型调用场景:
- 场景1: 标准同步调用: 完整的invoke链路,从用户代码到最终结果
- 场景2: 流式调用: 使用transform处理流式输入,支持增量输出
- 场景3: 异步调用: 异步版本的调用链路,利用线程池执行CPU密集操作
2. 关键步骤解析
场景1: 标准同步调用 (步骤1-11)
- 步骤1-2: 用户通过LCEL Chain调用,触发LanguageModel生成
- 步骤3: Model返回AIMessage
- 步骤4-5: Parser接收AIMessage,判断输入类型
- 步骤6-7: 通过
_call_with_config包装调用,提供配置管理和回调支持 - 步骤8: 将BaseMessage包装为ChatGeneration
- 步骤9: 调用
parse_result方法 - 步骤10:
parse_result默认调用parse(text),提取text字段 - 步骤11: 具体解析器实现解析逻辑(如JSON.loads)
场景2: 流式调用 (步骤12-20)
- 步骤12-13: 用户调用stream,Model返回流式chunks
- 步骤14: 进入循环,逐个处理chunk
- 步骤15-16: Parser使用transform方法
- 步骤17: 累积chunk(BaseCumulativeTransformOutputParser)
- 步骤18: 调用
parse_result(..., partial=True)尝试解析部分结果 - 步骤19: 如果解析成功且有变化,返回部分结果
- 步骤20: 逐步yield给用户
场景3: 异步调用 (步骤21-27)
- 步骤21-23: 异步调用链路
- 步骤24: 使用
_acall_with_config异步包装 - 步骤25: 调用
aparse_result - 步骤26: 默认使用
run_in_executor在线程池中执行 - 步骤27: 阻塞的parse在单独线程中执行,不阻塞事件循环
3. 边界条件
输入类型处理:
- 输入为BaseMessage: 直接包装为ChatGeneration
- 输入为str: 包装为Generation(text=str)
- 输入为其他类型: 抛出TypeError
partial参数:
partial=False(默认): 解析失败抛异常partial=True(流式): 解析失败返回None,等待更多数据
chunk累积:
- BaseCumulativeTransformOutputParser累积所有chunk
- 使用
+运算符合并GenerationChunk - 每次尝试解析累积结果,成功则输出
diff模式:
diff=False(默认): 输出完整解析结果diff=True: 输出JSONPatch增量(仅JsonOutputParser支持)
4. 异常处理
解析失败:
- 抛出
OutputParserException,携带原始llm_output - 调用方可捕获并决定重试或降级
流式解析失败:
- partial模式下,解析失败返回None而非抛异常
- 继续等待更多chunk,直到能成功解析
验证失败(Pydantic):
- 捕获ValidationError,重新包装为OutputParserException
- 包含详细的字段错误信息
5. 性能特征
同步调用性能:
- 单次调用延迟: 主要取决于解析复杂度
- StrOutputParser: <1ms(O(1))
- JsonOutputParser: 1-10ms(取决于JSON大小)
- PydanticOutputParser: 额外+1-5ms(验证开销)
流式调用性能:
- 首token延迟: 取决于何时能解析出完整结构
- JSON: 需等待完整JSON(通常最后一个chunk)
- List: 可逐项输出(更低延迟)
异步调用性能:
- 使用线程池执行CPU密集的解析操作
- 不阻塞事件循环,提高并发能力
- 线程池大小默认为CPU核心数
内存特征:
- 非流式: O(n),n为输出文本长度
- 流式: O(n)累积缓冲区,直到解析成功
- Pydantic: 额外O(m),m为模型实例大小
6. 并发与线程安全
无状态设计:
- 所有解析器实例无共享状态(除了配置字段)
- 可安全并发调用
异步执行:
- parse方法通常是同步的(CPU密集)
- aparse/ainvoke使用run_in_executor避免阻塞
- 线程池隔离,不影响异步性能
流式并发:
- 单个流内部串行(保证顺序)
- 不同流之间可并发处理
核心API详解
本节从上游接口开始,自上而下详细分析每个解析器的调用链路、关键代码和内部时序。
调用链路总览
所有OutputParser的调用都遵循以下层次结构:
用户代码
↓
LCEL Chain (pipe操作符组装)
↓
Runnable接口 (invoke/transform/stream)
↓
BaseOutputParser.invoke/transform
↓
_call_with_config (配置管理+回调)
↓
parse_result (Generation → 文本)
↓
parse (具体解析逻辑)
↓
结构化结果
每个解析器在parse方法中实现自己的解析逻辑,上游的Runnable接口层、配置管理层、Generation转换层都是共享的基础设施。
API-1: StrOutputParser
基本信息
- 名称:
StrOutputParser - 继承:
BaseTransformOutputParser[str] - 幂等性: 幂等
- 文件路径:
langchain_core/output_parsers/string.py
功能说明
最简单的解析器,提取模型输出的字符串内容。支持流式输出,逐块返回文本。
调用链路分析
用户调用
↓
parser.invoke(input: str | BaseMessage) [BaseOutputParser继承]
↓
_call_with_config(lambda: parse_result([generation]))
↓
parse_result([generation]) [BaseOutputParser实现]
↓
parse(generation[0].text) [StrOutputParser实现]
↓
return text (直接返回)
内部时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant Parser as StrOutputParser
participant Base as BaseOutputParser
participant Config as RunnableConfig
Note over User,Config: 场景1: 同步调用
User->>Parser: invoke("Hello")
activate Parser
Parser->>Base: invoke(input, config)
activate Base
Base->>Base: 判断类型: isinstance(input, BaseMessage)?
alt input是str
Base->>Config: _call_with_config(parse_result)
activate Config
Config->>Base: parse_result([Generation(text="Hello")])
Base->>Parser: parse("Hello")
activate Parser
Parser->>Parser: return text
Parser-->>Base: "Hello"
deactivate Parser
Base-->>Config: "Hello"
deactivate Config
Config-->>Parser: "Hello"
else input是BaseMessage
Base->>Config: _call_with_config(parse_result)
Config->>Base: parse_result([ChatGeneration(message=input)])
Base->>Parser: parse(message.content)
Parser-->>Base: content string
Base-->>Config: content string
Config-->>Parser: content string
end
deactivate Base
Parser-->>User: "Hello"
deactivate Parser
Note over User,Config: 场景2: 流式调用
User->>Parser: transform(iter(["Hello", " ", "World"]))
activate Parser
Parser->>Base: transform(input, config)
activate Base
Base->>Config: _transform_stream_with_config(_transform, ...)
activate Config
loop 每个chunk
Config->>Parser: _transform(["Hello"])
activate Parser
Parser->>Base: parse_result([Generation(text="Hello")])
Base->>Parser: parse("Hello")
Parser->>Parser: return text
Parser-->>Config: yield "Hello"
deactivate Parser
end
Config-->>Base: Iterator[str]
deactivate Config
Base-->>Parser: Iterator[str]
deactivate Base
Parser-->>User: Iterator["Hello", " ", "World"]
deactivate Parser
时序图说明
场景1: 同步调用 (步骤1-10)
- 步骤1-2: 用户调用
invoke,传入字符串或BaseMessage - 步骤3: BaseOutputParser的invoke方法接管
- 步骤4: 判断输入类型(str或BaseMessage)
- 步骤5-6: 使用
_call_with_config包装,提供配置和回调支持 - 步骤7: 调用
parse_result,将输入包装为Generation - 步骤8:
parse_result提取第一个Generation的text字段,调用parse - 步骤9: StrOutputParser.parse直接返回text(无额外处理)
- 步骤10: 结果沿调用链返回
场景2: 流式调用 (步骤11-18)
- 步骤11-12: 用户调用
transform,传入Iterator - 步骤13: BaseTransformOutputParser的transform方法接管
- 步骤14: 使用
_transform_stream_with_config包装流式处理 - 步骤15-17: 循环处理每个chunk,调用
_transform - 步骤18: 每个chunk被parse后立即yield(无累积)
边界条件:
- 输入为空字符串: 返回空字符串
- 输入为None: 抛出AttributeError(在text提取时)
- BaseMessage.content不是str: 在BaseOutputParser层处理,提取为字符串
性能特征:
- 时间复杂度: O(1) - 不做任何处理
- 内存开销: O(1) - 不复制字符串(Python字符串不可变,直接返回引用)
- 流式延迟: <1ms per chunk
关键代码
核心实现 (langchain_core/output_parsers/string.py):
from langchain_core.output_parsers.transform import BaseTransformOutputParser
class StrOutputParser(BaseTransformOutputParser[str]):
"""字符串输出解析器
最简单的解析器,直接返回文本内容。
继承BaseTransformOutputParser以支持流式处理。
"""
def parse(self, text: str) -> str:
"""直接返回输入文本,不做任何处理
Args:
text: 输入文本
Returns:
原始文本
"""
return text
@property
def _type(self) -> str:
"""序列化类型标识"""
return "default"
流式转换逻辑 (继承自BaseTransformOutputParser):
class BaseTransformOutputParser(BaseOutputParser[T]):
def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[T]:
"""逐块处理输入
对每个chunk调用parse_result,立即yield结果。
适用于不需要累积的解析器(如StrOutputParser)。
"""
for chunk in input:
if isinstance(chunk, BaseMessage):
# 从BaseMessage提取content
yield self.parse_result([ChatGeneration(message=chunk)])
else:
# 字符串直接包装为Generation
yield self.parse_result([Generation(text=chunk)])
调用入口 (继承自BaseOutputParser):
class BaseOutputParser(BaseLLMOutputParser, RunnableSerializable[LanguageModelOutput, T]):
def invoke(self, input: str | BaseMessage, config: RunnableConfig | None = None) -> T:
"""Runnable接口的invoke实现
Args:
input: 字符串或BaseMessage
config: 可选的运行时配置(回调、tags等)
Returns:
解析后的结构化结果
"""
if isinstance(input, BaseMessage):
# BaseMessage路径
return self._call_with_config(
lambda inner_input: self.parse_result([ChatGeneration(message=inner_input)]),
input,
config,
run_type="parser",
)
# 字符串路径
return self._call_with_config(
lambda inner_input: self.parse_result([Generation(text=inner_input)]),
input,
config,
run_type="parser",
)
def parse_result(self, result: list[Generation], *, partial: bool = False) -> T:
"""从Generation列表解析
默认实现: 提取第一个Generation的text字段,调用parse。
Args:
result: Generation列表(通常只有一个)
partial: 是否为部分结果(流式模式)
Returns:
解析结果
"""
return self.parse(result[0].text)
使用示例:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser
# 同步调用
result = chain.invoke({"topic": "程序员"})
print(result) # "为什么程序员..."(字符串)
# 流式调用
for chunk in chain.stream({"topic": "程序员"}):
print(chunk, end="", flush=True) # 逐字输出
API-2: JsonOutputParser
基本信息
- 名称:
JsonOutputParser - 继承:
BaseCumulativeTransformOutputParser[dict | list] - 幂等性: 幂等
- 文件路径:
langchain_core/output_parsers/json.py
功能说明
解析JSON格式输出,可选地验证Pydantic模型schema。支持流式模式下的部分JSON解析和JSONPatch增量输出。
调用链路分析
用户调用
↓
parser.invoke(input: str | BaseMessage) [BaseOutputParser继承]
↓
_call_with_config(lambda: parse_result([generation]))
↓
parse_result([generation], partial=False) [JsonOutputParser覆盖]
↓
text = generation[0].text.strip()
↓
parse_json_markdown(text) [工具函数]
↓ (尝试提取markdown代码块中的JSON)
json.loads(extracted_json)
↓
(可选) pydantic_object.model_validate(parsed)
↓
return dict | list
流式调用链路:
用户调用 chain.stream()
↓
parser.transform(Iterator[str | BaseMessage])
↓
BaseCumulativeTransformOutputParser._transform
↓
loop: 累积chunk → acc_gen
↓
parse_result([acc_gen], partial=True)
↓
parse_json_markdown(text) - 尝试解析
↓ (成功)
if parsed != prev_parsed:
if diff: yield JSONPatch(prev, parsed)
else: yield parsed
内部时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant Parser as JsonOutputParser
participant Cumulative as BaseCumulativeTransform
participant Utils as JSON工具
participant Pydantic as Pydantic验证
Note over User,Pydantic: 场景1: 非流式解析
User->>Parser: invoke('{"name": "Alice", "age": 30}')
activate Parser
Parser->>Parser: parse_result([generation], partial=False)
activate Parser
Parser->>Parser: text = result[0].text.strip()
Parser->>Utils: parse_json_markdown(text)
activate Utils
Utils->>Utils: 检测markdown代码块(```json...)
Utils->>Utils: 提取JSON部分
Utils->>Utils: json.loads(json_str)
Utils-->>Parser: {"name": "Alice", "age": 30}
deactivate Utils
alt pydantic_object存在
Parser->>Pydantic: model_validate(parsed)
activate Pydantic
Pydantic->>Pydantic: 验证字段类型和约束
Pydantic-->>Parser: 验证通过/失败
deactivate Pydantic
end
Parser-->>Parser: parsed dict
deactivate Parser
Parser-->>User: {"name": "Alice", "age": 30}
deactivate Parser
Note over User,Pydantic: 场景2: 流式解析(累积模式)
User->>Parser: transform(iter(['{"na', 'me": "A', 'lice"}']))
activate Parser
Parser->>Cumulative: _transform(input)
activate Cumulative
loop 每个chunk
Cumulative->>Cumulative: chunk_gen = GenerationChunk(text=chunk)
Cumulative->>Cumulative: acc_gen = acc_gen + chunk_gen
Note right of Cumulative: 累积: '{"na' → '{"name": "A' → '{"name": "Alice"}'
Cumulative->>Parser: parse_result([acc_gen], partial=True)
activate Parser
Parser->>Utils: parse_json_markdown(accumulated_text)
activate Utils
alt JSON完整
Utils->>Utils: json.loads成功
Utils-->>Parser: parsed dict
Parser->>Parser: if parsed != prev_parsed
Parser-->>Cumulative: yield parsed
else JSON不完整
Utils->>Utils: JSONDecodeError
Utils-->>Parser: return None
Parser-->>Cumulative: 不yield(等待更多数据)
end
deactivate Utils
deactivate Parser
end
Cumulative-->>Parser: Iterator[dict]
deactivate Cumulative
Parser-->>User: yield完整JSON
deactivate Parser
Note over User,Pydantic: 场景3: diff模式(增量输出)
User->>Parser: parser.diff = True<br/>transform(iter(['{"items"', ': ["a"],', ' "count": 1}']))
activate Parser
Parser->>Cumulative: _transform(input)
activate Cumulative
loop 每个chunk
Cumulative->>Cumulative: 累积chunk
Cumulative->>Parser: parse_result([acc_gen], partial=True)
Parser-->>Cumulative: parsed (new version)
Cumulative->>Cumulative: _diff(prev_parsed, parsed)
Note right of Cumulative: JSONPatch: [<br/> {"op": "add", "path": "/count", "value": 1}<br/>]
Cumulative-->>Parser: yield JSONPatch operations
end
deactivate Cumulative
Parser-->>User: yield增量变化
deactivate Parser
Note over User,Pydantic: 场景4: 解析失败
User->>Parser: invoke('Invalid JSON{')
activate Parser
Parser->>Parser: parse_result([generation], partial=False)
Parser->>Utils: parse_json_markdown('Invalid JSON{')
activate Utils
Utils->>Utils: json.loads(...)
Utils->>Utils: JSONDecodeError
Utils-->>Parser: raise Exception
deactivate Utils
Parser->>Parser: catch JSONDecodeError
Parser->>Parser: raise OutputParserException
Parser-->>User: OutputParserException:<br/>llm_output='Invalid JSON{'
deactivate Parser
时序图说明
场景1: 非流式解析 (步骤1-9)
- 步骤1-3: 用户调用invoke,Parser调用parse_result
- 步骤4: 提取并清理文本(strip空白)
- 步骤5-7: 调用parse_json_markdown工具函数
- 检测是否在markdown代码块中(
'''json...''') - 提取纯JSON部分
- 调用json.loads解析
- 检测是否在markdown代码块中(
- 步骤8-9: 如果配置了pydantic_object,额外进行模型验证
- 步骤10: 返回解析后的dict或list
场景2: 流式解析(累积模式) (步骤10-22)
- 步骤10-11: 用户调用transform,进入累积模式
- 步骤12-14: 循环处理每个chunk
- 将新chunk转为GenerationChunk
- 使用
+运算符累积(acc_gen = acc_gen + chunk_gen) - 累积文本:
'{"na'→'{"name": "A'→'{"name": "Alice"}'
- 步骤15-16: 每次尝试解析累积的文本(partial=True)
- 步骤17-20: parse_json_markdown尝试解析
- JSON完整: 解析成功,检查是否与上次不同,如是则yield
- JSON不完整: JSONDecodeError,返回None,不yield,等待更多数据
- 步骤21-22: 直到JSON完整才输出结果
场景3: diff模式(增量输出) (步骤23-31)
- 步骤23-24: 设置
parser.diff = True,启用增量模式 - 步骤25-27: 累积chunk,每次解析新版本
- 步骤28: 调用
_diff(prev_parsed, parsed)生成JSONPatch- JSONPatch格式:
[{"op": "add/replace/remove", "path": "/field", "value": ...}] - 描述从prev_parsed到parsed的最小变化
- JSONPatch格式:
- 步骤29-31: yield JSONPatch操作列表而非完整对象
场景4: 解析失败 (步骤32-38)
- 步骤32-35: 无效JSON导致json.loads失败
- 步骤36: 捕获JSONDecodeError
- 步骤37-38: 包装为OutputParserException,携带原始llm_output
边界条件与异常处理
输入格式处理:
- 纯JSON:
{"key": "value"}✓ - Markdown代码块:
'''json\n{"key": "value"}\n'''✓ - 普通代码块:
'''\n{"key": "value"}\n'''✓ - 混合文本:
这是结果: {"key": "value"}✗ (无法提取)
partial模式:
partial=False: 解析失败抛OutputParserExceptionpartial=True: 解析失败返回None(流式等待更多数据)
JSON类型:
- 对象:
{"key": "value"}→ dict - 数组:
["a", "b", "c"]→ list - 原始值:
"string",123,true→ 对应Python类型
空值处理:
- 空对象:
{}→ {} - 空数组:
[]→ [] - null:
null→ None
性能特征
解析性能:
- 时间复杂度: O(n),n为JSON文本长度
- Python标准json.loads性能: ~1-10MB/s
- 可选orjson加速: ~100-500MB/s
流式性能:
- 首token延迟: 需等待完整JSON(通常是最后一个chunk)
- 对象级延迟: 对于
{"a": 1, "b": 2},无法在"a"完整后就输出 - 改进方案: 使用更细粒度的解析器(如JsonStreamParser,未内置)
diff模式开销:
- jsonpatch.make_patch: O(n),n为对象大小
- 适用场景: 大对象频繁小变化(如实时日志追加)
关键代码
核心实现 (langchain_core/output_parsers/json.py):
from langchain_core.output_parsers.transform import BaseCumulativeTransformOutputParser
from langchain_core.utils.json import parse_json_markdown
import json
import jsonpatch
class JsonOutputParser(BaseCumulativeTransformOutputParser[Any]):
"""JSON输出解析器
支持流式模式和JSONPatch增量输出。
"""
pydantic_object: type[BaseModel] | None = None
"""可选的Pydantic模型(用于生成schema和验证)"""
def parse_result(self, result: list[Generation], *, partial: bool = False) -> Any:
"""从Generation列表解析JSON
Args:
result: Generation列表
partial: 是否为部分结果(流式模式)
Returns:
解析后的dict/list,或None(partial模式下解析失败)
Raises:
OutputParserException: partial=False且解析失败时
"""
text = result[0].text.strip()
if partial:
# 流式模式: 解析失败返回None
try:
return parse_json_markdown(text)
except JSONDecodeError:
return None
else:
# 非流式模式: 解析失败抛异常
try:
return parse_json_markdown(text)
except JSONDecodeError as e:
msg = f"Invalid json output: {text}"
raise OutputParserException(msg, llm_output=text) from e
def parse(self, text: str) -> Any:
"""便捷方法: 从文本解析JSON"""
return self.parse_result([Generation(text=text)])
def get_format_instructions(self) -> str:
"""生成格式指令"""
if self.pydantic_object is None:
return "Return a JSON object."
# 从Pydantic模型生成schema
schema = self.pydantic_object.model_json_schema()
# (省略schema简化逻辑)
schema_str = json.dumps(schema, ensure_ascii=False)
return JSON_FORMAT_INSTRUCTIONS.format(schema=schema_str)
def _diff(self, prev: Any | None, next: Any) -> Any:
"""计算JSONPatch增量
Args:
prev: 上一次的解析结果
next: 当前解析结果
Returns:
JSONPatch操作列表
"""
return jsonpatch.make_patch(prev, next).patch
工具函数 (langchain_core/utils/json.py):
def parse_json_markdown(text: str) -> Any:
"""从文本中提取并解析JSON
支持:
- 纯JSON: {"key": "value"}
- Markdown代码块: ```json\n{"key": "value"}\n```
- 普通代码块: ```\n{"key": "value"}\n```
Args:
text: 输入文本
Returns:
解析后的JSON对象
Raises:
JSONDecodeError: 解析失败
"""
# 1) 尝试提取markdown代码块
if "```json" in text:
text = text.split("```json")[1].split("```")[0]
elif "```" in text:
text = text.split("```")[1].split("```")[0]
# 2) 清理空白
text = text.strip()
# 3) 解析JSON
return json.loads(text)
累积转换逻辑 (继承自BaseCumulativeTransformOutputParser):
class BaseCumulativeTransformOutputParser(BaseTransformOutputParser[T]):
diff: bool = False
"""是否输出增量差异"""
def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[Any]:
"""累积输入并增量解析
关键逻辑:
1. 累积每个chunk到acc_gen
2. 尝试解析累积结果(partial=True)
3. 如果解析成功且结果有变化,yield结果
4. 如果diff=True,yield增量而非完整结果
"""
prev_parsed = None
acc_gen: GenerationChunk | ChatGenerationChunk | None = None
for chunk in input:
# 1) 包装chunk
if isinstance(chunk, BaseMessageChunk):
chunk_gen = ChatGenerationChunk(message=chunk)
elif isinstance(chunk, BaseMessage):
chunk_gen = ChatGenerationChunk(message=BaseMessageChunk(**chunk.model_dump()))
else:
chunk_gen = GenerationChunk(text=chunk)
# 2) 累积
acc_gen = chunk_gen if acc_gen is None else acc_gen + chunk_gen
# 3) 尝试解析
parsed = self.parse_result([acc_gen], partial=True)
# 4) 检查变化
if parsed is not None and parsed != prev_parsed:
if self.diff:
# 输出增量
yield self._diff(prev_parsed, parsed)
else:
# 输出完整结果
yield parsed
prev_parsed = parsed
使用示例:
from pydantic import BaseModel, Field
class Joke(BaseModel):
setup: str = Field(description="笑话的铺垫")
punchline: str = Field(description="笑点")
rating: int = Field(description="有趣程度1-10")
# 创建解析器(带schema)
parser = JsonOutputParser(pydantic_object=Joke)
# 构建链
prompt = ChatPromptTemplate.from_template(
"讲一个笑话,{format_instructions}\n\n主题: {topic}"
)
chain = prompt | model | parser
# 使用
result = chain.invoke({
"topic": "程序员",
"format_instructions": parser.get_format_instructions()
})
print(result)
# {'setup': '...', 'punchline': '...', 'rating': 8}
# 流式调用
for partial_result in chain.stream({"topic": "程序员", ...}):
print(f"部分结果: {partial_result}")
# diff模式
parser.diff = True
for patch in chain.stream({"topic": "程序员", ...}):
print(f"增量变化: {patch}")
# [{"op": "add", "path": "/setup", "value": "..."}]
场景1: 结构化数据提取
from pydantic import BaseModel, Field
class Article(BaseModel):
title: str = Field(description="文章标题")
summary: str = Field(description="文章摘要")
keywords: list[str] = Field(description="关键词列表")
category: str = Field(description="文章分类")
parser = PydanticOutputParser(pydantic_object=Article)
prompt = ChatPromptTemplate.from_template(
"""分析以下文章并提取信息。
{format_instructions}
文章内容:
{content}"""
)
chain = prompt | model | parser
article_info = chain.invoke({
"content": "长篇文章内容...",
"format_instructions": parser.get_format_instructions()
})
场景2: 多步骤解析
# 第一步: 生成JSON
json_parser = JsonOutputParser()
step1 = prompt1 | model | json_parser
# 第二步: 验证Pydantic
pydantic_parser = PydanticOutputParser(pydantic_object=MyModel)
step2 = RunnableLambda(lambda x: pydantic_parser.parse(json.dumps(x)))
# 组合
chain = step1 | step2
场景3: 错误修复
from langchain_core.output_parsers import OutputFixingParser
# 基础解析器
base_parser = PydanticOutputParser(pydantic_object=MyModel)
# 包装为修复解析器
fixing_parser = OutputFixingParser.from_llm(
parser=base_parser,
llm=model
)
# 即使输出格式有误,也会尝试修复
result = fixing_parser.parse(malformed_output)
最佳实践
1. 选择合适的解析器
# 简单字符串: StrOutputParser
chain = prompt | model | StrOutputParser()
# 简单JSON: JsonOutputParser
chain = prompt | model | JsonOutputParser()
# 需要验证: PydanticOutputParser
chain = prompt | model | PydanticOutputParser(pydantic_object=MyModel)
2. 格式指令集成
# 推荐: 在prompt中包含格式指令
parser = PydanticOutputParser(pydantic_object=MyModel)
prompt = ChatPromptTemplate.from_template(
"""任务: {task}
{format_instructions}
输入: {input}"""
)
chain = (
{"task": ..., "input": ..., "format_instructions": lambda _: parser.get_format_instructions()}
| prompt
| model
| parser
)
3. 错误处理
# 推荐: 捕获OutputParserException
from langchain_core.exceptions import OutputParserException
try:
result = parser.parse(output)
except OutputParserException as e:
print(f"解析失败: {e}")
print(f"原始输出: {e.llm_output}")
# 降级处理或重试
result = fallback_value
模块交互总结
与其他模块的关系
上游依赖:
- Runnables模块: 继承
RunnableSerializable,实现LCEL协议,支持pipe操作符组装 - Messages模块: 处理
BaseMessage和BaseMessageChunk输入类型 - Outputs模块: 处理
Generation和ChatGeneration包装类型 - Utils模块: 使用JSON工具函数(parse_json_markdown)、Pydantic工具等
下游消费:
- Agents模块: 使用PydanticOutputParser解析工具调用结果和Agent决策
- Chains模块: 作为链的最后一环,将LLM输出结构化
- 应用层: 直接消费解析后的结构化数据(dict/list/Pydantic实例)
关键设计模式
策略模式:
- 不同解析器实现不同的parse策略
- 统一的BaseOutputParser接口
- 运行时可替换解析器
装饰器模式(通过继承实现):
BaseOutputParser: 基础解析能力→ BaseTransformOutputParser: 增加流式支持→ BaseCumulativeTransformOutputParser: 增加累积和diff能力→ JsonOutputParser: 增加JSON解析→ PydanticOutputParser: 增加Pydantic验证
模板方法:
- BaseOutputParser定义调用骨架(invoke/parse_result/parse)
- 子类只需实现parse方法
- 上游的配置管理、回调、错误处理统一由基类提供
性能考量总结
| 解析器 | 时间复杂度 | 内存开销 | 流式首token延迟 |
|---|---|---|---|
| StrOutputParser | O(1) | O(1) | <1ms |
| JsonOutputParser | O(n) | O(n) | 需等待完整JSON |
| PydanticOutputParser | O(n)+验证 | O(n)+模型实例 | 需等待完整JSON |
| CommaSeparatedList | O(n) | O(n) | 可逐项输出 |
优化建议:
- 简单场景优先StrOutputParser
- 需要验证时使用PydanticOutputParser而非手动验证
- 流式场景考虑累积延迟,对JSON等格式需等待完整输出
- 大对象频繁小变化使用diff模式减少传输
常见陷阱
1. 流式JSON无法增量输出:
- 问题: JSON需要完整才能解析,流式模式无法提前输出
- 解决: 使用ListOutputParser或自定义增量解析器
2. 格式指令未注入:
- 问题: 忘记将
get_format_instructions()加入prompt - 解决: 使用
RunnablePassthrough或lambda自动注入
3. 异常处理缺失:
- 问题: OutputParserException未捕获导致链中断
- 解决: 使用try-except或OutputFixingParser自动修复
4. Pydantic版本兼容:
- 问题: Pydantic v1和v2 API不同
- 解决: 使用PydanticOutputParser自动适配,或显式指定版本