LLM 模块 (Large Language Model Module)

1. 职责与边界

负责

  • 大语言模型的统一接口封装和调用管理
  • 多种 LLM 服务提供商的适配(DashScope、OpenAI、本地部署等)
  • 函数调用(Function Calling)能力的实现和管理
  • 消息格式的标准化和转换
  • 流式输出、重试机制、缓存策略等高级功能

不负责

  • 具体的业务逻辑处理(委托给 Agent 模块)
  • 工具的具体实现(委托给 Tools 模块)
  • 用户界面和交互(委托给 GUI 模块)
  • 文件存储和管理(委托给 Memory 模块)

依赖

  • 外部 API: DashScope API、OpenAI API、vLLM、Ollama 等
  • 配置管理: API 密钥、模型参数、生成配置
  • 网络服务: HTTP 客户端、WebSocket 连接
  • 缓存系统: diskcache(可选)

数据契约

  • 输入: 消息列表(Message 对象)、函数定义、生成配置
  • 输出: 响应消息流(Iterator[List[Message]])
  • 事件: 模型调用事件、错误事件、重试事件

2. 模块架构

2.1 整体架构图

graph TB
    subgraph "LLM 抽象层"
        BaseChatModel[BaseChatModel<br/>统一接口定义]
        BaseFnCallModel[BaseFnCallModel<br/>函数调用增强]
        ModelFactory[get_chat_model<br/>工厂函数]
        Schema[Message Schema<br/>消息格式]
    end
    
    subgraph "具体实现层"
        QwenDS[QwenChatAtDS<br/>DashScope实现]
        QwenVL[QwenVLChatAtDS<br/>多模态实现]
        QwenAudio[QwenAudioChatAtDS<br/>音频实现]
        OpenAI[OpenAIModel<br/>OpenAI兼容]
        Transformers[TransformersLLM<br/>本地部署]
        Azure[AzureModel<br/>Azure实现]
    end
    
    subgraph "功能增强层"
        FunctionCalling[函数调用处理<br/>工具集成]
        StreamProcessor[流式处理<br/>实时响应]
        RetryMechanism[重试机制<br/>容错处理]
        ConfigManager[配置管理<br/>参数合并]
    end
    
    subgraph "外部服务层"
        DashScopeAPI[DashScope API<br/>阿里云服务]
        OpenAIAPI[OpenAI API<br/>官方服务]
        vLLMServer[vLLM Server<br/>本地推理]
        OllamaServer[Ollama Server<br/>本地部署]
        AzureAPI[Azure OpenAI<br/>企业服务]
    end
    
    %% 继承关系
    BaseChatModel --> BaseFnCallModel
    BaseFnCallModel --> QwenDS
    BaseFnCallModel --> QwenVL
    BaseFnCallModel --> QwenAudio
    BaseFnCallModel --> OpenAI
    BaseChatModel --> Transformers
    BaseFnCallModel --> Azure
    
    %% 工厂创建
    ModelFactory --> QwenDS
    ModelFactory --> OpenAI
    ModelFactory --> Transformers
    
    %% 功能集成
    BaseFnCallModel --> FunctionCalling
    BaseChatModel --> StreamProcessor
    BaseChatModel --> RetryMechanism
    BaseChatModel --> ConfigManager
    
    %% 外部调用
    QwenDS --> DashScopeAPI
    OpenAI --> OpenAIAPI
    Transformers --> vLLMServer
    Transformers --> OllamaServer
    Azure --> AzureAPI

2.2 支持的模型类型

模型类型 实现类 支持特性 适用场景
DashScope QwenChatAtDS 函数调用、流式输出 生产环境
多模态 QwenVLChatAtDS 图像理解、视觉问答 多模态应用
音频 QwenAudioChatAtDS 语音理解、音频处理 语音应用
OpenAI OpenAIModel 标准API、工具调用 兼容性需求
本地部署 TransformersLLM 私有化部署、定制化 数据安全
Azure AzureModel 企业级服务、合规性 企业应用

3. 主要时序

3.1 标准聊天调用时序

sequenceDiagram
    participant C as Client
    participant B as BaseChatModel
    participant I as Implementation
    participant A as API Service
    participant Cache as Cache
    
    C->>B: chat(messages, functions)
    B->>B: 消息格式标准化
    B->>Cache: 查找缓存
    Cache-->>B: 缓存未命中
    
    B->>B: 预处理消息
    B->>B: 截断长消息
    B->>I: _chat_with_functions
    
    I->>A: 发送 API 请求
    A-->>I: 流式响应
    
    loop 流式输出
        I->>B: yield 响应块
        B->>B: 后处理消息
        B-->>C: 返回格式化响应
    end
    
    B->>Cache: 缓存最终结果

