LangChain-04-Messages模块

模块概览

职责与定位

Messages模块定义了LangChain中所有消息类型的核心数据结构,是聊天模型和对话系统的基础。该模块提供了统一的消息抽象,支持文本、多模态内容、工具调用等多种消息类型,确保不同模型提供商之间的互操作性。

核心职责:

  • 定义统一的消息类型体系(HumanMessage, AIMessage, SystemMessage等)
  • 支持多模态内容(文本、图片、音频、视频)
  • 封装工具调用和工具结果
  • 提供消息序列化/反序列化
  • 支持消息合并和转换
  • 管理消息元数据和使用统计

输入输出

输入:

  • 原始数据:字符串、字典、元组等
  • 可通过convert_to_messages()转换为标准消息

输出:

  • BaseMessage子类实例
  • 可序列化为字典/JSON
  • 可格式化为字符串显示

上下游依赖

依赖:

  • langchain_core.load.serializable: 序列化基类
  • pydantic: 数据验证
  • typing_extensions: 类型系统扩展

被依赖:

  • langchain_core.language_models: 模型的输入输出
  • langchain_core.prompts: Prompt模板生成消息
  • langchain_core.agents: Agent的对话历史
  • 所有需要处理对话的组件

生命周期

消息对象通常是不可变的:

  1. 创建: 通过构造函数或转换函数创建
  2. 使用: 在链路中传递和处理
  3. 合并: 通过+操作符合并chunk
  4. 序列化: 保存到数据库或传输
  5. 反序列化: 从存储恢复

LangChain整体架构与Messages模块定位

系统架构总览

flowchart TB
    subgraph UserLayer["应用层"]
        USER[用户应用]
        AGENT[Agents<br/>智能代理]
        CHAIN[Chains<br/>链式调用]
    end

    subgraph CoreLayer["核心抽象层"]
        RUNNABLE[Runnables<br/>可执行接口]
        PROMPT[Prompts<br/>提示词模板]
        MESSAGES[Messages<br/>消息抽象<br/>⭐本模块]
        CALLBACK[Callbacks<br/>回调机制]
    end

    subgraph ModelLayer["模型层"]
        LLM[LLMs<br/>文本生成模型]
        CHAT[ChatModels<br/>对话模型]
        EMBEDDINGS[Embeddings<br/>嵌入模型]
    end

    subgraph ToolLayer["工具层"]
        TOOLS[Tools<br/>工具定义]
        RETRIEVER[Retrievers<br/>检索器]
        MEMORY[Memory<br/>记忆存储]
    end

    subgraph DataLayer["数据层"]
        VECTOR[VectorStores<br/>向量数据库]
        DOCUMENT[Document<br/>文档加载器]
        OUTPUT[OutputParsers<br/>输出解析]
    end

    USER --> AGENT
    USER --> CHAIN
    AGENT --> RUNNABLE
    CHAIN --> RUNNABLE

    RUNNABLE --> PROMPT
    RUNNABLE --> CHAT
    RUNNABLE --> TOOLS

    PROMPT --> MESSAGES
    CHAT --> MESSAGES
    AGENT --> MESSAGES

    CHAT --> CALLBACK
    MESSAGES --> OUTPUT

    TOOLS --> MESSAGES
    MEMORY --> MESSAGES
    RETRIEVER --> DOCUMENT

    style MESSAGES fill:#ff6b6b,stroke:#c92a2a,stroke-width:3px,color:#fff
    style CHAT fill:#4ecdc4,stroke:#0e9594
    style PROMPT fill:#95e1d3,stroke:#38ada9
    style TOOLS fill:#f9ca24,stroke:#f0932b

架构说明:

  1. Messages在核心抽象层的地位: Messages模块是LangChain核心抽象层的基础组件,所有涉及对话的模块都依赖它
  2. 上游调用方: ChatModels、Prompts、Agents、Chains都会生成或处理Messages
  3. 下游消费方: OutputParsers解析Messages内容,Memory存储Messages历史,Callbacks追踪Messages流转
  4. 横向协作: 与Runnables接口配合实现流式处理,与Callbacks配合实现事件追踪

Messages模块在调用链中的位置

sequenceDiagram
    autonumber
    participant App as 应用层
    participant Chain as Chain/Agent
    participant Prompt as PromptTemplate
    participant Messages as Messages模块
    participant ChatModel as ChatModel
    participant Provider as 模型提供商API

    App->>Chain: 用户输入
    Chain->>Prompt: format_messages({input})

    Note over Prompt,Messages: 1️⃣ 消息生成阶段
    Prompt->>Messages: convert_to_messages(...)
    Messages->>Messages: 创建HumanMessage/SystemMessage
    Messages-->>Prompt: [BaseMessage]

    Prompt-->>Chain: messages列表
    Chain->>ChatModel: invoke(messages)

    Note over ChatModel,Messages: 2️⃣ 消息转换阶段
    ChatModel->>Messages: convert_to_messages(input)
    Messages->>Messages: 验证并标准化
    Messages-->>ChatModel: 标准化的messages

    ChatModel->>ChatModel: _convert_to_provider_format()
    ChatModel->>Provider: API调用(provider格式)

    Note over Provider,Messages: 3️⃣ 响应构建阶段
    Provider-->>ChatModel: API响应(JSON)
    ChatModel->>Messages: 构造AIMessage
    Messages->>Messages: 解析tool_calls/usage_metadata
    Messages-->>ChatModel: AIMessage实例

    ChatModel-->>Chain: AIMessage
    Chain-->>App: 最终响应

调用链说明:

阶段1 - 消息生成:

  • PromptTemplate通过convert_to_messages()将模板结果转为标准Messages
  • 支持字符串、元组、字典等多种输入格式

阶段2 - 消息标准化:

  • ChatModel接收输入后再次调用convert_to_messages()确保格式统一
  • 处理兼容性问题(如v0格式的多模态内容转换)

阶段3 - 响应封装:

  • 模型API返回后,ChatModel构造AIMessage封装响应
  • 解析tool_calls、usage_metadata等结构化信息
  • 统一不同提供商的响应格式

模块架构图

flowchart TB
    subgraph Base["基类层"]
        BASE_MSG[BaseMessage<br/>消息基类]
        BASE_CHUNK[BaseMessageChunk<br/>可合并消息块]
    end

    subgraph CoreTypes["核心消息类型"]
        HUMAN[HumanMessage<br/>用户消息]
        AI[AIMessage<br/>AI消息]
        SYSTEM[SystemMessage<br/>系统消息]
        TOOL[ToolMessage<br/>工具结果消息]
        FUNCTION[FunctionMessage<br/>函数结果(废弃)]
        CHAT[ChatMessage<br/>自定义角色]
    end

    subgraph Chunks["消息块类型"]
        HUMAN_CHUNK[HumanMessageChunk]
        AI_CHUNK[AIMessageChunk]
        SYSTEM_CHUNK[SystemMessageChunk]
        TOOL_CHUNK[ToolMessageChunk]
    end

    subgraph Content["内容类型"]
        TEXT[TextContentBlock<br/>文本]
        IMAGE[ImageContentBlock<br/>图片]
        AUDIO[AudioContentBlock<br/>音频]
        VIDEO[VideoContentBlock<br/>视频]
        FILE[FileContentBlock<br/>文件]
        REASONING[ReasoningContentBlock<br/>推理过程]
    end

    subgraph ToolCalling["工具调用"]
        TOOL_CALL[ToolCall<br/>工具调用请求]
        TOOL_CALL_CHUNK[ToolCallChunk<br/>工具调用块]
        INVALID_TOOL[InvalidToolCall<br/>无效工具调用]
        SERVER_TOOL[ServerToolCall<br/>服务端工具]
    end

    subgraph Utils["工具函数"]
        CONVERT[convert_to_messages]
        FILTER[filter_messages]
        TRIM[trim_messages]
        MERGE[merge_message_runs]
        TO_DICT[messages_to_dict]
        FROM_DICT[messages_from_dict]
    end

    BASE_MSG <|-- BASE_CHUNK
    BASE_MSG <|-- HUMAN
    BASE_MSG <|-- AI
    BASE_MSG <|-- SYSTEM
    BASE_MSG <|-- TOOL
    BASE_MSG <|-- FUNCTION
    BASE_MSG <|-- CHAT

    BASE_CHUNK <|-- HUMAN_CHUNK
    BASE_CHUNK <|-- AI_CHUNK
    BASE_CHUNK <|-- SYSTEM_CHUNK
    BASE_CHUNK <|-- TOOL_CHUNK

    AI --> TOOL_CALL
    AI --> TOOL_CALL_CHUNK
    AI --> INVALID_TOOL

    BASE_MSG --> Content
    BASE_MSG --> Utils

    style BASE_MSG fill:#e1f5ff
    style AI fill:#fff4e1
    style HUMAN fill:#e8f5e9
    style TOOL_CALL fill:#ffe8e8

