Qwen-Agent 工具系统完整分析
1. 工具系统概述
1.1 设计理念
Qwen-Agent 的工具系统采用插件化架构,通过统一的接口规范实现工具的注册、发现和调用。系统支持内置工具和自定义工具,提供了丰富的功能扩展能力。
1.2 核心特性
- 插件化架构: 基于装饰器的工具注册机制
- 统一接口: 标准化的工具调用接口
- 类型安全: 严格的参数类型定义和验证
- 异步支持: 支持同步和异步工具执行
- 错误处理: 完善的异常处理和恢复机制
- 扩展性: 易于添加新工具和功能
1.3 职责与边界
负责:
- 工具的注册、管理和执行
- 统一的工具调用接口和参数验证
- 内置工具的实现(代码解释器、网络搜索、文档解析等)
- 工具执行环境的管理和隔离
- 工具结果的格式化和返回
不负责:
- 工具调用的决策逻辑(委托给 Agent 模块)
- LLM 与工具的交互协议(委托给 LLM 模块)
- 工具执行结果的后续处理(委托给 Agent 模块)
- 用户界面的展示(委托给 GUI 模块)
依赖关系:
- 执行环境: Jupyter 内核、Python 解释器、系统命令
- 外部服务: 搜索引擎 API、图像生成服务、天气服务
- 文件系统: 工作目录、临时文件、静态资源
- 网络服务: HTTP 客户端、WebSocket 连接
1.4 工具分类体系
graph TB
subgraph "工具分类体系"
Tools[工具系统]
subgraph "按功能分类"
ComputeTools[计算工具]
SearchTools[搜索工具]
ParseTools[解析工具]
GenerateTools[生成工具]
IntegrationTools[集成工具]
end
subgraph "按执行环境分类"
LocalTools[本地工具]
RemoteTools[远程工具]
CloudTools[云端工具]
end
subgraph "按数据类型分类"
TextTools[文本工具]
ImageTools[图像工具]
AudioTools[音频工具]
VideoTools[视频工具]
MultimodalTools[多模态工具]
end
end
Tools --> ComputeTools
Tools --> SearchTools
Tools --> ParseTools
Tools --> GenerateTools
Tools --> IntegrationTools
ComputeTools --> LocalTools
SearchTools --> RemoteTools
GenerateTools --> CloudTools
ParseTools --> TextTools
GenerateTools --> ImageTools
IntegrationTools --> MultimodalTools
工具类别 | 工具名称 | 主要功能 | 适用场景 |
---|---|---|---|
代码执行 | CodeInterpreter | Python代码执行 | 数据分析、计算 |
代码执行 | PythonExecutor | 安全Python执行 | 代码验证、测试 |
文档处理 | DocParser | 多格式文档解析 | 文档问答、知识提取 |
文档处理 | SimpleDocParser | 简化文档解析 | 快速文本提取 |
网络功能 | WebSearch | 网络搜索 | 信息检索、实时查询 |
网络功能 | WebExtractor | 网页内容提取 | 网页分析、内容抓取 |
多媒体 | ImageGen | AI图像生成 | 创意设计、插图生成 |
多媒体 | ImageSearch | 图像搜索 | 图片查找、视觉搜索 |
检索系统 | Retrieval | RAG检索 | 知识问答、文档检索 |
检索系统 | KeywordSearch | 关键词搜索 | 精确匹配搜索 |
检索系统 | VectorSearch | 向量搜索 | 语义相似搜索 |
检索系统 | HybridSearch | 混合搜索 | 综合检索策略 |
生活服务 | AmapWeather | 天气查询 | 天气信息获取 |
系统功能 | Storage | 存储管理 | 文件存储、管理 |
扩展集成 | MCPManager | MCP工具管理 | 第三方工具集成 |
2. 工具系统架构
2.1 整体架构图
graph TB
subgraph "工具注册层 (Tool Registration Layer)"
TOOL_REGISTRY[全局工具注册表<br/>TOOL_REGISTRY]
RegisterDecorator[@register_tool<br/>装饰器注册]
ToolDiscovery[工具发现机制<br/>动态加载]
end
subgraph "工具抽象层 (Tool Abstraction Layer)"
BaseTool[BaseTool<br/>工具基类]
ToolInterface[工具接口规范<br/>call() 方法]
ToolMetadata[工具元数据<br/>name, description, parameters]
end
subgraph "工具实现层 (Tool Implementation Layer)"
subgraph "计算工具"
CodeInterpreter[代码解释器<br/>Python 执行]
MathSolver[数学求解器<br/>符号计算]
end
subgraph "搜索工具"
WebSearch[网络搜索<br/>Bing/Google API]
DocumentSearch[文档搜索<br/>本地检索]
end
subgraph "解析工具"
PDFParser[PDF 解析器<br/>文档提取]
ImageParser[图像解析器<br/>OCR 识别]
AudioParser[音频解析器<br/>语音转文本]
end
subgraph "生成工具"
ImageGenerator[图像生成器<br/>AI 绘画]
TextGenerator[文本生成器<br/>模板填充]
end
subgraph "集成工具"
MCPTools[MCP 工具<br/>第三方集成]
APIConnector[API 连接器<br/>外部服务]
end
end
subgraph "执行环境层 (Execution Environment Layer)"
JupyterKernel[Jupyter 内核<br/>代码执行环境]
DockerContainer[Docker 容器<br/>隔离执行环境]
ProcessManager[进程管理器<br/>资源控制]
SecuritySandbox[安全沙箱<br/>权限控制]
end
subgraph "外部服务层 (External Service Layer)"
SearchEngines[搜索引擎<br/>Bing/Google/DuckDuckGo]
AIServices[AI 服务<br/>图像生成/语音识别]
CloudAPIs[云端 API<br/>天气/地图/翻译]
DatabaseServices[数据库服务<br/>向量数据库/关系数据库]
end
%% 注册关系
RegisterDecorator --> TOOL_REGISTRY
ToolDiscovery --> TOOL_REGISTRY
%% 抽象关系
BaseTool --> ToolInterface
BaseTool --> ToolMetadata
%% 实现关系
BaseTool --> CodeInterpreter
BaseTool --> WebSearch
BaseTool --> PDFParser
BaseTool --> ImageGenerator
BaseTool --> MCPTools
%% 执行环境
CodeInterpreter --> JupyterKernel
ImageGenerator --> DockerContainer
WebSearch --> ProcessManager
PDFParser --> SecuritySandbox
%% 外部服务
WebSearch --> SearchEngines
ImageGenerator --> AIServices
DocumentSearch --> DatabaseServices
MCPTools --> CloudAPIs
2.2 工具系统架构图
graph TB
subgraph "工具抽象层"
BaseTool[BaseTool<br/>工具基类]
ToolRegistry[TOOL_REGISTRY<br/>全局注册表]
RegisterDecorator[@register_tool<br/>注册装饰器]
ToolFactory[get_tool<br/>工具工厂]
BaseToolWithFileAccess[BaseToolWithFileAccess<br/>文件访问工具基类]
end
subgraph "核心工具集"
CodeInterpreter[CodeInterpreter<br/>代码解释器]
WebSearch[WebSearch<br/>网络搜索]
DocParser[DocParser<br/>文档解析]
ImageGen[ImageGen<br/>图像生成]
Retrieval[Retrieval<br/>RAG检索]
AmapWeather[AmapWeather<br/>天气查询]
end
subgraph "扩展工具集"
MCPManager[MCPManager<br/>MCP工具管理]
CustomTool[CustomTool<br/>自定义工具]
PluginLoader[PluginLoader<br/>插件加载器]
end
subgraph "执行环境"
JupyterKernel[Jupyter内核<br/>代码执行]
ProcessPool[进程池<br/>并发执行]
ResourceManager[资源管理器<br/>内存/CPU限制]
SecurityManager[安全管理器<br/>权限控制]
end
subgraph "外部依赖"
SearchAPI[搜索API<br/>Bing/Google]
ImageAPI[图像API<br/>DALL-E/Midjourney]
WeatherAPI[天气API<br/>高德地图]
VectorDB[向量数据库<br/>文档检索]
end
%% 注册关系
RegisterDecorator --> ToolRegistry
BaseTool --> ToolRegistry
ToolFactory --> ToolRegistry
%% 继承关系
BaseTool --> BaseToolWithFileAccess
BaseToolWithFileAccess --> DocParser
BaseToolWithFileAccess --> Retrieval
%% 实现关系
BaseTool --> CodeInterpreter
BaseTool --> WebSearch
BaseTool --> ImageGen
BaseTool --> AmapWeather
%% 扩展关系
ToolRegistry --> MCPManager
ToolRegistry --> CustomTool
PluginLoader --> CustomTool
%% 执行依赖
CodeInterpreter --> JupyterKernel
CodeInterpreter --> ProcessPool
BaseTool --> ResourceManager
BaseTool --> SecurityManager
%% 外部调用
WebSearch --> SearchAPI
ImageGen --> ImageAPI
AmapWeather --> WeatherAPI
Retrieval --> VectorDB
3. 核心工具详细分析
3.1 BaseTool 抽象基类
位置: qwen_agent/tools/base.py:45-55
3.1.1 工具接口规范
class BaseTool(ABC):
"""
工具抽象基类
设计模式:
- 命令模式: 每个工具是一个可执行的命令
- 注册表模式: 通过装饰器自动注册工具
- 模板方法模式: call() 定义执行流程
核心职责:
- 定义工具统一接口
- 参数验证和类型转换
- 结果格式化
- 错误处理
"""
# 工具元数据
name: str = '' # 工具名称
description: str = '' # 工具描述
parameters: List[Dict] = [] # 参数定义
# 配置属性
timeout: int = 30 # 超时时间
max_retries: int = 3 # 最大重试次数
cache_enabled: bool = True # 是否启用缓存
@abstractmethod
def call(self, params: str, **kwargs) -> Union[str, List[ContentItem]]:
"""
工具执行接口 - 子类必须实现
参数规范:
- params: JSON 格式的参数字符串
- **kwargs: 额外的上下文参数
- messages: 当前对话历史
- lang: 语言设置
- max_output_length: 最大输出长度
返回值规范:
- str: 文本结果
- List[ContentItem]: 多模态结果
异常处理:
- ToolServiceError: 工具服务错误
- ValueError: 参数格式错误
- TimeoutError: 执行超时
"""
raise NotImplementedError
@property
def function(self) -> Dict:
"""
生成 OpenAI 函数调用格式的工具定义
输出格式:
{
"name": "工具名称",
"description": "工具描述",
"parameters": {
"type": "object",
"properties": {
"param1": {"type": "string", "description": "参数1描述"},
"param2": {"type": "number", "description": "参数2描述"}
},
"required": ["param1"]
}
}
"""
return {
'name': self.name,
'description': self.description,
'parameters': {
'type': 'object',
'properties': {
param['name']: {
'type': param['type'],
'description': param['description']
} for param in self.parameters
},
'required': [
param['name'] for param in self.parameters
if param.get('required', False)
]
}
}
def validate_parameters(self, params: Dict) -> Dict:
"""参数验证"""
validated = {}
for param_def in self.parameters:
name = param_def['name']
param_type = param_def['type']
required = param_def.get('required', False)
if name not in params:
if required:
raise ValueError(f"Missing required parameter: {name}")
continue
value = params[name]
# 类型验证
if param_type == 'string' and not isinstance(value, str):
raise ValueError(f"Parameter {name} must be a string")
elif param_type == 'number' and not isinstance(value, (int, float)):
raise ValueError(f"Parameter {name} must be a number")
elif param_type == 'boolean' and not isinstance(value, bool):
raise ValueError(f"Parameter {name} must be a boolean")
validated[name] = value
return validated
# 工具注册机制
TOOL_REGISTRY: Dict[str, Type[BaseTool]] = {}
def register_tool(name: str):
"""
工具注册装饰器
功能:
- 自动注册工具到全局注册表
- 设置工具名称
- 支持运行时发现
使用方式:
@register_tool('my_tool')
class MyTool(BaseTool):
pass
"""
def decorator(cls):
cls.name = name
TOOL_REGISTRY[name] = cls
return cls
return decorator
3.2 CodeInterpreter - 代码解释器
位置: qwen_agent/tools/code_interpreter.py:180-220
3.2.1 核心执行逻辑
@register_tool('code_interpreter')
class CodeInterpreter(BaseTool):
"""
Python 代码解释器工具
核心功能:
- 安全的 Python 代码执行
- Jupyter 内核管理
- 多媒体输出支持
- 执行结果收集
安全机制:
- 隔离执行环境
- 资源限制
- 超时控制
- 危险函数过滤
"""
description = 'Python代码解释器,可以运行Python代码并返回结果'
parameters = [{
'name': 'code',
'type': 'string',
'description': '要执行的Python代码',
'required': True
}]
def __init__(self):
super().__init__()
self.kernel_manager = None
self.kernel_client = None
self.timeout = 30 # 默认超时30秒
def call(self, params: str, **kwargs) -> str:
"""
代码执行的核心实现
执行流程:
1. 参数解析与验证
2. Jupyter 内核初始化
3. 代码安全检查
4. 异步执行代码
5. 结果收集与格式化
6. 多媒体文件处理
性能优化:
- 内核复用
- 异步执行
- 流式输出
- 结果缓存
"""
try:
# === 第一步:参数解析 ===
import json5
args = json5.loads(params)
code = args.get('code', '').strip()
if not code:
return json5.dumps({
'error': 'Empty code provided'
}, ensure_ascii=False)
# === 第二步:安全检查 ===
if self._is_dangerous_code(code):
return json5.dumps({
'error': 'Dangerous code detected and blocked'
}, ensure_ascii=False)
# === 第三步:内核管理 ===
if not self.kernel_manager:
self._initialize_kernel()
# === 第四步:代码执行 ===
execution_result = self._execute_code_safely(code)
# === 第五步:结果处理 ===
formatted_result = self._format_execution_result(execution_result)
return json5.dumps(formatted_result, ensure_ascii=False)
except Exception as e:
error_result = {
'error': str(e),
'traceback': traceback.format_exc()
}
return json5.dumps(error_result, ensure_ascii=False)
def _initialize_kernel(self):
"""
初始化 Jupyter 内核
配置选项:
- 内核类型:Python 3
- 工作目录:代码解释器专用目录
- 资源限制:内存、CPU 限制
- 安全配置:禁用危险模块
"""
from jupyter_client import KernelManager
# 创建内核管理器
self.kernel_manager = KernelManager(kernel_name='python3')
# 启动内核
self.kernel_manager.start_kernel()
# 获取客户端
self.kernel_client = self.kernel_manager.client()
# 等待内核就绪
self.kernel_client.wait_for_ready(timeout=10)
# 初始化工作环境
init_code = """
import sys
import os
import matplotlib
matplotlib.use('Agg') # 非交互式后端
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# 设置工作目录
os.chdir(os.environ.get('M6_CODE_INTERPRETER_WORK_DIR', '.'))
# 配置输出
plt.rcParams['figure.figsize'] = (10, 6)
plt.rcParams['font.size'] = 12
"""
self.kernel_client.execute(init_code, silent=True)
def _execute_code_safely(self, code: str) -> Dict:
"""
安全执行代码
执行策略:
- 异步执行,避免阻塞
- 超时控制,防止无限循环
- 输出收集,包括 stdout、stderr、display_data
- 异常捕获,详细错误信息
返回结果:
{
'stdout': '标准输出',
'stderr': '错误输出',
'display_data': [显示数据列表],
'execution_count': 执行计数,
'error': 错误信息(如有)
}
"""
# 执行代码
msg_id = self.kernel_client.execute(code)
# 收集执行结果
result = {
'stdout': '',
'stderr': '',
'display_data': [],
'execution_count': 0,
'error': None
}
# 监听消息
start_time = time.time()
while True:
try:
# 设置超时
remaining_time = self.timeout - (time.time() - start_time)
if remaining_time <= 0:
raise TimeoutError(f"Code execution timeout after {self.timeout}s")
# 获取消息
msg = self.kernel_client.get_iopub_msg(timeout=min(remaining_time, 1.0))
msg_type = msg['header']['msg_type']
if msg_type == 'stream':
# 标准输出/错误
stream_name = msg['content']['name']
text = msg['content']['text']
result[stream_name] += text
elif msg_type == 'display_data':
# 显示数据(图片、表格等)
data = msg['content']['data']
result['display_data'].append(data)
elif msg_type == 'execute_result':
# 执行结果
data = msg['content']['data']
result['display_data'].append(data)
result['execution_count'] = msg['content']['execution_count']
elif msg_type == 'error':
# 执行错误
result['error'] = {
'ename': msg['content']['ename'],
'evalue': msg['content']['evalue'],
'traceback': msg['content']['traceback']
}
break
elif msg_type == 'execute_reply':
# 执行完成
if msg['content']['status'] == 'ok':
result['execution_count'] = msg['content']['execution_count']
break
except Exception as e:
if "timeout" in str(e).lower():
result['error'] = {'ename': 'TimeoutError', 'evalue': str(e)}
break
return result
def _format_execution_result(self, result: Dict) -> Dict:
"""
格式化执行结果
处理内容:
1. 文本输出格式化
2. 图像文件保存
3. 数据表格处理
4. 错误信息美化
输出优化:
- 截断过长输出
- 图像 URL 生成
- 多媒体内容整合
"""
formatted = {
'stdout': result.get('stdout', ''),
'stderr': result.get('stderr', ''),
'execution_count': result.get('execution_count', 0),
'files': []
}
# 处理错误信息
if result.get('error'):
error = result['error']
formatted['error'] = {
'type': error.get('ename', 'Error'),
'message': error.get('evalue', ''),
'traceback': '\n'.join(error.get('traceback', []))
}
# 处理显示数据
if result.get('display_data'):
for data in result['display_data']:
# 处理图像
if 'image/png' in data:
image_data = data['image/png']
image_path = self._save_image(image_data)
if image_path:
formatted['files'].append({
'type': 'image',
'path': image_path,
'url': f'/static/{os.path.basename(image_path)}'
})
# 处理 HTML 表格
if 'text/html' in data:
html_content = data['text/html']
formatted['html'] = html_content
# 处理纯文本
if 'text/plain' in data:
text_content = data['text/plain']
formatted['stdout'] += f"\n{text_content}"
return formatted
def _is_dangerous_code(self, code: str) -> bool:
"""
检测危险代码
检测规则:
- 系统命令执行
- 文件系统操作
- 网络访问
- 进程管理
- 模块导入限制
安全策略:
- 黑名单关键词
- AST 语法分析
- 动态行为检测
"""
dangerous_patterns = [
# 系统命令
r'os\.system',
r'subprocess\.',
r'exec\(',
r'eval\(',
# 文件操作
r'open\([\'"][^\'"]*/.*[\'"]', # 绝对路径
r'__import__',
# 网络操作
r'urllib',
r'requests\.',
r'socket\.',
# 危险模块
r'import\s+(os|sys|subprocess|socket)',
]
import re
for pattern in dangerous_patterns:
if re.search(pattern, code):
return True
return False
3.3 WebSearch - 网络搜索工具
位置: qwen_agent/tools/web_search.py
3.3.1 核心实现
@register_tool('web_search')
class WebSearch(BaseTool):
"""
网络搜索工具
支持的搜索引擎:
- Bing Search API
- Google Custom Search
- DuckDuckGo (免费)
功能特性:
- 多搜索引擎支持
- 结果去重和排序
- 内容摘要提取
- 搜索结果缓存
"""
description = '在互联网上搜索信息'
parameters = [
{
'name': 'query',
'type': 'string',
'description': '搜索关键词',
'required': True
},
{
'name': 'max_results',
'type': 'number',
'description': '最大结果数量,默认5',
'required': False
}
]
def __init__(self, search_engine='bing', **kwargs):
super().__init__()
self.search_engine = search_engine
self.bing_api_key = kwargs.get('bing_api_key') or os.getenv('BING_API_KEY')
self.google_api_key = kwargs.get('google_api_key') or os.getenv('GOOGLE_API_KEY')
self.google_cse_id = kwargs.get('google_cse_id') or os.getenv('GOOGLE_CSE_ID')
def call(self, params: str, **kwargs) -> str:
"""
执行网络搜索
搜索流程:
1. 参数解析和验证
2. 选择搜索引擎
3. 执行搜索请求
4. 结果处理和格式化
5. 返回搜索结果
"""
try:
# 解析参数
import json5
args = json5.loads(params)
query = args.get('query', '').strip()
max_results = args.get('max_results', 5)
if not query:
return json5.dumps({'error': 'Search query cannot be empty'})
# 执行搜索
if self.search_engine == 'bing' and self.bing_api_key:
results = self._search_bing(query, max_results)
elif self.search_engine == 'google' and self.google_api_key:
results = self._search_google(query, max_results)
else:
results = self._search_duckduckgo(query, max_results)
# 格式化结果
formatted_results = self._format_search_results(results)
return json5.dumps({
'query': query,
'results': formatted_results,
'total': len(formatted_results)
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'error': f'Search failed: {str(e)}'
}, ensure_ascii=False)
def _search_bing(self, query: str, max_results: int) -> List[Dict]:
"""使用 Bing Search API 搜索"""
import requests
url = 'https://api.bing.microsoft.com/v7.0/search'
headers = {'Ocp-Apim-Subscription-Key': self.bing_api_key}
params = {
'q': query,
'count': max_results,
'mkt': 'zh-CN',
'safesearch': 'Moderate'
}
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for item in data.get('webPages', {}).get('value', []):
results.append({
'title': item.get('name', ''),
'url': item.get('url', ''),
'snippet': item.get('snippet', ''),
'source': 'Bing'
})
return results
def _search_google(self, query: str, max_results: int) -> List[Dict]:
"""使用 Google Custom Search API 搜索"""
import requests
url = 'https://www.googleapis.com/customsearch/v1'
params = {
'key': self.google_api_key,
'cx': self.google_cse_id,
'q': query,
'num': max_results
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for item in data.get('items', []):
results.append({
'title': item.get('title', ''),
'url': item.get('link', ''),
'snippet': item.get('snippet', ''),
'source': 'Google'
})
return results
def _search_duckduckgo(self, query: str, max_results: int) -> List[Dict]:
"""使用 DuckDuckGo 免费搜索"""
try:
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
for result in ddgs.text(query, max_results=max_results):
results.append({
'title': result.get('title', ''),
'url': result.get('href', ''),
'snippet': result.get('body', ''),
'source': 'DuckDuckGo'
})
return results
except ImportError:
raise Exception("DuckDuckGo search requires 'duckduckgo-search' package")
def _format_search_results(self, results: List[Dict]) -> List[Dict]:
"""格式化搜索结果"""
formatted = []
for i, result in enumerate(results, 1):
formatted.append({
'rank': i,
'title': result.get('title', ''),
'url': result.get('url', ''),
'snippet': result.get('snippet', ''),
'source': result.get('source', 'Unknown')
})
return formatted
3.4 DocParser - 文档解析工具
位置: qwen_agent/tools/doc_parser.py
3.4.1 多格式文档解析
@register_tool('doc_parser')
class DocParser(BaseToolWithFileAccess):
"""
多格式文档解析工具
支持格式:
- PDF 文档
- Word 文档 (.docx)
- PowerPoint 演示文稿 (.pptx)
- 纯文本文件 (.txt, .md)
- HTML 网页
- CSV/Excel 表格
解析能力:
- 文本内容提取
- 表格结构识别
- 图像 OCR 识别
- 元数据提取
"""
description = '解析各种格式的文档,提取文本内容'
parameters = [
{
'name': 'file_path',
'type': 'string',
'description': '要解析的文件路径',
'required': True
},
{
'name': 'extract_images',
'type': 'boolean',
'description': '是否提取图像内容',
'required': False
}
]
def call(self, params: str, **kwargs) -> str:
"""
解析文档内容
解析流程:
1. 参数解析和文件验证
2. 文件类型检测
3. 选择合适的解析器
4. 执行内容提取
5. 结果格式化和返回
"""
try:
import json5
args = json5.loads(params)
file_path = args.get('file_path', '').strip()
extract_images = args.get('extract_images', False)
if not file_path:
return json5.dumps({'error': 'File path cannot be empty'})
# 检查文件是否存在
if not os.path.exists(file_path):
return json5.dumps({'error': f'File not found: {file_path}'})
# 检测文件类型
file_type = self._detect_file_type(file_path)
# 选择解析器
if file_type == 'pdf':
content = self._parse_pdf(file_path, extract_images)
elif file_type == 'docx':
content = self._parse_docx(file_path)
elif file_type == 'pptx':
content = self._parse_pptx(file_path)
elif file_type in ['txt', 'md']:
content = self._parse_text(file_path)
elif file_type == 'html':
content = self._parse_html(file_path)
elif file_type in ['csv', 'xlsx']:
content = self._parse_spreadsheet(file_path)
else:
return json5.dumps({'error': f'Unsupported file type: {file_type}'})
return json5.dumps({
'file_path': file_path,
'file_type': file_type,
'content': content,
'metadata': self._extract_metadata(file_path)
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'error': f'Document parsing failed: {str(e)}'
}, ensure_ascii=False)
def _parse_pdf(self, file_path: str, extract_images: bool = False) -> Dict:
"""解析 PDF 文档"""
try:
import PyPDF2
import pdfplumber
content = {
'text': '',
'pages': [],
'tables': [],
'images': []
}
# 使用 pdfplumber 提取文本和表格
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
# 提取文本
page_text = page.extract_text() or ''
content['text'] += page_text + '\n'
content['pages'].append({
'page_number': page_num,
'text': page_text
})
# 提取表格
tables = page.extract_tables()
for table_idx, table in enumerate(tables):
content['tables'].append({
'page': page_num,
'table_index': table_idx,
'data': table
})
# 提取图像(如果需要)
if extract_images:
content['images'] = self._extract_pdf_images(file_path)
return content
except ImportError:
raise Exception("PDF parsing requires 'PyPDF2' and 'pdfplumber' packages")
def _parse_docx(self, file_path: str) -> Dict:
"""解析 Word 文档"""
try:
from docx import Document
doc = Document(file_path)
content = {
'text': '',
'paragraphs': [],
'tables': []
}
# 提取段落
for para in doc.paragraphs:
if para.text.strip():
content['text'] += para.text + '\n'
content['paragraphs'].append({
'text': para.text,
'style': para.style.name if para.style else None
})
# 提取表格
for table_idx, table in enumerate(doc.tables):
table_data = []
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
table_data.append(row_data)
content['tables'].append({
'table_index': table_idx,
'data': table_data
})
return content
except ImportError:
raise Exception("Word document parsing requires 'python-docx' package")
def _parse_pptx(self, file_path: str) -> Dict:
"""解析 PowerPoint 演示文稿"""
try:
from pptx import Presentation
prs = Presentation(file_path)
content = {
'text': '',
'slides': []
}
for slide_idx, slide in enumerate(prs.slides, 1):
slide_text = ''
slide_content = {
'slide_number': slide_idx,
'title': '',
'content': '',
'notes': ''
}
# 提取文本框内容
for shape in slide.shapes:
if hasattr(shape, 'text'):
text = shape.text.strip()
if text:
slide_text += text + '\n'
# 尝试识别标题
if not slide_content['title'] and len(text) < 100:
slide_content['title'] = text
else:
slide_content['content'] += text + '\n'
# 提取备注
if slide.notes_slide.notes_text_frame:
notes_text = slide.notes_slide.notes_text_frame.text.strip()
slide_content['notes'] = notes_text
slide_text += notes_text + '\n'
content['text'] += slide_text
content['slides'].append(slide_content)
return content
except ImportError:
raise Exception("PowerPoint parsing requires 'python-pptx' package")
def _detect_file_type(self, file_path: str) -> str:
"""检测文件类型"""
import mimetypes
# 基于文件扩展名
_, ext = os.path.splitext(file_path.lower())
type_mapping = {
'.pdf': 'pdf',
'.docx': 'docx',
'.pptx': 'pptx',
'.txt': 'txt',
'.md': 'md',
'.html': 'html',
'.htm': 'html',
'.csv': 'csv',
'.xlsx': 'xlsx'
}
return type_mapping.get(ext, 'unknown')
3.5 Retrieval - RAG 检索工具
位置: qwen_agent/tools/retrieval.py
3.5.1 混合检索实现
@register_tool('retrieval')
class Retrieval(BaseToolWithFileAccess):
"""
RAG 检索工具
检索策略:
- 向量相似度检索
- BM25 关键词检索
- 混合检索融合
- 结果重排序
支持功能:
- 多文档检索
- 增量索引更新
- 检索结果缓存
- 相关度评分
"""
description = '从文档中检索相关信息'
parameters = [
{
'name': 'query',
'type': 'string',
'description': '检索查询',
'required': True
},
{
'name': 'files',
'type': 'array',
'description': '要检索的文件列表',
'required': False
},
{
'name': 'max_results',
'type': 'number',
'description': '最大结果数量',
'required': False
}
]
def __init__(self, **kwargs):
super().__init__()
self.embedding_model = kwargs.get('embedding_model', 'text-embedding-ada-002')
self.chunk_size = kwargs.get('chunk_size', 1000)
self.chunk_overlap = kwargs.get('chunk_overlap', 200)
self.vector_store = None
self.bm25_index = None
def call(self, params: str, **kwargs) -> str:
"""
执行 RAG 检索
检索流程:
1. 参数解析和验证
2. 文档预处理和索引
3. 查询向量化
4. 混合检索执行
5. 结果重排序和格式化
"""
try:
import json5
args = json5.loads(params)
query = args.get('query', '').strip()
files = args.get('files', [])
max_results = args.get('max_results', 5)
if not query:
return json5.dumps({'error': 'Query cannot be empty'})
# 获取文件列表
if not files:
# 从 kwargs 中获取消息中的文件
messages = kwargs.get('messages', [])
files = self._extract_files_from_messages(messages)
if not files:
return json5.dumps({'error': 'No files provided for retrieval'})
# 处理文档并建立索引
documents = self._process_documents(files)
# 执行混合检索
results = self._hybrid_search(query, documents, max_results)
# 格式化结果
formatted_results = self._format_retrieval_results(results)
return json5.dumps({
'query': query,
'results': formatted_results,
'total': len(formatted_results)
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'error': f'Retrieval failed: {str(e)}'
}, ensure_ascii=False)
def _process_documents(self, files: List[str]) -> List[Dict]:
"""处理文档并分块"""
documents = []
for file_path in files:
try:
# 解析文档内容
content = self._parse_document(file_path)
# 文档分块
chunks = self._chunk_document(content, file_path)
documents.extend(chunks)
except Exception as e:
logger.warning(f"Failed to process document {file_path}: {e}")
continue
return documents
def _chunk_document(self, content: str, file_path: str) -> List[Dict]:
"""文档分块处理"""
chunks = []
# 简单的滑动窗口分块
start = 0
chunk_id = 0
while start < len(content):
end = start + self.chunk_size
chunk_text = content[start:end]
# 确保不在单词中间切断
if end < len(content):
last_space = chunk_text.rfind(' ')
if last_space > self.chunk_size * 0.8: # 至少保留80%的内容
end = start + last_space
chunk_text = content[start:end]
chunks.append({
'id': f"{file_path}_{chunk_id}",
'text': chunk_text.strip(),
'source': file_path,
'start_pos': start,
'end_pos': end
})
# 下一个块的起始位置(考虑重叠)
start = end - self.chunk_overlap
chunk_id += 1
return chunks
def _hybrid_search(self, query: str, documents: List[Dict], max_results: int) -> List[Dict]:
"""
混合检索算法
算法组合:
1. 向量相似度检索(语义匹配)
2. BM25 关键词检索(精确匹配)
3. 加权融合排序
4. 多样性重排序
评分公式:
final_score = α * vector_score + β * bm25_score
其中 α = 0.7, β = 0.3
"""
# 向量检索
vector_results = self._vector_search(query, documents, max_results * 2)
# BM25 检索
bm25_results = self._bm25_search(query, documents, max_results * 2)
# 融合结果
combined_results = self._combine_search_results(
vector_results, bm25_results, max_results
)
return combined_results
def _vector_search(self, query: str, documents: List[Dict], max_results: int) -> List[Dict]:
"""向量相似度检索"""
try:
# 查询向量化
query_vector = self._get_embedding(query)
# 计算相似度
similarities = []
for doc in documents:
doc_vector = self._get_embedding(doc['text'])
similarity = self._cosine_similarity(query_vector, doc_vector)
similarities.append({
'document': doc,
'score': similarity,
'method': 'vector'
})
# 排序并返回
similarities.sort(key=lambda x: x['score'], reverse=True)
return similarities[:max_results]
except Exception as e:
logger.warning(f"Vector search failed: {e}")
return []
def _bm25_search(self, query: str, documents: List[Dict], max_results: int) -> List[Dict]:
"""BM25 关键词检索"""
try:
from rank_bm25 import BM25Okapi
import jieba
# 构建 BM25 索引
corpus = []
for doc in documents:
# 中文分词
tokens = list(jieba.cut(doc['text']))
corpus.append(tokens)
bm25 = BM25Okapi(corpus)
# 查询分词
query_tokens = list(jieba.cut(query))
# 计算 BM25 分数
scores = bm25.get_scores(query_tokens)
# 构建结果
results = []
for i, score in enumerate(scores):
if score > 0: # 只保留有分数的结果
results.append({
'document': documents[i],
'score': score,
'method': 'bm25'
})
# 排序并返回
results.sort(key=lambda x: x['score'], reverse=True)
return results[:max_results]
except ImportError:
logger.warning("BM25 search requires 'rank-bm25' and 'jieba' packages")
return []
except Exception as e:
logger.warning(f"BM25 search failed: {e}")
return []
def _combine_search_results(self, vector_results: List[Dict],
bm25_results: List[Dict], max_results: int) -> List[Dict]:
"""融合检索结果"""
# 归一化分数
if vector_results:
max_vector_score = max(r['score'] for r in vector_results)
for result in vector_results:
result['normalized_score'] = result['score'] / max_vector_score if max_vector_score > 0 else 0
if bm25_results:
max_bm25_score = max(r['score'] for r in bm25_results)
for result in bm25_results:
result['normalized_score'] = result['score'] / max_bm25_score if max_bm25_score > 0 else 0
# 合并结果
combined = {}
# 添加向量检索结果
for result in vector_results:
doc_id = result['document']['id']
combined[doc_id] = {
'document': result['document'],
'vector_score': result['normalized_score'],
'bm25_score': 0.0
}
# 添加 BM25 检索结果
for result in bm25_results:
doc_id = result['document']['id']
if doc_id in combined:
combined[doc_id]['bm25_score'] = result['normalized_score']
else:
combined[doc_id] = {
'document': result['document'],
'vector_score': 0.0,
'bm25_score': result['normalized_score']
}
# 计算最终分数
final_results = []
for doc_id, data in combined.items():
final_score = 0.7 * data['vector_score'] + 0.3 * data['bm25_score']
final_results.append({
'document': data['document'],
'final_score': final_score,
'vector_score': data['vector_score'],
'bm25_score': data['bm25_score']
})
# 排序并返回
final_results.sort(key=lambda x: x['final_score'], reverse=True)
return final_results[:max_results]
4. 工具执行时序
4.1 工具调用完整时序
sequenceDiagram
participant Agent as 智能体
participant LLM as 大语言模型
participant ToolRegistry as 工具注册表
participant Tool as 具体工具
participant ExecutionEnv as 执行环境
participant ExternalService as 外部服务
Agent->>LLM: 发送消息 + 工具定义
LLM-->>Agent: 返回工具调用请求
Note over Agent: 工具调用检测
Agent->>Agent: _detect_tool()
Agent->>Agent: 解析工具名和参数
Note over Agent: 工具执行
Agent->>ToolRegistry: 查找工具实例
ToolRegistry-->>Agent: 返回工具对象
Agent->>Tool: call(params)
Note over Tool: 工具执行流程
Tool->>Tool: 参数验证
Tool->>Tool: 安全检查
Tool->>ExecutionEnv: 初始化执行环境
alt 需要外部服务
Tool->>ExternalService: 调用外部API
ExternalService-->>Tool: 返回服务结果
end
Tool->>Tool: 结果处理
Tool-->>Agent: 返回执行结果
Note over Agent: 继续对话
Agent->>Agent: 添加工具结果到历史
Agent->>LLM: 发送更新的消息
LLM-->>Agent: 返回最终响应
4.2 代码解释器执行时序
sequenceDiagram
participant Agent as 智能体
participant CodeInterpreter as 代码解释器
participant KernelManager as 内核管理器
participant JupyterKernel as Jupyter内核
participant FileSystem as 文件系统
Agent->>CodeInterpreter: call({"code": "..."})
Note over CodeInterpreter: 参数处理
CodeInterpreter->>CodeInterpreter: 解析代码参数
CodeInterpreter->>CodeInterpreter: 安全检查
Note over CodeInterpreter: 内核管理
CodeInterpreter->>KernelManager: 获取/创建内核
KernelManager->>JupyterKernel: 启动内核
JupyterKernel-->>KernelManager: 内核就绪
KernelManager-->>CodeInterpreter: 返回内核客户端
Note over CodeInterpreter: 代码执行
CodeInterpreter->>JupyterKernel: 执行代码
loop 收集执行结果
JupyterKernel-->>CodeInterpreter: 流输出/错误/显示数据
end
JupyterKernel-->>CodeInterpreter: 执行完成
Note over CodeInterpreter: 结果处理
alt 有图像输出
CodeInterpreter->>FileSystem: 保存图像文件
FileSystem-->>CodeInterpreter: 返回文件路径
end
CodeInterpreter->>CodeInterpreter: 格式化输出
CodeInterpreter-->>Agent: 返回执行结果
5. 工具扩展开发
5.1 自定义工具开发模板
from qwen_agent.tools.base import BaseTool, register_tool
import json5
@register_tool('custom_tool_name')
class CustomTool(BaseTool):
"""
自定义工具模板
开发步骤:
1. 继承 BaseTool 基类
2. 定义工具元数据
3. 实现 call() 方法
4. 添加参数验证
5. 处理异常情况
"""
# 工具元数据
description = '工具功能描述'
parameters = [
{
'name': 'param1',
'type': 'string',
'description': '参数1描述',
'required': True
},
{
'name': 'param2',
'type': 'number',
'description': '参数2描述',
'required': False
}
]
def __init__(self, **kwargs):
super().__init__()
# 初始化工具特定的配置
self.config = kwargs
self.timeout = kwargs.get('timeout', 30)
def call(self, params: str, **kwargs) -> str:
"""
工具执行的核心逻辑
实现要点:
1. 参数解析和验证
2. 核心业务逻辑
3. 错误处理
4. 结果格式化
"""
try:
# 第一步:参数解析
args = json5.loads(params)
param1 = args.get('param1')
param2 = args.get('param2', 0)
# 第二步:参数验证
if not param1:
raise ValueError("param1 is required")
# 第三步:核心业务逻辑
result = self._execute_business_logic(param1, param2)
# 第四步:结果格式化
return json5.dumps({
'success': True,
'result': result,
'metadata': {
'tool_name': self.name,
'execution_time': time.time()
}
}, ensure_ascii=False)
except ValueError as e:
# 参数错误
return json5.dumps({
'success': False,
'error': f'Parameter error: {str(e)}'
}, ensure_ascii=False)
except Exception as e:
# 其他异常
return json5.dumps({
'success': False,
'error': f'Execution failed: {str(e)}'
}, ensure_ascii=False)
def _execute_business_logic(self, param1: str, param2: int):
"""
核心业务逻辑实现
这里实现工具的具体功能
"""
# 示例:简单的字符串处理
result = param1.upper() * param2 if param2 > 0 else param1.upper()
return result
5.2 异步工具开发
import asyncio
from qwen_agent.tools.base import BaseTool, register_tool
@register_tool('async_tool')
class AsyncTool(BaseTool):
"""
异步工具示例
适用场景:
- 网络请求
- 文件I/O操作
- 数据库查询
- 长时间运行的任务
"""
description = '异步执行的工具示例'
parameters = [
{
'name': 'url',
'type': 'string',
'description': '要请求的URL',
'required': True
}
]
def call(self, params: str, **kwargs) -> str:
"""
异步工具的同步包装器
注意: BaseTool.call() 是同步方法,
需要在内部使用 asyncio.run() 来执行异步逻辑
"""
try:
args = json5.loads(params)
url = args.get('url')
# 运行异步逻辑
result = asyncio.run(self._async_execute(url))
return json5.dumps({
'success': True,
'result': result
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
async def _async_execute(self, url: str):
"""异步执行逻辑"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=self.timeout) as response:
content = await response.text()
return {
'status_code': response.status,
'content_length': len(content),
'headers': dict(response.headers)
}
5.3 工具配置管理
class ConfigurableTool(BaseTool):
"""
可配置的工具基类
支持功能:
- 配置文件加载
- 环境变量读取
- 运行时配置更新
- 配置验证
"""
def __init__(self, config_file=None, **kwargs):
super().__init__()
# 加载配置
self.config = self._load_config(config_file, **kwargs)
# 验证配置
self._validate_config()
# 应用配置
self._apply_config()
def _load_config(self, config_file=None, **kwargs):
"""加载配置"""
config = {}
# 1. 默认配置
config.update(self._get_default_config())
# 2. 配置文件
if config_file and os.path.exists(config_file):
with open(config_file, 'r') as f:
file_config = json.load(f)
config.update(file_config)
# 3. 环境变量
env_config = self._load_env_config()
config.update(env_config)
# 4. 运行时参数
config.update(kwargs)
return config
def _get_default_config(self):
"""获取默认配置"""
return {
'timeout': 30,
'max_retries': 3,
'cache_enabled': True
}
def _load_env_config(self):
"""从环境变量加载配置"""
env_config = {}
# 定义环境变量映射
env_mapping = {
'TOOL_TIMEOUT': ('timeout', int),
'TOOL_MAX_RETRIES': ('max_retries', int),
'TOOL_CACHE_ENABLED': ('cache_enabled', lambda x: x.lower() == 'true')
}
for env_key, (config_key, converter) in env_mapping.items():
env_value = os.getenv(env_key)
if env_value is not None:
try:
env_config[config_key] = converter(env_value)
except (ValueError, TypeError):
logger.warning(f"Invalid environment variable {env_key}: {env_value}")
return env_config
def _validate_config(self):
"""验证配置"""
if self.config.get('timeout', 0) <= 0:
raise ValueError("Timeout must be positive")
if self.config.get('max_retries', 0) < 0:
raise ValueError("Max retries cannot be negative")
def _apply_config(self):
"""应用配置"""
self.timeout = self.config.get('timeout', 30)
self.max_retries = self.config.get('max_retries', 3)
self.cache_enabled = self.config.get('cache_enabled', True)
6. 工具性能优化
6.1 缓存机制
import functools
import hashlib
import pickle
import time
from typing import Any, Optional
class ToolCache:
"""工具结果缓存管理器"""
def __init__(self, cache_dir: str = './tool_cache', ttl: int = 3600):
self.cache_dir = cache_dir
self.ttl = ttl
os.makedirs(cache_dir, exist_ok=True)
def get_cache_key(self, tool_name: str, params: str) -> str:
"""生成缓存键"""
content = f"{tool_name}:{params}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, tool_name: str, params: str) -> Optional[Any]:
"""获取缓存结果"""
cache_key = self.get_cache_key(tool_name, params)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
if not os.path.exists(cache_file):
return None
try:
with open(cache_file, 'rb') as f:
cache_data = pickle.load(f)
# 检查是否过期
if time.time() - cache_data['timestamp'] > self.ttl:
os.remove(cache_file)
return None
return cache_data['result']
except Exception:
# 缓存文件损坏,删除
try:
os.remove(cache_file)
except:
pass
return None
def set(self, tool_name: str, params: str, result: Any):
"""设置缓存结果"""
cache_key = self.get_cache_key(tool_name, params)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
cache_data = {
'result': result,
'timestamp': time.time()
}
try:
with open(cache_file, 'wb') as f:
pickle.dump(cache_data, f)
except Exception as e:
logger.warning(f"Failed to cache result: {e}")
def cached_tool_call(cache_ttl: int = 3600):
"""工具调用缓存装饰器"""
def decorator(call_method):
@functools.wraps(call_method)
def wrapper(self, params: str, **kwargs):
# 检查是否启用缓存
if not getattr(self, 'cache_enabled', True):
return call_method(self, params, **kwargs)
# 获取缓存
cache = getattr(self, '_cache', None)
if cache is None:
cache = ToolCache(ttl=cache_ttl)
self._cache = cache
# 尝试从缓存获取结果
cached_result = cache.get(self.name, params)
if cached_result is not None:
return cached_result
# 执行工具调用
result = call_method(self, params, **kwargs)
# 缓存结果
cache.set(self.name, params, result)
return result
return wrapper
return decorator
6.2 并发执行
import concurrent.futures
import threading
from typing import List, Dict, Callable
class ToolExecutor:
"""工具并发执行器"""
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self._lock = threading.Lock()
def execute_parallel(self, tool_calls: List[Dict]) -> List[Dict]:
"""
并行执行多个工具调用
Args:
tool_calls: 工具调用列表,每个元素包含:
- tool_name: 工具名称
- params: 工具参数
- tool_instance: 工具实例
Returns:
执行结果列表
"""
futures = []
# 提交任务
for i, call_info in enumerate(tool_calls):
future = self.executor.submit(
self._execute_single_tool,
call_info['tool_instance'],
call_info['params'],
call_info.get('kwargs', {}),
i # 任务索引
)
futures.append(future)
# 收集结果
results = []
for i, future in enumerate(futures):
try:
result = future.result(timeout=60) # 60秒超时
results.append({
'index': i,
'success': True,
'result': result
})
except concurrent.futures.TimeoutError:
results.append({
'index': i,
'success': False,
'error': 'Tool execution timeout'
})
except Exception as e:
results.append({
'index': i,
'success': False,
'error': str(e)
})
# 按原始顺序排序
results.sort(key=lambda x: x['index'])
return results
def _execute_single_tool(self, tool_instance, params: str, kwargs: Dict, index: int):
"""执行单个工具"""
try:
return tool_instance.call(params, **kwargs)
except Exception as e:
logger.error(f"Tool execution failed (index {index}): {e}")
raise
def shutdown(self):
"""关闭执行器"""
self.executor.shutdown(wait=True)
7. 工具安全机制
7.1 安全沙箱
import subprocess
import tempfile
import shutil
import resource
import signal
from contextlib import contextmanager
class SecuritySandbox:
"""工具执行安全沙箱"""
def __init__(self,
max_memory_mb: int = 512,
max_cpu_time: int = 30,
max_file_size_mb: int = 100):
self.max_memory = max_memory_mb * 1024 * 1024 # 转换为字节
self.max_cpu_time = max_cpu_time
self.max_file_size = max_file_size_mb * 1024 * 1024
@contextmanager
def secure_execution(self):
"""安全执行上下文管理器"""
# 创建临时工作目录
temp_dir = tempfile.mkdtemp(prefix='tool_sandbox_')
old_cwd = os.getcwd()
try:
# 切换到临时目录
os.chdir(temp_dir)
# 设置资源限制
self._set_resource_limits()
# 设置信号处理
old_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
signal.alarm(self.max_cpu_time)
yield temp_dir
finally:
# 恢复信号处理
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
# 恢复工作目录
os.chdir(old_cwd)
# 清理临时目录
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory: {e}")
def _set_resource_limits(self):
"""设置资源限制"""
try:
# 内存限制
resource.setrlimit(resource.RLIMIT_AS, (self.max_memory, self.max_memory))
# CPU 时间限制
resource.setrlimit(resource.RLIMIT_CPU, (self.max_cpu_time, self.max_cpu_time))
# 文件大小限制
resource.setrlimit(resource.RLIMIT_FSIZE, (self.max_file_size, self.max_file_size))
# 进程数限制
resource.setrlimit(resource.RLIMIT_NPROC, (10, 10))
except Exception as e:
logger.warning(f"Failed to set resource limits: {e}")
def _timeout_handler(self, signum, frame):
"""超时信号处理器"""
raise TimeoutError("Tool execution timeout")
class SecureTool(BaseTool):
"""安全工具基类"""
def __init__(self, **kwargs):
super().__init__()
self.sandbox = SecuritySandbox(
max_memory_mb=kwargs.get('max_memory_mb', 512),
max_cpu_time=kwargs.get('max_cpu_time', 30),
max_file_size_mb=kwargs.get('max_file_size_mb', 100)
)
def call(self, params: str, **kwargs) -> str:
"""在安全沙箱中执行工具"""
try:
with self.sandbox.secure_execution() as temp_dir:
return self._secure_call(params, temp_dir, **kwargs)
except TimeoutError:
return json5.dumps({
'error': 'Tool execution timeout'
}, ensure_ascii=False)
except MemoryError:
return json5.dumps({
'error': 'Tool execution exceeded memory limit'
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'error': f'Tool execution failed: {str(e)}'
}, ensure_ascii=False)
def _secure_call(self, params: str, temp_dir: str, **kwargs) -> str:
"""在沙箱中的安全执行逻辑"""
raise NotImplementedError
8. 验收清单
- 工具系统概述和设计理念完整
- 工具分类体系清晰
- 架构图和时序图详细
- 核心工具实现分析深入
- BaseTool 抽象基类规范
- CodeInterpreter 详细实现
- WebSearch 多引擎支持
- DocParser 多格式解析
- Retrieval 混合检索算法
- 工具扩展开发指南完整
- 性能优化策略实用
- 安全机制考虑周全
- 缓存和并发机制完善
这个完整的工具系统分析文档整合了原有两个工具文档的所有内容,从设计理念到具体实现,从核心工具到扩展开发,为开发者提供了全方位的 Qwen-Agent 工具系统技术指南。