3.2 函数调用时序

sequenceDiagram
    participant A as Agent
    participant L as LLM
    participant F as FnCallProcessor
    participant API as LLM API
    participant T as Tools
    
    A->>L: chat(messages, functions)
    L->>F: 处理函数调用
    F->>F: 构造函数调用提示
    F->>API: 发送带函数的请求
    
    API-->>F: 返回函数调用响应
    F->>F: 解析函数调用
    F-->>L: 返回函数调用消息
    L-->>A: 返回需要执行的函数
    
    A->>T: 执行函数调用
    T-->>A: 返回函数结果
    A->>L: 发送函数结果
    L->>API: 继续对话
    API-->>L: 返回最终响应
    L-->>A: 返回完整对话

3.3 重试机制时序

sequenceDiagram
    participant C as Client
    participant R as RetryManager
    participant L as LLM
    participant API as API Service
    
    C->>R: 调用模型服务
    R->>L: 第一次尝试
    L->>API: 发送请求
    API-->>L: 网络错误
    L-->>R: ModelServiceError
    
    R->>R: 检查错误类型
    R->>R: 计算退避延迟
    R->>R: 等待延迟时间
    
    R->>L: 第二次尝试
    L->>API: 重新发送请求
    API-->>L: 成功响应
    L-->>R: 返回结果
    R-->>C: 返回最终结果

3.4 LLM调用完整时序

sequenceDiagram
    participant Agent as 智能体
    participant Factory as LLM工厂
    participant LLM as LLM实例
    participant API as 外部API
    participant Processor as 响应处理器
    
    Agent->>Factory: get_chat_model(config)
    Factory->>Factory: 验证配置
    Factory->>LLM: 创建 LLM实例
    LLM-->>Factory: 返回实例
    Factory-->>Agent: 返回 LLM实例
    
    Agent->>LLM: chat(messages, functions)
    LLM->>LLM: 消息预处理
    LLM->>API: 发送API请求
    
    loop 流式响应
        API-->>LLM: 返回响应块
        LLM->>Processor: 处理响应块
        Processor-->>LLM: 返回处理结果
        LLM-->>Agent: yield响应消息
    end
    
    LLM->>LLM: 响应后处理
    LLM-->>Agent: 返回最终响应

3.5 函数调用处理时序

sequenceDiagram
    participant LLM as LLM实例
    participant Preprocessor as 预处理器
    participant API as 外部API
    participant Postprocessor as 后处理器
    participant FnExtractor as 函数提取器
    
    LLM->>Preprocessor: 预处理消息
    Preprocessor->>Preprocessor: 注入函数定义
    Preprocessor->>Preprocessor: 格式化提示
    Preprocessor-->>LLM: 返回处理后消息
    
    LLM->>API: 发送带函数定义的请求
    API-->>LLM: 返回可能包含函数调用的响应
    
    LLM->>Postprocessor: 后处理响应
    Postprocessor->>FnExtractor: 提取函数调用信息
    FnExtractor-->>Postprocessor: 返回函数调用对象
    Postprocessor->>Postprocessor: 清理函数调用标记
    Postprocessor-->>LLM: 返回标准化响应

4. 提供的接口

4.1 对外接口

接口 协议 方法 参数 返回 说明
chat() Python API 同步/流式 messages, functions, stream, extra_cfg Iterator[List[Message]] 主要聊天接口
quick_chat() Python API 同步 prompt str 简单文本对话
quick_chat_oai() Python API 流式 messages, tools dict OpenAI 兼容接口
raw_chat() Python API 流式 messages, functions, generate_cfg Iterator[List[Message]] 原生 API 调用

4.2 对内接口

接口 作用域 参数 返回 说明
_chat_with_functions() 子类实现 messages, functions, stream, generate_cfg Iterator[List[Message]] 函数调用实现
_chat_stream() 子类实现 messages, delta_stream, generate_cfg Iterator[List[Message]] 流式聊天实现
_chat_no_stream() 子类实现 messages, generate_cfg List[Message] 非流式聊天实现
_preprocess_messages() 内部处理 messages, lang, generate_cfg, functions List[Message] 消息预处理
_postprocess_messages() 内部处理 messages, fncall_mode, generate_cfg List[Message] 消息后处理

5. 入口函数清单