架构说明

消息类型层次

BaseMessage:

  • 所有消息的抽象基类
  • 定义通用字段:content, additional_kwargs, response_metadata等
  • 实现序列化接口

BaseMessageChunk:

  • 可合并的消息块基类
  • 支持+操作符合并
  • 用于流式输出

角色消息类型:

  • HumanMessage: 用户输入消息
  • AIMessage: AI模型输出消息
  • SystemMessage: 系统指令消息
  • ToolMessage: 工具执行结果消息
  • ChatMessage: 自定义角色消息(兼容性)

多模态支持

消息的content字段可以是:

  1. 字符串: 纯文本内容
  2. 列表: 包含多种内容块
    • TextContentBlock: 文本块
    • ImageContentBlock: 图片(URL或base64)
    • AudioContentBlock: 音频
    • VideoContentBlock: 视频
    • FileContentBlock: 文件

工具调用体系

ToolCall:

  • 表示AI请求调用工具
  • 包含工具名、参数、调用ID
  • 存储在AIMessage.tool_calls

ToolMessage:

  • 表示工具执行结果
  • 关联到原始ToolCall的ID
  • 返回给模型继续对话

InvalidToolCall:

  • 表示无效的工具调用
  • 包含错误信息
  • 用于错误处理

核心数据结构

classDiagram
    class BaseMessage {
        <<abstract>>
        +content: str|list
        +additional_kwargs: dict
        +response_metadata: dict
        +type: str
        +name: str|None
        +id: str|None
        +text: TextAccessor
        +pretty_repr() str
    }

    class HumanMessage {
        +type: "human"
        +example: bool
    }

    class AIMessage {
        +type: "ai"
        +tool_calls: list[ToolCall]
        +invalid_tool_calls: list[InvalidToolCall]
        +usage_metadata: UsageMetadata|None
    }

    class SystemMessage {
        +type: "system"
    }

    class ToolMessage {
        +type: "tool"
        +tool_call_id: str
        +artifact: Any
        +status: Literal["success", "error"]
    }

    class ToolCall {
        +name: str
        +args: dict
        +id: str
        +type: Literal["tool_call"]
    }

    class UsageMetadata {
        +input_tokens: int
        +output_tokens: int
        +total_tokens: int
        +input_token_details: InputTokenDetails
        +output_token_details: OutputTokenDetails
    }

    class ContentBlock {
        <<union>>
        TextContentBlock
        ImageContentBlock
        AudioContentBlock
        VideoContentBlock
        FileContentBlock
    }

    BaseMessage <|-- HumanMessage
    BaseMessage <|-- AIMessage
    BaseMessage <|-- SystemMessage
    BaseMessage <|-- ToolMessage

    AIMessage --> ToolCall : contains
    AIMessage --> UsageMetadata : contains
    BaseMessage --> ContentBlock : content

数据结构说明

BaseMessage字段

字段 类型 必填 默认值 说明
content str|list - 消息内容,字符串或内容块列表
additional_kwargs dict {} 额外参数(废弃,使用response_metadata)
response_metadata dict {} 响应元数据
type str - 消息类型标识符
name str|None None 消息发送者名称
id str|None None 消息唯一标识

AIMessage特有字段

字段 类型 说明
tool_calls list[ToolCall] 工具调用请求列表
invalid_tool_calls list[InvalidToolCall] 无效工具调用列表
usage_metadata UsageMetadata|None Token使用统计

ToolCall结构

字段 类型 必填 说明
name str 工具名称
args dict 工具参数字典
id str 调用唯一ID
type str 固定为"tool_call"

ToolMessage字段

字段 类型 必填 说明
content str 工具执行结果(字符串形式)
tool_call_id str 关联的ToolCall的ID
artifact Any 原始结果对象
status str “success"或"error”

UsageMetadata结构

字段 类型 说明
input_tokens int 输入Token数
output_tokens int 输出Token数
total_tokens int 总Token数
input_token_details dict 输入Token细分(缓存命中等)
output_token_details dict 输出Token细分(推理Token等)

多模态内容块

ImageContentBlock:

{
    "type": "image",
    "source": "url",  # 或 "base64"
    "url": "https://...",  # 或 "data": "base64..."
    "detail": "auto"  # 或 "low"/"high"
}

AudioContentBlock:

{
    "type": "audio",
    "source": "base64",
    "data": "base64..."
}

核心API详解

API-1: convert_to_messages

基本信息

  • 名称: convert_to_messages
  • 函数签名: def convert_to_messages(messages: Sequence[MessageLikeRepresentation]) -> list[BaseMessage]
  • 幂等性: 幂等(多次调用结果相同)

功能说明

将各种格式的输入转换为标准BaseMessage列表。支持字符串、字典、元组、BaseMessage等多种输入格式。

请求结构体

messages: Sequence[MessageLikeRepresentation]
# MessageLikeRepresentation = BaseMessage | list | tuple | str | dict
参数 类型 说明
messages Sequence 消息列表,支持多种格式

支持的输入格式:

  1. BaseMessage实例:直接返回
  2. 字符串:"hello"HumanMessage("hello")
  3. 元组:("system", "You are...")SystemMessage("You are...")
  4. 字典:{"role": "user", "content": "hi"}HumanMessage("hi")
  5. 列表:递归转换每个元素

响应结构体

list[BaseMessage]  # 标准消息列表

入口函数与关键代码

def convert_to_messages(
    messages: Sequence[MessageLikeRepresentation],
) -> list[BaseMessage]:
    """将各种格式转换为标准消息列表

    Args:
        messages: 消息序列,支持多种格式

    Returns:
        标准BaseMessage列表
    """
    result = []
    for message in messages:
        result.append(_convert_single_message(message))
    return result

def _convert_single_message(message: MessageLikeRepresentation) -> BaseMessage:
    # 1) 已经是BaseMessage,直接返回
    if isinstance(message, BaseMessage):
        return message

    # 2) 字符串 → HumanMessage
    elif isinstance(message, str):
        return HumanMessage(content=message)

    # 3) 元组 (role, content) → 对应类型消息
    elif isinstance(message, tuple) and len(message) == 2:
        role, content = message
        return _role_to_message(role, content)

    # 4) 字典 {role: ..., content: ...} → 对应类型消息
    elif isinstance(message, dict):
        if "role" in message:
            return _role_to_message(message["role"], message["content"])
        else:
            # 尝试直接构造消息
            return _message_from_dict(message)

    else:
        msg = f"Unsupported message type: {type(message)}"
        raise ValueError(msg)

def _role_to_message(role: str, content: str) -> BaseMessage:
    """根据角色创建对应类型消息"""
    role = role.lower()
    if role in ("human", "user"):
        return HumanMessage(content=content)
    elif role in ("ai", "assistant"):
        return AIMessage(content=content)
    elif role == "system":
        return SystemMessage(content=content)
    elif role == "tool":
        # 工具消息需要tool_call_id
        return ToolMessage(content=content, tool_call_id="")
    else:
        # 自定义角色使用ChatMessage
        return ChatMessage(content=content, role=role)

代码说明:

  1. 遍历输入序列,逐个转换
  2. 根据输入类型选择转换策略
  3. 字符串默认转为HumanMessage
  4. 元组和字典根据role字段映射到消息类型
  5. 不支持的类型抛出ValueError

调用链与上层函数

# 在模型调用中自动转换
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

