6.1 工具系统架构概览
工具系统是OpenAI Agents SDK的核心功能之一,它允许代理调用外部函数、服务和API来扩展其能力。工具系统采用分层设计,包含:
- 工具接口层: 定义统一的工具接口
- 函数工具层: 支持Python函数作为工具
- 内置工具层: 提供常用的预制工具
- MCP工具层: 支持Model Context Protocol
- 工具执行层: 负责工具的调用和结果处理
6.2 工具系统架构图
graph TD
A[Agent代理] --> B[Tool工具接口]
B --> C{工具类型}
C --> D[FunctionTool 函数工具]
C --> E[内置工具]
C --> F[MCPTool MCP工具]
D --> G[@function_tool 装饰器]
G --> H[Python函数]
E --> I[FileSearchTool 文件搜索]
E --> J[CodeInterpreterTool 代码解释器]
E --> K[WebSearchTool 网络搜索]
E --> L[ImageGenerationTool 图像生成]
E --> M[ComputerTool 计算机操作]
E --> N[LocalShellTool 本地Shell]
F --> O[MCP服务器]
O --> P[远程工具服务]
B --> Q[ToolContext 工具上下文]
Q --> R[RunContextWrapper 运行上下文]
Q --> S[工具调用信息]
style B fill:#e3f2fd
style G fill:#f3e5f5
style Q fill:#e8f5e8
6.3 Tool基础接口分析
6.3.1 Tool抽象基类
# 位于 src/agents/tool.py
from abc import ABC, abstractmethod
from typing import Any, Awaitable, Callable
class Tool(ABC):
"""
所有工具的抽象基类
定义了工具的基本接口,所有具体工具类型都必须实现这个接口
"""
@property
@abstractmethod
def name(self) -> str:
"""工具的名称,用于LLM识别和调用"""
pass
@property
@abstractmethod
def description(self) -> str:
"""工具的描述,帮助LLM理解工具的功能和使用场景"""
pass
@abstractmethod
async def execute(
self,
context: ToolContext[Any],
arguments: str
) -> Any:
"""
执行工具的核心方法
Args:
context: 工具执行上下文,包含运行时信息
arguments: 工具参数的JSON字符串
Returns:
Any: 工具执行结果
"""
pass
@property
def is_enabled(self) -> bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]]:
"""
工具是否启用
可以是布尔值或返回布尔值的函数,支持动态启用/禁用
"""
return True
6.3.2 ToolContext工具上下文
# 位于 src/agents/tool_context.py
@dataclass
class ToolContext(Generic[TContext]):
"""
工具执行的上下文信息
提供工具执行时需要的所有运行时信息和环境
"""
run_context: RunContextWrapper[TContext] # 运行上下文包装器
agent: AgentBase # 当前执行的代理
tool_name: str # 工具名称
tool_call_id: str # 工具调用ID
tool_arguments: str # 工具参数JSON字符串
model_provider: ModelProvider # 模型提供商
@property
def context(self) -> TContext:
"""获取用户自定义的上下文对象"""
return self.run_context.context
@property
def usage(self) -> Usage:
"""获取当前的使用统计信息"""
return self.run_context.usage
def create_sub_agent(
self,
name: str,
instructions: str,
**agent_kwargs
) -> Agent[TContext]:
"""
在工具中创建子代理
这允许工具创建专门的代理来处理特定任务
"""
return Agent(
name=name,
instructions=instructions,
model_provider=self.model_provider,
**agent_kwargs
)
6.4 FunctionTool详细实现
6.4.1 FunctionTool类定义
# 位于 src/agents/tool.py
@dataclass
class FunctionTool(Tool):
"""
函数工具实现类
将Python函数包装为代理可调用的工具
支持多种函数签名模式和参数处理
"""
name: str # 工具名称
description: str # 工具描述
params_json_schema: dict[str, Any] # 参数JSON Schema
on_invoke_tool: Callable[[ToolContext[Any], str], Awaitable[Any]] # 工具调用处理函数
strict_json_schema: bool = True # 是否使用严格JSON Schema
is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True # 启用状态
def __post_init__(self):
"""初始化后处理,确保JSON Schema的严格性"""
if self.strict_json_schema:
self.params_json_schema = ensure_strict_json_schema(self.params_json_schema)
async def execute(self, context: ToolContext[Any], arguments: str) -> Any:
"""
执行函数工具
调用on_invoke_tool处理函数来执行实际的Python函数
"""
return await self.on_invoke_tool(context, arguments)
def to_openai_format(self) -> dict[str, Any]:
"""
转换为OpenAI API工具格式
用于向OpenAI API传递工具定义
"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.params_json_schema,
"strict": self.strict_json_schema,
}
}
6.4.2 function_tool装饰器深度实现
# 位于 src/agents/tool.py
def function_tool(
func: ToolFunction[ToolParams] | None = None,
*,
name_override: str | None = None,
description_override: str | None = None,
strict_json_schema: bool = True,
is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True,
docstring_style: DocstringStyle = "google",
) -> FunctionTool | Callable[[ToolFunction[ToolParams]], FunctionTool]:
"""
function_tool装饰器的完整实现
支持三种函数签名模式:
1. 普通函数:func(param1, param2, ...)
2. 带运行上下文:func(context: RunContextWrapper, param1, param2, ...)
3. 带工具上下文:func(tool_context: ToolContext, param1, param2, ...)
"""
def decorator(f: ToolFunction[ToolParams]) -> FunctionTool:
# 1. 分析函数签名
signature = inspect.signature(f)
function_name = name_override or f.__name__
# 2. 确定上下文参数类型
context_info = _analyze_context_parameter(signature)
# 3. 生成工具描述
description = description_override or _extract_description(f, docstring_style)
if not description:
description = f"Execute function {function_name}"
# 4. 生成参数JSON Schema
try:
params_schema = function_schema(
f,
strict=strict_json_schema,
docstring_style=docstring_style
)
except Exception as e:
raise ValueError(f"Failed to generate schema for function {function_name}: {e}")
# 5. 创建工具调用处理函数
async def on_invoke_tool(tool_context: ToolContext[Any], args_json: str) -> Any:
"""处理工具调用的内部实现"""
try:
# 解析参数
if args_json.strip():
try:
args = json.loads(args_json)
except json.JSONDecodeError as e:
error_msg = f"Invalid JSON arguments for tool {function_name}: {e}"
logger.error(error_msg)
return error_msg
else:
args = {}
# 验证参数
validation_result = _validate_arguments(args, params_schema)
if not validation_result.valid:
error_msg = f"Invalid arguments for tool {function_name}: {validation_result.error}"
logger.error(error_msg)
return error_msg
# 根据函数签名调用函数
if context_info.has_context:
if context_info.context_type == "run_context":
# 传递RunContextWrapper
result = f(tool_context.run_context, **args)
elif context_info.context_type == "tool_context":
# 传递ToolContext
result = f(tool_context, **args)
else:
raise ValueError(f"Unknown context type: {context_info.context_type}")
else:
# 普通函数,不传递上下文
result = f(**args)
# 处理异步结果
if inspect.isawaitable(result):
result = await result
return result
except TypeError as e:
# 参数类型错误
error_msg = f"Type error in tool {function_name}: {str(e)}"
logger.error(error_msg)
return error_msg
except Exception as e:
# 其他执行错误
error_msg = f"Error executing tool {function_name}: {str(e)}"
logger.error(error_msg, exc_info=True)
# 检查是否有自定义错误处理
error_handler = getattr(f, '_tool_error_handler', None)
if error_handler and callable(error_handler):
try:
handled_result = error_handler(e, tool_context, args)
if inspect.isawaitable(handled_result):
handled_result = await handled_result
return handled_result
except Exception:
pass # 错误处理器本身出错,继续返回原始错误
return error_msg
# 6. 创建FunctionTool实例
return FunctionTool(
name=function_name,
description=description,
params_json_schema=params_schema,
on_invoke_tool=on_invoke_tool,
strict_json_schema=strict_json_schema,
is_enabled=is_enabled,
)
# 支持直接装饰和参数化装饰
if func is not None:
return decorator(func)
else:
return decorator
def _analyze_context_parameter(signature: inspect.Signature) -> ContextInfo:
"""
分析函数签名中的上下文参数
返回上下文信息,包括是否有上下文参数以及类型
"""
params = list(signature.parameters.values())
if not params:
return ContextInfo(has_context=False)
first_param = params[0]
# 检查类型注解
if hasattr(first_param, 'annotation') and first_param.annotation != inspect.Parameter.empty:
annotation = first_param.annotation
# 检查是否是RunContextWrapper类型
if _is_run_context_type(annotation):
return ContextInfo(has_context=True, context_type="run_context")
# 检查是否是ToolContext类型
elif _is_tool_context_type(annotation):
return ContextInfo(has_context=True, context_type="tool_context")
# 根据参数名称推断
if first_param.name in ["context", "run_context"]:
return ContextInfo(has_context=True, context_type="run_context")
elif first_param.name in ["tool_context", "tool_ctx"]:
return ContextInfo(has_context=True, context_type="tool_context")
return ContextInfo(has_context=False)
@dataclass
class ContextInfo:
"""上下文参数信息"""
has_context: bool
context_type: str | None = None
6.4.3 函数Schema生成
# 位于 src/agents/function_schema.py
def function_schema(
func: Callable,
*,
strict: bool = True,
docstring_style: DocstringStyle = "google",
exclude_first_param: bool = False,
) -> dict[str, Any]:
"""
从Python函数生成JSON Schema
支持从类型注解、文档字符串等提取参数信息
"""
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
# 排除上下文参数
if exclude_first_param and parameters:
# 检查第一个参数是否是上下文参数
first_param = parameters[0]
if _is_context_parameter(first_param):
parameters = parameters[1:]
# 解析文档字符串
docstring = inspect.getdoc(func) or ""
param_docs = _parse_docstring_params(docstring, docstring_style)
# 构建JSON Schema
schema = {
"type": "object",
"properties": {},
"required": [],
"additionalProperties": False,
}
for param in parameters:
param_name = param.name
param_type = param.annotation
param_default = param.default
# 生成参数schema
param_schema = _generate_param_schema(
param_name=param_name,
param_type=param_type,
param_default=param_default,
param_doc=param_docs.get(param_name, ""),
)
schema["properties"][param_name] = param_schema
# 判断是否为必需参数
if param_default is inspect.Parameter.empty:
schema["required"].append(param_name)
# 应用严格模式
if strict:
schema = ensure_strict_json_schema(schema)
return schema
def _generate_param_schema(
param_name: str,
param_type: Any,
param_default: Any,
param_doc: str,
) -> dict[str, Any]:
"""为单个参数生成JSON Schema"""
schema = {"description": param_doc or f"Parameter {param_name}"}
# 处理类型注解
if param_type == inspect.Parameter.empty:
# 无类型注解,尝试从默认值推断
if param_default != inspect.Parameter.empty:
schema.update(_infer_type_from_value(param_default))
else:
schema["type"] = "string" # 默认为字符串
else:
# 根据类型注解生成schema
schema.update(_type_to_json_schema(param_type))
# 处理默认值
if param_default != inspect.Parameter.empty:
schema["default"] = param_default
return schema
def _type_to_json_schema(python_type: Any) -> dict[str, Any]:
"""将Python类型转换为JSON Schema"""
# 基本类型映射
basic_types = {
str: {"type": "string"},
int: {"type": "integer"},
float: {"type": "number"},
bool: {"type": "boolean"},
list: {"type": "array"},
dict: {"type": "object"},
}
if python_type in basic_types:
return basic_types[python_type]
# 处理泛型类型
origin = get_origin(python_type)
args = get_args(python_type)
if origin is list:
# List[T] -> array with items schema
if args:
item_schema = _type_to_json_schema(args[0])
return {"type": "array", "items": item_schema}
else:
return {"type": "array"}
elif origin is dict:
# Dict[str, T] -> object with additionalProperties schema
if len(args) >= 2:
value_schema = _type_to_json_schema(args[1])
return {"type": "object", "additionalProperties": value_schema}
else:
return {"type": "object"}
elif origin is Union:
# Union类型 -> oneOf或anyOf
union_schemas = [_type_to_json_schema(arg) for arg in args if arg is not type(None)]
if len(union_schemas) == 1:
# Optional[T] case
return union_schemas[0]
else:
return {"anyOf": union_schemas}
# 处理Literal类型
elif origin is Literal:
return {"enum": list(args)}
# 处理Annotated类型
elif origin is Annotated:
base_type = args[0]
annotations = args[1:]
base_schema = _type_to_json_schema(base_type)
# 从注解中提取额外信息
for annotation in annotations:
if isinstance(annotation, str):
# 字符串注解作为描述
base_schema["description"] = annotation
elif hasattr(annotation, '__dict__'):
# 其他注解对象的属性
for key, value in annotation.__dict__.items():
if key.startswith('_'):
continue
base_schema[key] = value
return base_schema
# 处理Pydantic模型
elif hasattr(python_type, 'model_json_schema'):
return python_type.model_json_schema()
# 处理dataclass
elif hasattr(python_type, '__dataclass_fields__'):
return _dataclass_to_json_schema(python_type)
# 处理Enum
elif hasattr(python_type, '__members__'):
return {"enum": list(python_type.__members__.keys())}
# 默认情况
return {"type": "string"}
6.5 内置工具详细分析
6.5.1 FileSearchTool - 文件搜索工具
# 位于 src/agents/tool.py
@dataclass
class FileSearchTool(Tool):
"""
文件搜索工具
允许代理在指定文件中搜索信息
使用OpenAI的文件搜索功能
"""
file_ids: list[str] # OpenAI文件ID列表
filters: Filters | None = None # 搜索过滤器
ranking_options: RankingOptions | None = None # 排序选项
description: str = "Search through uploaded files" # 工具描述
@property
def name(self) -> str:
return "file_search"
async def execute(self, context: ToolContext[Any], arguments: str) -> str:
"""执行文件搜索"""
# 文件搜索通常由模型服务商处理,这里返回配置信息
return f"File search configured for files: {self.file_ids}"
def to_openai_format(self) -> dict[str, Any]:
"""转换为OpenAI API格式"""
tool_def = {"type": "file_search"}
if self.file_ids:
tool_def["file_search"] = {"file_ids": self.file_ids}
if self.filters:
tool_def["file_search"]["filters"] = self.filters
if self.ranking_options:
tool_def["file_search"]["ranking_options"] = self.ranking_options
return tool_def
6.5.2 CodeInterpreterTool - 代码解释器工具
# 位于 src/agents/tool.py
@dataclass
class CodeInterpreterTool(Tool):
"""
代码解释器工具
允许代理执行Python代码并获取结果
"""
description: str = "Execute Python code and analyze data"
@property
def name(self) -> str:
return "code_interpreter"
async def execute(self, context: ToolContext[Any], arguments: str) -> str:
"""执行代码解释器"""
# 代码执行通常由模型服务商的沙箱环境处理
return "Code interpreter available for Python execution"
def to_openai_format(self) -> dict[str, Any]:
"""转换为OpenAI API格式"""
return {"type": "code_interpreter"}
6.5.3 ComputerTool - 计算机操作工具
# 位于 src/agents/tool.py
@dataclass
class ComputerTool(Tool):
"""
计算机操作工具
允许代理执行屏幕截图、点击、输入等计算机操作
需要配合Computer实例使用
"""
computer: Computer | AsyncComputer # 计算机操作实例
description: str = "Control computer screen, keyboard and mouse"
@property
def name(self) -> str:
return "computer"
async def execute(self, context: ToolContext[Any], arguments: str) -> str:
"""执行计算机操作"""
try:
args = json.loads(arguments) if arguments else {}
action = args.get("action")
if action == "screenshot":
# 获取屏幕截图
if hasattr(self.computer, 'screenshot'):
result = await self.computer.screenshot()
return f"Screenshot taken: {result}"
else:
return "Screenshot not supported by current computer instance"
elif action == "click":
# 执行鼠标点击
x = args.get("coordinate", [0, 0])[0]
y = args.get("coordinate", [0, 0])[1]
if hasattr(self.computer, 'click'):
await self.computer.click(x, y)
return f"Clicked at ({x}, {y})"
else:
return "Click not supported"
elif action == "type":
# 输入文本
text = args.get("text", "")
if hasattr(self.computer, 'type'):
await self.computer.type(text)
return f"Typed: {text}"
else:
return "Typing not supported"
else:
return f"Unknown computer action: {action}"
except Exception as e:
return f"Computer tool error: {str(e)}"
def to_openai_format(self) -> dict[str, Any]:
"""转换为OpenAI API格式"""
return {
"type": "computer",
"computer": {
"display_width_px": getattr(self.computer, 'display_width', 1920),
"display_height_px": getattr(self.computer, 'display_height', 1080),
"display_number": getattr(self.computer, 'display_number', None),
}
}
6.5.4 LocalShellTool - 本地Shell工具
# 位于 src/agents/tool.py
@dataclass
class LocalShellTool(Tool):
"""
本地Shell命令执行工具
允许代理执行本地系统命令
需要谨慎使用,建议配合安全防护
"""
executor: LocalShellExecutor # Shell执行器
description: str = "Execute local shell commands"
@property
def name(self) -> str:
return "local_shell"
async def execute(self, context: ToolContext[Any], arguments: str) -> str:
"""执行Shell命令"""
try:
args = json.loads(arguments) if arguments else {}
command = args.get("command", "")
if not command:
return "Error: No command provided"
# 执行命令
result = await self.executor.execute(
LocalShellCommandRequest(command=command)
)
if result.exit_code == 0:
return f"Command executed successfully:\n{result.stdout}"
else:
return f"Command failed (exit code {result.exit_code}):\n{result.stderr}"
except Exception as e:
return f"Shell execution error: {str(e)}"
def to_openai_format(self) -> dict[str, Any]:
"""转换为OpenAI API格式"""
return {"type": "local_shell"}
@dataclass
class LocalShellCommandRequest:
"""Shell命令请求"""
command: str # 要执行的命令
timeout: int = 30 # 超时时间(秒)
working_directory: str | None = None # 工作目录
@dataclass
class LocalShellCommandResult:
"""Shell命令执行结果"""
exit_code: int # 退出码
stdout: str # 标准输出
stderr: str # 标准错误输出
execution_time: float # 执行时间
class LocalShellExecutor:
"""本地Shell命令执行器"""
async def execute(self, request: LocalShellCommandRequest) -> LocalShellCommandResult:
"""
执行Shell命令
使用asyncio.subprocess执行命令,支持超时控制
"""
import asyncio.subprocess as subprocess
start_time = time.time()
try:
# 创建子进程
process = await subprocess.create_subprocess_shell(
request.command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=request.working_directory,
)
# 等待命令完成(带超时)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=request.timeout
)
execution_time = time.time() - start_time
return LocalShellCommandResult(
exit_code=process.returncode or 0,
stdout=stdout.decode('utf-8', errors='replace'),
stderr=stderr.decode('utf-8', errors='replace'),
execution_time=execution_time,
)
except asyncio.TimeoutError:
# 命令超时,终止进程
if process:
process.terminate()
await process.wait()
execution_time = time.time() - start_time
return LocalShellCommandResult(
exit_code=-1,
stdout="",
stderr=f"Command timed out after {request.timeout} seconds",
execution_time=execution_time,
)
except Exception as e:
execution_time = time.time() - start_time
return LocalShellCommandResult(
exit_code=-1,
stdout="",
stderr=f"Execution error: {str(e)}",
execution_time=execution_time,
)
6.6 MCP工具系统分析
6.6.1 MCPTool实现
# 位于 src/agents/mcp/__init__.py
@dataclass
class MCPTool(Tool):
"""
Model Context Protocol (MCP) 工具
支持通过MCP协议调用远程工具服务
"""
server: MCPServer # MCP服务器实例
tool_name: str # 工具名称
tool_schema: dict[str, Any] # 工具Schema
description: str # 工具描述
@property
def name(self) -> str:
return self.tool_name
async def execute(self, context: ToolContext[Any], arguments: str) -> Any:
"""执行MCP工具"""
try:
# 调用MCP服务器
result = await self.server.call_tool(
name=self.tool_name,
arguments=json.loads(arguments) if arguments else {}
)
return result.content
except Exception as e:
error_msg = f"MCP tool '{self.tool_name}' failed: {str(e)}"
logger.error(error_msg)
return error_msg
def to_openai_format(self) -> dict[str, Any]:
"""转换为OpenAI API格式"""
return {
"type": "function",
"function": {
"name": self.tool_name,
"description": self.description,
"parameters": self.tool_schema,
}
}
class MCPServer:
"""MCP服务器连接管理器"""
def __init__(self, server_config: MCPServerConfig):
self.config = server_config
self.client: mcp.Client | None = None
self.session: mcp.ClientSession | None = None
async def connect(self) -> None:
"""连接到MCP服务器"""
if self.config.transport_type == "stdio":
# 标准输入输出传输
transport = mcp.StdioServerParameters(
command=self.config.command,
args=self.config.args or [],
env=self.config.env or {},
)
elif self.config.transport_type == "sse":
# Server-Sent Events传输
transport = mcp.SSEServerParameters(url=self.config.url)
else:
raise ValueError(f"Unsupported transport type: {self.config.transport_type}")
# 创建客户端和会话
self.client = mcp.Client(transport)
self.session = await self.client.connect()
# 获取服务器信息
server_info = await self.session.get_server_info()
logger.info(f"Connected to MCP server: {server_info.name}")
async def get_available_tools(self) -> list[MCPTool]:
"""获取服务器提供的所有工具"""
if not self.session:
raise RuntimeError("MCP server not connected")
# 列出可用工具
tools_response = await self.session.list_tools()
mcp_tools = []
for tool_info in tools_response.tools:
mcp_tool = MCPTool(
server=self,
tool_name=tool_info.name,
tool_schema=tool_info.inputSchema,
description=tool_info.description or f"MCP tool: {tool_info.name}",
)
mcp_tools.append(mcp_tool)
return mcp_tools
async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
"""调用MCP工具"""
if not self.session:
raise RuntimeError("MCP server not connected")
# 调用工具
result = await self.session.call_tool(name, arguments)
return result
async def cleanup(self) -> None:
"""清理连接资源"""
if self.session:
await self.session.close()
if self.client:
await self.client.close()
@dataclass
class MCPServerConfig:
"""MCP服务器配置"""
name: str # 服务器名称
transport_type: Literal["stdio", "sse"] # 传输类型
command: str | None = None # 命令(stdio传输)
args: list[str] | None = None # 命令参数
env: dict[str, str] | None = None # 环境变量
url: str | None = None # URL(SSE传输)
6.7 工具错误处理和最佳实践
6.7.1 工具错误处理策略
# 位于 src/agents/tool.py
def default_tool_error_function(
error: Exception,
tool_context: ToolContext[Any],
arguments: dict[str, Any],
) -> str:
"""
默认的工具错误处理函数
提供统一的错误处理和日志记录
"""
tool_name = tool_context.tool_name
error_msg = str(error)
# 根据错误类型提供不同的处理
if isinstance(error, json.JSONDecodeError):
return f"Invalid JSON arguments for tool '{tool_name}': {error_msg}"
elif isinstance(error, TypeError):
return f"Invalid arguments for tool '{tool_name}': {error_msg}"
elif isinstance(error, FileNotFoundError):
return f"Required file not found for tool '{tool_name}': {error_msg}"
elif isinstance(error, PermissionError):
return f"Permission denied for tool '{tool_name}': {error_msg}"
elif isinstance(error, TimeoutError):
return f"Tool '{tool_name}' timed out: {error_msg}"
else:
# 通用错误处理
logger.error(f"Tool '{tool_name}' execution failed", exc_info=True)
return f"Tool '{tool_name}' failed: {error_msg}"
def tool_error_handler(handler_func: Callable[[Exception, ToolContext[Any], dict], str]):
"""
工具错误处理装饰器
允许为特定工具定义自定义错误处理逻辑
"""
def decorator(tool_func):
tool_func._tool_error_handler = handler_func
return tool_func
return decorator
# 使用示例
@tool_error_handler(lambda e, ctx, args: f"Custom error handling: {e}")
@function_tool
def risky_operation(value: int) -> int:
"""可能出错的操作"""
if value < 0:
raise ValueError("Value must be non-negative")
return value * 2
这个工具系统为OpenAI Agents SDK提供了强大而灵活的扩展能力,支持从简单的Python函数到复杂的外部服务集成,使代理能够与真实世界进行交互。