入口 文件/行号 签名 说明
BaseChatModel.chat qwen_agent/llm/base.py:118-290 chat(messages, functions, stream, **kwargs) -> Iterator LLM 主入口
QwenChatAtDS._chat_stream qwen_agent/llm/qwen_dashscope.py:37-58 _chat_stream(messages, delta_stream, generate_cfg) -> Iterator DashScope 流式实现
QwenChatAtDS._chat_no_stream qwen_agent/llm/qwen_dashscope.py:60-86 _chat_no_stream(messages, generate_cfg) -> List[Message] DashScope 非流式实现

6. 核心类详细分析

6.1 BaseChatModel 抽象基类

位置: qwen_agent/llm/base.py

核心功能: 定义统一的LLM接口规范

class BaseChatModel(ABC):
    """基础聊天模型类
    
    设计原则:
    - 统一的消息格式 (OpenAI兼容)
    - 流式和非流式输出支持
    - 配置参数标准化
    - 多模态内容支持
    """
    
    def __init__(self, model: str, api_key: str = '', model_server: str = '', 
                 generate_cfg: dict = None, **kwargs):
        self.model = model
        self.model_type = self.__class__.__name__.lower()
        self.api_key = api_key
        self.model_server = model_server
        
        # 默认生成配置
        self.generate_cfg = {
            'top_p': 0.8,
            'temperature': 0.7,
            'max_tokens': 2000,
            'stream': True,
        }
        
        if generate_cfg:
            self.generate_cfg.update(generate_cfg)
    
    @abstractmethod
    def chat(self, messages: List[Message], functions: List[Dict] = None,
             stream: bool = True, extra_generate_cfg: dict = None) -> Iterator[List[Message]]:
        """聊天接口抽象方法"""
        raise NotImplementedError

6.2 BaseFnCallModel 函数调用增强

位置: qwen_agent/llm/function_calling.py

核心功能: 为LLM添加函数调用能力

class BaseFnCallModel(BaseChatModel):
    """函数调用增强的LLM基类
    
    核心特性:
    - 函数调用消息预处理
    - 并行函数调用支持
    - 函数选择策略
    - 多语言提示模板
    """
    
    def _preprocess_messages(self, messages: List[Message], lang: str = 'en',
                           generate_cfg: dict = None, functions: List[Dict] = None,
                           use_raw_api: bool = False) -> List[Message]:
        """函数调用消息预处理"""
        messages = super()._preprocess_messages(messages, lang=lang, 
                                              generate_cfg=generate_cfg, functions=functions)
        
        if use_raw_api or self.use_raw_api:
            return messages
        
        if (not functions) or (generate_cfg and generate_cfg.get('function_choice', 'auto') == 'none'):
            messages = self._remove_fncall_messages(messages, lang=lang)
        else:
            messages = self.fncall_prompt.preprocess_fncall_messages(
                messages=messages,
                functions=functions,
                lang=lang,
                parallel_function_calls=generate_cfg.get('parallel_function_calls', False) if generate_cfg else False,
                function_choice=generate_cfg.get('function_choice', 'auto') if generate_cfg else 'auto',
            )
        
        return messages

6.3 模型工厂和注册机制

位置: qwen_agent/llm/__init__.py

def get_chat_model(cfg: Union[dict, BaseChatModel]) -> BaseChatModel:
    """
    LLM工厂函数
    
    功能:
    - 根据配置创建对应的LLM实例
    - 支持多种模型类型
    - 配置验证和默认值处理
    """
    if isinstance(cfg, BaseChatModel):
        return cfg
    
    model_type = cfg.get('model_type', 'qwen_dashscope')
    model = cfg.get('model', 'qwen-max-latest')
    
    if model_type == 'qwen_dashscope':
        from .qwen_dashscope import QwenChatAtDS
        return QwenChatAtDS(model=model, **cfg)
    elif model_type == 'openai':
        from .oai import OpenAIModel
        return OpenAIModel(model=model, **cfg)
    # ... 其他模型类型
    else:
        raise ValueError(f"Unsupported model type: {model_type}")

# 全局LLM注册表
LLM_REGISTRY = {
    'qwen_dashscope': 'qwen_agent.llm.qwen_dashscope.QwenChatAtDS',
    'qwenvl_dashscope': 'qwen_agent.llm.qwenvl_dashscope.QwenVLChatAtDS',
    'openai': 'qwen_agent.llm.oai.OpenAIModel',
    'azure': 'qwen_agent.llm.azure.AzureModel',
    'transformers': 'qwen_agent.llm.transformers_llm.TransformersLLM',
}

7. 关键路径与关键函数

7.1 关键路径图