# 各种输入格式都会自动转换
result = model.invoke("hello")  # str → [HumanMessage("hello")]
result = model.invoke([("system", "You are..."), ("user", "hi")])  # tuple → messages
result = model.invoke([{"role": "user", "content": "hi"}])  # dict → messages

# 在Prompt模板中
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are {role}"),
    ("user", "{input}"),
])

# format_messages内部调用convert_to_messages
messages = prompt.format_messages(role="assistant", input="hello")

时序图

sequenceDiagram
    autonumber
    participant User
    participant Convert as convert_to_messages
    participant Model as ChatModel

    User->>Model: invoke([<br/>  ("system", "You are..."),<br/>  "What is AI?"<br/>])
    Model->>Model: _convert_input(input)
    Model->>Convert: convert_to_messages([...])

    Convert->>Convert: 处理元组("system", ...)
    Convert->>Convert: SystemMessage("You are...")

    Convert->>Convert: 处理字符串"What is AI?"
    Convert->>Convert: HumanMessage("What is AI?")

    Convert-->>Model: [SystemMessage, HumanMessage]
    Model->>Model: 发送到API
    Model-->>User: AIMessage(response)

时序图说明:

图意: 展示输入转换如何将多种格式统一为标准消息列表。

边界条件:

  • 空列表返回空列表
  • 不支持的类型抛出ValueError
  • role不识别时使用ChatMessage

API-2: messages_to_dict / messages_from_dict

基本信息

  • 名称: messages_to_dict / messages_from_dict
  • 函数签名:
    • def messages_to_dict(messages: Sequence[BaseMessage]) -> list[dict]
    • def messages_from_dict(messages: list[dict]) -> list[BaseMessage]
  • 幂等性: 幂等(往返转换保持等价)

功能说明

序列化和反序列化消息,用于存储到数据库或传输。

请求结构体

# messages_to_dict
messages: Sequence[BaseMessage]

# messages_from_dict
messages: list[dict]  # 包含__class__字段的字典

响应结构体

# messages_to_dict
list[dict]  # 可序列化的字典列表

# messages_from_dict
list[BaseMessage]  # 恢复的消息对象

入口函数与关键代码

def messages_to_dict(messages: Sequence[BaseMessage]) -> list[dict]:
    """将消息列表序列化为字典列表

    Args:
        messages: BaseMessage序列

    Returns:
        可JSON序列化的字典列表
    """
    return [message_to_dict(m) for m in messages]

def message_to_dict(message: BaseMessage) -> dict:
    """将单个消息序列化为字典"""
    # 使用dumpd进行序列化(处理Pydantic模型)
    data = dumpd(message)

    # 添加类型信息用于反序列化
    data["__class__"] = {
        "module": message.__class__.__module__,
        "name": message.__class__.__name__,
    }

    return data

def messages_from_dict(messages: list[dict]) -> list[BaseMessage]:
    """从字典列表反序列化消息

    Args:
        messages: 字典列表,包含__class__字段

    Returns:
        恢复的BaseMessage列表
    """
    return [_message_from_dict(m) for m in messages]

def _message_from_dict(message: dict) -> BaseMessage:
    """从单个字典反序列化消息"""
    # 1) 提取类型信息
    if "__class__" in message:
        class_info = message["__class__"]
        module = class_info["module"]
        name = class_info["name"]

        # 2) 动态导入类
        module_obj = importlib.import_module(module)
        cls = getattr(module_obj, name)

        # 3) 构造实例
        data = {k: v for k, v in message.items() if k != "__class__"}
        return cls(**data)

    else:
        # 兼容旧格式,根据type字段推断
        msg_type = message.get("type")
        if msg_type == "human":
            return HumanMessage(**message)
        elif msg_type == "ai":
            return AIMessage(**message)
        elif msg_type == "system":
            return SystemMessage(**message)
        elif msg_type == "tool":
            return ToolMessage(**message)
        else:
            return ChatMessage(**message)

代码说明:

  1. messages_to_dict遍历消息并序列化
  2. 每个消息包含__class__字段用于反序列化
  3. messages_from_dict通过动态导入恢复类型
  4. 兼容旧格式的type字段

调用链与上层函数

import json

# 序列化消息保存到数据库
messages = [
    HumanMessage("Hello"),
    AIMessage("Hi, how can I help?"),
]

# 转换为字典
dicts = messages_to_dict(messages)

# 保存到JSON文件
with open("chat_history.json", "w") as f:
    json.dump(dicts, f)

# 从JSON恢复
with open("chat_history.json") as f:
    loaded_dicts = json.load(f)

# 反序列化
loaded_messages = messages_from_dict(loaded_dicts)

assert loaded_messages[0].content == "Hello"
assert isinstance(loaded_messages[1], AIMessage)

API-3: filter_messages

基本信息

  • 名称: filter_messages
  • 函数签名: def filter_messages(messages: Sequence[BaseMessage], *, include_names: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, include_types: Sequence[type | str] | None = None, exclude_types: Sequence[type | str] | None = None, include_ids: Sequence[str] | None = None, exclude_ids: Sequence[str] | None = None) -> list[BaseMessage]
  • 幂等性: 幂等

功能说明

根据名称、类型、ID等条件过滤消息列表。

请求结构体

messages: Sequence[BaseMessage]
include_names: Sequence[str] | None  # 包含的名称
exclude_names: Sequence[str] | None  # 排除的名称
include_types: Sequence[type | str] | None  # 包含的类型
exclude_types: Sequence[type | str] | None  # 排除的类型
include_ids: Sequence[str] | None  # 包含的ID
exclude_ids: Sequence[str] | None  # 排除的ID

响应结构体

list[BaseMessage]  # 过滤后的消息列表

入口函数与关键代码

def filter_messages(
    messages: Sequence[BaseMessage],
    *,
    include_names: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    include_types: Sequence[type | str] | None = None,
    exclude_types: Sequence[type | str] | None = None,
    include_ids: Sequence[str] | None = None,
    exclude_ids: Sequence[str] | None = None,
) -> list[BaseMessage]:
    """根据条件过滤消息

    Args:
        messages: 输入消息列表
        include_names: 只保留这些名称的消息
        exclude_names: 排除这些名称的消息
        include_types: 只保留这些类型的消息
        exclude_types: 排除这些类型的消息
        include_ids: 只保留这些ID的消息
        exclude_ids: 排除这些ID的消息

    Returns:
        过滤后的消息列表
    """
    filtered = []

    for message in messages:
        # 1) 检查ID过滤
        if include_ids and message.id not in include_ids:
            continue
        if exclude_ids and message.id in exclude_ids:
            continue

        # 2) 检查名称过滤
        if include_names and message.name not in include_names:
            continue
        if exclude_names and message.name in exclude_names:
            continue

        # 3) 检查类型过滤
        if include_types:
            type_match = any(
                isinstance(message, t) if isinstance(t, type) else message.type == t
                for t in include_types
            )
            if not type_match:
                continue

        if exclude_types:
            type_match = any(
                isinstance(message, t) if isinstance(t, type) else message.type == t
                for t in exclude_types
            )
            if type_match:
                continue

        # 4) 通过所有过滤条件,保留消息
        filtered.append(message)

    return filtered

代码说明:

  1. 遍历所有消息并应用过滤条件
  2. include条件是白名单(只保留匹配的)
  3. exclude条件是黑名单(排除匹配的)
  4. 类型可以是类型对象或字符串
  5. 所有条件都通过才保留消息

调用链与上层函数

# 只保留用户和AI消息,排除系统消息
filtered = filter_messages(
    messages,
    include_types=[HumanMessage, AIMessage]
)

# 排除特定名称的消息
filtered = filter_messages(
    messages,
    exclude_names=["debug_user"]
)

# 组合条件:只保留human类型且名称为"alice"
filtered = filter_messages(
    messages,
    include_types=["human"],
    include_names=["alice"]
)

API-4: trim_messages

基本信息

  • 名称: trim_messages
  • 函数签名: def trim_messages(messages: Sequence[BaseMessage], *, max_tokens: int | None = None, token_counter: Callable | BaseLanguageModel | None = None, strategy: Literal["first", "last"] = "last", allow_partial: bool = False, start_on: Literal["human", "ai", "system"] | None = None, end_on: Literal["human", "ai", "system"] | None = None, include_system: bool = False) -> list[BaseMessage]
  • 幂等性: 非幂等(结果取决于消息内容)

