智能体模块 (Agents Module)
1. 模块概述
1.1 职责与边界
负责
- 智能体的核心执行逻辑和工作流程
- 消息处理、工具调用、多轮对话管理
- RAG(检索增强生成)能力集成
- 不同类型智能体的专门化实现
- 多智能体协作和对话管理
不负责
- 具体的 LLM 模型调用(委托给 LLM 模块)
- 工具的具体实现(委托给 Tools 模块)
- 文件存储和管理(委托给 Memory 模块)
- 用户界面展示(委托给 GUI 模块)
依赖
- LLM 模块: 大语言模型调用服务
- Tools 模块: 工具注册和执行
- Memory 模块: 文件管理和对话历史
- 配置: 系统消息、最大调用次数等设置
数据契约
- 输入: 消息列表(Message 对象)
- 输出: 响应消息流(Iterator[List[Message]])
- 事件: 工具调用事件、错误事件
1.2 核心智能体类型
智能体类型 | 继承关系 | 主要功能 | 适用场景 |
---|---|---|---|
Agent | 抽象基类 | 定义统一接口 | 框架基础 |
BasicAgent | Agent | 纯 LLM 对话 | 简单聊天 |
FnCallAgent | Agent | 工具调用 | 功能扩展 |
Assistant | FnCallAgent | RAG + 工具调用 | 通用助手 |
ReActChat | FnCallAgent | 推理行动循环 | 复杂任务 |
ArticleAgent | Assistant | 文章写作 | 内容创作 |
GroupChat | Agent + MultiAgentHub | 多智能体协作 | 团队协作 |
2. 架构图
2.1 智能体继承体系
graph TB
subgraph "智能体继承体系"
Agent[Agent 抽象基类<br/>统一接口定义]
BasicAgent[BasicAgent<br/>基础对话智能体]
FnCallAgent[FnCallAgent<br/>函数调用智能体]
Assistant[Assistant<br/>通用助手智能体]
ReActChat[ReActChat<br/>推理行动智能体]
ArticleAgent[ArticleAgent<br/>文章写作智能体]
GroupChat[GroupChat<br/>多智能体协作]
VirtualMemoryAgent[VirtualMemoryAgent<br/>虚拟记忆智能体]
end
subgraph "支持模块"
Memory[Memory 记忆模块<br/>RAG 检索]
MultiAgentHub[MultiAgentHub<br/>多智能体管理]
Tools[Tools 工具系统<br/>功能扩展]
LLM[LLM 模块<br/>模型调用]
end
Agent --> BasicAgent
Agent --> FnCallAgent
Agent --> GroupChat
FnCallAgent --> Assistant
FnCallAgent --> ReActChat
Assistant --> ArticleAgent
Assistant --> VirtualMemoryAgent
FnCallAgent --> Memory
GroupChat --> MultiAgentHub
Agent --> Tools
Agent --> LLM
2.2 模块组件架构
graph TD
subgraph "核心组件"
Agent --> LLMInterface[LLM 接口]
Agent --> ToolRegistry[工具注册表]
Agent --> MessageProcessor[消息处理器]
Agent --> ResponseGenerator[响应生成器]
end
subgraph "外部依赖"
LLMInterface --> LLMModule[LLM 模块]
ToolRegistry --> ToolsModule[Tools 模块]
MessageProcessor --> MemoryModule[Memory 模块]
end
3. 核心类详细分析
3.1 Agent 抽象基类 (qwen_agent/agent.py
)
设计目的: 定义智能体的统一接口和通用行为
核心功能: 定义智能体的统一接口和基础功能
class Agent(ABC):
"""智能体抽象基类
设计原则:
- 统一的消息处理接口
- 流式输出支持
- 工具调用框架
- 错误处理机制
"""
def __init__(self, function_list=None, llm=None, system_message=None,
name=None, description=None, **kwargs):
"""
初始化智能体
参数:
- function_list: 工具列表,支持字符串、字典、对象三种格式
- llm: LLM 配置或实例
- system_message: 系统消息
- name: 智能体名称
- description: 智能体描述
"""
# LLM 初始化
if isinstance(llm, dict):
self.llm = get_chat_model(llm)
else:
self.llm = llm
# 工具初始化
self.function_map = {}
if function_list:
for tool in function_list:
self._init_tool(tool)
self.system_message = system_message
self.name = name
self.description = description
def run(self, messages: List[Union[Dict, Message]], **kwargs) -> Iterator[List[Message]]:
"""
智能体运行的统一入口
处理流程:
1. 消息格式标准化
2. 语言自动检测
3. 系统消息注入
4. 调用子类 _run 方法
5. 响应格式化和流式输出
"""
# 消息深拷贝,避免副作用
messages = copy.deepcopy(messages)
_return_message_type = 'dict'
new_messages = []
# 消息格式转换
for msg in messages:
if isinstance(msg, dict):
new_messages.append(Message(**msg))
else:
new_messages.append(msg)
_return_message_type = 'message'
# 自动语言检测
if 'lang' not in kwargs:
if has_chinese_messages(new_messages):
kwargs['lang'] = 'zh'
else:
kwargs['lang'] = 'en'
# 系统消息注入
if self.system_message:
if not new_messages or new_messages[0][ROLE] != SYSTEM:
new_messages.insert(0, Message(role=SYSTEM, content=self.system_message))
else:
# 智能合并系统消息
if isinstance(new_messages[0][CONTENT], str):
new_messages[0][CONTENT] = self.system_message + '\n\n' + new_messages[0][CONTENT]
# 调用子类实现
for rsp in self._run(messages=new_messages, **kwargs):
# 自动注入智能体名称
for i in range(len(rsp)):
if not rsp[i].name and self.name:
rsp[i].name = self.name
# 返回格式转换
if _return_message_type == 'message':
yield [Message(**x) if isinstance(x, dict) else x for x in rsp]
else:
yield [x.model_dump() if not isinstance(x, dict) else x for x in rsp]
@abstractmethod
def _run(self, messages: List[Message], **kwargs) -> Iterator[List[Message]]:
"""子类必须实现的核心逻辑"""
raise NotImplementedError
def _call_llm(self, messages: List[Message], functions: Optional[List[Dict]] = None,
stream: bool = True, extra_generate_cfg: Optional[dict] = None) -> Iterator[List[Message]]:
"""
LLM调用的统一接口
功能:
- 合并生成配置
- 调用LLM的chat方法
- 处理流式/非流式输出
"""
return self.llm.chat(
messages=messages,
functions=functions,
stream=stream,
extra_generate_cfg=merge_generate_cfgs(
base_generate_cfg=self.extra_generate_cfg,
new_generate_cfg=extra_generate_cfg,
)
)
def _call_tool(self, tool_name: str, tool_args: Union[str, dict] = '{}', **kwargs) -> Union[str, List[ContentItem]]:
"""
工具调用的统一接口
功能:
- 工具存在性检查
- 异常处理和日志记录
- 结果格式化
"""
if tool_name not in self.function_map:
return f'Tool {tool_name} does not exists.'
tool = self.function_map[tool_name]
try:
tool_result = tool.call(tool_args, **kwargs)
except (ToolServiceError, DocParserError) as ex:
raise ex
except Exception as ex:
exception_type = type(ex).__name__
exception_message = str(ex)
traceback_info = ''.join(traceback.format_tb(ex.__traceback__))
error_message = f'An error occurred when calling tool `{tool_name}`:\n' \
f'{exception_type}: {exception_message}\n' \
f'Traceback:\n{traceback_info}'
logger.warning(error_message)
return error_message
# 结果格式化
if isinstance(tool_result, str):
return tool_result
elif isinstance(tool_result, list) and all(isinstance(item, ContentItem) for item in tool_result):
return tool_result # 多模态工具结果
else:
return json.dumps(tool_result, ensure_ascii=False, indent=4)
def _detect_tool(self, message: Message) -> Tuple[bool, str, str, str]:
"""
工具调用检测
检查消息中是否包含function_call字段
Returns:
Tuple[bool, str, str, str]: (是否需要调用工具, 工具名, 工具参数, 文本回复)
"""
func_name = None
func_args = None
if message.function_call:
func_call = message.function_call
func_name = func_call.name
func_args = func_call.arguments
text = message.content
if not text:
text = ''
return (func_name is not None), func_name, func_args, text
关键设计特点:
- 类型安全: 支持多种输入格式,自动类型推断
- 无副作用: 深拷贝输入,保护原始数据
- 智能化: 自动语言检测、系统消息合并
- 流式处理: 支持实时响应输出
3.2 FnCallAgent 函数调用智能体 (qwen_agent/agents/fncall_agent.py
)
设计目的: 实现 LLM 与工具的交互循环
核心功能: 实现 LLM 与工具的交互循环
class FnCallAgent(Agent):
"""函数调用智能体
核心特性:
- 工具调用循环
- 记忆模块集成
- 流式输出支持
- 错误恢复机制
"""
def __init__(self, function_list=None, llm=None, system_message=None,
name=None, description=None, files=None, **kwargs):
"""
初始化函数调用智能体
新增参数:
- files: 初始文件列表,用于 RAG 检索
"""
super().__init__(function_list=function_list, llm=llm,
system_message=system_message, name=name, description=description)
# 记忆模块初始化
if not hasattr(self, 'mem'):
# 针对不同模型的优化配置
if 'qwq' in self.llm.model.lower() or 'qvq' in self.llm.model.lower():
if 'dashscope' in self.llm.model_type:
mem_llm = {
'model': 'qwen-turbo',
'model_type': 'qwen_dashscope',
'generate_cfg': {'max_input_tokens': 30000}
}
else:
mem_llm = None
else:
mem_llm = self.llm
self.mem = Memory(llm=mem_llm, files=files, **kwargs)
def _run(self, messages: List[Message], lang: str = 'en', **kwargs) -> Iterator[List[Message]]:
"""
函数调用的核心执行循环
执行策略:
1. 限制最大调用次数,防止无限循环
2. LLM 调用生成响应
3. 工具调用检测和执行
4. 结果整合和继续对话
5. 终止条件检查
"""
messages = copy.deepcopy(messages)
num_llm_calls_available = MAX_LLM_CALL_PER_RUN # 默认 10 次
response = []
while True and num_llm_calls_available > 0:
num_llm_calls_available -= 1
# 准备生成配置
extra_generate_cfg = {'lang': lang}
if kwargs.get('seed') is not None:
extra_generate_cfg['seed'] = kwargs['seed']
# 调用 LLM,传入函数定义
output_stream = self._call_llm(
messages=messages,
functions=[func.function for func in self.function_map.values()],
extra_generate_cfg=extra_generate_cfg
)
# 流式收集 LLM 输出
output: List[Message] = []
for output in output_stream:
if output:
yield response + output # 实时流式输出
if output:
response.extend(output)
messages.extend(output)
used_any_tool = False
# 检查并执行工具调用
for out in output:
use_tool, tool_name, tool_args, _ = self._detect_tool(out)
if use_tool:
try:
# 执行工具调用
tool_result = self._call_tool(tool_name, tool_args,
messages=messages, **kwargs)
# 创建标准化工具结果消息
fn_msg = Message(
role=FUNCTION,
name=tool_name,
content=tool_result,
extra={'function_id': out.extra.get('function_id', '1')}
)
messages.append(fn_msg)
response.append(fn_msg)
yield response
used_any_tool = True
except Exception as ex:
# 工具调用失败处理
error_msg = f"Tool {tool_name} failed: {str(ex)}"
fn_msg = Message(role=FUNCTION, name=tool_name, content=error_msg)
messages.append(fn_msg)
response.append(fn_msg)
yield response
used_any_tool = True
# 终止条件:无工具调用或达到最大次数
if not used_any_tool:
break
yield response
执行时序图:
sequenceDiagram
participant User as 用户
participant Agent as FnCallAgent
participant LLM as 大语言模型
participant Tool as 工具系统
User->>Agent: 发送消息
Agent->>Agent: 初始化循环计数器
loop 工具调用循环 (最多10次)
Agent->>LLM: 发送消息 + 函数定义
LLM-->>Agent: 返回响应 (可能含工具调用)
Agent->>Agent: 流式输出响应
alt 包含工具调用
Agent->>Tool: 执行工具
Tool-->>Agent: 返回工具结果
Agent->>Agent: 添加工具结果到历史
Agent->>Agent: 继续循环
else 无工具调用
Agent->>Agent: 结束循环
end
end
Agent-->>User: 返回最终响应
3.3 Assistant 通用助手智能体 (qwen_agent/agents/assistant.py
)
核心功能: 集成 RAG 检索和工具调用的通用助手
class Assistant(FnCallAgent):
"""通用助手智能体
核心特性:
- RAG 知识检索
- 知识注入和格式化
- 多文件支持
- 智能知识筛选
"""
def __init__(self, function_list=None, llm=None, system_message=None,
name=None, description=None, files=None, rag_cfg=None, **kwargs):
"""
初始化通用助手
新增参数:
- rag_cfg: RAG 配置参数
"""
super().__init__(function_list=function_list, llm=llm,
system_message=system_message, name=name,
description=description, files=files, **kwargs)
self.rag_cfg = rag_cfg or {}
def _run(self, messages: List[Message], lang: str = 'en',
max_ref_token: int = 4000, **kwargs) -> Iterator[List[Message]]:
"""
Assistant 的核心执行逻辑
执行流程:
1. 文件内容处理和格式化
2. RAG 知识检索
3. 知识注入到系统消息
4. 调用父类函数调用逻辑
"""
# 文件内容处理
messages = self._format_file(messages, lang=lang)
# RAG 知识检索
if self.rag_cfg and any(self._has_file_content(msg) for msg in messages):
try:
# 调用记忆模块进行检索
*_, last = self.mem.run(messages=messages,
max_ref_token=max_ref_token, **kwargs)
if last:
# 格式化检索到的知识
knowledge = format_knowledge_to_source_and_content(last[-1].content)
if knowledge:
# 注入知识到系统消息
knowledge_str = self._format_knowledge_to_str(knowledge, lang=lang)
messages = self._prepend_knowledge_prompt(messages, knowledge_str, lang=lang)
except Exception as ex:
logger.warning(f'RAG failed: {ex}')
# 调用父类的函数调用逻辑
yield from super()._run(messages, lang=lang, **kwargs)
def _format_knowledge_to_str(self, knowledge: List[dict], lang: str = 'en') -> str:
"""
将检索到的知识格式化为字符串
格式化策略:
- 按来源分组
- 添加引用信息
- 多语言模板支持
"""
knowledge_str = ''
for k in knowledge:
if lang == 'zh':
snippet = f"来源: {k['source']}\n内容: {k['content']}\n\n"
else:
snippet = f"Source: {k['source']}\nContent: {k['content']}\n\n"
knowledge_str += snippet
if lang == 'zh':
template = f"# 知识库\n\n以下是相关的背景知识,请基于这些信息回答用户问题:\n\n{knowledge_str}"
else:
template = f"# Knowledge Base\n\nHere is the relevant background knowledge. Please answer user questions based on this information:\n\n{knowledge_str}"
return template
def _prepend_knowledge_prompt(self, messages: List[Message],
knowledge_str: str, lang: str = 'en') -> List[Message]:
"""
将知识注入到消息中
注入策略:
- 优先注入到系统消息
- 支持多模态内容
- 保持消息结构完整性
"""
messages = copy.deepcopy(messages)
if messages and messages[0].role == SYSTEM:
# 合并到现有系统消息
if isinstance(messages[0].content, str):
messages[0].content = messages[0].content + '\n\n' + knowledge_str
else:
messages[0].content.append(ContentItem(text=knowledge_str))
else:
# 创建新的系统消息
messages.insert(0, Message(role=SYSTEM, content=knowledge_str))
return messages
RAG 处理流程图:
flowchart TD
Start([开始处理]) --> ExtractFiles[提取消息中的文件]
ExtractFiles --> CheckRAG{是否启用RAG?}
CheckRAG -->|否| CallParent[调用父类逻辑]
CheckRAG -->|是| HasFiles{是否有文件?}
HasFiles -->|否| CallParent
HasFiles -->|是| CallMemory[调用Memory模块]
CallMemory --> RetrieveKnowledge[检索相关知识]
RetrieveKnowledge --> FormatKnowledge[格式化知识内容]
FormatKnowledge --> InjectKnowledge[注入到系统消息]
InjectKnowledge --> CallParent
CallParent --> End([结束处理])
3.4 ReActChat 推理行动智能体 (qwen_agent/agents/react_chat.py
)
核心功能: 实现推理-行动-观察循环模式
class ReActChat(FnCallAgent):
"""ReAct 推理行动智能体
核心特性:
- 推理-行动-观察循环
- 思维链推理
- 工具使用策略
- 自我反思机制
"""
def _run(self, messages: List[Message], lang: str = 'en', **kwargs) -> Iterator[List[Message]]:
"""
ReAct 模式的执行逻辑
ReAct 循环:
1. Thought: 分析当前情况,制定行动计划
2. Action: 选择并执行工具
3. Observation: 观察工具执行结果
4. 重复直到得出最终答案
"""
messages = copy.deepcopy(messages)
# 添加 ReAct 提示模板
react_prompt = self._get_react_prompt(lang)
if messages and messages[0].role == SYSTEM:
messages[0].content += '\n\n' + react_prompt
else:
messages.insert(0, Message(role=SYSTEM, content=react_prompt))
# 执行 ReAct 循环
max_iterations = 10
iteration = 0
response = []
while iteration < max_iterations:
iteration += 1
# 调用 LLM 进行推理
output_stream = self._call_llm(
messages=messages + response,
functions=[func.function for func in self.function_map.values()],
extra_generate_cfg={'lang': lang}
)
# 收集 LLM 输出
output: List[Message] = []
for output in output_stream:
if output:
yield response + output
if output:
response.extend(output)
# 检查是否需要工具调用
last_message = output[-1]
use_tool, tool_name, tool_args, thought = self._detect_tool(last_message)
if use_tool:
# 执行工具(Action)
try:
tool_result = self._call_tool(tool_name, tool_args, **kwargs)
# 添加观察结果(Observation)
observation_msg = Message(
role=FUNCTION,
name=tool_name,
content=f"Observation: {tool_result}"
)
response.append(observation_msg)
yield response
except Exception as ex:
# 处理工具执行失败
error_msg = Message(
role=FUNCTION,
name=tool_name,
content=f"Observation: Tool execution failed - {str(ex)}"
)
response.append(error_msg)
yield response
else:
# 无工具调用,检查是否得出最终答案
if self._is_final_answer(last_message.content):
break
yield response
def _get_react_prompt(self, lang: str) -> str:
"""
获取 ReAct 提示模板
模板包含:
- 推理模式说明
- 工具使用指南
- 输出格式要求
"""
if lang == 'zh':
return """你是一个能够进行推理和行动的AI助手。请按照以下格式思考和回答:
思考: 分析当前情况,思考需要采取什么行动
行动: 选择合适的工具来获取信息或执行任务
观察: 观察工具执行的结果
... (重复思考-行动-观察循环)
最终答案: 基于所有信息给出最终回答
可用工具: {tools}
请始终遵循"思考-行动-观察"的循环模式,直到能够给出最终答案。"""
else:
return """You are an AI assistant capable of reasoning and taking actions. Please think and respond in the following format:
Thought: Analyze the current situation and think about what action to take
Action: Choose an appropriate tool to get information or perform tasks
Observation: Observe the results of tool execution
... (repeat Thought-Action-Observation cycle)
Final Answer: Give the final answer based on all information
Available tools: {tools}
Always follow the "Thought-Action-Observation" cycle until you can provide a final answer."""
def _is_final_answer(self, content: str) -> bool:
"""
检查是否为最终答案
检查策略:
- 关键词匹配
- 格式识别
- 内容完整性
"""
final_keywords = ['最终答案', 'Final Answer', '答案是', 'The answer is']
return any(keyword in content for keyword in final_keywords)
3.5 GroupChat 多智能体协作 (qwen_agent/agents/group_chat.py
)
核心功能: 管理多个智能体的协作对话
class GroupChat(Agent, MultiAgentHub):
"""多智能体群聊
核心特性:
- 智能体轮换机制
- 对话流程控制
- 角色分工协作
- 终止条件判断
"""
def __init__(self, agents: List[Agent], max_turns: int = 10, **kwargs):
"""
初始化群聊
参数:
- agents: 参与群聊的智能体列表
- max_turns: 最大轮次限制
"""
super().__init__(**kwargs)
self.agents = agents
self.max_turns = max_turns
self.current_turn = 0
def _run(self, messages: List[Message], **kwargs) -> Iterator[List[Message]]:
"""
群聊执行逻辑
执行流程:
1. 选择下一个发言的智能体
2. 让选中的智能体处理消息
3. 收集响应并更新对话历史
4. 检查终止条件
5. 重复直到达到终止条件
"""
conversation_history = copy.deepcopy(messages)
response = []
while self.current_turn < self.max_turns:
self.current_turn += 1
# 选择下一个发言的智能体
next_agent = self._select_next_agent(conversation_history)
if not next_agent:
break
# 让选中的智能体处理消息
agent_response = []
for agent_output in next_agent.run(conversation_history, **kwargs):
agent_response = agent_output
yield response + agent_response
if agent_response:
# 更新对话历史
conversation_history.extend(agent_response)
response.extend(agent_response)
# 检查是否应该终止对话
if self._should_terminate(conversation_history):
break
yield response
def _select_next_agent(self, messages: List[Message]) -> Optional[Agent]:
"""
选择下一个发言的智能体
选择策略:
- 轮询模式
- 基于内容的智能选择
- 专家系统路由
"""
if not self.agents:
return None
# 简单轮询策略
agent_index = (self.current_turn - 1) % len(self.agents)
return self.agents[agent_index]
def _should_terminate(self, messages: List[Message]) -> bool:
"""
判断是否应该终止对话
终止条件:
- 达到最大轮次
- 任务完成标志
- 所有智能体无响应
"""
if self.current_turn >= self.max_turns:
return True
# 检查最后一条消息是否包含终止标志
if messages:
last_content = messages[-1].content
if isinstance(last_content, str):
terminate_keywords = ['任务完成', 'Task completed', '对话结束', 'Conversation ended']
return any(keyword in last_content for keyword in terminate_keywords)
return False
4. 模块交互时序
4.1 智能体初始化时序
sequenceDiagram
participant Client as 客户端
participant Agent as 智能体
participant LLMFactory as LLM工厂
participant ToolRegistry as 工具注册表
participant Memory as 记忆模块
Client->>Agent: 创建智能体实例
Agent->>LLMFactory: 初始化LLM
LLMFactory-->>Agent: 返回 LLM实例
Agent->>ToolRegistry: 注册工具列表
ToolRegistry-->>Agent: 返回工具实例
Agent->>Memory: 初始化记忆模块
Memory-->>Agent: 返回记忆实例
Agent-->>Client: 智能体就绪
4.2 Assistant RAG + 工具调用时序
sequenceDiagram
participant User as 用户
participant Assistant as Assistant智能体
participant Memory as Memory模块
participant LLM as 大语言模型
participant Tool as 工具系统
User->>Assistant: 发送消息+文件
Note over Assistant: RAG检索阶段
Assistant->>Assistant: 提取文件列表
Assistant->>Memory: run(messages, max_ref_token)
Memory->>Memory: 文档分块和向量检索
Memory-->>Assistant: 返回相关知识片段
Note over Assistant: 知识注入
Assistant->>Assistant: format_knowledge_to_str()
Assistant->>Assistant: _prepend_knowledge_prompt()
Note over Assistant: LLM调用
Assistant->>LLM: chat(messages, functions)
LLM-->>Assistant: 返回响应(含工具调用)
Note over Assistant: 工具调用处理
Assistant->>Assistant: _detect_tool()
Assistant->>Tool: _call_tool(name, args)
Tool-->>Assistant: 返回工具结果
Assistant->>LLM: 继续对话(含工具结果)
LLM-->>Assistant: 返回最终响应
Assistant-->>User: 流式返回完整回复
4.3 多智能体协作时序
sequenceDiagram
participant User as 用户
participant GroupChat as 群聊管理器
participant Agent1 as 智能体1
participant Agent2 as 智能体2
participant Agent3 as 智能体3
User->>GroupChat: 发起群聊
GroupChat->>GroupChat: 选择首个发言者
loop 对话轮次
GroupChat->>Agent1: 处理消息
Agent1-->>GroupChat: 返回响应
GroupChat->>GroupChat: 更新对话历史
GroupChat->>Agent2: 处理消息
Agent2-->>GroupChat: 返回响应
GroupChat->>GroupChat: 更新对话历史
GroupChat->>Agent3: 处理消息
Agent3-->>GroupChat: 返回响应
GroupChat->>GroupChat: 检查终止条件
end
GroupChat-->>User: 返回最终结果
5. 性能优化策略
5.1 内存管理优化
class OptimizedAgent(Agent):
"""内存优化的智能体实现"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._message_cache = {}
self._response_cache = {}
def _run(self, messages: List[Message], **kwargs) -> Iterator[List[Message]]:
# 消息去重和缓存
message_hash = self._hash_messages(messages)
if message_hash in self._response_cache:
yield self._response_cache[message_hash]
return
# 限制对话历史长度
if len(messages) > MAX_HISTORY_LENGTH:
messages = self._truncate_messages(messages)
# 执行核心逻辑
response = []
for chunk in super()._run(messages, **kwargs):
response = chunk
yield chunk
# 缓存响应
self._response_cache[message_hash] = response
def _truncate_messages(self, messages: List[Message]) -> List[Message]:
"""智能截断对话历史"""
# 保留系统消息和最近的对话
system_messages = [msg for msg in messages if msg.role == SYSTEM]
recent_messages = messages[-MAX_RECENT_MESSAGES:]
return system_messages + recent_messages
5.2 并发处理优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ConcurrentAgent(Agent):
"""支持并发处理的智能体"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.executor = ThreadPoolExecutor(max_workers=4)
async def run_async(self, messages: List[Message], **kwargs) -> List[Message]:
"""异步运行智能体"""
loop = asyncio.get_event_loop()
# 并发执行多个任务
tasks = []
if self.function_map:
# 预热工具
for tool_name, tool in self.function_map.items():
task = loop.run_in_executor(self.executor, tool.warm_up)
tasks.append(task)
# 等待预热完成
if tasks:
await asyncio.gather(*tasks)
# 执行主逻辑
result = await loop.run_in_executor(
self.executor,
lambda: list(self.run(messages, **kwargs))[-1]
)
return result
6. 扩展开发指南
6.1 自定义智能体开发
class CustomAgent(FnCallAgent):
"""自定义智能体开发模板"""
def __init__(self, custom_config: dict = None, **kwargs):
super().__init__(**kwargs)
self.custom_config = custom_config or {}
def _run(self, messages: List[Message], **kwargs) -> Iterator[List[Message]]:
"""
自定义执行逻辑
开发步骤:
1. 消息预处理
2. 自定义逻辑实现
3. 调用父类或重写核心逻辑
4. 响应后处理
"""
# 1. 消息预处理
messages = self._preprocess_messages(messages)
# 2. 自定义逻辑
if self._should_use_custom_logic(messages):
yield from self._custom_logic(messages, **kwargs)
else:
# 3. 调用父类逻辑
yield from super()._run(messages, **kwargs)
def _preprocess_messages(self, messages: List[Message]) -> List[Message]:
"""自定义消息预处理"""
# 实现自定义预处理逻辑
return messages
def _should_use_custom_logic(self, messages: List[Message]) -> bool:
"""判断是否使用自定义逻辑"""
# 实现判断逻辑
return False
def _custom_logic(self, messages: List[Message], **kwargs) -> Iterator[List[Message]]:
"""自定义核心逻辑"""
# 实现自定义逻辑
yield [Message(role='assistant', content='Custom response')]
6.2 智能体插件系统
class AgentPlugin(ABC):
"""智能体插件基类"""
@abstractmethod
def before_run(self, agent: Agent, messages: List[Message]) -> List[Message]:
"""运行前钩子"""
pass
@abstractmethod
def after_run(self, agent: Agent, response: List[Message]) -> List[Message]:
"""运行后钩子"""
pass
class LoggingPlugin(AgentPlugin):
"""日志插件"""
def before_run(self, agent: Agent, messages: List[Message]) -> List[Message]:
logger.info(f"Agent {agent.name} processing {len(messages)} messages")
return messages
def after_run(self, agent: Agent, response: List[Message]) -> List[Message]:
logger.info(f"Agent {agent.name} generated {len(response)} responses")
return response
class PluginManager:
"""插件管理器"""
def __init__(self):
self.plugins: List[AgentPlugin] = []
def register_plugin(self, plugin: AgentPlugin):
self.plugins.append(plugin)
def apply_before_hooks(self, agent: Agent, messages: List[Message]) -> List[Message]:
for plugin in self.plugins:
messages = plugin.before_run(agent, messages)
return messages
def apply_after_hooks(self, agent: Agent, response: List[Message]) -> List[Message]:
for plugin in self.plugins:
response = plugin.after_run(agent, response)
return response
7. 关键设计模式
7.1 模板方法模式
Agent基类定义了run()
方法的通用流程,子类实现_run()
方法的具体逻辑:
# 基类定义模板
class Agent(ABC):
def run(self, messages, **kwargs):
# 通用预处理
messages = self._preprocess(messages)
# 调用子类实现
for response in self._run(messages, **kwargs):
yield self._postprocess(response)
@abstractmethod
def _run(self, messages, **kwargs):
# 子类实现具体逻辑
pass
# 子类实现具体逻辑
class Assistant(FnCallAgent):
def _run(self, messages, **kwargs):
# RAG检索 + 函数调用逻辑
pass
7.2 策略模式
不同智能体采用不同的执行策略:
class AgentStrategy(ABC):
@abstractmethod
def execute(self, messages, **kwargs):
pass
class RAGStrategy(AgentStrategy):
def execute(self, messages, **kwargs):
# RAG检索策略
pass
class ReActStrategy(AgentStrategy):
def execute(self, messages, **kwargs):
# ReAct推理策略
pass
7.3 组合模式
智能体组合不同的能力模块:
class Agent:
def __init__(self):
self.llm_module = LLMModule()
self.tool_module = ToolModule()
self.memory_module = MemoryModule()
def _run(self, messages, **kwargs):
# 组合各模块能力
knowledge = self.memory_module.retrieve(messages)
response = self.llm_module.generate(messages + knowledge)
if self.tool_module.needs_tool(response):
result = self.tool_module.execute(response)
return self.llm_module.generate(messages + [result])
return response
验收清单
- 智能体继承体系分析完整
- 核心类详细代码分析
- 模块交互时序图清晰
- 性能优化策略实用
- 扩展开发指南完整
- 架构图和流程图准确
- 关键函数代码贴出并注释
- 设计模式和最佳实践说明
这个智能体模块文档详细分析了Qwen-Agent中智能体的设计理念、继承体系、核心实现和执行流程,为开发者提供了深入理解和扩展智能体功能的技术指南。