flowchart TD
    Start[用户调用 chat] --> Format[消息格式标准化]
    Format --> Cache[缓存查找]
    Cache -->|命中| Return[返回缓存结果]
    Cache -->|未命中| Preprocess[消息预处理]
    
    Preprocess --> Truncate[消息截断]
    Truncate --> FnCheck{是否函数调用}
    
    FnCheck -->|是| FnCall[函数调用处理]
    FnCheck -->|否| NormalChat[普通聊天]
    
    FnCall --> APICall[API 调用]
    NormalChat --> APICall
    
    APICall --> Stream{是否流式}
    Stream -->|是| StreamProcess[流式处理]
    Stream -->|否| BatchProcess[批量处理]
    
    StreamProcess --> Postprocess[消息后处理]
    BatchProcess --> Postprocess
    
    Postprocess --> CacheStore[存储缓存]
    CacheStore --> Return

7.2 关键函数分析

7.2.1 BaseChatModel.chat() - LLM 主入口

文件: qwen_agent/llm/base.py:118-290

def chat(self, messages: List[Union[Message, Dict]], functions: Optional[List[Dict]] = None,
         stream: bool = True, delta_stream: bool = False, 
         extra_generate_cfg: Optional[Dict] = None) -> Iterator[List[Message]]:
    """
    LLM 聊天的核心接口函数
    
    设计目的: 提供统一的 LLM 调用接口,支持多种模型和调用模式
    作用域: 所有 LLM 实现的通用入口点
    依赖前置条件: 模型已正确初始化,API 密钥配置完成
    后置条件: 生成符合格式的响应消息流
    复杂度: O(n*m) - n 为消息数,m 为 API 调用次数(含重试)
    重要旁支: 支持缓存机制、重试策略、流式输出
    """
    # 消息格式统一化:确保所有消息都是 Message 对象
    messages = copy.deepcopy(messages)
    _return_message_type = 'dict'  # 记录原始格式用于返回时保持一致
    new_messages = []
    
    for msg in messages:
        if isinstance(msg, dict):
            new_messages.append(Message(**msg))  # 字典转 Message 对象
        else:
            new_messages.append(msg)
            _return_message_type = 'message'  # 原始就是 Message 对象
    
    messages = new_messages
    
    if not messages:
        raise ValueError('Messages can not be empty.')
    
    # 缓存查找:避免重复的相同请求
    if self.cache is not None:
        cache_key = dict(messages=messages, functions=functions, extra_generate_cfg=extra_generate_cfg)
        cache_key: str = json_dumps_compact(cache_key, sort_keys=True)
        cache_value: str = self.cache.get(cache_key)
        
        if cache_value:
            cache_value: List[dict] = json.loads(cache_value)
            if _return_message_type == 'message':
                cache_value: List[Message] = [Message(**m) for m in cache_value]
            if stream:
                cache_value: Iterator[List[Union[Message, dict]]] = iter([cache_value])
            return cache_value
    
    # 生成配置合并:基础配置 + 额外配置
    generate_cfg = merge_generate_cfgs(base_generate_cfg=self.generate_cfg, new_generate_cfg=extra_generate_cfg)
    
    # 随机种子设置:确保可重现性
    if 'seed' not in generate_cfg:
        generate_cfg['seed'] = random.randint(a=0, b=2**30)
    
    # 语言检测:自动检测中英文
    if 'lang' in generate_cfg:
        lang: Literal['en', 'zh'] = generate_cfg.pop('lang')
    else:
        lang: Literal['en', 'zh'] = 'zh' if has_chinese_messages(messages) else 'en'
    
    # 系统消息注入:确保有默认系统消息
    if DEFAULT_SYSTEM_MESSAGE and messages[0].role != SYSTEM:
        messages = [Message(role=SYSTEM, content=DEFAULT_SYSTEM_MESSAGE)] + messages
    
    # 消息截断:防止超出模型上下文长度限制
    max_input_tokens = generate_cfg.pop('max_input_tokens', DEFAULT_MAX_INPUT_TOKENS)
    if max_input_tokens > 0:
        messages = _truncate_input_messages_roughly(messages=messages, max_tokens=max_input_tokens)
    
    # 函数调用模式检测
    if functions:
        fncall_mode = True
    else:
        fncall_mode = False
    
    # 消息预处理:格式化为模型可接受的格式
    messages = self._preprocess_messages(
        messages, lang=lang, generate_cfg=generate_cfg, 
        functions=functions, use_raw_api=self.use_raw_api
    )
    
    # 多模态支持检查:不支持多模态的模型转为纯文本
    if not self.support_multimodal_input:
        messages = [format_as_text_message(msg, add_upload_info=False) for msg in messages]
    
    # 原生 API 模式:直接调用模型原生接口
    if self.use_raw_api:
        logger.debug('`use_raw_api` takes effect.')
        assert stream and (not delta_stream), '`use_raw_api` only support full stream!!!'
        return self.raw_chat(messages=messages, functions=functions, stream=stream, generate_cfg=generate_cfg)
    
    # 定义模型服务调用函数
    def _call_model_service():
        if fncall_mode:
            return self._chat_with_functions(
                messages=messages, functions=functions, stream=stream,
                delta_stream=delta_stream, generate_cfg=generate_cfg, lang=lang,
            )
        else:
            if messages[-1].role == ASSISTANT:
                # 继续助手响应模式
                assert not delta_stream, 'Continuation mode does not currently support `delta_stream`'
                return self._continue_assistant_response(messages, generate_cfg=generate_cfg, stream=stream)
            else:
                return self._chat(messages, stream=stream, delta_stream=delta_stream, generate_cfg=generate_cfg)
    
    # 重试机制:根据流式模式选择不同的重试策略
    if stream and delta_stream:
        output = _call_model_service()  # 增量流式不支持重试
    elif stream and (not delta_stream):
        output = retry_model_service_iterator(_call_model_service, max_retries=self.max_retries)
    else:
        output = retry_model_service(_call_model_service, max_retries=self.max_retries)
    
    # 处理输出结果
    if isinstance(output, list):
        # 非流式输出
        assert not stream
        logger.debug(f'LLM Output: \n{pformat([_.model_dump() for _ in output], indent=2)}')
        output = self._postprocess_messages(output, fncall_mode=fncall_mode, generate_cfg=generate_cfg)
        
        if not self.support_multimodal_output:
            output = _format_as_text_messages(messages=output)
        
        # 缓存结果
        if self.cache:
            self.cache.set(cache_key, json_dumps_compact(output))
        
        return self._convert_messages_to_target_type(output, _return_message_type)
    else:
        # 流式输出
        assert stream
        output = self._postprocess_messages_iterator(output, fncall_mode=fncall_mode, generate_cfg=generate_cfg)
        
        def _format_and_cache() -> Iterator[List[Message]]:
            o = []
            for o in output:
                if o:
                    if not self.support_multimodal_output:
                        o = _format_as_text_messages(messages=o)
                    yield o
            # 缓存最终结果
            if o and (self.cache is not None):
                self.cache.set(cache_key, json_dumps_compact(o))
        
        return self._convert_messages_iterator_to_target_type(_format_and_cache(), _return_message_type)