功能说明

根据token限制裁剪消息列表,确保不超过模型上下文窗口。

请求结构体

messages: Sequence[BaseMessage]
max_tokens: int | None  # 最大token数
token_counter: Callable | BaseLanguageModel | None  # token计数器
strategy: Literal["first", "last"]  # 保留策略
allow_partial: bool  # 是否允许部分消息
start_on: str | None  # 起始消息类型
end_on: str | None  # 结束消息类型
include_system: bool  # 是否始终包含系统消息
参数 类型 必填 默认值 说明
messages Sequence[BaseMessage] - 输入消息列表
max_tokens int|None None 最大token数限制
token_counter Callable|BaseLanguageModel|None None token计数器或模型
strategy str “last” “first"保留最早,“last"保留最新
allow_partial bool False 是否允许截断单条消息
start_on str|None None 结果必须以该类型消息开始
end_on str|None None 结果必须以该类型消息结束
include_system bool False 是否始终保留SystemMessage

响应结构体

list[BaseMessage]  # 裁剪后的消息列表

入口函数与关键代码

def trim_messages(
    messages: Sequence[BaseMessage],
    *,
    max_tokens: int | None = None,
    token_counter: Callable | BaseLanguageModel | None = None,
    strategy: Literal["first", "last"] = "last",
    allow_partial: bool = False,
    start_on: Literal["human", "ai", "system"] | None = None,
    end_on: Literal["human", "ai", "system"] | None = None,
    include_system: bool = False,
) -> list[BaseMessage]:
    """根据token限制裁剪消息列表

    Args:
        messages: 输入消息列表
        max_tokens: 最大token数
        token_counter: token计数函数或模型
        strategy: "first"保留最早,"last"保留最新
        allow_partial: 是否允许截断消息
        start_on: 起始消息类型要求
        end_on: 结束消息类型要求
        include_system: 是否始终保留系统消息

    Returns:
        裁剪后的消息列表
    """
    # 1) 确定token计数器
    if token_counter is None:
        token_counter = _get_token_ids_default_method
    elif isinstance(token_counter, BaseLanguageModel):
        token_counter = token_counter.get_num_tokens

    # 2) 分离系统消息
    system_messages = []
    other_messages = []
    for msg in messages:
        if include_system and isinstance(msg, SystemMessage):
            system_messages.append(msg)
        else:
            other_messages.append(msg)

    # 3) 计算系统消息token数
    system_tokens = sum(token_counter(msg.content) for msg in system_messages)
    remaining_tokens = max_tokens - system_tokens if max_tokens else None

    # 4) 根据策略选择消息
    if strategy == "last":
        # 从后向前保留
        selected = _trim_from_end(other_messages, remaining_tokens, token_counter, allow_partial)
    else:
        # 从前向后保留
        selected = _trim_from_start(other_messages, remaining_tokens, token_counter, allow_partial)

    # 5) 应用start_on/end_on约束
    if start_on:
        selected = _ensure_start_on(selected, start_on)
    if end_on:
        selected = _ensure_end_on(selected, end_on)

    # 6) 合并系统消息
    return system_messages + selected

def _trim_from_end(
    messages: list[BaseMessage],
    max_tokens: int | None,
    token_counter: Callable,
    allow_partial: bool,
) -> list[BaseMessage]:
    """从后向前保留消息,不超过token限制"""
    if max_tokens is None:
        return messages

    selected = []
    total_tokens = 0

    for msg in reversed(messages):
        msg_tokens = token_counter(msg.content)

        if total_tokens + msg_tokens <= max_tokens:
            # 完整消息可以加入
            selected.insert(0, msg)
            total_tokens += msg_tokens
        elif allow_partial and total_tokens < max_tokens:
            # 截断消息
            remaining = max_tokens - total_tokens
            truncated = _truncate_message(msg, remaining, token_counter)
            selected.insert(0, truncated)
            break
        else:
            # 无法加入更多消息
            break

    return selected

代码说明:

  1. 获取或构造token计数器
  2. 分离系统消息(如果include_system=True)
  3. 根据strategy选择保留策略
  4. _trim_from_end从最新消息开始保留
  5. allow_partial允许截断单条消息
  6. start_on/end_on确保结果符合约束

调用链与上层函数

from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o")

# 裁剪消息以适应上下文窗口
trimmed = trim_messages(
    long_conversation,
    max_tokens=4000,
    token_counter=model,  # 使用模型的token计数器
    strategy="last",  # 保留最新消息
    include_system=True,  # 始终保留系统消息
)

# 发送裁剪后的消息
response = model.invoke(trimmed)

模块间交互详细时序图

场景1: 简单对话流程

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Model as ChatOpenAI
    participant Converter as convert_to_messages
    participant BaseMsg as BaseMessage
    participant AIMsg as AIMessage

    User->>Model: invoke([<br/>  "You are helpful",<br/>  {"role": "user", "content": "Hi"}<br/>])

    Note over Model,Converter: 📥 输入标准化
    Model->>Model: _convert_input(input)
    Model->>Converter: convert_to_messages([...])

    Converter->>Converter: 遍历输入列表

    Note over Converter: 处理第1项: 字符串
    Converter->>Converter: _convert_to_message("You are helpful")
    Converter->>BaseMsg: HumanMessage("You are helpful")
    BaseMsg-->>Converter: HumanMessage实例

    Note over Converter: 处理第2项: 字典
    Converter->>Converter: _convert_to_message({role: "user", ...})
    Converter->>Converter: _create_message_from_message_type(<br/>  "user", "Hi"<br/>)
    Converter->>BaseMsg: HumanMessage("Hi")
    BaseMsg-->>Converter: HumanMessage实例

    Converter-->>Model: [HumanMessage, HumanMessage]

    Note over Model: 🌐 调用API
    Model->>Model: _generate_with_retry(messages)
    Model->>Model: 转换为OpenAI格式
    Model->>Model: HTTP POST /chat/completions

    Note over Model: 📤 响应处理
    Model->>Model: 解析API响应JSON
    Model->>AIMsg: AIMessage(<br/>  content="Hello! How can I help you?",<br/>  response_metadata={...},<br/>  usage_metadata={...}<br/>)
    AIMsg-->>Model: AIMessage实例

    Model-->>User: AIMessage

流程说明:

  1. 输入接收(步骤1-2): ChatModel接收混合格式输入(字符串、字典等)
  2. 标准化处理(步骤3-9):
    • convert_to_messages()遍历输入并逐项转换
    • 字符串默认转为HumanMessage
    • 字典根据role字段创建对应类型消息
  3. API调用(步骤10-12): 转换为提供商格式并发送请求
  4. 响应构造(步骤13-15): 将API JSON响应封装为AIMessage

场景2: 多模态消息处理

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Model as ChatModel
    participant HumanMsg as HumanMessage
    participant ContentBlocks as content_blocks属性
    participant Translator as BlockTranslator

    User->>HumanMsg: HumanMessage(content=[<br/>  {"type": "text", "text": "Describe this"},<br/>  {"type": "image", "source": {...}}<br/>])

    Note over HumanMsg: 💾 存储原始content
    HumanMsg->>HumanMsg: self.content = [dict1, dict2]

    User->>Model: invoke([message])
    Model->>Model: _convert_input([message])

    Note over Model,ContentBlocks: 🔍 内容块解析
    Model->>ContentBlocks: message.content_blocks
    ContentBlocks->>ContentBlocks: 遍历content列表

    ContentBlocks->>ContentBlocks: 检查第1项: {"type": "text"}
    Note over ContentBlocks: 标准v1格式,直接使用
    ContentBlocks->>ContentBlocks: blocks.append(TextContentBlock)

    ContentBlocks->>ContentBlocks: 检查第2项: {"type": "image", "source": ...}
    Note over ContentBlocks: 非标准格式,标记为non_standard
    ContentBlocks->>ContentBlocks: blocks.append({<br/>  "type": "non_standard",<br/>  "value": {...}<br/>})

    Note over ContentBlocks,Translator: 🔄 格式转换
    ContentBlocks->>Translator: _convert_to_v1_from_anthropic_input(blocks)
    Translator->>Translator: 识别Anthropic格式
    Translator->>Translator: 转换为v1标准ImageContentBlock
    Translator-->>ContentBlocks: 标准化的blocks

    ContentBlocks-->>Model: [<br/>  TextContentBlock,<br/>  ImageContentBlock<br/>]

    Note over Model: 🌐 转换为提供商格式
    Model->>Model: convert_to_openai_messages([message])
    Model->>Model: 转换ImageContentBlock → image_url格式
    Model->>Model: 发送API请求

    Model-->>User: AIMessage(response)

