OpenAI Agents Python SDK 源码剖析
目录
框架使用手册
1. 快速开始
OpenAI Agents SDK 是一个用于构建AI智能体的Python框架,支持工具调用、多智能体协作、实时语音交互等功能。
安装
pip install openai-agents
基本使用
from openai_agents import Agent, Tool
# 创建智能体
agent = Agent(
name="assistant",
model="gpt-4",
instructions="你是一个有用的助手"
)
# 定义工具
@Tool
def get_weather(location: str) -> str:
"""获取指定地点的天气信息"""
return f"{location}的天气是晴天"
# 添加工具到智能体
agent.add_tool(get_weather)
# 运行对话
response = agent.run("北京的天气怎么样?")
print(response)
2. 核心概念
Agent(智能体)
智能体是框架的核心概念,代表一个具有特定能力和职责的AI实体。
class Agent:
def __init__(
self,
name: str,
model: str = "gpt-4",
instructions: str = "",
tools: List[Tool] = None,
temperature: float = 0.7,
max_tokens: int = 1000
):
self.name = name
self.model = model
self.instructions = instructions
self.tools = tools or []
self.temperature = temperature
self.max_tokens = max_tokens
Tool(工具)
工具是智能体可以调用的函数,用于执行特定任务。
from typing import Callable, Any
import inspect
class Tool:
def __init__(self, func: Callable):
self.func = func
self.name = func.__name__
self.description = func.__doc__ or ""
self.parameters = self._extract_parameters()
def _extract_parameters(self) -> dict:
"""提取函数参数信息"""
sig = inspect.signature(self.func)
parameters = {}
for name, param in sig.parameters.items():
param_info = {
"type": str(param.annotation) if param.annotation != param.empty else "str",
"required": param.default == param.empty
}
parameters[name] = param_info
return parameters
def call(self, **kwargs) -> Any:
"""调用工具函数"""
return self.func(**kwargs)
Conversation(对话)
对话管理智能体与用户或其他智能体之间的交互。
class Conversation:
def __init__(self, agent: Agent):
self.agent = agent
self.messages = []
self.context = {}
def add_message(self, role: str, content: str):
"""添加消息到对话历史"""
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now()
})
def get_response(self, user_input: str) -> str:
"""获取智能体响应"""
self.add_message("user", user_input)
# 构建提示
prompt = self._build_prompt()
# 调用模型
response = self.agent.generate_response(prompt)
self.add_message("assistant", response)
return response
3. 高级功能
多智能体协作
from openai_agents import MultiAgentSystem
# 创建多智能体系统
system = MultiAgentSystem()
# 添加智能体
researcher = Agent(
name="researcher",
instructions="你负责研究和收集信息"
)
writer = Agent(
name="writer",
instructions="你负责根据研究结果写作文章"
)
system.add_agent(researcher)
system.add_agent(writer)
# 定义工作流
workflow = [
{"agent": "researcher", "task": "研究AI发展趋势"},
{"agent": "writer", "task": "写一篇关于AI发展趋势的文章"}
]
# 执行工作流
result = system.execute_workflow(workflow)
实时语音交互
from openai_agents import VoiceAgent
# 创建语音智能体
voice_agent = VoiceAgent(
name="voice_assistant",
voice_model="whisper-1",
tts_model="tts-1"
)
# 启动语音对话
voice_agent.start_voice_conversation()
整体架构分析
1. 架构概览
OpenAI Agents SDK采用模块化架构设计,主要包含以下核心组件:
graph TB
A[Client Application] --> B[Agent Manager]
B --> C[Agent Core]
B --> D[Tool System]
B --> E[Conversation Manager]
C --> F[Model Interface]
C --> G[Memory System]
C --> H[Planning Engine]
D --> I[Tool Registry]
D --> J[Tool Executor]
E --> K[Message Handler]
E --> L[Context Manager]
F --> M[OpenAI API]
F --> N[Local Models]
G --> O[Short-term Memory]
G --> P[Long-term Memory]
2. 核心模块
Agent Core(智能体核心)
负责智能体的核心逻辑,包括决策制定、任务规划和执行控制。
class AgentCore:
def __init__(self, config: AgentConfig):
self.config = config
self.model_interface = ModelInterface(config.model)
self.memory = MemorySystem(config.memory_config)
self.planner = PlanningEngine(config.planning_config)
async def process_request(self, request: AgentRequest) -> AgentResponse:
"""处理智能体请求"""
# 1. 理解请求
understanding = await self._understand_request(request)
# 2. 制定计划
plan = await self.planner.create_plan(understanding)
# 3. 执行计划
result = await self._execute_plan(plan)
# 4. 生成响应
response = await self._generate_response(result)
return response
Tool System(工具系统)
管理和执行智能体可用的工具函数。
class ToolSystem:
def __init__(self):
self.registry = ToolRegistry()
self.executor = ToolExecutor()
def register_tool(self, tool: Tool):
"""注册工具"""
self.registry.add(tool)
async def execute_tool(self, tool_name: str, parameters: dict) -> Any:
"""执行工具"""
tool = self.registry.get(tool_name)
if not tool:
raise ToolNotFoundError(f"Tool {tool_name} not found")
return await self.executor.execute(tool, parameters)
Memory System(记忆系统)
管理智能体的短期和长期记忆。
class MemorySystem:
def __init__(self, config: MemoryConfig):
self.short_term = ShortTermMemory(config.short_term_size)
self.long_term = LongTermMemory(config.long_term_config)
def store_interaction(self, interaction: Interaction):
"""存储交互记录"""
# 存储到短期记忆
self.short_term.add(interaction)
# 根据重要性决定是否存储到长期记忆
if self._is_important(interaction):
self.long_term.store(interaction)
def retrieve_relevant_memories(self, query: str) -> List[Memory]:
"""检索相关记忆"""
short_term_memories = self.short_term.search(query)
long_term_memories = self.long_term.search(query)
return self._merge_and_rank(short_term_memories, long_term_memories)
3. 数据流分析
请求处理流程
async def handle_user_request(user_input: str) -> str:
"""处理用户请求的完整流程"""
# 1. 输入预处理
processed_input = preprocess_input(user_input)
# 2. 意图识别
intent = await recognize_intent(processed_input)
# 3. 上下文检索
context = memory_system.retrieve_context(intent)
# 4. 工具选择
relevant_tools = tool_system.select_tools(intent)
# 5. 响应生成
response = await agent_core.generate_response(
input=processed_input,
intent=intent,
context=context,
tools=relevant_tools
)
# 6. 后处理
final_response = postprocess_response(response)
# 7. 记忆更新
memory_system.update_memory(user_input, final_response)
return final_response
核心API深入分析
1. Agent API
创建和配置智能体
class Agent:
def __init__(
self,
name: str,
model: str = "gpt-4",
instructions: str = "",
tools: List[Tool] = None,
functions: List[Function] = None,
temperature: float = 0.7,
max_tokens: int = 1000,
memory_config: MemoryConfig = None
):
"""
初始化智能体
Args:
name: 智能体名称
model: 使用的模型名称
instructions: 系统指令
tools: 可用工具列表
functions: 可用函数列表
temperature: 生成温度
max_tokens: 最大token数
memory_config: 记忆配置
"""
self.name = name
self.model = model
self.instructions = instructions
self.tools = tools or []
self.functions = functions or []
self.temperature = temperature
self.max_tokens = max_tokens
# 初始化核心组件
self.core = AgentCore(self._build_config())
self.conversation = ConversationManager()
def _build_config(self) -> AgentConfig:
"""构建智能体配置"""
return AgentConfig(
name=self.name,
model=self.model,
instructions=self.instructions,
temperature=self.temperature,
max_tokens=self.max_tokens,
memory_config=memory_config or MemoryConfig()
)
运行和交互
async def run(
self,
message: str,
context: dict = None,
stream: bool = False
) -> Union[str, AsyncGenerator[str, None]]:
"""
运行智能体处理消息
Args:
message: 用户消息
context: 额外上下文
stream: 是否流式返回
Returns:
响应消息或流式生成器
"""
request = AgentRequest(
message=message,
context=context or {},
stream=stream
)
if stream:
return self._stream_response(request)
else:
response = await self.core.process_request(request)
return response.content
async def _stream_response(self, request: AgentRequest) -> AsyncGenerator[str, None]:
"""流式响应生成"""
async for chunk in self.core.stream_process(request):
yield chunk.content
工具管理
def add_tool(self, tool: Union[Tool, Callable]):
"""添加工具到智能体"""
if callable(tool):
tool = Tool(tool)
self.tools.append(tool)
self.core.tool_system.register_tool(tool)
def remove_tool(self, tool_name: str):
"""移除工具"""
self.tools = [t for t in self.tools if t.name != tool_name]
self.core.tool_system.unregister_tool(tool_name)
def list_tools(self) -> List[str]:
"""列出所有可用工具"""
return [tool.name for tool in self.tools]
2. Tool API
工具定义和注册
def tool(
name: str = None,
description: str = None,
parameters: dict = None
):
"""
工具装饰器
Args:
name: 工具名称
description: 工具描述
parameters: 参数定义
"""
def decorator(func: Callable) -> Tool:
tool_name = name or func.__name__
tool_description = description or func.__doc__ or ""
return Tool(
name=tool_name,
description=tool_description,
function=func,
parameters=parameters or _extract_parameters(func)
)
return decorator
# 使用示例
@tool(
name="search_web",
description="搜索网络信息",
parameters={
"query": {"type": "string", "description": "搜索查询"},
"limit": {"type": "integer", "description": "结果数量限制", "default": 10}
}
)
def search_web(query: str, limit: int = 10) -> List[dict]:
"""搜索网络信息"""
# 实现搜索逻辑
return search_results
工具执行
class ToolExecutor:
def __init__(self):
self.execution_context = ExecutionContext()
async def execute(
self,
tool: Tool,
parameters: dict,
timeout: float = 30.0
) -> ToolResult:
"""
执行工具
Args:
tool: 要执行的工具
parameters: 执行参数
timeout: 超时时间
Returns:
工具执行结果
"""
try:
# 参数验证
validated_params = self._validate_parameters(tool, parameters)
# 执行工具
start_time = time.time()
if asyncio.iscoroutinefunction(tool.function):
result = await asyncio.wait_for(
tool.function(**validated_params),
timeout=timeout
)
else:
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: tool.function(**validated_params)
)
execution_time = time.time() - start_time
return ToolResult(
success=True,
result=result,
execution_time=execution_time,
tool_name=tool.name
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
tool_name=tool.name
)
3. Conversation API
对话管理
class ConversationManager:
def __init__(self, max_history: int = 100):
self.messages = []
self.max_history = max_history
self.context = {}
def add_message(
self,
role: str,
content: str,
metadata: dict = None
):
"""添加消息到对话历史"""
message = Message(
role=role,
content=content,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.messages.append(message)
# 保持历史记录在限制范围内
if len(self.messages) > self.max_history:
self.messages = self.messages[-self.max_history:]
def get_context(self, include_system: bool = True) -> List[dict]:
"""获取对话上下文"""
context = []
for message in self.messages:
if not include_system and message.role == "system":
continue
context.append({
"role": message.role,
"content": message.content
})
return context
def clear_history(self):
"""清空对话历史"""
self.messages = []
self.context = {}
模块化架构分析
1. 模块组织结构
openai_agents/
├── __init__.py
├── core/
│ ├── __init__.py
│ ├── agent.py # 智能体核心
│ ├── memory.py # 记忆系统
│ ├── planning.py # 规划引擎
│ └── execution.py # 执行引擎
├── tools/
│ ├── __init__.py
│ ├── base.py # 工具基类
│ ├── registry.py # 工具注册表
│ ├── executor.py # 工具执行器
│ └── builtin/ # 内置工具
│ ├── web_search.py
│ ├── file_ops.py
│ └── math_tools.py
├── conversation/
│ ├── __init__.py
│ ├── manager.py # 对话管理器
│ ├── message.py # 消息处理
│ └── context.py # 上下文管理
├── models/
│ ├── __init__.py
│ ├── interface.py # 模型接口
│ ├── openai_client.py # OpenAI客户端
│ └── local_models.py # 本地模型
├── utils/
│ ├── __init__.py
│ ├── logging.py # 日志工具
│ ├── config.py # 配置管理
│ └── validation.py # 参数验证
└── examples/
├── basic_agent.py
├── multi_agent.py
└── voice_agent.py
2. 核心模块详解
Agent Core Module
# core/agent.py
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
class BaseAgent(ABC):
"""智能体基类"""
@abstractmethod
async def process_message(self, message: str) -> str:
"""处理消息的抽象方法"""
pass
@abstractmethod
def add_tool(self, tool: 'Tool'):
"""添加工具的抽象方法"""
pass
class Agent(BaseAgent):
"""具体智能体实现"""
def __init__(self, config: AgentConfig):
self.config = config
self.memory = MemorySystem(config.memory_config)
self.planner = PlanningEngine(config.planning_config)
self.executor = ExecutionEngine(config.execution_config)
self.model_interface = ModelInterface(config.model_config)
async def process_message(self, message: str) -> str:
"""处理用户消息"""
# 1. 消息理解
understanding = await self._understand_message(message)
# 2. 检索相关记忆
relevant_memories = self.memory.retrieve_relevant(understanding)
# 3. 制定执行计划
plan = await self.planner.create_plan(understanding, relevant_memories)
# 4. 执行计划
execution_result = await self.executor.execute_plan(plan)
# 5. 生成响应
response = await self._generate_response(execution_result)
# 6. 更新记忆
self.memory.store_interaction(message, response)
return response
Memory System Module
# core/memory.py
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Memory:
content: str
timestamp: datetime
importance: float
tags: List[str]
metadata: Dict[str, Any]
class MemorySystem:
def __init__(self, config: MemoryConfig):
self.short_term = ShortTermMemory(config.short_term_size)
self.long_term = LongTermMemory(config.long_term_config)
self.working_memory = WorkingMemory(config.working_memory_size)
def store_interaction(self, input_msg: str, output_msg: str):
"""存储交互记录"""
interaction = Interaction(
input=input_msg,
output=output_msg,
timestamp=datetime.now()
)
# 存储到短期记忆
self.short_term.add(interaction)
# 根据重要性决定是否存储到长期记忆
importance = self._calculate_importance(interaction)
if importance > self.config.long_term_threshold:
self.long_term.store(interaction, importance)
def retrieve_relevant(self, query: str, limit: int = 5) -> List[Memory]:
"""检索相关记忆"""
# 从不同记忆系统检索
short_term_results = self.short_term.search(query, limit)
long_term_results = self.long_term.search(query, limit)
working_memory_results = self.working_memory.search(query, limit)
# 合并和排序结果
all_results = short_term_results + long_term_results + working_memory_results
return self._rank_memories(all_results, query)[:limit]
Tool System Module
# tools/base.py
from typing import Callable, Dict, Any, List
from abc import ABC, abstractmethod
class BaseTool(ABC):
"""工具基类"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""执行工具的抽象方法"""
pass
@abstractmethod
def get_parameters(self) -> Dict[str, Any]:
"""获取工具参数定义"""
pass
class FunctionTool(BaseTool):
"""基于函数的工具"""
def __init__(self, func: Callable, name: str = None, description: str = None):
self.function = func
name = name or func.__name__
description = description or func.__doc__ or ""
super().__init__(name, description)
self.parameters = self._extract_parameters()
async def execute(self, **kwargs) -> Any:
"""执行函数工具"""
if asyncio.iscoroutinefunction(self.function):
return await self.function(**kwargs)
else:
return self.function(**kwargs)
def get_parameters(self) -> Dict[str, Any]:
"""获取函数参数定义"""
return self.parameters
3. 模块间通信
事件系统
# utils/events.py
from typing import Callable, Dict, List
from enum import Enum
class EventType(Enum):
AGENT_STARTED = "agent_started"
MESSAGE_RECEIVED = "message_received"
TOOL_EXECUTED = "tool_executed"
MEMORY_UPDATED = "memory_updated"
ERROR_OCCURRED = "error_occurred"
class EventBus:
def __init__(self):
self.listeners: Dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, callback: Callable):
"""订阅事件"""
if event_type not in self.listeners:
self.listeners[event_type] = []
self.listeners[event_type].append(callback)
def publish(self, event_type: EventType, data: Any):
"""发布事件"""
if event_type in self.listeners:
for callback in self.listeners[event_type]:
try:
callback(data)
except Exception as e:
print(f"Event callback error: {e}")
# 全局事件总线
event_bus = EventBus()
依赖注入
# utils/di.py
from typing import Type, Any, Dict, Callable
class Container:
def __init__(self):
self._services: Dict[str, Any] = {}
self._factories: Dict[str, Callable] = {}
def register(self, name: str, service: Any):
"""注册服务实例"""
self._services[name] = service
def register_factory(self, name: str, factory: Callable):
"""注册服务工厂"""
self._factories[name] = factory
def get(self, name: str) -> Any:
"""获取服务"""
if name in self._services:
return self._services[name]
elif name in self._factories:
service = self._factories[name]()
self._services[name] = service
return service
else:
raise ValueError(f"Service {name} not found")
# 全局容器
container = Container()
关键函数和继承关系
1. 类继承体系
# 智能体继承体系
class BaseAgent(ABC):
"""智能体基类"""
pass
class Agent(BaseAgent):
"""标准智能体"""
pass
class VoiceAgent(Agent):
"""语音智能体"""
pass
class MultiAgent(BaseAgent):
"""多智能体系统"""
pass
# 工具继承体系
class BaseTool(ABC):
"""工具基类"""
pass
class FunctionTool(BaseTool):
"""函数工具"""
pass
class APITool(BaseTool):
"""API工具"""
pass
class DatabaseTool(BaseTool):
"""数据库工具"""
pass
# 记忆继承体系
class BaseMemory(ABC):
"""记忆基类"""
pass
class ShortTermMemory(BaseMemory):
"""短期记忆"""
pass
class LongTermMemory(BaseMemory):
"""长期记忆"""
pass
class WorkingMemory(BaseMemory):
"""工作记忆"""
pass
2. 关键函数分析
Agent.run() 函数
async def run(
self,
message: str,
context: Dict[str, Any] = None,
stream: bool = False,
tools: List[str] = None
) -> Union[str, AsyncGenerator[str, None]]:
"""
智能体运行函数 - 核心入口点
处理流程:
1. 输入验证和预处理
2. 上下文构建
3. 工具选择和准备
4. 模型调用
5. 响应后处理
6. 记忆更新
Args:
message: 用户输入消息
context: 额外上下文信息
stream: 是否流式返回
tools: 指定可用工具列表
Returns:
响应字符串或流式生成器
"""
# 1. 输入验证
if not message or not message.strip():
raise ValueError("Message cannot be empty")
# 2. 构建请求上下文
request_context = self._build_context(message, context)
# 3. 选择可用工具
available_tools = self._select_tools(tools)
# 4. 构建系统提示
system_prompt = self._build_system_prompt(available_tools)
# 5. 准备消息历史
messages = self._prepare_messages(system_prompt, message, request_context)
# 6. 调用模型
if stream:
return self._stream_generate(messages, available_tools)
else:
response = await self._generate_response(messages, available_tools)
# 7. 更新记忆
await self._update_memory(message, response, request_context)
return response
async def _generate_response(
self,
messages: List[Dict],
tools: List[Tool]
) -> str:
"""生成响应的核心逻辑"""
max_iterations = 10 # 防止无限循环
iteration = 0
while iteration < max_iterations:
# 调用模型
response = await self.model_interface.chat_completion(
messages=messages,
tools=[tool.to_openai_format() for tool in tools],
temperature=self.temperature,
max_tokens=self.max_tokens
)
# 检查是否需要调用工具
if response.tool_calls:
# 执行工具调用
tool_results = await self._execute_tools(response.tool_calls)
# 将工具结果添加到消息历史
messages.append({
"role": "assistant",
"content": response.content,
"tool_calls": response.tool_calls
})
for tool_result in tool_results:
messages.append({
"role": "tool",
"content": tool_result.content,
"tool_call_id": tool_result.tool_call_id
})
iteration += 1
continue
else:
# 没有工具调用,返回最终响应
return response.content
raise RuntimeError("Max iterations reached in tool calling loop")
Tool.execute() 函数
async def execute(
self,
parameters: Dict[str, Any],
context: ExecutionContext = None
) -> ToolResult:
"""
工具执行函数
Args:
parameters: 执行参数
context: 执行上下文
Returns:
工具执行结果
"""
try:
# 1. 参数验证
validated_params = self._validate_parameters(parameters)
# 2. 权限检查
if not self._check_permissions(context):
raise PermissionError(f"Permission denied for tool {self.name}")
# 3. 执行前钩子
await self._before_execute(validated_params, context)
# 4. 执行工具函数
start_time = time.time()
if asyncio.iscoroutinefunction(self.function):
result = await self.function(**validated_params)
else:
# 在线程池中执行同步函数
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: self.function(**validated_params)
)
execution_time = time.time() - start_time
# 5. 执行后钩子
await self._after_execute(result, execution_time, context)
# 6. 构建结果
return ToolResult(
success=True,
result=result,
execution_time=execution_time,
tool_name=self.name,
metadata={
"parameters": validated_params,
"context": context.to_dict() if context else {}
}
)
except Exception as e:
# 错误处理
await self._handle_error(e, parameters, context)
return ToolResult(
success=False,
error=str(e),
error_type=type(e).__name__,
tool_name=self.name,
metadata={
"parameters": parameters,
"context": context.to_dict() if context else {}
}
)
Memory.retrieve() 函数
def retrieve_relevant(
self,
query: str,
limit: int = 5,
similarity_threshold: float = 0.7,
time_decay: bool = True
) -> List[Memory]:
"""
检索相关记忆
Args:
query: 查询字符串
limit: 返回结果数量限制
similarity_threshold: 相似度阈值
time_decay: 是否应用时间衰减
Returns:
相关记忆列表
"""
# 1. 查询向量化
query_embedding = self.embedding_model.encode(query)
# 2. 从各个记忆系统检索
candidates = []
# 短期记忆检索
short_term_memories = self.short_term.search(
query_embedding,
limit * 2, # 获取更多候选
similarity_threshold
)
candidates.extend(short_term_memories)
# 长期记忆检索
long_term_memories = self.long_term.search(
query_embedding,
limit * 2,
similarity_threshold
)
candidates.extend(long_term_memories)
# 工作记忆检索
working_memories = self.working_memory.get_current_context()
candidates.extend(working_memories)
# 3. 计算相关性分数
scored_memories = []
for memory in candidates:
# 语义相似度
semantic_score = self._calculate_similarity(
query_embedding,
memory.embedding
)
# 重要性分数
importance_score = memory.importance
# 时间衰减
time_score = 1.0
if time_decay:
time_score = self._calculate_time_decay(memory.timestamp)
# 综合分数
final_score = (
semantic_score * 0.5 +
importance_score * 0.3 +
time_score * 0.2
)
if final_score >= similarity_threshold:
scored_memories.append((memory, final_score))
# 4. 排序和去重
scored_memories.sort(key=lambda x: x[1], reverse=True)
# 5. 返回top-k结果
return [memory for memory, score in scored_memories[:limit]]
3. 设计模式应用
策略模式 - 模型接口
class ModelStrategy(ABC):
"""模型策略接口"""
@abstractmethod
async def generate_response(self, messages: List[Dict]) -> str:
pass
class OpenAIStrategy(ModelStrategy):
"""OpenAI模型策略"""
async def generate_response(self, messages: List[Dict]) -> str:
# OpenAI API调用逻辑
pass
class LocalModelStrategy(ModelStrategy):
"""本地模型策略"""
async def generate_response(self, messages: List[Dict]) -> str:
# 本地模型推理逻辑
pass
class ModelInterface:
def __init__(self, strategy: ModelStrategy):
self.strategy = strategy
async def generate_response(self, messages: List[Dict]) -> str:
return await self.strategy.generate_response(messages)
观察者模式 - 事件系统
class Observable:
def __init__(self):
self._observers = []
def attach(self, observer):
self._observers.append(observer)
def detach(self, observer):
self._observers.remove(observer)
def notify(self, event):
for observer in self._observers:
observer.update(event)
class Agent(Observable):
def process_message(self, message):
# 处理消息
result = self._internal_process(message)
# 通知观察者
self.notify(MessageProcessedEvent(message, result))
return result
工厂模式 - 智能体创建
class AgentFactory:
@staticmethod
def create_agent(agent_type: str, config: Dict) -> BaseAgent:
if agent_type == "standard":
return Agent(config)
elif agent_type == "voice":
return VoiceAgent(config)
elif agent_type == "multi":
return MultiAgent(config)
else:
raise ValueError(f"Unknown agent type: {agent_type}")
# 使用工厂创建智能体
agent = AgentFactory.create_agent("voice", voice_config)
实战经验总结
1. 最佳实践
智能体设计原则
# 1. 单一职责原则
class ResearchAgent(Agent):
"""专门负责研究任务的智能体"""
def __init__(self):
super().__init__(
name="researcher",
instructions="你是一个专业的研究员,擅长收集和分析信息"
)
# 只添加研究相关的工具
self.add_tool(web_search_tool)
self.add_tool(paper_search_tool)
self.add_tool(data_analysis_tool)
# 2. 明确的接口定义
class AgentInterface(ABC):
@abstractmethod
async def process_task(self, task: Task) -> TaskResult:
pass
@abstractmethod
def get_capabilities(self) -> List[str]:
pass
# 3. 错误处理和恢复
class RobustAgent(Agent):
async def process_message(self, message: str) -> str:
try:
return await super().process_message(message)
except ToolExecutionError as e:
# 工具执行失败,尝试替代方案
return await self._handle_tool_failure(e)
except ModelError as e:
# 模型调用失败,使用备用模型
return await self._fallback_to_backup_model(message)
except Exception as e:
# 其他错误,返回友好的错误信息
return f"抱歉,处理您的请求时遇到了问题:{str(e)}"
工具开发指南
# 1. 工具应该是幂等的
@tool
def get_user_info(user_id: str) -> dict:
"""获取用户信息 - 幂等操作"""
return database.get_user(user_id)
# 2. 工具应该有清晰的错误处理
@tool
def send_email(to: str, subject: str, body: str) -> bool:
"""发送邮件"""
try:
email_service.send(to, subject, body)
return True
except EmailServiceError as e:
logger.error(f"Failed to send email: {e}")
return False
# 3. 工具应该有适当的权限控制
@tool(permissions=["file_read"])
def read_file(file_path: str) -> str:
"""读取文件内容"""
if not security.check_file_access(file_path):
raise PermissionError("Access denied")
with open(file_path, 'r') as f:
return f.read()
# 4. 工具应该有详细的文档
@tool(
description="搜索网络信息并返回相关结果",
parameters={
"query": {
"type": "string",
"description": "搜索查询字符串",
"required": True
},
"limit": {
"type": "integer",
"description": "返回结果的最大数量",
"default": 10,
"minimum": 1,
"maximum": 50
}
}
)
def web_search(query: str, limit: int = 10) -> List[dict]:
"""
搜索网络信息
Args:
query: 搜索查询字符串
limit: 返回结果数量限制
Returns:
搜索结果列表,每个结果包含title、url、snippet等字段
Raises:
SearchServiceError: 搜索服务不可用时抛出
ValidationError: 参数验证失败时抛出
"""
# 实现搜索逻辑
pass
2. 性能优化
异步处理优化
# 1. 并行工具执行
async def execute_tools_parallel(self, tool_calls: List[ToolCall]) -> List[ToolResult]:
"""并行执行多个工具调用"""
tasks = []
for tool_call in tool_calls:
tool = self.get_tool(tool_call.name)
task = asyncio.create_task(
tool.execute(tool_call.parameters)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
tool_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
tool_results.append(ToolResult(
success=False,
error=str(result),
tool_name=tool_calls[i].name
))
else:
tool_results.append(result)
return tool_results
# 2. 记忆检索优化
class OptimizedMemorySystem:
def __init__(self):
self.embedding_cache = {} # 嵌入向量缓存
self.similarity_cache = {} # 相似度计算缓存
async def retrieve_relevant(self, query: str) -> List[Memory]:
# 检查缓存
cache_key = hashlib.md5(query.encode()).hexdigest()
if cache_key in self.similarity_cache:
return self.similarity_cache[cache_key]
# 异步计算嵌入向量
query_embedding = await self._get_embedding_async(query)
# 并行搜索多个记忆系统
search_tasks = [
self.short_term.search_async(query_embedding),
self.long_term.search_async(query_embedding),
self.working_memory.search_async(query_embedding)
]
results = await asyncio.gather(*search_tasks)
# 合并和排序结果
all_memories = []
for result_list in results:
all_memories.extend(result_list)
ranked_memories = self._rank_memories(all_memories, query_embedding)
# 缓存结果
self.similarity_cache[cache_key] = ranked_memories
return ranked_memories
# 3. 流式响应优化
async def stream_response(self, message: str) -> AsyncGenerator[str, None]:
"""流式生成响应"""
# 快速返回初始响应
yield "正在思考..."
# 异步处理复杂逻辑
context_task = asyncio.create_task(self._build_context(message))
tools_task = asyncio.create_task(self._select_tools(message))
# 等待必要的准备工作完成
context, tools = await asyncio.gather(context_task, tools_task)
# 流式生成最终响应
async for chunk in self.model_interface.stream_generate(
messages=self._prepare_messages(message, context),
tools=tools
):
yield chunk
内存管理优化
# 1. 智能记忆清理
class MemoryManager:
def __init__(self, max_memory_size: int = 1000):
self.max_memory_size = max_memory_size
self.memory_usage = 0
async def cleanup_memory(self):
"""智能清理记忆"""
if self.memory_usage < self.max_memory_size * 0.8:
return
# 按重要性和时间排序
all_memories = self.get_all_memories()
sorted_memories = sorted(
all_memories,
key=lambda m: (m.importance, m.timestamp),
reverse=True
)
# 保留重要记忆,清理不重要的
keep_count = int(self.max_memory_size * 0.6)
memories_to_keep = sorted_memories[:keep_count]
memories_to_remove = sorted_memories[keep_count:]
# 将部分记忆压缩存储
compressed_memories = self._compress_memories(memories_to_remove)
# 更新记忆系统
self._update_memory_storage(memories_to_keep, compressed_memories)
# 2. 连接池管理
class ModelConnectionPool:
def __init__(self, max_connections: int = 10):
self.max_connections = max_connections
self.connections = asyncio.Queue(maxsize=max_connections)
self._initialize_connections()
async def _initialize_connections(self):
"""初始化连接池"""
for _ in range(self.max_connections):
connection = await self._create_connection()
await self.connections.put(connection)
async def get_connection(self):
"""获取连接"""
return await self.connections.get()
async def return_connection(self, connection):
"""归还连接"""
await self.connections.put(connection)
async def execute_with_connection(self, func, *args, **kwargs):
"""使用连接执行函数"""
connection = await self.get_connection()
try:
return await func(connection, *args, **kwargs)
finally:
await self.return_connection(connection)
3. 错误处理和调试
全面的错误处理
# 1. 自定义异常类型
class AgentError(Exception):
"""智能体基础异常"""
pass
class ToolExecutionError(AgentError):
"""工具执行异常"""
def __init__(self, tool_name: str, error_message: str):
self.tool_name = tool_name
self.error_message = error_message
super().__init__(f"Tool {tool_name} execution failed: {error_message}")
class ModelError(AgentError):
"""模型调用异常"""
pass
class MemoryError(AgentError):
"""记忆系统异常"""
pass
# 2. 错误恢复机制
class ErrorRecoveryAgent(Agent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.error_history = []
self.recovery_strategies = {
ToolExecutionError: self._recover_from_tool_error,
ModelError: self._recover_from_model_error,
MemoryError: self._recover_from_memory_error
}
async def process_message(self, message: str) -> str:
try:
return await super().process_message(message)
except Exception as e:
return await self._handle_error(e, message)
async def _handle_error(self, error: Exception, message: str) -> str:
"""统一错误处理"""
# 记录错误
self.error_history.append({
'error': error,
'message': message,
'timestamp': datetime.now()
})
# 尝试恢复
error_type = type(error)
if error_type in self.recovery_strategies:
try:
return await self.recovery_strategies[error_type](error, message)
except Exception as recovery_error:
logger.error(f"Recovery failed: {recovery_error}")
# 返回友好的错误信息
return self._generate_user_friendly_error(error)
# 3. 调试和监控
class DebuggableAgent(Agent):
def __init__(self, *args, debug_mode: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.debug_mode = debug_mode
self.execution_trace = []
async def process_message(self, message: str) -> str:
if self.debug_mode:
trace_id = str(uuid.uuid4())
self._start_trace(trace_id, message)
try:
result = await super().process_message(message)
if self.debug_mode:
self._end_trace(trace_id, result)
return result
except Exception as e:
if self.debug_mode:
self._trace_error(trace_id, e)
raise
def _start_trace(self, trace_id: str, message: str):
"""开始执行跟踪"""
self.execution_trace.append({
'trace_id': trace_id,
'start_time': datetime.now(),
'message': message,
'steps': []
})
def get_debug_info(self) -> dict:
"""获取调试信息"""
return {
'execution_trace': self.execution_trace,
'memory_usage': self._get_memory_usage(),
'tool_usage_stats': self._get_tool_stats(),
'error_history': getattr(self, 'error_history', [])
}
4. 部署和运维
生产环境配置
# 1. 配置管理
class ProductionConfig:
def __init__(self):
self.model_config = {
'primary_model': 'gpt-4',
'fallback_model': 'gpt-3.5-turbo',
'max_retries': 3,
'timeout': 30.0
}
self.memory_config = {
'short_term_size': 100,
'long_term_threshold': 0.8,
'cleanup_interval': 3600 # 1小时
}
self.security_config = {
'enable_tool_permissions': True,
'max_tool_execution_time': 60.0,
'allowed_domains': ['api.example.com']
}
# 2. 健康检查
class HealthChecker:
def __init__(self, agent: Agent):
self.agent = agent
async def check_health(self) -> dict:
"""检查智能体健康状态"""
health_status = {
'status': 'healthy',
'checks': {}
}
# 检查模型连接
try:
await self.agent.model_interface.health_check()
health_status['checks']['model'] = 'ok'
except Exception as e:
health_status['checks']['model'] = f'error: {e}'
health_status['status'] = 'unhealthy'
# 检查记忆系统
try:
self.agent.memory.health_check()
health_status['checks']['memory'] = 'ok'
except Exception as e:
health_status['checks']['memory'] = f'error: {e}'
health_status['status'] = 'unhealthy'
# 检查工具系统
try:
await self.agent.tool_system.health_check()
health_status['checks']['tools'] = 'ok'
except Exception as e:
health_status['checks']['tools'] = f'error: {e}'
health_status['status'] = 'unhealthy'
return health_status
# 3. 监控指标
class MetricsCollector:
def __init__(self):
self.metrics = {
'requests_total': 0,
'requests_success': 0,
'requests_error': 0,
'response_time_avg': 0.0,
'tool_calls_total': 0,
'memory_usage': 0
}
def record_request(self, success: bool, response_time: float):
"""记录请求指标"""
self.metrics['requests_total'] += 1
if success:
self.metrics['requests_success'] += 1
else:
self.metrics['requests_error'] += 1
# 更新平均响应时间
total_requests = self.metrics['requests_total']
current_avg = self.metrics['response_time_avg']
self.metrics['response_time_avg'] = (
(current_avg * (total_requests - 1) + response_time) / total_requests
)
def get_metrics(self) -> dict:
"""获取当前指标"""
return self.metrics.copy()
这个源码剖析涵盖了OpenAI Agents SDK的主要架构、核心API、模块设计和实战经验。通过深入分析代码结构和实现细节,可以更好地理解如何构建高质量的AI智能体应用。