7.2.2 QwenChatAtDS._full_stream_output() - DashScope 流式输出处理

文件: qwen_agent/llm/qwen_dashscope.py:109-159

@staticmethod
def _full_stream_output(response) -> Iterator[List[Message]]:
    """
    DashScope 完整流式输出处理
    
    设计目的: 处理 DashScope API 的流式响应,累积完整内容
    作用域: DashScope 模型的流式输出处理
    依赖前置条件: DashScope API 响应流已建立
    后置条件: 生成累积的完整消息内容
    复杂度: O(n) - n 为响应块数量
    重要旁支: 支持推理内容、工具调用的增量累积
    """
    full_content = ''              # 累积的完整内容
    full_reasoning_content = ''    # 累积的推理内容
    full_tool_calls = []          # 累积的工具调用列表
    
    # 处理每个响应块
    for chunk in response:
        if chunk.status_code == HTTPStatus.OK:
            # 累积推理内容(思考过程)
            if chunk.output.choices[0].message.get('reasoning_content', ''):
                full_reasoning_content += chunk.output.choices[0].message.reasoning_content
            
            # 累积主要内容
            if chunk.output.choices[0].message.content:
                full_content += chunk.output.choices[0].message.content
            
            # 处理工具调用的增量更新
            tool_calls = chunk.output.choices[0].message.get('tool_calls', None)
            if tool_calls:
                for tc in tool_calls:
                    # 检查是否是现有工具调用的续传
                    if full_tool_calls and (not tc['id'] or tc['id'] == full_tool_calls[-1]['extra']['function_id']):
                        # 累积工具名称和参数
                        if tc['function'].get('name', ''):
                            full_tool_calls[-1].function_call['name'] += tc['function']['name']
                        if tc['function'].get('arguments', ''):
                            full_tool_calls[-1].function_call['arguments'] += tc['function']['arguments']
                    else:
                        # 新的工具调用
                        full_tool_calls.append(
                            Message(
                                role=ASSISTANT,
                                content='',
                                function_call=FunctionCall(
                                    name=tc['function'].get('name', ''),
                                    arguments=tc['function'].get('arguments', '')
                                ),
                                extra={
                                    'model_service_info': json.loads(str(chunk)),
                                    'function_id': tc['id']
                                }
                            )
                        )
            
            # 构造当前累积的响应
            res = []
            
            # 添加推理内容消息
            if full_reasoning_content:
                res.append(
                    Message(
                        role=ASSISTANT,
                        content='',
                        reasoning_content=full_reasoning_content,
                        extra={'model_service_info': json.loads(str(chunk))}
                    )
                )
            
            # 添加主要内容消息
            if full_content:
                res.append(
                    Message(
                        role=ASSISTANT,
                        content=full_content,
                        extra={'model_service_info': json.loads(str(chunk))}
                    )
                )
            
            # 添加工具调用消息
            if full_tool_calls:
                res += full_tool_calls
            
            yield res  # 返回当前累积的完整响应
            
        else:
            # 处理错误响应
            raise ModelServiceError(
                code=chunk.code, 
                message=chunk.message, 
                extra={'model_service_info': chunk}
            )