多模态处理说明:

  1. 内容存储(步骤1-2): HumanMessage接受列表形式的content,可包含多种类型块
  2. 延迟解析(步骤3-5): content_blocks是@property,首次访问时才解析
  3. 格式识别(步骤6-10):
    • 检查每个块是否为v1标准格式
    • 非标准格式标记为non_standard待后续处理
  4. 多轮转换(步骤11-14):
    • 依次尝试Anthropic、OpenAI、Google等格式转换器
    • 成功转换为v1标准ContentBlock
  5. 提供商适配(步骤16-18): 根据目标模型转换为特定格式

场景3: 工具调用完整流程

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Agent as Agent/Chain
    participant Model as ChatModel(bind_tools)
    participant ToolExec as Tool执行器
    participant AIMsg as AIMessage
    participant ToolMsg as ToolMessage

    Note over User,Agent: 🚀 初始化工具绑定
    User->>Model: model.bind_tools([get_weather_tool])
    Model->>Model: 保存tool schemas
    Model-->>User: 配置后的model

    Note over User,Agent: 💬 1: 用户请求
    User->>Agent: "What's the weather in Beijing?"
    Agent->>Agent: messages = [HumanMessage(...)]
    Agent->>Model: invoke(messages)

    Note over Model: 📤 发送带工具定义的请求
    Model->>Model: 构造API请求:<br/>{<br/>  messages: [...],<br/>  tools: [weather_tool_schema]<br/>}
    Model->>Model: HTTP POST to API

    Note over Model,AIMsg: 🔧 解析工具调用
    Model->>Model: 接收API响应:<br/>{<br/>  "tool_calls": [{<br/>    "id": "call_123",<br/>    "function": {<br/>      "name": "get_weather",<br/>      "arguments": '{"location":"Beijing"}'<br/>    }<br/>  }]<br/>}

    Model->>Model: default_tool_parser(raw_tool_calls)
    Model->>AIMsg: AIMessage(<br/>  content="",<br/>  tool_calls=[<br/>    ToolCall(<br/>      name="get_weather",<br/>      args={"location": "Beijing"},<br/>      id="call_123"<br/>    )<br/>  ]<br/>)
    AIMsg-->>Model: AIMessage实例
    Model-->>Agent: AIMessage(tool_calls=[...])

    Note over Agent,ToolExec: ⚙️ 执行工具
    Agent->>Agent: 检测到tool_calls
    Agent->>ToolExec: 调用get_weather(location="Beijing")
    ToolExec->>ToolExec: 执行工具逻辑
    ToolExec-->>Agent: "Sunny, 25°C"

    Note over Agent,ToolMsg: 📝 构造工具结果
    Agent->>ToolMsg: ToolMessage(<br/>  content="Sunny, 25°C",<br/>  tool_call_id="call_123"<br/>)
    ToolMsg-->>Agent: ToolMessage实例

    Agent->>Agent: messages.extend([<br/>  AIMessage(tool_calls),<br/>  ToolMessage(result)<br/>])

    Note over Agent,Model: 💬 2: 生成最终答案
    Agent->>Model: invoke(messages)
    Model->>Model: 发送完整对话历史:<br/>[HumanMessage, AIMessage(tool_calls), ToolMessage]
    Model->>Model: HTTP POST to API

    Note over Model: 📤 生成最终响应
    Model->>AIMsg: AIMessage(<br/>  content="The weather in Beijing is sunny, 25°C"<br/>)
    Model-->>Agent: AIMessage
    Agent-->>User: 最终答案

工具调用流程说明:

  1. 工具绑定(步骤1-3):

    • bind_tools()将工具schema注入模型配置
    • 每次调用时自动附加到API请求
  2. 第一轮对话(步骤4-15):

    • 模型检测到需要工具信息,返回tool_calls
    • default_tool_parser()解析OpenAI格式为LangChain ToolCall
    • AIMessage携带tool_calls而非文本content
  3. 工具执行(步骤16-21):

    • Agent检测AIMessage.tool_calls非空
    • 并行或串行执行对应工具
    • 将结果封装为ToolMessage,关联tool_call_id
  4. 第二轮对话(步骤22-27):

    • 将完整对话历史(含工具调用和结果)发送给模型
    • 模型基于工具结果生成自然语言答案

场景4: 流式响应与消息合并

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Model as ChatModel
    participant Stream as stream迭代器
    participant AIChunk as AIMessageChunk
    participant Merge as BaseMessageChunk.__add__

    User->>Model: stream([HumanMessage("Tell me a story")])

    Note over Model,Stream: 🌊 开启流式响应
    Model->>Stream: _stream(messages)
    Model->>Model: 发送streaming=True的API请求

    Note over Stream,AIChunk: 📦 接收第1个chunk
    Stream->>Stream: 接收SSE数据: {"delta": {"content": "Once"}}
    Stream->>AIChunk: AIMessageChunk(<br/>  content="Once",<br/>  chunk_position=0<br/>)
    AIChunk-->>Stream: chunk1
    Stream-->>User: yield chunk1

    Note over User: 显示: "Once"

    Note over Stream,AIChunk: 📦 接收第2个chunk
    Stream->>Stream: 接收SSE数据: {"delta": {"content": " upon"}}
    Stream->>AIChunk: AIMessageChunk(<br/>  content=" upon",<br/>  chunk_position=1<br/>)
    AIChunk-->>Stream: chunk2
    Stream-->>User: yield chunk2

    Note over User: 显示: " upon"

    Note over User: ... (更多chunks)

    Note over User,Merge: 🔗 用户合并chunks
    User->>User: accumulated = None
    User->>User: for chunk in stream():

    User->>Merge: accumulated = accumulated + chunk1
    Merge->>Merge: merge_content("", "Once")
    Merge->>Merge: merge_dicts(additional_kwargs)
    Merge-->>User: AIMessageChunk(content="Once")

    User->>Merge: accumulated = accumulated + chunk2
    Merge->>Merge: merge_content("Once", " upon")
    Merge-->>User: AIMessageChunk(content="Once upon")

    Note over User: ... (继续累积)

    User->>User: 最终: AIMessageChunk(<br/>  content="Once upon a time...",<br/>  response_metadata={完整元数据}<br/>)

流式处理说明:

  1. 流式初始化(步骤1-3):

    • stream()方法启动SSE(Server-Sent Events)连接
    • API返回增量数据而非完整响应
  2. Chunk生成(步骤4-12):

    • 每个SSE事件转换为AIMessageChunk
    • chunk_position追踪顺序
    • 立即yield给调用方实现实时显示
  3. 增量合并(步骤13-20):

    • __add__操作符支持chunk累加
    • merge_content()处理字符串拼接或列表合并
    • merge_dicts()合并元数据(后者覆盖前者)
  4. 最终消息:

    • 所有chunks合并后得到完整AIMessage
    • 可通过message_chunk_to_message()转为非chunk类型

典型使用场景与最佳实践

场景1: 简单对话

messages = [
    SystemMessage("You are a helpful assistant"),
    HumanMessage("What is 2+2?"),
    AIMessage("2+2 equals 4"),
    HumanMessage("What about 3+3?"),
]

response = model.invoke(messages)

场景2: 多模态消息

message = HumanMessage(
    content=[
        {"type": "text", "text": "What's in this image?"},
        {"type": "image_url", "image_url": {"url": "https://example.com/image.jpg"}},
    ]
)

response = model.invoke([message])

场景3: 工具调用对话

# 1. 用户请求
messages = [HumanMessage("What's the weather in Beijing?")]

