LangChain-04-Messages模块
模块概览
职责与定位
Messages模块定义了LangChain中所有消息类型的核心数据结构,是聊天模型和对话系统的基础。该模块提供了统一的消息抽象,支持文本、多模态内容、工具调用等多种消息类型,确保不同模型提供商之间的互操作性。
核心职责:
- 定义统一的消息类型体系(HumanMessage, AIMessage, SystemMessage等)
- 支持多模态内容(文本、图片、音频、视频)
- 封装工具调用和工具结果
- 提供消息序列化/反序列化
- 支持消息合并和转换
- 管理消息元数据和使用统计
输入输出
输入:
- 原始数据:字符串、字典、元组等
- 可通过
convert_to_messages()转换为标准消息
输出:
- BaseMessage子类实例
- 可序列化为字典/JSON
- 可格式化为字符串显示
上下游依赖
依赖:
langchain_core.load.serializable: 序列化基类pydantic: 数据验证typing_extensions: 类型系统扩展
被依赖:
langchain_core.language_models: 模型的输入输出langchain_core.prompts: Prompt模板生成消息langchain_core.agents: Agent的对话历史- 所有需要处理对话的组件
生命周期
消息对象通常是不可变的:
- 创建: 通过构造函数或转换函数创建
- 使用: 在链路中传递和处理
- 合并: 通过
+操作符合并chunk - 序列化: 保存到数据库或传输
- 反序列化: 从存储恢复
LangChain整体架构与Messages模块定位
系统架构总览
flowchart TB
subgraph UserLayer["应用层"]
USER[用户应用]
AGENT[Agents<br/>智能代理]
CHAIN[Chains<br/>链式调用]
end
subgraph CoreLayer["核心抽象层"]
RUNNABLE[Runnables<br/>可执行接口]
PROMPT[Prompts<br/>提示词模板]
MESSAGES[Messages<br/>消息抽象<br/>⭐本模块]
CALLBACK[Callbacks<br/>回调机制]
end
subgraph ModelLayer["模型层"]
LLM[LLMs<br/>文本生成模型]
CHAT[ChatModels<br/>对话模型]
EMBEDDINGS[Embeddings<br/>嵌入模型]
end
subgraph ToolLayer["工具层"]
TOOLS[Tools<br/>工具定义]
RETRIEVER[Retrievers<br/>检索器]
MEMORY[Memory<br/>记忆存储]
end
subgraph DataLayer["数据层"]
VECTOR[VectorStores<br/>向量数据库]
DOCUMENT[Document<br/>文档加载器]
OUTPUT[OutputParsers<br/>输出解析]
end
USER --> AGENT
USER --> CHAIN
AGENT --> RUNNABLE
CHAIN --> RUNNABLE
RUNNABLE --> PROMPT
RUNNABLE --> CHAT
RUNNABLE --> TOOLS
PROMPT --> MESSAGES
CHAT --> MESSAGES
AGENT --> MESSAGES
CHAT --> CALLBACK
MESSAGES --> OUTPUT
TOOLS --> MESSAGES
MEMORY --> MESSAGES
RETRIEVER --> DOCUMENT
style MESSAGES fill:#ff6b6b,stroke:#c92a2a,stroke-width:3px,color:#fff
style CHAT fill:#4ecdc4,stroke:#0e9594
style PROMPT fill:#95e1d3,stroke:#38ada9
style TOOLS fill:#f9ca24,stroke:#f0932b
架构说明:
- Messages在核心抽象层的地位: Messages模块是LangChain核心抽象层的基础组件,所有涉及对话的模块都依赖它
- 上游调用方: ChatModels、Prompts、Agents、Chains都会生成或处理Messages
- 下游消费方: OutputParsers解析Messages内容,Memory存储Messages历史,Callbacks追踪Messages流转
- 横向协作: 与Runnables接口配合实现流式处理,与Callbacks配合实现事件追踪
Messages模块在调用链中的位置
sequenceDiagram
autonumber
participant App as 应用层
participant Chain as Chain/Agent
participant Prompt as PromptTemplate
participant Messages as Messages模块
participant ChatModel as ChatModel
participant Provider as 模型提供商API
App->>Chain: 用户输入
Chain->>Prompt: format_messages({input})
Note over Prompt,Messages: 1️⃣ 消息生成阶段
Prompt->>Messages: convert_to_messages(...)
Messages->>Messages: 创建HumanMessage/SystemMessage
Messages-->>Prompt: [BaseMessage]
Prompt-->>Chain: messages列表
Chain->>ChatModel: invoke(messages)
Note over ChatModel,Messages: 2️⃣ 消息转换阶段
ChatModel->>Messages: convert_to_messages(input)
Messages->>Messages: 验证并标准化
Messages-->>ChatModel: 标准化的messages
ChatModel->>ChatModel: _convert_to_provider_format()
ChatModel->>Provider: API调用(provider格式)
Note over Provider,Messages: 3️⃣ 响应构建阶段
Provider-->>ChatModel: API响应(JSON)
ChatModel->>Messages: 构造AIMessage
Messages->>Messages: 解析tool_calls/usage_metadata
Messages-->>ChatModel: AIMessage实例
ChatModel-->>Chain: AIMessage
Chain-->>App: 最终响应
调用链说明:
阶段1 - 消息生成:
- PromptTemplate通过
convert_to_messages()将模板结果转为标准Messages - 支持字符串、元组、字典等多种输入格式
阶段2 - 消息标准化:
- ChatModel接收输入后再次调用
convert_to_messages()确保格式统一 - 处理兼容性问题(如v0格式的多模态内容转换)
阶段3 - 响应封装:
- 模型API返回后,ChatModel构造AIMessage封装响应
- 解析tool_calls、usage_metadata等结构化信息
- 统一不同提供商的响应格式
模块架构图
flowchart TB
subgraph Base["基类层"]
BASE_MSG[BaseMessage<br/>消息基类]
BASE_CHUNK[BaseMessageChunk<br/>可合并消息块]
end
subgraph CoreTypes["核心消息类型"]
HUMAN[HumanMessage<br/>用户消息]
AI[AIMessage<br/>AI消息]
SYSTEM[SystemMessage<br/>系统消息]
TOOL[ToolMessage<br/>工具结果消息]
FUNCTION[FunctionMessage<br/>函数结果(废弃)]
CHAT[ChatMessage<br/>自定义角色]
end
subgraph Chunks["消息块类型"]
HUMAN_CHUNK[HumanMessageChunk]
AI_CHUNK[AIMessageChunk]
SYSTEM_CHUNK[SystemMessageChunk]
TOOL_CHUNK[ToolMessageChunk]
end
subgraph Content["内容类型"]
TEXT[TextContentBlock<br/>文本]
IMAGE[ImageContentBlock<br/>图片]
AUDIO[AudioContentBlock<br/>音频]
VIDEO[VideoContentBlock<br/>视频]
FILE[FileContentBlock<br/>文件]
REASONING[ReasoningContentBlock<br/>推理过程]
end
subgraph ToolCalling["工具调用"]
TOOL_CALL[ToolCall<br/>工具调用请求]
TOOL_CALL_CHUNK[ToolCallChunk<br/>工具调用块]
INVALID_TOOL[InvalidToolCall<br/>无效工具调用]
SERVER_TOOL[ServerToolCall<br/>服务端工具]
end
subgraph Utils["工具函数"]
CONVERT[convert_to_messages]
FILTER[filter_messages]
TRIM[trim_messages]
MERGE[merge_message_runs]
TO_DICT[messages_to_dict]
FROM_DICT[messages_from_dict]
end
BASE_MSG <|-- BASE_CHUNK
BASE_MSG <|-- HUMAN
BASE_MSG <|-- AI
BASE_MSG <|-- SYSTEM
BASE_MSG <|-- TOOL
BASE_MSG <|-- FUNCTION
BASE_MSG <|-- CHAT
BASE_CHUNK <|-- HUMAN_CHUNK
BASE_CHUNK <|-- AI_CHUNK
BASE_CHUNK <|-- SYSTEM_CHUNK
BASE_CHUNK <|-- TOOL_CHUNK
AI --> TOOL_CALL
AI --> TOOL_CALL_CHUNK
AI --> INVALID_TOOL
BASE_MSG --> Content
BASE_MSG --> Utils
style BASE_MSG fill:#e1f5ff
style AI fill:#fff4e1
style HUMAN fill:#e8f5e9
style TOOL_CALL fill:#ffe8e8
架构说明
消息类型层次
BaseMessage:
- 所有消息的抽象基类
- 定义通用字段:content, additional_kwargs, response_metadata等
- 实现序列化接口
BaseMessageChunk:
- 可合并的消息块基类
- 支持
+操作符合并 - 用于流式输出
角色消息类型:
HumanMessage: 用户输入消息AIMessage: AI模型输出消息SystemMessage: 系统指令消息ToolMessage: 工具执行结果消息ChatMessage: 自定义角色消息(兼容性)
多模态支持
消息的content字段可以是:
- 字符串: 纯文本内容
- 列表: 包含多种内容块
- TextContentBlock: 文本块
- ImageContentBlock: 图片(URL或base64)
- AudioContentBlock: 音频
- VideoContentBlock: 视频
- FileContentBlock: 文件
工具调用体系
ToolCall:
- 表示AI请求调用工具
- 包含工具名、参数、调用ID
- 存储在AIMessage.tool_calls
ToolMessage:
- 表示工具执行结果
- 关联到原始ToolCall的ID
- 返回给模型继续对话
InvalidToolCall:
- 表示无效的工具调用
- 包含错误信息
- 用于错误处理
核心数据结构
classDiagram
class BaseMessage {
<<abstract>>
+content: str|list
+additional_kwargs: dict
+response_metadata: dict
+type: str
+name: str|None
+id: str|None
+text: TextAccessor
+pretty_repr() str
}
class HumanMessage {
+type: "human"
+example: bool
}
class AIMessage {
+type: "ai"
+tool_calls: list[ToolCall]
+invalid_tool_calls: list[InvalidToolCall]
+usage_metadata: UsageMetadata|None
}
class SystemMessage {
+type: "system"
}
class ToolMessage {
+type: "tool"
+tool_call_id: str
+artifact: Any
+status: Literal["success", "error"]
}
class ToolCall {
+name: str
+args: dict
+id: str
+type: Literal["tool_call"]
}
class UsageMetadata {
+input_tokens: int
+output_tokens: int
+total_tokens: int
+input_token_details: InputTokenDetails
+output_token_details: OutputTokenDetails
}
class ContentBlock {
<<union>>
TextContentBlock
ImageContentBlock
AudioContentBlock
VideoContentBlock
FileContentBlock
}
BaseMessage <|-- HumanMessage
BaseMessage <|-- AIMessage
BaseMessage <|-- SystemMessage
BaseMessage <|-- ToolMessage
AIMessage --> ToolCall : contains
AIMessage --> UsageMetadata : contains
BaseMessage --> ContentBlock : content
数据结构说明
BaseMessage字段
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| content | str|list | 是 | - | 消息内容,字符串或内容块列表 |
| additional_kwargs | dict | 否 | {} | 额外参数(废弃,使用response_metadata) |
| response_metadata | dict | 否 | {} | 响应元数据 |
| type | str | 是 | - | 消息类型标识符 |
| name | str|None | 否 | None | 消息发送者名称 |
| id | str|None | 否 | None | 消息唯一标识 |
AIMessage特有字段
| 字段 | 类型 | 说明 |
|---|---|---|
| tool_calls | list[ToolCall] | 工具调用请求列表 |
| invalid_tool_calls | list[InvalidToolCall] | 无效工具调用列表 |
| usage_metadata | UsageMetadata|None | Token使用统计 |
ToolCall结构
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| name | str | 是 | 工具名称 |
| args | dict | 是 | 工具参数字典 |
| id | str | 是 | 调用唯一ID |
| type | str | 否 | 固定为"tool_call" |
ToolMessage字段
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| content | str | 是 | 工具执行结果(字符串形式) |
| tool_call_id | str | 是 | 关联的ToolCall的ID |
| artifact | Any | 否 | 原始结果对象 |
| status | str | 否 | “success"或"error” |
UsageMetadata结构
| 字段 | 类型 | 说明 |
|---|---|---|
| input_tokens | int | 输入Token数 |
| output_tokens | int | 输出Token数 |
| total_tokens | int | 总Token数 |
| input_token_details | dict | 输入Token细分(缓存命中等) |
| output_token_details | dict | 输出Token细分(推理Token等) |
多模态内容块
ImageContentBlock:
{
"type": "image",
"source": "url", # 或 "base64"
"url": "https://...", # 或 "data": "base64..."
"detail": "auto" # 或 "low"/"high"
}
AudioContentBlock:
{
"type": "audio",
"source": "base64",
"data": "base64..."
}
核心API详解
API-1: convert_to_messages
基本信息
- 名称:
convert_to_messages - 函数签名:
def convert_to_messages(messages: Sequence[MessageLikeRepresentation]) -> list[BaseMessage] - 幂等性: 幂等(多次调用结果相同)
功能说明
将各种格式的输入转换为标准BaseMessage列表。支持字符串、字典、元组、BaseMessage等多种输入格式。
请求结构体
messages: Sequence[MessageLikeRepresentation]
# MessageLikeRepresentation = BaseMessage | list | tuple | str | dict
| 参数 | 类型 | 说明 |
|---|---|---|
| messages | Sequence | 消息列表,支持多种格式 |
支持的输入格式:
- BaseMessage实例:直接返回
- 字符串:
"hello"→HumanMessage("hello") - 元组:
("system", "You are...")→SystemMessage("You are...") - 字典:
{"role": "user", "content": "hi"}→HumanMessage("hi") - 列表:递归转换每个元素
响应结构体
list[BaseMessage] # 标准消息列表
入口函数与关键代码
def convert_to_messages(
messages: Sequence[MessageLikeRepresentation],
) -> list[BaseMessage]:
"""将各种格式转换为标准消息列表
Args:
messages: 消息序列,支持多种格式
Returns:
标准BaseMessage列表
"""
result = []
for message in messages:
result.append(_convert_single_message(message))
return result
def _convert_single_message(message: MessageLikeRepresentation) -> BaseMessage:
# 1) 已经是BaseMessage,直接返回
if isinstance(message, BaseMessage):
return message
# 2) 字符串 → HumanMessage
elif isinstance(message, str):
return HumanMessage(content=message)
# 3) 元组 (role, content) → 对应类型消息
elif isinstance(message, tuple) and len(message) == 2:
role, content = message
return _role_to_message(role, content)
# 4) 字典 {role: ..., content: ...} → 对应类型消息
elif isinstance(message, dict):
if "role" in message:
return _role_to_message(message["role"], message["content"])
else:
# 尝试直接构造消息
return _message_from_dict(message)
else:
msg = f"Unsupported message type: {type(message)}"
raise ValueError(msg)
def _role_to_message(role: str, content: str) -> BaseMessage:
"""根据角色创建对应类型消息"""
role = role.lower()
if role in ("human", "user"):
return HumanMessage(content=content)
elif role in ("ai", "assistant"):
return AIMessage(content=content)
elif role == "system":
return SystemMessage(content=content)
elif role == "tool":
# 工具消息需要tool_call_id
return ToolMessage(content=content, tool_call_id="")
else:
# 自定义角色使用ChatMessage
return ChatMessage(content=content, role=role)
代码说明:
- 遍历输入序列,逐个转换
- 根据输入类型选择转换策略
- 字符串默认转为HumanMessage
- 元组和字典根据role字段映射到消息类型
- 不支持的类型抛出ValueError
调用链与上层函数
# 在模型调用中自动转换
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
# 各种输入格式都会自动转换
result = model.invoke("hello") # str → [HumanMessage("hello")]
result = model.invoke([("system", "You are..."), ("user", "hi")]) # tuple → messages
result = model.invoke([{"role": "user", "content": "hi"}]) # dict → messages
# 在Prompt模板中
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages([
("system", "You are {role}"),
("user", "{input}"),
])
# format_messages内部调用convert_to_messages
messages = prompt.format_messages(role="assistant", input="hello")
时序图
sequenceDiagram
autonumber
participant User
participant Convert as convert_to_messages
participant Model as ChatModel
User->>Model: invoke([<br/> ("system", "You are..."),<br/> "What is AI?"<br/>])
Model->>Model: _convert_input(input)
Model->>Convert: convert_to_messages([...])
Convert->>Convert: 处理元组("system", ...)
Convert->>Convert: SystemMessage("You are...")
Convert->>Convert: 处理字符串"What is AI?"
Convert->>Convert: HumanMessage("What is AI?")
Convert-->>Model: [SystemMessage, HumanMessage]
Model->>Model: 发送到API
Model-->>User: AIMessage(response)
时序图说明:
图意: 展示输入转换如何将多种格式统一为标准消息列表。
边界条件:
- 空列表返回空列表
- 不支持的类型抛出ValueError
- role不识别时使用ChatMessage
API-2: messages_to_dict / messages_from_dict
基本信息
- 名称:
messages_to_dict/messages_from_dict - 函数签名:
def messages_to_dict(messages: Sequence[BaseMessage]) -> list[dict]def messages_from_dict(messages: list[dict]) -> list[BaseMessage]
- 幂等性: 幂等(往返转换保持等价)
功能说明
序列化和反序列化消息,用于存储到数据库或传输。
请求结构体
# messages_to_dict
messages: Sequence[BaseMessage]
# messages_from_dict
messages: list[dict] # 包含__class__字段的字典
响应结构体
# messages_to_dict
list[dict] # 可序列化的字典列表
# messages_from_dict
list[BaseMessage] # 恢复的消息对象
入口函数与关键代码
def messages_to_dict(messages: Sequence[BaseMessage]) -> list[dict]:
"""将消息列表序列化为字典列表
Args:
messages: BaseMessage序列
Returns:
可JSON序列化的字典列表
"""
return [message_to_dict(m) for m in messages]
def message_to_dict(message: BaseMessage) -> dict:
"""将单个消息序列化为字典"""
# 使用dumpd进行序列化(处理Pydantic模型)
data = dumpd(message)
# 添加类型信息用于反序列化
data["__class__"] = {
"module": message.__class__.__module__,
"name": message.__class__.__name__,
}
return data
def messages_from_dict(messages: list[dict]) -> list[BaseMessage]:
"""从字典列表反序列化消息
Args:
messages: 字典列表,包含__class__字段
Returns:
恢复的BaseMessage列表
"""
return [_message_from_dict(m) for m in messages]
def _message_from_dict(message: dict) -> BaseMessage:
"""从单个字典反序列化消息"""
# 1) 提取类型信息
if "__class__" in message:
class_info = message["__class__"]
module = class_info["module"]
name = class_info["name"]
# 2) 动态导入类
module_obj = importlib.import_module(module)
cls = getattr(module_obj, name)
# 3) 构造实例
data = {k: v for k, v in message.items() if k != "__class__"}
return cls(**data)
else:
# 兼容旧格式,根据type字段推断
msg_type = message.get("type")
if msg_type == "human":
return HumanMessage(**message)
elif msg_type == "ai":
return AIMessage(**message)
elif msg_type == "system":
return SystemMessage(**message)
elif msg_type == "tool":
return ToolMessage(**message)
else:
return ChatMessage(**message)
代码说明:
- messages_to_dict遍历消息并序列化
- 每个消息包含__class__字段用于反序列化
- messages_from_dict通过动态导入恢复类型
- 兼容旧格式的type字段
调用链与上层函数
import json
# 序列化消息保存到数据库
messages = [
HumanMessage("Hello"),
AIMessage("Hi, how can I help?"),
]
# 转换为字典
dicts = messages_to_dict(messages)
# 保存到JSON文件
with open("chat_history.json", "w") as f:
json.dump(dicts, f)
# 从JSON恢复
with open("chat_history.json") as f:
loaded_dicts = json.load(f)
# 反序列化
loaded_messages = messages_from_dict(loaded_dicts)
assert loaded_messages[0].content == "Hello"
assert isinstance(loaded_messages[1], AIMessage)
API-3: filter_messages
基本信息
- 名称:
filter_messages - 函数签名:
def filter_messages(messages: Sequence[BaseMessage], *, include_names: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, include_types: Sequence[type | str] | None = None, exclude_types: Sequence[type | str] | None = None, include_ids: Sequence[str] | None = None, exclude_ids: Sequence[str] | None = None) -> list[BaseMessage] - 幂等性: 幂等
功能说明
根据名称、类型、ID等条件过滤消息列表。
请求结构体
messages: Sequence[BaseMessage]
include_names: Sequence[str] | None # 包含的名称
exclude_names: Sequence[str] | None # 排除的名称
include_types: Sequence[type | str] | None # 包含的类型
exclude_types: Sequence[type | str] | None # 排除的类型
include_ids: Sequence[str] | None # 包含的ID
exclude_ids: Sequence[str] | None # 排除的ID
响应结构体
list[BaseMessage] # 过滤后的消息列表
入口函数与关键代码
def filter_messages(
messages: Sequence[BaseMessage],
*,
include_names: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
include_types: Sequence[type | str] | None = None,
exclude_types: Sequence[type | str] | None = None,
include_ids: Sequence[str] | None = None,
exclude_ids: Sequence[str] | None = None,
) -> list[BaseMessage]:
"""根据条件过滤消息
Args:
messages: 输入消息列表
include_names: 只保留这些名称的消息
exclude_names: 排除这些名称的消息
include_types: 只保留这些类型的消息
exclude_types: 排除这些类型的消息
include_ids: 只保留这些ID的消息
exclude_ids: 排除这些ID的消息
Returns:
过滤后的消息列表
"""
filtered = []
for message in messages:
# 1) 检查ID过滤
if include_ids and message.id not in include_ids:
continue
if exclude_ids and message.id in exclude_ids:
continue
# 2) 检查名称过滤
if include_names and message.name not in include_names:
continue
if exclude_names and message.name in exclude_names:
continue
# 3) 检查类型过滤
if include_types:
type_match = any(
isinstance(message, t) if isinstance(t, type) else message.type == t
for t in include_types
)
if not type_match:
continue
if exclude_types:
type_match = any(
isinstance(message, t) if isinstance(t, type) else message.type == t
for t in exclude_types
)
if type_match:
continue
# 4) 通过所有过滤条件,保留消息
filtered.append(message)
return filtered
代码说明:
- 遍历所有消息并应用过滤条件
- include条件是白名单(只保留匹配的)
- exclude条件是黑名单(排除匹配的)
- 类型可以是类型对象或字符串
- 所有条件都通过才保留消息
调用链与上层函数
# 只保留用户和AI消息,排除系统消息
filtered = filter_messages(
messages,
include_types=[HumanMessage, AIMessage]
)
# 排除特定名称的消息
filtered = filter_messages(
messages,
exclude_names=["debug_user"]
)
# 组合条件:只保留human类型且名称为"alice"
filtered = filter_messages(
messages,
include_types=["human"],
include_names=["alice"]
)
API-4: trim_messages
基本信息
- 名称:
trim_messages - 函数签名:
def trim_messages(messages: Sequence[BaseMessage], *, max_tokens: int | None = None, token_counter: Callable | BaseLanguageModel | None = None, strategy: Literal["first", "last"] = "last", allow_partial: bool = False, start_on: Literal["human", "ai", "system"] | None = None, end_on: Literal["human", "ai", "system"] | None = None, include_system: bool = False) -> list[BaseMessage] - 幂等性: 非幂等(结果取决于消息内容)
功能说明
根据token限制裁剪消息列表,确保不超过模型上下文窗口。
请求结构体
messages: Sequence[BaseMessage]
max_tokens: int | None # 最大token数
token_counter: Callable | BaseLanguageModel | None # token计数器
strategy: Literal["first", "last"] # 保留策略
allow_partial: bool # 是否允许部分消息
start_on: str | None # 起始消息类型
end_on: str | None # 结束消息类型
include_system: bool # 是否始终包含系统消息
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| messages | Sequence[BaseMessage] | 是 | - | 输入消息列表 |
| max_tokens | int|None | 否 | None | 最大token数限制 |
| token_counter | Callable|BaseLanguageModel|None | 否 | None | token计数器或模型 |
| strategy | str | 否 | “last” | “first"保留最早,“last"保留最新 |
| allow_partial | bool | 否 | False | 是否允许截断单条消息 |
| start_on | str|None | 否 | None | 结果必须以该类型消息开始 |
| end_on | str|None | 否 | None | 结果必须以该类型消息结束 |
| include_system | bool | 否 | False | 是否始终保留SystemMessage |
响应结构体
list[BaseMessage] # 裁剪后的消息列表
入口函数与关键代码
def trim_messages(
messages: Sequence[BaseMessage],
*,
max_tokens: int | None = None,
token_counter: Callable | BaseLanguageModel | None = None,
strategy: Literal["first", "last"] = "last",
allow_partial: bool = False,
start_on: Literal["human", "ai", "system"] | None = None,
end_on: Literal["human", "ai", "system"] | None = None,
include_system: bool = False,
) -> list[BaseMessage]:
"""根据token限制裁剪消息列表
Args:
messages: 输入消息列表
max_tokens: 最大token数
token_counter: token计数函数或模型
strategy: "first"保留最早,"last"保留最新
allow_partial: 是否允许截断消息
start_on: 起始消息类型要求
end_on: 结束消息类型要求
include_system: 是否始终保留系统消息
Returns:
裁剪后的消息列表
"""
# 1) 确定token计数器
if token_counter is None:
token_counter = _get_token_ids_default_method
elif isinstance(token_counter, BaseLanguageModel):
token_counter = token_counter.get_num_tokens
# 2) 分离系统消息
system_messages = []
other_messages = []
for msg in messages:
if include_system and isinstance(msg, SystemMessage):
system_messages.append(msg)
else:
other_messages.append(msg)
# 3) 计算系统消息token数
system_tokens = sum(token_counter(msg.content) for msg in system_messages)
remaining_tokens = max_tokens - system_tokens if max_tokens else None
# 4) 根据策略选择消息
if strategy == "last":
# 从后向前保留
selected = _trim_from_end(other_messages, remaining_tokens, token_counter, allow_partial)
else:
# 从前向后保留
selected = _trim_from_start(other_messages, remaining_tokens, token_counter, allow_partial)
# 5) 应用start_on/end_on约束
if start_on:
selected = _ensure_start_on(selected, start_on)
if end_on:
selected = _ensure_end_on(selected, end_on)
# 6) 合并系统消息
return system_messages + selected
def _trim_from_end(
messages: list[BaseMessage],
max_tokens: int | None,
token_counter: Callable,
allow_partial: bool,
) -> list[BaseMessage]:
"""从后向前保留消息,不超过token限制"""
if max_tokens is None:
return messages
selected = []
total_tokens = 0
for msg in reversed(messages):
msg_tokens = token_counter(msg.content)
if total_tokens + msg_tokens <= max_tokens:
# 完整消息可以加入
selected.insert(0, msg)
total_tokens += msg_tokens
elif allow_partial and total_tokens < max_tokens:
# 截断消息
remaining = max_tokens - total_tokens
truncated = _truncate_message(msg, remaining, token_counter)
selected.insert(0, truncated)
break
else:
# 无法加入更多消息
break
return selected
代码说明:
- 获取或构造token计数器
- 分离系统消息(如果include_system=True)
- 根据strategy选择保留策略
- _trim_from_end从最新消息开始保留
- allow_partial允许截断单条消息
- start_on/end_on确保结果符合约束
调用链与上层函数
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
# 裁剪消息以适应上下文窗口
trimmed = trim_messages(
long_conversation,
max_tokens=4000,
token_counter=model, # 使用模型的token计数器
strategy="last", # 保留最新消息
include_system=True, # 始终保留系统消息
)
# 发送裁剪后的消息
response = model.invoke(trimmed)
模块间交互详细时序图
场景1: 简单对话流程
sequenceDiagram
autonumber
participant User as 用户代码
participant Model as ChatOpenAI
participant Converter as convert_to_messages
participant BaseMsg as BaseMessage
participant AIMsg as AIMessage
User->>Model: invoke([<br/> "You are helpful",<br/> {"role": "user", "content": "Hi"}<br/>])
Note over Model,Converter: 📥 输入标准化
Model->>Model: _convert_input(input)
Model->>Converter: convert_to_messages([...])
Converter->>Converter: 遍历输入列表
Note over Converter: 处理第1项: 字符串
Converter->>Converter: _convert_to_message("You are helpful")
Converter->>BaseMsg: HumanMessage("You are helpful")
BaseMsg-->>Converter: HumanMessage实例
Note over Converter: 处理第2项: 字典
Converter->>Converter: _convert_to_message({role: "user", ...})
Converter->>Converter: _create_message_from_message_type(<br/> "user", "Hi"<br/>)
Converter->>BaseMsg: HumanMessage("Hi")
BaseMsg-->>Converter: HumanMessage实例
Converter-->>Model: [HumanMessage, HumanMessage]
Note over Model: 🌐 调用API
Model->>Model: _generate_with_retry(messages)
Model->>Model: 转换为OpenAI格式
Model->>Model: HTTP POST /chat/completions
Note over Model: 📤 响应处理
Model->>Model: 解析API响应JSON
Model->>AIMsg: AIMessage(<br/> content="Hello! How can I help you?",<br/> response_metadata={...},<br/> usage_metadata={...}<br/>)
AIMsg-->>Model: AIMessage实例
Model-->>User: AIMessage
流程说明:
- 输入接收(步骤1-2): ChatModel接收混合格式输入(字符串、字典等)
- 标准化处理(步骤3-9):
convert_to_messages()遍历输入并逐项转换- 字符串默认转为HumanMessage
- 字典根据role字段创建对应类型消息
- API调用(步骤10-12): 转换为提供商格式并发送请求
- 响应构造(步骤13-15): 将API JSON响应封装为AIMessage
场景2: 多模态消息处理
sequenceDiagram
autonumber
participant User as 用户代码
participant Model as ChatModel
participant HumanMsg as HumanMessage
participant ContentBlocks as content_blocks属性
participant Translator as BlockTranslator
User->>HumanMsg: HumanMessage(content=[<br/> {"type": "text", "text": "Describe this"},<br/> {"type": "image", "source": {...}}<br/>])
Note over HumanMsg: 💾 存储原始content
HumanMsg->>HumanMsg: self.content = [dict1, dict2]
User->>Model: invoke([message])
Model->>Model: _convert_input([message])
Note over Model,ContentBlocks: 🔍 内容块解析
Model->>ContentBlocks: message.content_blocks
ContentBlocks->>ContentBlocks: 遍历content列表
ContentBlocks->>ContentBlocks: 检查第1项: {"type": "text"}
Note over ContentBlocks: 标准v1格式,直接使用
ContentBlocks->>ContentBlocks: blocks.append(TextContentBlock)
ContentBlocks->>ContentBlocks: 检查第2项: {"type": "image", "source": ...}
Note over ContentBlocks: 非标准格式,标记为non_standard
ContentBlocks->>ContentBlocks: blocks.append({<br/> "type": "non_standard",<br/> "value": {...}<br/>})
Note over ContentBlocks,Translator: 🔄 格式转换
ContentBlocks->>Translator: _convert_to_v1_from_anthropic_input(blocks)
Translator->>Translator: 识别Anthropic格式
Translator->>Translator: 转换为v1标准ImageContentBlock
Translator-->>ContentBlocks: 标准化的blocks
ContentBlocks-->>Model: [<br/> TextContentBlock,<br/> ImageContentBlock<br/>]
Note over Model: 🌐 转换为提供商格式
Model->>Model: convert_to_openai_messages([message])
Model->>Model: 转换ImageContentBlock → image_url格式
Model->>Model: 发送API请求
Model-->>User: AIMessage(response)
多模态处理说明:
- 内容存储(步骤1-2): HumanMessage接受列表形式的content,可包含多种类型块
- 延迟解析(步骤3-5): content_blocks是@property,首次访问时才解析
- 格式识别(步骤6-10):
- 检查每个块是否为v1标准格式
- 非标准格式标记为
non_standard待后续处理
- 多轮转换(步骤11-14):
- 依次尝试Anthropic、OpenAI、Google等格式转换器
- 成功转换为v1标准ContentBlock
- 提供商适配(步骤16-18): 根据目标模型转换为特定格式
场景3: 工具调用完整流程
sequenceDiagram
autonumber
participant User as 用户代码
participant Agent as Agent/Chain
participant Model as ChatModel(bind_tools)
participant ToolExec as Tool执行器
participant AIMsg as AIMessage
participant ToolMsg as ToolMessage
Note over User,Agent: 🚀 初始化工具绑定
User->>Model: model.bind_tools([get_weather_tool])
Model->>Model: 保存tool schemas
Model-->>User: 配置后的model
Note over User,Agent: 💬 第1轮: 用户请求
User->>Agent: "What's the weather in Beijing?"
Agent->>Agent: messages = [HumanMessage(...)]
Agent->>Model: invoke(messages)
Note over Model: 📤 发送带工具定义的请求
Model->>Model: 构造API请求:<br/>{<br/> messages: [...],<br/> tools: [weather_tool_schema]<br/>}
Model->>Model: HTTP POST to API
Note over Model,AIMsg: 🔧 解析工具调用
Model->>Model: 接收API响应:<br/>{<br/> "tool_calls": [{<br/> "id": "call_123",<br/> "function": {<br/> "name": "get_weather",<br/> "arguments": '{"location":"Beijing"}'<br/> }<br/> }]<br/>}
Model->>Model: default_tool_parser(raw_tool_calls)
Model->>AIMsg: AIMessage(<br/> content="",<br/> tool_calls=[<br/> ToolCall(<br/> name="get_weather",<br/> args={"location": "Beijing"},<br/> id="call_123"<br/> )<br/> ]<br/>)
AIMsg-->>Model: AIMessage实例
Model-->>Agent: AIMessage(tool_calls=[...])
Note over Agent,ToolExec: ⚙️ 执行工具
Agent->>Agent: 检测到tool_calls
Agent->>ToolExec: 调用get_weather(location="Beijing")
ToolExec->>ToolExec: 执行工具逻辑
ToolExec-->>Agent: "Sunny, 25°C"
Note over Agent,ToolMsg: 📝 构造工具结果
Agent->>ToolMsg: ToolMessage(<br/> content="Sunny, 25°C",<br/> tool_call_id="call_123"<br/>)
ToolMsg-->>Agent: ToolMessage实例
Agent->>Agent: messages.extend([<br/> AIMessage(tool_calls),<br/> ToolMessage(result)<br/>])
Note over Agent,Model: 💬 第2轮: 生成最终答案
Agent->>Model: invoke(messages)
Model->>Model: 发送完整对话历史:<br/>[HumanMessage, AIMessage(tool_calls), ToolMessage]
Model->>Model: HTTP POST to API
Note over Model: 📤 生成最终响应
Model->>AIMsg: AIMessage(<br/> content="The weather in Beijing is sunny, 25°C"<br/>)
Model-->>Agent: AIMessage
Agent-->>User: 最终答案
工具调用流程说明:
-
工具绑定(步骤1-3):
bind_tools()将工具schema注入模型配置- 每次调用时自动附加到API请求
-
第一轮对话(步骤4-15):
- 模型检测到需要工具信息,返回tool_calls
default_tool_parser()解析OpenAI格式为LangChain ToolCall- AIMessage携带tool_calls而非文本content
-
工具执行(步骤16-21):
- Agent检测AIMessage.tool_calls非空
- 并行或串行执行对应工具
- 将结果封装为ToolMessage,关联tool_call_id
-
第二轮对话(步骤22-27):
- 将完整对话历史(含工具调用和结果)发送给模型
- 模型基于工具结果生成自然语言答案
场景4: 流式响应与消息合并
sequenceDiagram
autonumber
participant User as 用户代码
participant Model as ChatModel
participant Stream as stream迭代器
participant AIChunk as AIMessageChunk
participant Merge as BaseMessageChunk.__add__
User->>Model: stream([HumanMessage("Tell me a story")])
Note over Model,Stream: 🌊 开启流式响应
Model->>Stream: _stream(messages)
Model->>Model: 发送streaming=True的API请求
Note over Stream,AIChunk: 📦 接收第1个chunk
Stream->>Stream: 接收SSE数据: {"delta": {"content": "Once"}}
Stream->>AIChunk: AIMessageChunk(<br/> content="Once",<br/> chunk_position=0<br/>)
AIChunk-->>Stream: chunk1
Stream-->>User: yield chunk1
Note over User: 显示: "Once"
Note over Stream,AIChunk: 📦 接收第2个chunk
Stream->>Stream: 接收SSE数据: {"delta": {"content": " upon"}}
Stream->>AIChunk: AIMessageChunk(<br/> content=" upon",<br/> chunk_position=1<br/>)
AIChunk-->>Stream: chunk2
Stream-->>User: yield chunk2
Note over User: 显示: " upon"
Note over User: ... (更多chunks)
Note over User,Merge: 🔗 用户合并chunks
User->>User: accumulated = None
User->>User: for chunk in stream():
User->>Merge: accumulated = accumulated + chunk1
Merge->>Merge: merge_content("", "Once")
Merge->>Merge: merge_dicts(additional_kwargs)
Merge-->>User: AIMessageChunk(content="Once")
User->>Merge: accumulated = accumulated + chunk2
Merge->>Merge: merge_content("Once", " upon")
Merge-->>User: AIMessageChunk(content="Once upon")
Note over User: ... (继续累积)
User->>User: 最终: AIMessageChunk(<br/> content="Once upon a time...",<br/> response_metadata={完整元数据}<br/>)
流式处理说明:
-
流式初始化(步骤1-3):
stream()方法启动SSE(Server-Sent Events)连接- API返回增量数据而非完整响应
-
Chunk生成(步骤4-12):
- 每个SSE事件转换为AIMessageChunk
- chunk_position追踪顺序
- 立即yield给调用方实现实时显示
-
增量合并(步骤13-20):
__add__操作符支持chunk累加merge_content()处理字符串拼接或列表合并merge_dicts()合并元数据(后者覆盖前者)
-
最终消息:
- 所有chunks合并后得到完整AIMessage
- 可通过
message_chunk_to_message()转为非chunk类型
典型使用场景与最佳实践
场景1: 简单对话
messages = [
SystemMessage("You are a helpful assistant"),
HumanMessage("What is 2+2?"),
AIMessage("2+2 equals 4"),
HumanMessage("What about 3+3?"),
]
response = model.invoke(messages)
场景2: 多模态消息
message = HumanMessage(
content=[
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {"url": "https://example.com/image.jpg"}},
]
)
response = model.invoke([message])
场景3: 工具调用对话
# 1. 用户请求
messages = [HumanMessage("What's the weather in Beijing?")]
# 2. AI请求工具
response = model_with_tools.invoke(messages)
# AIMessage(tool_calls=[{"name": "get_weather", "args": {"location": "Beijing"}, "id": "call_123"}])
messages.append(response)
# 3. 执行工具
result = get_weather("Beijing")
messages.append(ToolMessage(content=result, tool_call_id="call_123"))
# 4. AI生成最终答案
final_response = model.invoke(messages)
# AIMessage("The weather in Beijing is sunny, 25°C")
核心函数调用链路深度解析
1. convert_to_messages() 调用链路
入口点与调用栈
用户代码
└─> ChatModel.invoke(input)
└─> ChatModel._convert_input(input)
└─> convert_to_messages(input) # 入口
├─> _convert_to_message(item1)
│ ├─> isinstance检查
│ ├─> _create_message_from_message_type(role, content)
│ │ ├─> 解析role字段
│ │ ├─> 解析tool_calls (如果是AI消息)
│ │ └─> 构造对应Message类
│ └─> 返回BaseMessage实例
├─> _convert_to_message(item2)
└─> 返回 list[BaseMessage]
详细执行流程
# 步骤1: 入口函数
def convert_to_messages(
messages: Iterable[MessageLikeRepresentation] | PromptValue,
) -> list[BaseMessage]:
# 1.1 处理PromptValue特殊情况
if isinstance(messages, PromptValue):
return messages.to_messages()
# 1.2 遍历转换每个元素
return [_convert_to_message(m) for m in messages]
# 步骤2: 单条消息转换
def _convert_to_message(message: MessageLikeRepresentation) -> BaseMessage:
# 2.1 已经是BaseMessage,直接返回
if isinstance(message, BaseMessage):
return message
# 2.2 字符串 → HumanMessage
elif isinstance(message, str):
return _create_message_from_message_type("human", message)
# 2.3 元组 (role, content)
elif isinstance(message, Sequence) and len(message) == 2:
message_type_str, template = message
return _create_message_from_message_type(message_type_str, template)
# 2.4 字典 {role: ..., content: ...}
elif isinstance(message, dict):
msg_kwargs = message.copy()
# 2.4.1 提取role字段(或type字段)
try:
msg_type = msg_kwargs.pop("role")
except KeyError:
msg_type = msg_kwargs.pop("type")
# 2.4.2 提取content字段
msg_content = msg_kwargs.pop("content") or ""
# 2.4.3 调用创建函数
return _create_message_from_message_type(
msg_type, msg_content, **msg_kwargs
)
# 步骤3: 根据role创建具体消息类型
def _create_message_from_message_type(
message_type: str,
content: str,
name: str | None = None,
tool_call_id: str | None = None,
tool_calls: list[dict[str, Any]] | None = None,
**additional_kwargs: Any,
) -> BaseMessage:
# 3.1 准备kwargs
kwargs = {}
if name is not None:
kwargs["name"] = name
if tool_call_id is not None:
kwargs["tool_call_id"] = tool_call_id
# 3.2 处理tool_calls
if tool_calls is not None:
kwargs["tool_calls"] = []
for tool_call in tool_calls:
# 3.2.1 转换OpenAI格式
if "function" in tool_call:
args = tool_call["function"]["arguments"]
if isinstance(args, str):
args = json.loads(args, strict=False)
kwargs["tool_calls"].append({
"name": tool_call["function"]["name"],
"args": args,
"id": tool_call["id"],
"type": "tool_call",
})
# 3.3 根据类型创建消息
if message_type in {"human", "user"}:
return HumanMessage(content=content, **kwargs)
elif message_type in {"ai", "assistant"}:
return AIMessage(content=content, **kwargs)
elif message_type in {"system", "developer"}:
return SystemMessage(content=content, **kwargs)
elif message_type == "tool":
return ToolMessage(content=content, **kwargs)
关键决策点:
- 类型判断顺序: BaseMessage → str → Sequence → dict,按使用频率优化
- role别名映射: 支持
user/human,assistant/ai等别名 - tool_calls解析: 自动解析OpenAI格式的function调用为标准ToolCall
- 错误处理: 缺失role/content抛出带错误码的ValueError
2. BaseMessage.content_blocks 延迟解析链路
属性访问触发流程
用户代码
└─> message.content_blocks # @property访问
├─> 初始化: blocks = []
│
├─> 第一遍扫描: 识别已知类型
│ ├─> 遍历 self.content
│ ├─> 字符串 → TextContentBlock
│ ├─> dict["type"] in KNOWN_BLOCK_TYPES → 直接append
│ └─> 未知类型 → NonStandardContentBlock
│
├─> 第二遍转换: 尝试解析非标准块
│ ├─> _convert_v0_multimodal_input_to_v1(blocks)
│ │ └─> 识别v0格式(source_type字段)
│ ├─> _convert_to_v1_from_chat_completions_input(blocks)
│ │ └─> 识别OpenAI格式(image_url字段)
│ ├─> _convert_to_v1_from_anthropic_input(blocks)
│ │ └─> 识别Anthropic格式(source字段)
│ ├─> _convert_to_v1_from_genai_input(blocks)
│ │ └─> 识别Google格式(mime_type字段)
│ └─> _convert_to_v1_from_converse_input(blocks)
│ └─> 识别Bedrock格式
│
└─> 返回: list[ContentBlock]
详细解析逻辑
@property
def content_blocks(self) -> list[types.ContentBlock]:
# 阶段1: 初始化
blocks: list[types.ContentBlock] = []
# 1.1 字符串转列表
content = (
[self.content] if isinstance(self.content, str) and self.content
else self.content
)
# 阶段2: 第一遍扫描
for item in content:
if isinstance(item, str):
# 2.1 纯字符串 → text块
blocks.append({"type": "text", "text": item})
elif isinstance(item, dict):
item_type = item.get("type")
# 2.2 检查是否为v1标准类型
if item_type not in types.KNOWN_BLOCK_TYPES:
blocks.append({"type": "non_standard", "value": item})
# 2.3 检查是否为v0格式(含source_type)
elif "source_type" in item:
blocks.append({"type": "non_standard", "value": item})
# 2.4 v1标准块,直接使用
else:
blocks.append(cast("types.ContentBlock", item))
# 阶段3: 多轮转换尝试
for parsing_step in [
_convert_v0_multimodal_input_to_v1,
_convert_to_v1_from_chat_completions_input,
_convert_to_v1_from_anthropic_input,
_convert_to_v1_from_genai_input,
_convert_to_v1_from_converse_input,
]:
blocks = parsing_step(blocks)
return blocks
# 示例转换器: Anthropic格式
def _convert_to_v1_from_anthropic_input(
blocks: list[types.ContentBlock],
) -> list[types.ContentBlock]:
converted = []
for block in blocks:
if block.get("type") != "non_standard":
converted.append(block)
continue
value = block["value"]
# 检查Anthropic图片格式
if value.get("type") == "image" and "source" in value:
source = value["source"]
# 转换为v1标准ImageContentBlock
converted.append({
"type": "image",
"source": "base64" if source["type"] == "base64" else "url",
"data": source.get("data"),
"url": source.get("url"),
})
else:
# 无法识别,保持non_standard
converted.append(block)
return converted
设计要点:
- 延迟解析: content_blocks是@property,仅在访问时计算,避免不必要开销
- 两阶段处理: 先识别已知格式,再尝试转换未知格式
- 多轮兼容: 依次尝试5种主流提供商格式,最大化兼容性
- 幂等性: 多次调用返回相同结果,但每次都重新计算
3. trim_messages() 令牌裁剪链路
执行流程与算法
trim_messages(messages, max_tokens=1000)
│
├─> 1. 参数验证
│ ├─> 检查start_on/include_system与strategy兼容性
│ └─> 提取token_counter函数
│
├─> 2. 策略路由
│ ├─> strategy="first" → _first_max_tokens()
│ └─> strategy="last" → _last_max_tokens()
│
└─> 3. _last_max_tokens() 执行 (最常用)
│
├─> 3.1 end_on过滤
│ └─> 从尾部移除不匹配end_on类型的消息
│
├─> 3.2 system消息分离
│ ├─> 如果include_system=True且第一条是SystemMessage
│ ├─> 单独保存system_message
│ └─> 从列表中移除
│
├─> 3.3 计算剩余token预算
│ └─> remaining_tokens = max_tokens - system_tokens
│
├─> 3.4 反转消息列表
│ └─> reversed_messages = messages[::-1]
│
├─> 3.5 调用_first_max_tokens(reversed_messages)
│ │
│ ├─> 5.1 二分查找最大消息数
│ │ ├─> left, right = 0, len(messages)
│ │ └─> 找到idx使得token_counter(messages[:idx]) <= max_tokens
│ │
│ ├─> 5.2 allow_partial处理
│ │ ├─> 如果content是list,逐个移除块直到符合
│ │ └─> 如果content是str,使用text_splitter分割
│ │
│ └─> 5.3 end_on过滤
│ └─> 从idx向前移除不匹配的消息
│
├─> 3.6 反转回原顺序
│ └─> result = reversed_result[::-1]
│
├─> 3.7 start_on过滤
│ └─> 从头部移除不匹配start_on类型的消息
│
└─> 3.8 添加回system消息
└─> return [system_message] + result
二分查找算法详解
def _first_max_tokens(
messages: Sequence[BaseMessage],
*,
max_tokens: int,
token_counter: Callable[[list[BaseMessage]], int],
...
) -> list[BaseMessage]:
# 优化: 先检查是否全部消息都符合
if token_counter(messages) <= max_tokens:
# 只需应用end_on过滤
if end_on:
for _ in range(len(messages)):
if not _is_message_type(messages[-1], end_on):
messages.pop()
else:
break
return messages
# 二分查找最大idx
left, right = 0, len(messages)
max_iterations = len(messages).bit_length() # log2(n)
for _ in range(max_iterations):
if left >= right:
break
mid = (left + right + 1) // 2
# 检查前mid条消息的token数
if token_counter(messages[:mid]) <= max_tokens:
left = mid # mid条可以放入,尝试更多
idx = mid
else:
right = mid - 1 # mid条超出,减少数量
idx = left
# allow_partial: 尝试加入部分内容
if allow_partial and idx < len(messages):
excluded = messages[idx].model_copy(deep=True)
# 处理content是列表的情况
if isinstance(excluded.content, list):
for i in range(1, len(excluded.content)):
excluded.content = excluded.content[:-1] # 移除最后一块
if token_counter([*messages[:idx], excluded]) <= max_tokens:
messages = [*messages[:idx], excluded]
idx += 1
break
# 处理content是字符串的情况
else:
text = excluded.content
split_texts = text_splitter(text) # 按\n分割
# 二分查找最多可以包含多少分割
left, right = 0, len(split_texts)
for _ in range(len(split_texts).bit_length()):
if left >= right:
break
mid = (left + right + 1) // 2
excluded.content = "".join(split_texts[:mid])
if token_counter([*messages[:idx], excluded]) <= max_tokens:
left = mid
else:
right = mid - 1
if left > 0:
excluded.content = "".join(split_texts[:left])
messages = [*messages[:idx], excluded]
return messages[:idx]
算法复杂度:
- 时间复杂度: O(log(n) × T),其中n是消息数,T是token_counter的开销
- 空间复杂度: O(1),原地操作
- 优化策略:
- 先快速检查全部消息是否符合,避免不必要的二分
- 使用bit_length()限制迭代次数,避免死循环
- allow_partial时再次二分查找最优分割点
关键配置与最佳实践
消息构造最佳实践
# 1. 使用便捷构造方法
from langchain_core.messages import HumanMessage, AIMessage
# 推荐:直接传递content
msg = HumanMessage("Hello")
# 多模态:使用列表
msg = HumanMessage(content=[
{"type": "text", "text": "Describe this"},
{"type": "image_url", "image_url": {"url": "https://..."}}
])
# 2. 设置元数据
msg = AIMessage(
content="Response",
response_metadata={
"model": "gpt-4o",
"finish_reason": "stop",
"system_fingerprint": "fp_123"
},
usage_metadata={
"input_tokens": 100,
"output_tokens": 50,
"total_tokens": 150
}
)
# 3. 工具调用消息
from langchain_core.messages import ToolCall
ai_msg = AIMessage(
content="",
tool_calls=[
ToolCall(
name="calculator",
args={"expression": "2+2"},
id="call_abc"
)
]
)
tool_msg = ToolMessage(
content="4",
tool_call_id="call_abc"
)
消息管理最佳实践
# 1. 限制对话长度
from langchain_core.messages import trim_messages
def manage_conversation(history: list[BaseMessage], new_message: BaseMessage, model):
# 添加新消息
history.append(new_message)
# 裁剪以适应上下文窗口
trimmed = trim_messages(
history,
max_tokens=4000,
token_counter=model,
strategy="last",
include_system=True
)
return trimmed
# 2. 过滤调试消息
production_messages = filter_messages(
all_messages,
exclude_names=["debug", "test"]
)
# 3. 序列化保存
import json
# 保存对话历史
with open("chat_history.json", "w") as f:
json.dump(messages_to_dict(messages), f)
# 加载对话历史
with open("chat_history.json") as f:
messages = messages_from_dict(json.load(f))
工具调用最佳实践
# 1. 验证工具调用
def validate_tool_call(tool_call: ToolCall, available_tools: dict) -> bool:
if tool_call["name"] not in available_tools:
return False
# 验证参数schema
tool = available_tools[tool_call["name"]]
try:
tool.args_schema.model_validate(tool_call["args"])
return True
except ValidationError:
return False
# 2. 处理无效工具调用
if response.invalid_tool_calls:
for invalid_call in response.invalid_tool_calls:
error_msg = ToolMessage(
content=f"Invalid tool call: {invalid_call['error']}",
tool_call_id=invalid_call["id"],
status="error"
)
messages.append(error_msg)
# 3. 并行执行工具
import asyncio
async def execute_tools_parallel(tool_calls: list[ToolCall], tools_map):
tasks = [
tools_map[tc["name"]].ainvoke(tc["args"])
for tc in tool_calls
]
results = await asyncio.gather(*tasks)
return [
ToolMessage(content=str(result), tool_call_id=tc["id"])
for tc, result in zip(tool_calls, results)
]
性能优化与边界条件
性能关键点
1. content_blocks延迟计算
# ❌ 不推荐: 频繁访问content_blocks
for _ in range(1000):
blocks = message.content_blocks # 每次都重新解析
# ✅ 推荐: 缓存结果
blocks = message.content_blocks
for _ in range(1000):
process(blocks) # 复用缓存
原理: content_blocks是@property而非缓存属性,每次访问都会重新解析content列表并进行多轮格式转换。
2. trim_messages性能优化
# ❌ 不推荐: 使用精确token计数
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
trimmed = trim_messages(
long_history,
max_tokens=4000,
token_counter=model # 调用API计数,很慢
)
# ✅ 推荐: 使用近似计数
from langchain_core.messages import count_tokens_approximately
trimmed = trim_messages(
long_history,
max_tokens=4000,
token_counter=lambda msgs: count_tokens_approximately(msgs)
)
性能对比:
- 精确计数: ~500ms (网络请求)
- 近似计数: ~5ms (字符串长度/4)
- 加速比: 100倍
3. 消息序列化性能
import json
from langchain_core.messages import messages_to_dict
# ❌ 不推荐: 重复序列化
for message in messages:
json_str = json.dumps(messages_to_dict([message]))
# ✅ 推荐: 批量序列化
json_str = json.dumps(messages_to_dict(messages))
边界条件与异常处理
1. 空content处理
# 边界情况1: 空字符串content
msg = AIMessage(content="")
assert msg.text == ""
assert msg.content_blocks == [] # 空字符串被过滤
# 边界情况2: 空列表content
msg = AIMessage(content=[])
assert msg.content_blocks == []
# 边界情况3: 仅包含空字符串的列表
msg = AIMessage(content=["", "", ""])
assert msg.content_blocks == [] # 所有空字符串被过滤
2. tool_call_id验证
from langchain_core.messages import ToolMessage
# ✅ 正确: 关联到存在的tool_call
ai_msg = AIMessage(tool_calls=[{"name": "tool", "args": {}, "id": "call_1"}])
tool_msg = ToolMessage(content="result", tool_call_id="call_1")
# ⚠️ 警告: tool_call_id不匹配
tool_msg = ToolMessage(content="result", tool_call_id="call_999")
# 大多数模型会拒绝此消息或产生错误
3. 多模态内容类型检查
# 边界情况: 无法识别的content block类型
msg = HumanMessage(content=[
{"type": "unknown_type", "data": "..."}
])
# 结果: 保留为NonStandardContentBlock
blocks = msg.content_blocks
assert blocks[0]["type"] == "non_standard"
assert "value" in blocks[0]
4. trim_messages边界情况
from langchain_core.messages import trim_messages
# 边界1: max_tokens=0
result = trim_messages(messages, max_tokens=0, token_counter=len)
assert result == [] # 返回空列表
# 边界2: 所有消息都超过max_tokens
huge_messages = [HumanMessage("a" * 10000)]
result = trim_messages(huge_messages, max_tokens=100, token_counter=len)
assert result == [] # 无法包含任何消息
# 边界3: include_system但system消息超过预算
messages = [
SystemMessage("a" * 5000), # 5000 tokens
HumanMessage("Hello"), # 1 token
]
result = trim_messages(
messages,
max_tokens=100,
token_counter=len,
include_system=True
)
# SystemMessage会被保留,但没有空间给其他消息
assert len(result) == 1
assert isinstance(result[0], SystemMessage)
异常处理模式
1. 消息转换异常
from langchain_core.messages import convert_to_messages
from langchain_core.exceptions import ErrorCode
try:
messages = convert_to_messages([
{"content": "missing role field"} # 缺少role
])
except ValueError as e:
# 检查错误码
if hasattr(e, "error_code"):
assert e.error_code == ErrorCode.MESSAGE_COERCION_FAILURE
print(f"转换失败: {e}")
2. 无效tool_calls处理
from langchain_core.messages import AIMessage
# 情况1: 模型返回无效的tool_call JSON
ai_msg = AIMessage(
content="",
tool_calls=[], # 空
invalid_tool_calls=[
{
"name": "unknown_tool",
"args": "not-json", # 无效JSON
"id": "call_1",
"error": "JSONDecodeError: ..."
}
]
)
# 应用层处理
if ai_msg.invalid_tool_calls:
for invalid_call in ai_msg.invalid_tool_calls:
# 记录错误
logger.error(f"Invalid tool call: {invalid_call['error']}")
# 返回错误消息给模型
error_msg = ToolMessage(
content=f"Tool call failed: {invalid_call['error']}",
tool_call_id=invalid_call["id"],
status="error"
)
3. 流式响应中断恢复
from langchain_core.messages import AIMessageChunk
accumulated = None
try:
for chunk in model.stream(messages):
if accumulated is None:
accumulated = chunk
else:
accumulated = accumulated + chunk
# 实时显示
print(chunk.content, end="", flush=True)
except Exception as e:
# 连接中断
logger.error(f"Stream interrupted: {e}")
# accumulated包含已接收的部分
if accumulated:
print(f"\n已接收: {accumulated.content}")
# 可以基于部分结果继续处理
跨提供商兼容性最佳实践
1. 多模态内容标准化
# 推荐: 使用v1标准格式
from langchain_core.messages import HumanMessage
msg = HumanMessage(content=[
{"type": "text", "text": "Describe this"},
{
"type": "image",
"source": "url",
"url": "https://example.com/image.jpg"
}
])
# 自动适配不同提供商
# OpenAI: 转为 image_url 格式
# Anthropic: 转为 source 格式
# Google: 转为 media 格式
2. 工具调用格式统一
# LangChain标准格式
from langchain_core.messages import ToolCall
tool_call = ToolCall(
name="get_weather",
args={"location": "Beijing"},
id="call_123"
)
# ✅ 适配OpenAI
# {"type": "function", "id": "call_123", "function": {"name": "get_weather", "arguments": '{"location":"Beijing"}'}}
# ✅ 适配Anthropic
# {"type": "tool_use", "id": "call_123", "name": "get_weather", "input": {"location": "Beijing"}}
3. 元数据字段规范
from langchain_core.messages import AIMessage
# ✅ 推荐: 使用response_metadata而非additional_kwargs
msg = AIMessage(
content="Response",
response_metadata={
"model": "gpt-4o",
"finish_reason": "stop",
"system_fingerprint": "fp_abc",
"provider_specific_field": "value"
}
)
# ⚠️ 已废弃: additional_kwargs
msg = AIMessage(
content="Response",
additional_kwargs={ # 已废弃
"model": "gpt-4o"
}
)
常见问题与解决方案
Q1: 消息历史过长导致超出上下文窗口
问题: 长对话导致超过模型token限制
解决方案:
from langchain_core.messages import trim_messages
# 方案1: 保留最新消息 + 系统消息
trimmed = trim_messages(
history,
max_tokens=4000,
strategy="last",
token_counter=model,
include_system=True,
start_on="human"
)
# 方案2: 滑动窗口 + 总结
if len(history) > 20:
# 总结早期对话
summary = model.invoke([
SystemMessage("Summarize the following conversation"),
*history[:10]
])
# 保留总结 + 最新消息
history = [
SystemMessage(f"Previous context: {summary.content}"),
*history[10:]
]
Q2: 多模态消息在不同模型间迁移
问题: 从Anthropic切换到OpenAI时图片格式不兼容
解决方案:
from langchain_core.messages import convert_to_openai_messages
# Anthropic格式的消息
anthropic_msg = HumanMessage(content=[
{"type": "text", "text": "What's this?"},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_data
}
}
])
# 转换为OpenAI格式
openai_format = convert_to_openai_messages([anthropic_msg])
# 自动转换为OpenAI image_url格式
Q3: 工具调用结果过大
问题: 工具返回大量数据(如文档全文)导致token超限
解决方案:
from langchain_core.messages import ToolMessage
# ✅ 方案1: 使用artifact存储原始数据
full_result = get_large_document()
tool_msg = ToolMessage(
content=full_result[:500] + "...(truncated)", # 只发送摘要
tool_call_id="call_123",
artifact=full_result # 完整数据存在artifact
)
# ✅ 方案2: 使用status标记并返回引用
tool_msg = ToolMessage(
content="Result saved to memory with ID: doc_456",
tool_call_id="call_123",
artifact={"doc_id": "doc_456", "length": len(full_result)},
status="success"
)
Q4: 流式输出与非流式输出的一致性
问题: 流式合并的消息与直接invoke的消息字段不一致
解决方案:
from langchain_core.messages import message_chunk_to_message
# 流式调用
accumulated = None
for chunk in model.stream(messages):
accumulated = chunk if accumulated is None else accumulated + chunk
# 转换为标准Message
final_message = message_chunk_to_message(accumulated)
# 现在final_message与invoke()结果一致
# (除了chunk_position等chunk特有字段)