7.2.3 消息截断处理 - _truncate_input_messages_roughly()

文件: qwen_agent/llm/base.py:602-804

def _truncate_input_messages_roughly(messages: List[Message], max_tokens: int) -> List[Message]:
    """
    智能消息截断处理函数
    
    设计目的: 在保持对话连贯性的前提下,将消息截断到指定 token 限制内
    作用域: 所有 LLM 模型的输入预处理
    依赖前置条件: 消息列表格式正确,token 计算器可用
    后置条件: 返回截断后的消息列表,总 token 数不超过限制
    复杂度: O(n*m) - n 为消息数,m 为平均消息长度
    重要旁支: 优先保留最新对话,智能处理函数调用结果
    """
    # 系统消息验证:确保最多只有一个系统消息且在开头
    if len([m for m in messages if m.role == SYSTEM]) >= 2:
        raise ModelServiceError(
            code='400',
            message='The input messages must contain no more than one system message. '
                   ' And the system message, if exists, must be the first message.',
        )
    
    if not messages:
        return messages
    
    # 按对话轮次组织消息:每个用户消息开始一个新轮次
    turns = []
    for m in messages:
        if m.role == SYSTEM:
            continue  # 系统消息单独处理
        elif m.role == USER:
            turns.append([m])  # 用户消息开始新轮次
        else:
            if turns:
                turns[-1].append(m)  # 助手/函数消息添加到当前轮次
            else:
                raise ModelServiceError(
                    code='400',
                    message='The input messages (excluding the system message) must start with a user message.',
                )
    
    def _count_tokens(msg: Message) -> int:
        """计算单个消息的 token 数量"""
        if msg.role == ASSISTANT and msg.function_call:
            return tokenizer.count_tokens(f'{msg.function_call}')
        return tokenizer.count_tokens(extract_text_from_message(msg, add_upload_info=True))
    
    def _truncate_message(msg: Message, max_tokens: int, keep_both_sides: bool = False):
        """截断单个消息内容"""
        if isinstance(msg.content, str):
            content = tokenizer.truncate(msg.content, max_token=max_tokens, keep_both_sides=keep_both_sides)
        else:
            # 处理多模态内容
            text = []
            for item in msg.content:
                if not item.text:
                    return None
                text.append(item.text)
            text = '\n'.join(text)
            content = tokenizer.truncate(text, max_token=max_tokens, keep_both_sides=keep_both_sides)
        return Message(role=msg.role, content=content)
    
    # 计算可用 token 数:总限制减去系统消息占用
    available_token = max_tokens
    message_tokens = defaultdict(int)
    new_messages = []
    
    for msg_idx, msg in enumerate(messages):
        if msg.role == SYSTEM:
            new_messages.append(msg)
            available_token = max_tokens - _count_tokens(msg=msg)
            continue
        message_tokens[msg_idx] = _count_tokens(msg=msg)
    
    # 检查是否需要截断
    all_tokens = sum([x for x in message_tokens.values()])
    logger.info(f'ALL tokens: {all_tokens}, Available tokens: {available_token}')
    
    if all_tokens <= available_token:
        return messages  # 无需截断
    
    if available_token <= 0:
        raise ModelServiceError(
            code='400',
            message=f'The input system has exceed the maximum input context length ({max_tokens} tokens)',
        )
    
    # 执行智能截断:优先保留最新对话,智能处理函数结果
    exceedance = all_tokens - available_token  # 需要删除的 token 数
    
    # 按轮次从旧到新处理,优先删除旧的对话轮次
    for it, (user_msg_idx, indexed_messages) in enumerate(indexed_messages_per_user.items()):
        if exceedance <= 0:
            new_messages += [x[1] for x in indexed_messages]
            continue
        else:
            is_last_turn = (it == len(indexed_messages_per_user) - 1)
            new_turn, exceedance = _truncate_turn(
                indexed_messages1=indexed_messages,
                message_tokens1=message_tokens,
                exceedance=exceedance,
                is_last_turn=is_last_turn
            )
            if new_turn:
                new_messages += new_turn
    
    return new_messages