# 2. AI请求工具
response = model_with_tools.invoke(messages)
# AIMessage(tool_calls=[{"name": "get_weather", "args": {"location": "Beijing"}, "id": "call_123"}])
messages.append(response)

# 3. 执行工具
result = get_weather("Beijing")
messages.append(ToolMessage(content=result, tool_call_id="call_123"))

# 4. AI生成最终答案
final_response = model.invoke(messages)
# AIMessage("The weather in Beijing is sunny, 25°C")

核心函数调用链路深度解析

1. convert_to_messages() 调用链路

入口点与调用栈

用户代码
  └─> ChatModel.invoke(input)
        └─> ChatModel._convert_input(input)
              └─> convert_to_messages(input)  # 入口
                    ├─> _convert_to_message(item1)
                    │     ├─> isinstance检查
                    │     ├─> _create_message_from_message_type(role, content)
                    │     │     ├─> 解析role字段
                    │     │     ├─> 解析tool_calls (如果是AI消息)
                    │     │     └─> 构造对应Message类
                    │     └─> 返回BaseMessage实例
                    ├─> _convert_to_message(item2)
                    └─> 返回 list[BaseMessage]

详细执行流程

# 步骤1: 入口函数
def convert_to_messages(
    messages: Iterable[MessageLikeRepresentation] | PromptValue,
) -> list[BaseMessage]:
    # 1.1 处理PromptValue特殊情况
    if isinstance(messages, PromptValue):
        return messages.to_messages()

    # 1.2 遍历转换每个元素
    return [_convert_to_message(m) for m in messages]

# 步骤2: 单条消息转换
def _convert_to_message(message: MessageLikeRepresentation) -> BaseMessage:
    # 2.1 已经是BaseMessage,直接返回
    if isinstance(message, BaseMessage):
        return message

    # 2.2 字符串 → HumanMessage
    elif isinstance(message, str):
        return _create_message_from_message_type("human", message)

    # 2.3 元组 (role, content)
    elif isinstance(message, Sequence) and len(message) == 2:
        message_type_str, template = message
        return _create_message_from_message_type(message_type_str, template)

    # 2.4 字典 {role: ..., content: ...}
    elif isinstance(message, dict):
        msg_kwargs = message.copy()
        # 2.4.1 提取role字段(或type字段)
        try:
            msg_type = msg_kwargs.pop("role")
        except KeyError:
            msg_type = msg_kwargs.pop("type")

        # 2.4.2 提取content字段
        msg_content = msg_kwargs.pop("content") or ""

        # 2.4.3 调用创建函数
        return _create_message_from_message_type(
            msg_type, msg_content, **msg_kwargs
        )

# 步骤3: 根据role创建具体消息类型
def _create_message_from_message_type(
    message_type: str,
    content: str,
    name: str | None = None,
    tool_call_id: str | None = None,
    tool_calls: list[dict[str, Any]] | None = None,
    **additional_kwargs: Any,
) -> BaseMessage:
    # 3.1 准备kwargs
    kwargs = {}
    if name is not None:
        kwargs["name"] = name
    if tool_call_id is not None:
        kwargs["tool_call_id"] = tool_call_id

    # 3.2 处理tool_calls
    if tool_calls is not None:
        kwargs["tool_calls"] = []
        for tool_call in tool_calls:
            # 3.2.1 转换OpenAI格式
            if "function" in tool_call:
                args = tool_call["function"]["arguments"]
                if isinstance(args, str):
                    args = json.loads(args, strict=False)
                kwargs["tool_calls"].append({
                    "name": tool_call["function"]["name"],
                    "args": args,
                    "id": tool_call["id"],
                    "type": "tool_call",
                })

    # 3.3 根据类型创建消息
    if message_type in {"human", "user"}:
        return HumanMessage(content=content, **kwargs)
    elif message_type in {"ai", "assistant"}:
        return AIMessage(content=content, **kwargs)
    elif message_type in {"system", "developer"}:
        return SystemMessage(content=content, **kwargs)
    elif message_type == "tool":
        return ToolMessage(content=content, **kwargs)

关键决策点:

  1. 类型判断顺序: BaseMessage → str → Sequence → dict,按使用频率优化
  2. role别名映射: 支持user/human, assistant/ai等别名
  3. tool_calls解析: 自动解析OpenAI格式的function调用为标准ToolCall
  4. 错误处理: 缺失role/content抛出带错误码的ValueError

2. BaseMessage.content_blocks 延迟解析链路

属性访问触发流程

用户代码
  └─> message.content_blocks  # @property访问
        ├─> 初始化: blocks = []
        ├─> 第一遍扫描: 识别已知类型
        │     ├─> 遍历 self.content
        │     ├─> 字符串 → TextContentBlock
        │     ├─> dict["type"] in KNOWN_BLOCK_TYPES → 直接append
        │     └─> 未知类型 → NonStandardContentBlock
        ├─> 第二遍转换: 尝试解析非标准块
        │     ├─> _convert_v0_multimodal_input_to_v1(blocks)
        │     │     └─> 识别v0格式(source_type字段)
        │     ├─> _convert_to_v1_from_chat_completions_input(blocks)
        │     │     └─> 识别OpenAI格式(image_url字段)
        │     ├─> _convert_to_v1_from_anthropic_input(blocks)
        │     │     └─> 识别Anthropic格式(source字段)
        │     ├─> _convert_to_v1_from_genai_input(blocks)
        │     │     └─> 识别Google格式(mime_type字段)
        │     └─> _convert_to_v1_from_converse_input(blocks)
        │           └─> 识别Bedrock格式
        └─> 返回: list[ContentBlock]

详细解析逻辑

@property
def content_blocks(self) -> list[types.ContentBlock]:
    # 阶段1: 初始化
    blocks: list[types.ContentBlock] = []

    # 1.1 字符串转列表
    content = (
        [self.content] if isinstance(self.content, str) and self.content
        else self.content
    )

    # 阶段2: 第一遍扫描
    for item in content:
        if isinstance(item, str):
            # 2.1 纯字符串 → text块
            blocks.append({"type": "text", "text": item})

        elif isinstance(item, dict):
            item_type = item.get("type")

            # 2.2 检查是否为v1标准类型
            if item_type not in types.KNOWN_BLOCK_TYPES:
                blocks.append({"type": "non_standard", "value": item})

            # 2.3 检查是否为v0格式(含source_type)
            elif "source_type" in item:
                blocks.append({"type": "non_standard", "value": item})

            # 2.4 v1标准块,直接使用
            else:
                blocks.append(cast("types.ContentBlock", item))

    # 阶段3: 多轮转换尝试
    for parsing_step in [
        _convert_v0_multimodal_input_to_v1,
        _convert_to_v1_from_chat_completions_input,
        _convert_to_v1_from_anthropic_input,
        _convert_to_v1_from_genai_input,
        _convert_to_v1_from_converse_input,
    ]:
        blocks = parsing_step(blocks)

    return blocks

# 示例转换器: Anthropic格式
def _convert_to_v1_from_anthropic_input(
    blocks: list[types.ContentBlock],
) -> list[types.ContentBlock]:
    converted = []
    for block in blocks:
        if block.get("type") != "non_standard":
            converted.append(block)
            continue

        value = block["value"]

        # 检查Anthropic图片格式
        if value.get("type") == "image" and "source" in value:
            source = value["source"]
            # 转换为v1标准ImageContentBlock
            converted.append({
                "type": "image",
                "source": "base64" if source["type"] == "base64" else "url",
                "data": source.get("data"),
                "url": source.get("url"),
            })
        else:
            # 无法识别,保持non_standard
            converted.append(block)

    return converted

设计要点:

  1. 延迟解析: content_blocks是@property,仅在访问时计算,避免不必要开销
  2. 两阶段处理: 先识别已知格式,再尝试转换未知格式
  3. 多轮兼容: 依次尝试5种主流提供商格式,最大化兼容性
  4. 幂等性: 多次调用返回相同结果,但每次都重新计算

3. trim_messages() 令牌裁剪链路

执行流程与算法