7.2.4 重试机制 - retry_model_service_iterator()

文件: qwen_agent/llm/base.py:822-836

def retry_model_service_iterator(it_fn, max_retries: int = 10) -> Iterator:
    """
    流式迭代器的重试机制
    
    设计目的: 为流式 LLM 调用提供可靠的重试机制
    作用域: 所有流式 LLM 调用的错误恢复
    依赖前置条件: 迭代器函数可重复调用
    后置条件: 成功返回完整的流式响应或抛出最终异常
    复杂度: O(r*n) - r 为重试次数,n 为响应流长度
    重要旁支: 指数退避策略,智能错误分类
    """
    num_retries, delay = 0, 1.0  # 初始化重试计数和延迟时间
    
    while True:
        try:
            # 尝试执行迭代器函数
            for rsp in it_fn():
                yield rsp  # 逐个返回响应
            break  # 成功完成,退出重试循环
            
        except ModelServiceError as e:
            # 处理模型服务错误,决定是否重试
            num_retries, delay = _raise_or_delay(e, num_retries, delay, max_retries)
            # 如果 _raise_or_delay 没有抛出异常,说明可以重试
            # 继续下一轮循环

def _raise_or_delay(e: ModelServiceError, num_retries: int, delay: float, 
                    max_retries: int = 10, max_delay: float = 300.0, 
                    exponential_base: float = 2.0) -> Tuple[int, float]:
    """
    智能重试决策函数
    
    设计目的: 根据错误类型和重试次数决定是否继续重试
    作用域: 所有 LLM 调用的错误处理
    错误分类: 区分可重试错误和不可重试错误
    退避策略: 指数退避 + 随机抖动,避免雷群效应
    """
    if max_retries <= 0:  # 不允许重试
        raise e
    
    # 不可重试的错误类型
    if e.code == '400':  # 请求参数错误
        raise e
    if e.code == 'DataInspectionFailed':  # 内容安全检查失败
        raise e
    if 'inappropriate content' in str(e):  # 不当内容
        raise e
    if 'maximum context length' in str(e):  # 上下文长度超限
        raise e
    
    logger.warning('ModelServiceError - ' + str(e).strip('\n'))
    
    # 检查重试次数限制
    if num_retries >= max_retries:
        raise ModelServiceError(exception=Exception(f'Maximum number of retries ({max_retries}) exceeded.'))
    
    # 计算下次重试的延迟时间
    num_retries += 1
    jitter = 1.0 + random.random()  # 随机抖动因子 [1.0, 2.0)
    delay = min(delay * exponential_base, max_delay) * jitter  # 指数退避 + 抖动
    time.sleep(delay)  # 等待延迟时间
    
    return num_retries, delay

8. 性能优化策略

8.1 连接池管理

class ConnectionPoolManager:
    """连接池管理器"""
    
    def __init__(self, max_connections: int = 10):
        self.max_connections = max_connections
        self.pools = {}
    
    def get_pool(self, model_type: str, base_url: str):
        """获取连接池"""
        pool_key = f"{model_type}:{base_url}"
        
        if pool_key not in self.pools:
            if model_type == 'openai':
                from openai import OpenAI
                self.pools[pool_key] = OpenAI(
                    base_url=base_url,
                    max_retries=3,
                    timeout=30.0
                )
        
        return self.pools[pool_key]

8.2 批量处理优化

class BatchProcessor:
    """批量处理器"""
    
    def __init__(self, batch_size: int = 5, timeout: float = 1.0):
        self.batch_size = batch_size
        self.timeout = timeout
        self.pending_requests = []
    
    async def add_request(self, messages: List[Message], **kwargs) -> List[Message]:
        """添加请求到批次"""
        future = asyncio.Future()
        request = {
            'messages': messages,
            'kwargs': kwargs,
            'future': future
        }
        
        self.pending_requests.append(request)
        
        if len(self.pending_requests) >= self.batch_size:
            await self._process_batch()
        
        return await future

9. 扩展开发指南

9.1 自定义LLM实现

@register_llm('custom_model')
class CustomLLM(BaseFnCallModel):
    """自定义LLM实现模板"""
    
    def __init__(self, model: str, **kwargs):
        super().__init__(model, **kwargs)
        self.custom_config = kwargs.get('custom_config', {})
        self.client = self._init_client()
    
    def chat(self, messages: List[Message], functions: List[Dict] = None,
             stream: bool = True, extra_generate_cfg: dict = None) -> Iterator[List[Message]]:
        """实现聊天接口"""
        # 1. 消息预处理
        messages = self._preprocess_messages(messages, functions=functions)
        
        # 2. 转换为自定义格式
        custom_messages = self._convert_to_custom_format(messages)
        
        # 3. 调用自定义API
        try:
            if stream:
                yield from self._chat_stream(custom_messages, functions)
            else:
                yield from self._chat_no_stream(custom_messages, functions)
        except Exception as e:
            raise ModelServiceError(f"Custom model error: {str(e)}")

9.2 多模态LLM扩展

class MultimodalLLM(BaseFnCallModel):
    """多模态LLM基类"""
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.support_multimodal_input = True
        self.support_multimodal_output = True
    
    def _process_multimodal_content(self, content: List[ContentItem]) -> dict:
        """处理多模态内容"""
        processed_content = {
            'text': [],
            'images': [],
            'audio': [],
            'video': []
        }
        
        for item in content:
            if item.text:
                processed_content['text'].append(item.text)
            elif item.image:
                image_data = self._process_image(item.image)
                processed_content['images'].append(image_data)
        
        return processed_content

10. 并发/IO/错误/配置/安全要点

10.1 并发处理

  • 同步调用: 基础的 LLM 调用采用同步模式,通过流式输出提供实时反馈
  • 异步支持: 支持异步调用模式,适用于高并发场景
  • 连接复用: HTTP 连接复用,减少连接建立开销
  • 线程安全: 模型实例线程安全,支持多线程并发调用

10.2 I/O 与重试策略

  • 指数退避: 采用指数退避 + 随机抖动的重试策略,避免雷群效应
  • 智能重试: 根据错误类型智能决定是否重试,避免无效重试
  • 超时控制: 设置合理的请求超时时间,防止长时间阻塞
  • 流式处理: 支持流式输入输出,降低内存占用和响应延迟

10.3 错误分类与传播

  • 用户错误 (400): 参数格式错误、上下文过长等,不可重试
  • 服务错误 (5xx): 服务暂时不可用、网络异常等,可重试
  • 内容安全: 不当内容检测失败,不可重试
  • 配额限制: API 调用频率限制,可重试但需延长等待时间

10.4 配置项与动态开关

  • 模型参数: temperature、top_p、max_tokens 等生成参数
  • 重试配置: max_retries、max_delay、exponential_base
  • 缓存配置: cache_dir、缓存过期时间
  • API 配置: api_key、base_url、超时时间

10.5 安全考虑

  • API 密钥管理: 支持环境变量和配置文件两种方式
  • 内容过滤: 自动检测和过滤不当内容
  • 输入验证: 严格验证输入参数格式和范围
  • 日志脱敏: 避免在日志中记录敏感信息

11. 可观测性与性能

11.1 日志关键字段

  • 请求 ID: 唯一标识每次 LLM 调用
  • 模型信息: 模型名称、版本、提供商
  • 性能指标: 请求时间、响应时间、token 消耗
  • 错误信息: 错误码、错误消息、重试次数
  • 缓存状态: 缓存命中/未命中、缓存键

11.2 关键指标

  • 响应延迟: 首 token 延迟(P95 < 1s)、完整响应延迟(P95 < 10s)
  • 成功率: API 调用成功率(> 99%)、重试成功率
  • 吞吐量: QPS、并发连接数、token/秒
  • 资源使用: 内存占用、网络带宽、缓存命中率
  • 成本监控: API 调用费用、token 消耗统计

11.3 性能优化建议

  • 缓存策略: 对相同请求启用缓存,设置合理的过期时间
  • 批量处理: 对多个独立请求进行批量处理
  • 连接池: 使用 HTTP 连接池,减少连接建立开销
  • 预热机制: 对常用模型进行预热,减少冷启动时间

12. 验收清单

  • LLM 抽象层和实现层架构完整
  • 主要调用时序图清晰
  • 接口与入口函数完全列举
  • 关键路径覆盖完整
  • 关键函数贴代码并详细注释
  • 并发/IO/错误/配置/安全说明完整
  • 可观测性与性能建议明确
  • 验收标准达成