trim_messages(messages, max_tokens=1000)
  ├─> 1. 参数验证
  │     ├─> 检查start_on/include_system与strategy兼容性
  │     └─> 提取token_counter函数
  ├─> 2. 策略路由
  │     ├─> strategy="first" → _first_max_tokens()
  │     └─> strategy="last"  → _last_max_tokens()
  └─> 3. _last_max_tokens() 执行 (最常用)
        ├─> 3.1 end_on过滤
        │     └─> 从尾部移除不匹配end_on类型的消息
        ├─> 3.2 system消息分离
        │     ├─> 如果include_system=True且第一条是SystemMessage
        │     ├─> 单独保存system_message
        │     └─> 从列表中移除
        ├─> 3.3 计算剩余token预算
        │     └─> remaining_tokens = max_tokens - system_tokens
        ├─> 3.4 反转消息列表
        │     └─> reversed_messages = messages[::-1]
        ├─> 3.5 调用_first_max_tokens(reversed_messages)
        │     │
        │     ├─> 5.1 二分查找最大消息数
        │     │     ├─> left, right = 0, len(messages)
        │     │     └─> 找到idx使得token_counter(messages[:idx]) <= max_tokens
        │     │
        │     ├─> 5.2 allow_partial处理
        │     │     ├─> 如果content是list,逐个移除块直到符合
        │     │     └─> 如果content是str,使用text_splitter分割
        │     │
        │     └─> 5.3 end_on过滤
        │           └─> 从idx向前移除不匹配的消息
        ├─> 3.6 反转回原顺序
        │     └─> result = reversed_result[::-1]
        ├─> 3.7 start_on过滤
        │     └─> 从头部移除不匹配start_on类型的消息
        └─> 3.8 添加回system消息
              └─> return [system_message] + result

二分查找算法详解

def _first_max_tokens(
    messages: Sequence[BaseMessage],
    *,
    max_tokens: int,
    token_counter: Callable[[list[BaseMessage]], int],
    ...
) -> list[BaseMessage]:
    # 优化: 先检查是否全部消息都符合
    if token_counter(messages) <= max_tokens:
        # 只需应用end_on过滤
        if end_on:
            for _ in range(len(messages)):
                if not _is_message_type(messages[-1], end_on):
                    messages.pop()
                else:
                    break
        return messages

    # 二分查找最大idx
    left, right = 0, len(messages)
    max_iterations = len(messages).bit_length()  # log2(n)

    for _ in range(max_iterations):
        if left >= right:
            break
        mid = (left + right + 1) // 2

        # 检查前mid条消息的token数
        if token_counter(messages[:mid]) <= max_tokens:
            left = mid  # mid条可以放入,尝试更多
            idx = mid
        else:
            right = mid - 1  # mid条超出,减少数量

    idx = left

    # allow_partial: 尝试加入部分内容
    if allow_partial and idx < len(messages):
        excluded = messages[idx].model_copy(deep=True)

        # 处理content是列表的情况
        if isinstance(excluded.content, list):
            for i in range(1, len(excluded.content)):
                excluded.content = excluded.content[:-1]  # 移除最后一块
                if token_counter([*messages[:idx], excluded]) <= max_tokens:
                    messages = [*messages[:idx], excluded]
                    idx += 1
                    break

        # 处理content是字符串的情况
        else:
            text = excluded.content
            split_texts = text_splitter(text)  # 按\n分割

            # 二分查找最多可以包含多少分割
            left, right = 0, len(split_texts)
            for _ in range(len(split_texts).bit_length()):
                if left >= right:
                    break
                mid = (left + right + 1) // 2
                excluded.content = "".join(split_texts[:mid])
                if token_counter([*messages[:idx], excluded]) <= max_tokens:
                    left = mid
                else:
                    right = mid - 1

            if left > 0:
                excluded.content = "".join(split_texts[:left])
                messages = [*messages[:idx], excluded]

    return messages[:idx]

算法复杂度:

  • 时间复杂度: O(log(n) × T),其中n是消息数,T是token_counter的开销
  • 空间复杂度: O(1),原地操作
  • 优化策略:
    • 先快速检查全部消息是否符合,避免不必要的二分
    • 使用bit_length()限制迭代次数,避免死循环
    • allow_partial时再次二分查找最优分割点

关键配置与最佳实践

消息构造最佳实践

# 1. 使用便捷构造方法
from langchain_core.messages import HumanMessage, AIMessage

# 推荐:直接传递content
msg = HumanMessage("Hello")

# 多模态:使用列表
msg = HumanMessage(content=[
    {"type": "text", "text": "Describe this"},
    {"type": "image_url", "image_url": {"url": "https://..."}}
])

# 2. 设置元数据
msg = AIMessage(
    content="Response",
    response_metadata={
        "model": "gpt-4o",
        "finish_reason": "stop",
        "system_fingerprint": "fp_123"
    },
    usage_metadata={
        "input_tokens": 100,
        "output_tokens": 50,
        "total_tokens": 150
    }
)

# 3. 工具调用消息
from langchain_core.messages import ToolCall

ai_msg = AIMessage(
    content="",
    tool_calls=[
        ToolCall(
            name="calculator",
            args={"expression": "2+2"},
            id="call_abc"
        )
    ]
)

tool_msg = ToolMessage(
    content="4",
    tool_call_id="call_abc"
)

消息管理最佳实践

# 1. 限制对话长度
from langchain_core.messages import trim_messages

def manage_conversation(history: list[BaseMessage], new_message: BaseMessage, model):
    # 添加新消息
    history.append(new_message)

    # 裁剪以适应上下文窗口
    trimmed = trim_messages(
        history,
        max_tokens=4000,
        token_counter=model,
        strategy="last",
        include_system=True
    )

    return trimmed

# 2. 过滤调试消息
production_messages = filter_messages(
    all_messages,
    exclude_names=["debug", "test"]
)

# 3. 序列化保存
import json

# 保存对话历史
with open("chat_history.json", "w") as f:
    json.dump(messages_to_dict(messages), f)

# 加载对话历史
with open("chat_history.json") as f:
    messages = messages_from_dict(json.load(f))

工具调用最佳实践

# 1. 验证工具调用
def validate_tool_call(tool_call: ToolCall, available_tools: dict) -> bool:
    if tool_call["name"] not in available_tools:
        return False

    # 验证参数schema
    tool = available_tools[tool_call["name"]]
    try:
        tool.args_schema.model_validate(tool_call["args"])
        return True
    except ValidationError:
        return False

# 2. 处理无效工具调用
if response.invalid_tool_calls:
    for invalid_call in response.invalid_tool_calls:
        error_msg = ToolMessage(
            content=f"Invalid tool call: {invalid_call['error']}",
            tool_call_id=invalid_call["id"],
            status="error"
        )
        messages.append(error_msg)

# 3. 并行执行工具
import asyncio

async def execute_tools_parallel(tool_calls: list[ToolCall], tools_map):
    tasks = [
        tools_map[tc["name"]].ainvoke(tc["args"])
        for tc in tool_calls
    ]
    results = await asyncio.gather(*tasks)

    return [
        ToolMessage(content=str(result), tool_call_id=tc["id"])
        for tc, result in zip(tool_calls, results)
    ]

性能优化与边界条件

性能关键点

1. content_blocks延迟计算

# ❌ 不推荐: 频繁访问content_blocks
for _ in range(1000):
    blocks = message.content_blocks  # 每次都重新解析

# ✅ 推荐: 缓存结果
blocks = message.content_blocks
for _ in range(1000):
    process(blocks)  # 复用缓存

原理: content_blocks是@property而非缓存属性,每次访问都会重新解析content列表并进行多轮格式转换。

2. trim_messages性能优化

# ❌ 不推荐: 使用精确token计数
from langchain_openai import ChatOpenAI
model = ChatOpenAI()

trimmed = trim_messages(
    long_history,
    max_tokens=4000,
    token_counter=model  # 调用API计数,很慢
)

# ✅ 推荐: 使用近似计数
from langchain_core.messages import count_tokens_approximately

trimmed = trim_messages(
    long_history,
    max_tokens=4000,
    token_counter=lambda msgs: count_tokens_approximately(msgs)
)

性能对比:

  • 精确计数: ~500ms (网络请求)
  • 近似计数: ~5ms (字符串长度/4)
  • 加速比: 100倍

3. 消息序列化性能

import json
from langchain_core.messages import messages_to_dict

# ❌ 不推荐: 重复序列化
for message in messages:
    json_str = json.dumps(messages_to_dict([message]))

# ✅ 推荐: 批量序列化
json_str = json.dumps(messages_to_dict(messages))

边界条件与异常处理

1. 空content处理

# 边界情况1: 空字符串content
msg = AIMessage(content="")
assert msg.text == ""
assert msg.content_blocks == []  # 空字符串被过滤

# 边界情况2: 空列表content
msg = AIMessage(content=[])
assert msg.content_blocks == []

# 边界情况3: 仅包含空字符串的列表
msg = AIMessage(content=["", "", ""])
assert msg.content_blocks == []  # 所有空字符串被过滤

2. tool_call_id验证

from langchain_core.messages import ToolMessage

# ✅ 正确: 关联到存在的tool_call
ai_msg = AIMessage(tool_calls=[{"name": "tool", "args": {}, "id": "call_1"}])
tool_msg = ToolMessage(content="result", tool_call_id="call_1")

# ⚠️ 警告: tool_call_id不匹配
tool_msg = ToolMessage(content="result", tool_call_id="call_999")
# 大多数模型会拒绝此消息或产生错误

3. 多模态内容类型检查

# 边界情况: 无法识别的content block类型
msg = HumanMessage(content=[
    {"type": "unknown_type", "data": "..."}
])

# 结果: 保留为NonStandardContentBlock
blocks = msg.content_blocks
assert blocks[0]["type"] == "non_standard"
assert "value" in blocks[0]

4. trim_messages边界情况

from langchain_core.messages import trim_messages

# 边界1: max_tokens=0
result = trim_messages(messages, max_tokens=0, token_counter=len)
assert result == []  # 返回空列表

# 边界2: 所有消息都超过max_tokens
huge_messages = [HumanMessage("a" * 10000)]
result = trim_messages(huge_messages, max_tokens=100, token_counter=len)
assert result == []  # 无法包含任何消息

# 边界3: include_system但system消息超过预算
messages = [
    SystemMessage("a" * 5000),  # 5000 tokens
    HumanMessage("Hello"),      # 1 token
]
result = trim_messages(
    messages,
    max_tokens=100,
    token_counter=len,
    include_system=True
)
# SystemMessage会被保留,但没有空间给其他消息
assert len(result) == 1
assert isinstance(result[0], SystemMessage)

异常处理模式

1. 消息转换异常

from langchain_core.messages import convert_to_messages
from langchain_core.exceptions import ErrorCode

try:
    messages = convert_to_messages([
        {"content": "missing role field"}  # 缺少role
    ])
except ValueError as e:
    # 检查错误码
    if hasattr(e, "error_code"):
        assert e.error_code == ErrorCode.MESSAGE_COERCION_FAILURE
    print(f"转换失败: {e}")

2. 无效tool_calls处理

from langchain_core.messages import AIMessage

# 情况1: 模型返回无效的tool_call JSON
ai_msg = AIMessage(
    content="",
    tool_calls=[],  # 空
    invalid_tool_calls=[
        {
            "name": "unknown_tool",
            "args": "not-json",  # 无效JSON
            "id": "call_1",
            "error": "JSONDecodeError: ..."
        }
    ]
)

# 应用层处理
if ai_msg.invalid_tool_calls:
    for invalid_call in ai_msg.invalid_tool_calls:
        # 记录错误
        logger.error(f"Invalid tool call: {invalid_call['error']}")

        # 返回错误消息给模型
        error_msg = ToolMessage(
            content=f"Tool call failed: {invalid_call['error']}",
            tool_call_id=invalid_call["id"],
            status="error"
        )

3. 流式响应中断恢复

from langchain_core.messages import AIMessageChunk

accumulated = None
try:
    for chunk in model.stream(messages):
        if accumulated is None:
            accumulated = chunk
        else:
            accumulated = accumulated + chunk

        # 实时显示
        print(chunk.content, end="", flush=True)

except Exception as e:
    # 连接中断
    logger.error(f"Stream interrupted: {e}")

    # accumulated包含已接收的部分
    if accumulated:
        print(f"\n已接收: {accumulated.content}")
        # 可以基于部分结果继续处理

跨提供商兼容性最佳实践

1. 多模态内容标准化

# 推荐: 使用v1标准格式
from langchain_core.messages import HumanMessage

msg = HumanMessage(content=[
    {"type": "text", "text": "Describe this"},
    {
        "type": "image",
        "source": "url",
        "url": "https://example.com/image.jpg"
    }
])

# 自动适配不同提供商
# OpenAI: 转为 image_url 格式
# Anthropic: 转为 source 格式
# Google: 转为 media 格式

2. 工具调用格式统一

# LangChain标准格式
from langchain_core.messages import ToolCall

tool_call = ToolCall(
    name="get_weather",
    args={"location": "Beijing"},
    id="call_123"
)

# ✅ 适配OpenAI
# {"type": "function", "id": "call_123", "function": {"name": "get_weather", "arguments": '{"location":"Beijing"}'}}

# ✅ 适配Anthropic
# {"type": "tool_use", "id": "call_123", "name": "get_weather", "input": {"location": "Beijing"}}

3. 元数据字段规范

from langchain_core.messages import AIMessage

# ✅ 推荐: 使用response_metadata而非additional_kwargs
msg = AIMessage(
    content="Response",
    response_metadata={
        "model": "gpt-4o",
        "finish_reason": "stop",
        "system_fingerprint": "fp_abc",
        "provider_specific_field": "value"
    }
)

# ⚠️ 已废弃: additional_kwargs
msg = AIMessage(
    content="Response",
    additional_kwargs={  # 已废弃
        "model": "gpt-4o"
    }
)

常见问题与解决方案

Q1: 消息历史过长导致超出上下文窗口

问题: 长对话导致超过模型token限制

解决方案:

from langchain_core.messages import trim_messages

# 方案1: 保留最新消息 + 系统消息
trimmed = trim_messages(
    history,
    max_tokens=4000,
    strategy="last",
    token_counter=model,
    include_system=True,
    start_on="human"
)

# 方案2: 滑动窗口 + 总结
if len(history) > 20:
    # 总结早期对话
    summary = model.invoke([
        SystemMessage("Summarize the following conversation"),
        *history[:10]
    ])

    # 保留总结 + 最新消息
    history = [
        SystemMessage(f"Previous context: {summary.content}"),
        *history[10:]
    ]

Q2: 多模态消息在不同模型间迁移

问题: 从Anthropic切换到OpenAI时图片格式不兼容

解决方案:

from langchain_core.messages import convert_to_openai_messages

# Anthropic格式的消息
anthropic_msg = HumanMessage(content=[
    {"type": "text", "text": "What's this?"},
    {
        "type": "image",
        "source": {
            "type": "base64",
            "media_type": "image/jpeg",
            "data": base64_data
        }
    }
])

# 转换为OpenAI格式
openai_format = convert_to_openai_messages([anthropic_msg])
# 自动转换为OpenAI image_url格式

Q3: 工具调用结果过大

问题: 工具返回大量数据(如文档全文)导致token超限

解决方案:

from langchain_core.messages import ToolMessage

# ✅ 方案1: 使用artifact存储原始数据
full_result = get_large_document()
tool_msg = ToolMessage(
    content=full_result[:500] + "...(truncated)",  # 只发送摘要
    tool_call_id="call_123",
    artifact=full_result  # 完整数据存在artifact
)

# ✅ 方案2: 使用status标记并返回引用
tool_msg = ToolMessage(
    content="Result saved to memory with ID: doc_456",
    tool_call_id="call_123",
    artifact={"doc_id": "doc_456", "length": len(full_result)},
    status="success"
)

Q4: 流式输出与非流式输出的一致性

问题: 流式合并的消息与直接invoke的消息字段不一致

解决方案:

from langchain_core.messages import message_chunk_to_message

# 流式调用
accumulated = None
for chunk in model.stream(messages):
    accumulated = chunk if accumulated is None else accumulated + chunk

# 转换为标准Message
final_message = message_chunk_to_message(accumulated)

# 现在final_message与invoke()结果一致
# (除了chunk_position等chunk特有字段)