OpenAI Agents Python SDK 源码剖析

目录

  1. 框架使用手册
  2. 整体架构分析
  3. 核心API深入分析
  4. 模块化架构分析
  5. 关键函数和继承关系
  6. 实战经验总结

框架使用手册

1. 快速开始

OpenAI Agents SDK 是一个用于构建AI智能体的Python框架,支持工具调用、多智能体协作、实时语音交互等功能。

安装

pip install openai-agents

基本使用

from openai_agents import Agent, Tool

# 创建智能体
agent = Agent(
    name="assistant",
    model="gpt-4",
    instructions="你是一个有用的助手"
)

# 定义工具
@Tool
def get_weather(location: str) -> str:
    """获取指定地点的天气信息"""
    return f"{location}的天气是晴天"

# 添加工具到智能体
agent.add_tool(get_weather)

# 运行对话
response = agent.run("北京的天气怎么样?")
print(response)

2. 核心概念

Agent(智能体)

智能体是框架的核心概念,代表一个具有特定能力和职责的AI实体。

class Agent:
    def __init__(
        self,
        name: str,
        model: str = "gpt-4",
        instructions: str = "",
        tools: List[Tool] = None,
        temperature: float = 0.7,
        max_tokens: int = 1000
    ):
        self.name = name
        self.model = model
        self.instructions = instructions
        self.tools = tools or []
        self.temperature = temperature
        self.max_tokens = max_tokens

Tool(工具)

工具是智能体可以调用的函数,用于执行特定任务。

from typing import Callable, Any
import inspect

class Tool:
    def __init__(self, func: Callable):
        self.func = func
        self.name = func.__name__
        self.description = func.__doc__ or ""
        self.parameters = self._extract_parameters()
    
    def _extract_parameters(self) -> dict:
        """提取函数参数信息"""
        sig = inspect.signature(self.func)
        parameters = {}
        
        for name, param in sig.parameters.items():
            param_info = {
                "type": str(param.annotation) if param.annotation != param.empty else "str",
                "required": param.default == param.empty
            }
            parameters[name] = param_info
        
        return parameters
    
    def call(self, **kwargs) -> Any:
        """调用工具函数"""
        return self.func(**kwargs)

Conversation(对话)

对话管理智能体与用户或其他智能体之间的交互。

class Conversation:
    def __init__(self, agent: Agent):
        self.agent = agent
        self.messages = []
        self.context = {}
    
    def add_message(self, role: str, content: str):
        """添加消息到对话历史"""
        self.messages.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now()
        })
    
    def get_response(self, user_input: str) -> str:
        """获取智能体响应"""
        self.add_message("user", user_input)
        
        # 构建提示
        prompt = self._build_prompt()
        
        # 调用模型
        response = self.agent.generate_response(prompt)
        
        self.add_message("assistant", response)
        return response

3. 高级功能

多智能体协作

from openai_agents import MultiAgentSystem

# 创建多智能体系统
system = MultiAgentSystem()

# 添加智能体
researcher = Agent(
    name="researcher",
    instructions="你负责研究和收集信息"
)

writer = Agent(
    name="writer", 
    instructions="你负责根据研究结果写作文章"
)

system.add_agent(researcher)
system.add_agent(writer)

# 定义工作流
workflow = [
    {"agent": "researcher", "task": "研究AI发展趋势"},
    {"agent": "writer", "task": "写一篇关于AI发展趋势的文章"}
]

# 执行工作流
result = system.execute_workflow(workflow)

实时语音交互

from openai_agents import VoiceAgent

# 创建语音智能体
voice_agent = VoiceAgent(
    name="voice_assistant",
    voice_model="whisper-1",
    tts_model="tts-1"
)

# 启动语音对话
voice_agent.start_voice_conversation()

整体架构分析

1. 架构概览

OpenAI Agents SDK采用模块化架构设计,主要包含以下核心组件:

graph TB
    A[Client Application] --> B[Agent Manager]
    B --> C[Agent Core]
    B --> D[Tool System]
    B --> E[Conversation Manager]
    
    C --> F[Model Interface]
    C --> G[Memory System]
    C --> H[Planning Engine]
    
    D --> I[Tool Registry]
    D --> J[Tool Executor]
    
    E --> K[Message Handler]
    E --> L[Context Manager]
    
    F --> M[OpenAI API]
    F --> N[Local Models]
    
    G --> O[Short-term Memory]
    G --> P[Long-term Memory]

2. 核心模块

Agent Core(智能体核心)

负责智能体的核心逻辑,包括决策制定、任务规划和执行控制。

class AgentCore:
    def __init__(self, config: AgentConfig):
        self.config = config
        self.model_interface = ModelInterface(config.model)
        self.memory = MemorySystem(config.memory_config)
        self.planner = PlanningEngine(config.planning_config)
        
    async def process_request(self, request: AgentRequest) -> AgentResponse:
        """处理智能体请求"""
        # 1. 理解请求
        understanding = await self._understand_request(request)
        
        # 2. 制定计划
        plan = await self.planner.create_plan(understanding)
        
        # 3. 执行计划
        result = await self._execute_plan(plan)
        
        # 4. 生成响应
        response = await self._generate_response(result)
        
        return response

Tool System(工具系统)

管理和执行智能体可用的工具函数。

class ToolSystem:
    def __init__(self):
        self.registry = ToolRegistry()
        self.executor = ToolExecutor()
        
    def register_tool(self, tool: Tool):
        """注册工具"""
        self.registry.add(tool)
        
    async def execute_tool(self, tool_name: str, parameters: dict) -> Any:
        """执行工具"""
        tool = self.registry.get(tool_name)
        if not tool:
            raise ToolNotFoundError(f"Tool {tool_name} not found")
            
        return await self.executor.execute(tool, parameters)

Memory System(记忆系统)

管理智能体的短期和长期记忆。

class MemorySystem:
    def __init__(self, config: MemoryConfig):
        self.short_term = ShortTermMemory(config.short_term_size)
        self.long_term = LongTermMemory(config.long_term_config)
        
    def store_interaction(self, interaction: Interaction):
        """存储交互记录"""
        # 存储到短期记忆
        self.short_term.add(interaction)
        
        # 根据重要性决定是否存储到长期记忆
        if self._is_important(interaction):
            self.long_term.store(interaction)
            
    def retrieve_relevant_memories(self, query: str) -> List[Memory]:
        """检索相关记忆"""
        short_term_memories = self.short_term.search(query)
        long_term_memories = self.long_term.search(query)
        
        return self._merge_and_rank(short_term_memories, long_term_memories)

3. 数据流分析

请求处理流程

async def handle_user_request(user_input: str) -> str:
    """处理用户请求的完整流程"""
    
    # 1. 输入预处理
    processed_input = preprocess_input(user_input)
    
    # 2. 意图识别
    intent = await recognize_intent(processed_input)
    
    # 3. 上下文检索
    context = memory_system.retrieve_context(intent)
    
    # 4. 工具选择
    relevant_tools = tool_system.select_tools(intent)
    
    # 5. 响应生成
    response = await agent_core.generate_response(
        input=processed_input,
        intent=intent,
        context=context,
        tools=relevant_tools
    )
    
    # 6. 后处理
    final_response = postprocess_response(response)
    
    # 7. 记忆更新
    memory_system.update_memory(user_input, final_response)
    
    return final_response

核心API深入分析

1. Agent API

创建和配置智能体

class Agent:
    def __init__(
        self,
        name: str,
        model: str = "gpt-4",
        instructions: str = "",
        tools: List[Tool] = None,
        functions: List[Function] = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        memory_config: MemoryConfig = None
    ):
        """
        初始化智能体
        
        Args:
            name: 智能体名称
            model: 使用的模型名称
            instructions: 系统指令
            tools: 可用工具列表
            functions: 可用函数列表
            temperature: 生成温度
            max_tokens: 最大token数
            memory_config: 记忆配置
        """
        self.name = name
        self.model = model
        self.instructions = instructions
        self.tools = tools or []
        self.functions = functions or []
        self.temperature = temperature
        self.max_tokens = max_tokens
        
        # 初始化核心组件
        self.core = AgentCore(self._build_config())
        self.conversation = ConversationManager()
        
    def _build_config(self) -> AgentConfig:
        """构建智能体配置"""
        return AgentConfig(
            name=self.name,
            model=self.model,
            instructions=self.instructions,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            memory_config=memory_config or MemoryConfig()
        )

运行和交互

    async def run(
        self,
        message: str,
        context: dict = None,
        stream: bool = False
    ) -> Union[str, AsyncGenerator[str, None]]:
        """
        运行智能体处理消息
        
        Args:
            message: 用户消息
            context: 额外上下文
            stream: 是否流式返回
            
        Returns:
            响应消息或流式生成器
        """
        request = AgentRequest(
            message=message,
            context=context or {},
            stream=stream
        )
        
        if stream:
            return self._stream_response(request)
        else:
            response = await self.core.process_request(request)
            return response.content
    
    async def _stream_response(self, request: AgentRequest) -> AsyncGenerator[str, None]:
        """流式响应生成"""
        async for chunk in self.core.stream_process(request):
            yield chunk.content

工具管理

    def add_tool(self, tool: Union[Tool, Callable]):
        """添加工具到智能体"""
        if callable(tool):
            tool = Tool(tool)
        
        self.tools.append(tool)
        self.core.tool_system.register_tool(tool)
        
    def remove_tool(self, tool_name: str):
        """移除工具"""
        self.tools = [t for t in self.tools if t.name != tool_name]
        self.core.tool_system.unregister_tool(tool_name)
        
    def list_tools(self) -> List[str]:
        """列出所有可用工具"""
        return [tool.name for tool in self.tools]

2. Tool API

工具定义和注册

def tool(
    name: str = None,
    description: str = None,
    parameters: dict = None
):
    """
    工具装饰器
    
    Args:
        name: 工具名称
        description: 工具描述
        parameters: 参数定义
    """
    def decorator(func: Callable) -> Tool:
        tool_name = name or func.__name__
        tool_description = description or func.__doc__ or ""
        
        return Tool(
            name=tool_name,
            description=tool_description,
            function=func,
            parameters=parameters or _extract_parameters(func)
        )
    
    return decorator

# 使用示例
@tool(
    name="search_web",
    description="搜索网络信息",
    parameters={
        "query": {"type": "string", "description": "搜索查询"},
        "limit": {"type": "integer", "description": "结果数量限制", "default": 10}
    }
)
def search_web(query: str, limit: int = 10) -> List[dict]:
    """搜索网络信息"""
    # 实现搜索逻辑
    return search_results

工具执行

class ToolExecutor:
    def __init__(self):
        self.execution_context = ExecutionContext()
        
    async def execute(
        self,
        tool: Tool,
        parameters: dict,
        timeout: float = 30.0
    ) -> ToolResult:
        """
        执行工具
        
        Args:
            tool: 要执行的工具
            parameters: 执行参数
            timeout: 超时时间
            
        Returns:
            工具执行结果
        """
        try:
            # 参数验证
            validated_params = self._validate_parameters(tool, parameters)
            
            # 执行工具
            start_time = time.time()
            
            if asyncio.iscoroutinefunction(tool.function):
                result = await asyncio.wait_for(
                    tool.function(**validated_params),
                    timeout=timeout
                )
            else:
                result = await asyncio.get_event_loop().run_in_executor(
                    None,
                    lambda: tool.function(**validated_params)
                )
            
            execution_time = time.time() - start_time
            
            return ToolResult(
                success=True,
                result=result,
                execution_time=execution_time,
                tool_name=tool.name
            )
            
        except Exception as e:
            return ToolResult(
                success=False,
                error=str(e),
                tool_name=tool.name
            )

3. Conversation API

对话管理

class ConversationManager:
    def __init__(self, max_history: int = 100):
        self.messages = []
        self.max_history = max_history
        self.context = {}
        
    def add_message(
        self,
        role: str,
        content: str,
        metadata: dict = None
    ):
        """添加消息到对话历史"""
        message = Message(
            role=role,
            content=content,
            timestamp=datetime.now(),
            metadata=metadata or {}
        )
        
        self.messages.append(message)
        
        # 保持历史记录在限制范围内
        if len(self.messages) > self.max_history:
            self.messages = self.messages[-self.max_history:]
    
    def get_context(self, include_system: bool = True) -> List[dict]:
        """获取对话上下文"""
        context = []
        
        for message in self.messages:
            if not include_system and message.role == "system":
                continue
                
            context.append({
                "role": message.role,
                "content": message.content
            })
        
        return context
    
    def clear_history(self):
        """清空对话历史"""
        self.messages = []
        self.context = {}

模块化架构分析

1. 模块组织结构

openai_agents/
├── __init__.py
├── core/
   ├── __init__.py
   ├── agent.py          # 智能体核心
   ├── memory.py         # 记忆系统
   ├── planning.py       # 规划引擎
   └── execution.py      # 执行引擎
├── tools/
   ├── __init__.py
   ├── base.py          # 工具基类
   ├── registry.py      # 工具注册表
   ├── executor.py      # 工具执行器
   └── builtin/         # 内置工具
       ├── web_search.py
       ├── file_ops.py
       └── math_tools.py
├── conversation/
   ├── __init__.py
   ├── manager.py       # 对话管理器
   ├── message.py       # 消息处理
   └── context.py       # 上下文管理
├── models/
   ├── __init__.py
   ├── interface.py     # 模型接口
   ├── openai_client.py # OpenAI客户端
   └── local_models.py  # 本地模型
├── utils/
   ├── __init__.py
   ├── logging.py       # 日志工具
   ├── config.py        # 配置管理
   └── validation.py    # 参数验证
└── examples/
    ├── basic_agent.py
    ├── multi_agent.py
    └── voice_agent.py

2. 核心模块详解

Agent Core Module

# core/agent.py
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod

class BaseAgent(ABC):
    """智能体基类"""
    
    @abstractmethod
    async def process_message(self, message: str) -> str:
        """处理消息的抽象方法"""
        pass
    
    @abstractmethod
    def add_tool(self, tool: 'Tool'):
        """添加工具的抽象方法"""
        pass

class Agent(BaseAgent):
    """具体智能体实现"""
    
    def __init__(self, config: AgentConfig):
        self.config = config
        self.memory = MemorySystem(config.memory_config)
        self.planner = PlanningEngine(config.planning_config)
        self.executor = ExecutionEngine(config.execution_config)
        self.model_interface = ModelInterface(config.model_config)
        
    async def process_message(self, message: str) -> str:
        """处理用户消息"""
        # 1. 消息理解
        understanding = await self._understand_message(message)
        
        # 2. 检索相关记忆
        relevant_memories = self.memory.retrieve_relevant(understanding)
        
        # 3. 制定执行计划
        plan = await self.planner.create_plan(understanding, relevant_memories)
        
        # 4. 执行计划
        execution_result = await self.executor.execute_plan(plan)
        
        # 5. 生成响应
        response = await self._generate_response(execution_result)
        
        # 6. 更新记忆
        self.memory.store_interaction(message, response)
        
        return response

Memory System Module

# core/memory.py
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Memory:
    content: str
    timestamp: datetime
    importance: float
    tags: List[str]
    metadata: Dict[str, Any]

class MemorySystem:
    def __init__(self, config: MemoryConfig):
        self.short_term = ShortTermMemory(config.short_term_size)
        self.long_term = LongTermMemory(config.long_term_config)
        self.working_memory = WorkingMemory(config.working_memory_size)
        
    def store_interaction(self, input_msg: str, output_msg: str):
        """存储交互记录"""
        interaction = Interaction(
            input=input_msg,
            output=output_msg,
            timestamp=datetime.now()
        )
        
        # 存储到短期记忆
        self.short_term.add(interaction)
        
        # 根据重要性决定是否存储到长期记忆
        importance = self._calculate_importance(interaction)
        if importance > self.config.long_term_threshold:
            self.long_term.store(interaction, importance)
    
    def retrieve_relevant(self, query: str, limit: int = 5) -> List[Memory]:
        """检索相关记忆"""
        # 从不同记忆系统检索
        short_term_results = self.short_term.search(query, limit)
        long_term_results = self.long_term.search(query, limit)
        working_memory_results = self.working_memory.search(query, limit)
        
        # 合并和排序结果
        all_results = short_term_results + long_term_results + working_memory_results
        return self._rank_memories(all_results, query)[:limit]

Tool System Module

# tools/base.py
from typing import Callable, Dict, Any, List
from abc import ABC, abstractmethod

class BaseTool(ABC):
    """工具基类"""
    
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        
    @abstractmethod
    async def execute(self, **kwargs) -> Any:
        """执行工具的抽象方法"""
        pass
    
    @abstractmethod
    def get_parameters(self) -> Dict[str, Any]:
        """获取工具参数定义"""
        pass

class FunctionTool(BaseTool):
    """基于函数的工具"""
    
    def __init__(self, func: Callable, name: str = None, description: str = None):
        self.function = func
        name = name or func.__name__
        description = description or func.__doc__ or ""
        super().__init__(name, description)
        self.parameters = self._extract_parameters()
    
    async def execute(self, **kwargs) -> Any:
        """执行函数工具"""
        if asyncio.iscoroutinefunction(self.function):
            return await self.function(**kwargs)
        else:
            return self.function(**kwargs)
    
    def get_parameters(self) -> Dict[str, Any]:
        """获取函数参数定义"""
        return self.parameters

3. 模块间通信

事件系统

# utils/events.py
from typing import Callable, Dict, List
from enum import Enum

class EventType(Enum):
    AGENT_STARTED = "agent_started"
    MESSAGE_RECEIVED = "message_received"
    TOOL_EXECUTED = "tool_executed"
    MEMORY_UPDATED = "memory_updated"
    ERROR_OCCURRED = "error_occurred"

class EventBus:
    def __init__(self):
        self.listeners: Dict[EventType, List[Callable]] = {}
    
    def subscribe(self, event_type: EventType, callback: Callable):
        """订阅事件"""
        if event_type not in self.listeners:
            self.listeners[event_type] = []
        self.listeners[event_type].append(callback)
    
    def publish(self, event_type: EventType, data: Any):
        """发布事件"""
        if event_type in self.listeners:
            for callback in self.listeners[event_type]:
                try:
                    callback(data)
                except Exception as e:
                    print(f"Event callback error: {e}")

# 全局事件总线
event_bus = EventBus()

依赖注入

# utils/di.py
from typing import Type, Any, Dict, Callable

class Container:
    def __init__(self):
        self._services: Dict[str, Any] = {}
        self._factories: Dict[str, Callable] = {}
    
    def register(self, name: str, service: Any):
        """注册服务实例"""
        self._services[name] = service
    
    def register_factory(self, name: str, factory: Callable):
        """注册服务工厂"""
        self._factories[name] = factory
    
    def get(self, name: str) -> Any:
        """获取服务"""
        if name in self._services:
            return self._services[name]
        elif name in self._factories:
            service = self._factories[name]()
            self._services[name] = service
            return service
        else:
            raise ValueError(f"Service {name} not found")

# 全局容器
container = Container()

关键函数和继承关系

1. 类继承体系

# 智能体继承体系
class BaseAgent(ABC):
    """智能体基类"""
    pass

class Agent(BaseAgent):
    """标准智能体"""
    pass

class VoiceAgent(Agent):
    """语音智能体"""
    pass

class MultiAgent(BaseAgent):
    """多智能体系统"""
    pass

# 工具继承体系
class BaseTool(ABC):
    """工具基类"""
    pass

class FunctionTool(BaseTool):
    """函数工具"""
    pass

class APITool(BaseTool):
    """API工具"""
    pass

class DatabaseTool(BaseTool):
    """数据库工具"""
    pass

# 记忆继承体系
class BaseMemory(ABC):
    """记忆基类"""
    pass

class ShortTermMemory(BaseMemory):
    """短期记忆"""
    pass

class LongTermMemory(BaseMemory):
    """长期记忆"""
    pass

class WorkingMemory(BaseMemory):
    """工作记忆"""
    pass

2. 关键函数分析

Agent.run() 函数

async def run(
    self,
    message: str,
    context: Dict[str, Any] = None,
    stream: bool = False,
    tools: List[str] = None
) -> Union[str, AsyncGenerator[str, None]]:
    """
    智能体运行函数 - 核心入口点
    
    处理流程:
    1. 输入验证和预处理
    2. 上下文构建
    3. 工具选择和准备
    4. 模型调用
    5. 响应后处理
    6. 记忆更新
    
    Args:
        message: 用户输入消息
        context: 额外上下文信息
        stream: 是否流式返回
        tools: 指定可用工具列表
        
    Returns:
        响应字符串或流式生成器
    """
    
    # 1. 输入验证
    if not message or not message.strip():
        raise ValueError("Message cannot be empty")
    
    # 2. 构建请求上下文
    request_context = self._build_context(message, context)
    
    # 3. 选择可用工具
    available_tools = self._select_tools(tools)
    
    # 4. 构建系统提示
    system_prompt = self._build_system_prompt(available_tools)
    
    # 5. 准备消息历史
    messages = self._prepare_messages(system_prompt, message, request_context)
    
    # 6. 调用模型
    if stream:
        return self._stream_generate(messages, available_tools)
    else:
        response = await self._generate_response(messages, available_tools)
        
        # 7. 更新记忆
        await self._update_memory(message, response, request_context)
        
        return response

async def _generate_response(
    self,
    messages: List[Dict],
    tools: List[Tool]
) -> str:
    """生成响应的核心逻辑"""
    
    max_iterations = 10  # 防止无限循环
    iteration = 0
    
    while iteration < max_iterations:
        # 调用模型
        response = await self.model_interface.chat_completion(
            messages=messages,
            tools=[tool.to_openai_format() for tool in tools],
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )
        
        # 检查是否需要调用工具
        if response.tool_calls:
            # 执行工具调用
            tool_results = await self._execute_tools(response.tool_calls)
            
            # 将工具结果添加到消息历史
            messages.append({
                "role": "assistant",
                "content": response.content,
                "tool_calls": response.tool_calls
            })
            
            for tool_result in tool_results:
                messages.append({
                    "role": "tool",
                    "content": tool_result.content,
                    "tool_call_id": tool_result.tool_call_id
                })
            
            iteration += 1
            continue
        else:
            # 没有工具调用,返回最终响应
            return response.content
    
    raise RuntimeError("Max iterations reached in tool calling loop")

Tool.execute() 函数

async def execute(
    self,
    parameters: Dict[str, Any],
    context: ExecutionContext = None
) -> ToolResult:
    """
    工具执行函数
    
    Args:
        parameters: 执行参数
        context: 执行上下文
        
    Returns:
        工具执行结果
    """
    
    try:
        # 1. 参数验证
        validated_params = self._validate_parameters(parameters)
        
        # 2. 权限检查
        if not self._check_permissions(context):
            raise PermissionError(f"Permission denied for tool {self.name}")
        
        # 3. 执行前钩子
        await self._before_execute(validated_params, context)
        
        # 4. 执行工具函数
        start_time = time.time()
        
        if asyncio.iscoroutinefunction(self.function):
            result = await self.function(**validated_params)
        else:
            # 在线程池中执行同步函数
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                None, 
                lambda: self.function(**validated_params)
            )
        
        execution_time = time.time() - start_time
        
        # 5. 执行后钩子
        await self._after_execute(result, execution_time, context)
        
        # 6. 构建结果
        return ToolResult(
            success=True,
            result=result,
            execution_time=execution_time,
            tool_name=self.name,
            metadata={
                "parameters": validated_params,
                "context": context.to_dict() if context else {}
            }
        )
        
    except Exception as e:
        # 错误处理
        await self._handle_error(e, parameters, context)
        
        return ToolResult(
            success=False,
            error=str(e),
            error_type=type(e).__name__,
            tool_name=self.name,
            metadata={
                "parameters": parameters,
                "context": context.to_dict() if context else {}
            }
        )

Memory.retrieve() 函数

def retrieve_relevant(
    self,
    query: str,
    limit: int = 5,
    similarity_threshold: float = 0.7,
    time_decay: bool = True
) -> List[Memory]:
    """
    检索相关记忆
    
    Args:
        query: 查询字符串
        limit: 返回结果数量限制
        similarity_threshold: 相似度阈值
        time_decay: 是否应用时间衰减
        
    Returns:
        相关记忆列表
    """
    
    # 1. 查询向量化
    query_embedding = self.embedding_model.encode(query)
    
    # 2. 从各个记忆系统检索
    candidates = []
    
    # 短期记忆检索
    short_term_memories = self.short_term.search(
        query_embedding, 
        limit * 2,  # 获取更多候选
        similarity_threshold
    )
    candidates.extend(short_term_memories)
    
    # 长期记忆检索
    long_term_memories = self.long_term.search(
        query_embedding,
        limit * 2,
        similarity_threshold
    )
    candidates.extend(long_term_memories)
    
    # 工作记忆检索
    working_memories = self.working_memory.get_current_context()
    candidates.extend(working_memories)
    
    # 3. 计算相关性分数
    scored_memories = []
    for memory in candidates:
        # 语义相似度
        semantic_score = self._calculate_similarity(
            query_embedding, 
            memory.embedding
        )
        
        # 重要性分数
        importance_score = memory.importance
        
        # 时间衰减
        time_score = 1.0
        if time_decay:
            time_score = self._calculate_time_decay(memory.timestamp)
        
        # 综合分数
        final_score = (
            semantic_score * 0.5 + 
            importance_score * 0.3 + 
            time_score * 0.2
        )
        
        if final_score >= similarity_threshold:
            scored_memories.append((memory, final_score))
    
    # 4. 排序和去重
    scored_memories.sort(key=lambda x: x[1], reverse=True)
    
    # 5. 返回top-k结果
    return [memory for memory, score in scored_memories[:limit]]

3. 设计模式应用

策略模式 - 模型接口

class ModelStrategy(ABC):
    """模型策略接口"""
    
    @abstractmethod
    async def generate_response(self, messages: List[Dict]) -> str:
        pass

class OpenAIStrategy(ModelStrategy):
    """OpenAI模型策略"""
    
    async def generate_response(self, messages: List[Dict]) -> str:
        # OpenAI API调用逻辑
        pass

class LocalModelStrategy(ModelStrategy):
    """本地模型策略"""
    
    async def generate_response(self, messages: List[Dict]) -> str:
        # 本地模型推理逻辑
        pass

class ModelInterface:
    def __init__(self, strategy: ModelStrategy):
        self.strategy = strategy
    
    async def generate_response(self, messages: List[Dict]) -> str:
        return await self.strategy.generate_response(messages)

观察者模式 - 事件系统

class Observable:
    def __init__(self):
        self._observers = []
    
    def attach(self, observer):
        self._observers.append(observer)
    
    def detach(self, observer):
        self._observers.remove(observer)
    
    def notify(self, event):
        for observer in self._observers:
            observer.update(event)

class Agent(Observable):
    def process_message(self, message):
        # 处理消息
        result = self._internal_process(message)
        
        # 通知观察者
        self.notify(MessageProcessedEvent(message, result))
        
        return result

工厂模式 - 智能体创建

class AgentFactory:
    @staticmethod
    def create_agent(agent_type: str, config: Dict) -> BaseAgent:
        if agent_type == "standard":
            return Agent(config)
        elif agent_type == "voice":
            return VoiceAgent(config)
        elif agent_type == "multi":
            return MultiAgent(config)
        else:
            raise ValueError(f"Unknown agent type: {agent_type}")

# 使用工厂创建智能体
agent = AgentFactory.create_agent("voice", voice_config)

实战经验总结

1. 最佳实践

智能体设计原则

# 1. 单一职责原则
class ResearchAgent(Agent):
    """专门负责研究任务的智能体"""
    
    def __init__(self):
        super().__init__(
            name="researcher",
            instructions="你是一个专业的研究员,擅长收集和分析信息"
        )
        
        # 只添加研究相关的工具
        self.add_tool(web_search_tool)
        self.add_tool(paper_search_tool)
        self.add_tool(data_analysis_tool)

# 2. 明确的接口定义
class AgentInterface(ABC):
    @abstractmethod
    async def process_task(self, task: Task) -> TaskResult:
        pass
    
    @abstractmethod
    def get_capabilities(self) -> List[str]:
        pass

# 3. 错误处理和恢复
class RobustAgent(Agent):
    async def process_message(self, message: str) -> str:
        try:
            return await super().process_message(message)
        except ToolExecutionError as e:
            # 工具执行失败,尝试替代方案
            return await self._handle_tool_failure(e)
        except ModelError as e:
            # 模型调用失败,使用备用模型
            return await self._fallback_to_backup_model(message)
        except Exception as e:
            # 其他错误,返回友好的错误信息
            return f"抱歉,处理您的请求时遇到了问题:{str(e)}"

工具开发指南

# 1. 工具应该是幂等的
@tool
def get_user_info(user_id: str) -> dict:
    """获取用户信息 - 幂等操作"""
    return database.get_user(user_id)

# 2. 工具应该有清晰的错误处理
@tool
def send_email(to: str, subject: str, body: str) -> bool:
    """发送邮件"""
    try:
        email_service.send(to, subject, body)
        return True
    except EmailServiceError as e:
        logger.error(f"Failed to send email: {e}")
        return False

# 3. 工具应该有适当的权限控制
@tool(permissions=["file_read"])
def read_file(file_path: str) -> str:
    """读取文件内容"""
    if not security.check_file_access(file_path):
        raise PermissionError("Access denied")
    
    with open(file_path, 'r') as f:
        return f.read()

# 4. 工具应该有详细的文档
@tool(
    description="搜索网络信息并返回相关结果",
    parameters={
        "query": {
            "type": "string",
            "description": "搜索查询字符串",
            "required": True
        },
        "limit": {
            "type": "integer", 
            "description": "返回结果的最大数量",
            "default": 10,
            "minimum": 1,
            "maximum": 50
        }
    }
)
def web_search(query: str, limit: int = 10) -> List[dict]:
    """
    搜索网络信息
    
    Args:
        query: 搜索查询字符串
        limit: 返回结果数量限制
        
    Returns:
        搜索结果列表,每个结果包含title、url、snippet等字段
        
    Raises:
        SearchServiceError: 搜索服务不可用时抛出
        ValidationError: 参数验证失败时抛出
    """
    # 实现搜索逻辑
    pass

2. 性能优化

异步处理优化

# 1. 并行工具执行
async def execute_tools_parallel(self, tool_calls: List[ToolCall]) -> List[ToolResult]:
    """并行执行多个工具调用"""
    tasks = []
    
    for tool_call in tool_calls:
        tool = self.get_tool(tool_call.name)
        task = asyncio.create_task(
            tool.execute(tool_call.parameters)
        )
        tasks.append(task)
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 处理异常结果
    tool_results = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            tool_results.append(ToolResult(
                success=False,
                error=str(result),
                tool_name=tool_calls[i].name
            ))
        else:
            tool_results.append(result)
    
    return tool_results

# 2. 记忆检索优化
class OptimizedMemorySystem:
    def __init__(self):
        self.embedding_cache = {}  # 嵌入向量缓存
        self.similarity_cache = {}  # 相似度计算缓存
    
    async def retrieve_relevant(self, query: str) -> List[Memory]:
        # 检查缓存
        cache_key = hashlib.md5(query.encode()).hexdigest()
        if cache_key in self.similarity_cache:
            return self.similarity_cache[cache_key]
        
        # 异步计算嵌入向量
        query_embedding = await self._get_embedding_async(query)
        
        # 并行搜索多个记忆系统
        search_tasks = [
            self.short_term.search_async(query_embedding),
            self.long_term.search_async(query_embedding),
            self.working_memory.search_async(query_embedding)
        ]
        
        results = await asyncio.gather(*search_tasks)
        
        # 合并和排序结果
        all_memories = []
        for result_list in results:
            all_memories.extend(result_list)
        
        ranked_memories = self._rank_memories(all_memories, query_embedding)
        
        # 缓存结果
        self.similarity_cache[cache_key] = ranked_memories
        
        return ranked_memories

# 3. 流式响应优化
async def stream_response(self, message: str) -> AsyncGenerator[str, None]:
    """流式生成响应"""
    
    # 快速返回初始响应
    yield "正在思考..."
    
    # 异步处理复杂逻辑
    context_task = asyncio.create_task(self._build_context(message))
    tools_task = asyncio.create_task(self._select_tools(message))
    
    # 等待必要的准备工作完成
    context, tools = await asyncio.gather(context_task, tools_task)
    
    # 流式生成最终响应
    async for chunk in self.model_interface.stream_generate(
        messages=self._prepare_messages(message, context),
        tools=tools
    ):
        yield chunk

内存管理优化

# 1. 智能记忆清理
class MemoryManager:
    def __init__(self, max_memory_size: int = 1000):
        self.max_memory_size = max_memory_size
        self.memory_usage = 0
    
    async def cleanup_memory(self):
        """智能清理记忆"""
        if self.memory_usage < self.max_memory_size * 0.8:
            return
        
        # 按重要性和时间排序
        all_memories = self.get_all_memories()
        sorted_memories = sorted(
            all_memories,
            key=lambda m: (m.importance, m.timestamp),
            reverse=True
        )
        
        # 保留重要记忆,清理不重要的
        keep_count = int(self.max_memory_size * 0.6)
        memories_to_keep = sorted_memories[:keep_count]
        memories_to_remove = sorted_memories[keep_count:]
        
        # 将部分记忆压缩存储
        compressed_memories = self._compress_memories(memories_to_remove)
        
        # 更新记忆系统
        self._update_memory_storage(memories_to_keep, compressed_memories)

# 2. 连接池管理
class ModelConnectionPool:
    def __init__(self, max_connections: int = 10):
        self.max_connections = max_connections
        self.connections = asyncio.Queue(maxsize=max_connections)
        self._initialize_connections()
    
    async def _initialize_connections(self):
        """初始化连接池"""
        for _ in range(self.max_connections):
            connection = await self._create_connection()
            await self.connections.put(connection)
    
    async def get_connection(self):
        """获取连接"""
        return await self.connections.get()
    
    async def return_connection(self, connection):
        """归还连接"""
        await self.connections.put(connection)
    
    async def execute_with_connection(self, func, *args, **kwargs):
        """使用连接执行函数"""
        connection = await self.get_connection()
        try:
            return await func(connection, *args, **kwargs)
        finally:
            await self.return_connection(connection)

3. 错误处理和调试

全面的错误处理

# 1. 自定义异常类型
class AgentError(Exception):
    """智能体基础异常"""
    pass

class ToolExecutionError(AgentError):
    """工具执行异常"""
    def __init__(self, tool_name: str, error_message: str):
        self.tool_name = tool_name
        self.error_message = error_message
        super().__init__(f"Tool {tool_name} execution failed: {error_message}")

class ModelError(AgentError):
    """模型调用异常"""
    pass

class MemoryError(AgentError):
    """记忆系统异常"""
    pass

# 2. 错误恢复机制
class ErrorRecoveryAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.error_history = []
        self.recovery_strategies = {
            ToolExecutionError: self._recover_from_tool_error,
            ModelError: self._recover_from_model_error,
            MemoryError: self._recover_from_memory_error
        }
    
    async def process_message(self, message: str) -> str:
        try:
            return await super().process_message(message)
        except Exception as e:
            return await self._handle_error(e, message)
    
    async def _handle_error(self, error: Exception, message: str) -> str:
        """统一错误处理"""
        # 记录错误
        self.error_history.append({
            'error': error,
            'message': message,
            'timestamp': datetime.now()
        })
        
        # 尝试恢复
        error_type = type(error)
        if error_type in self.recovery_strategies:
            try:
                return await self.recovery_strategies[error_type](error, message)
            except Exception as recovery_error:
                logger.error(f"Recovery failed: {recovery_error}")
        
        # 返回友好的错误信息
        return self._generate_user_friendly_error(error)

# 3. 调试和监控
class DebuggableAgent(Agent):
    def __init__(self, *args, debug_mode: bool = False, **kwargs):
        super().__init__(*args, **kwargs)
        self.debug_mode = debug_mode
        self.execution_trace = []
    
    async def process_message(self, message: str) -> str:
        if self.debug_mode:
            trace_id = str(uuid.uuid4())
            self._start_trace(trace_id, message)
        
        try:
            result = await super().process_message(message)
            
            if self.debug_mode:
                self._end_trace(trace_id, result)
            
            return result
        except Exception as e:
            if self.debug_mode:
                self._trace_error(trace_id, e)
            raise
    
    def _start_trace(self, trace_id: str, message: str):
        """开始执行跟踪"""
        self.execution_trace.append({
            'trace_id': trace_id,
            'start_time': datetime.now(),
            'message': message,
            'steps': []
        })
    
    def get_debug_info(self) -> dict:
        """获取调试信息"""
        return {
            'execution_trace': self.execution_trace,
            'memory_usage': self._get_memory_usage(),
            'tool_usage_stats': self._get_tool_stats(),
            'error_history': getattr(self, 'error_history', [])
        }

4. 部署和运维

生产环境配置

# 1. 配置管理
class ProductionConfig:
    def __init__(self):
        self.model_config = {
            'primary_model': 'gpt-4',
            'fallback_model': 'gpt-3.5-turbo',
            'max_retries': 3,
            'timeout': 30.0
        }
        
        self.memory_config = {
            'short_term_size': 100,
            'long_term_threshold': 0.8,
            'cleanup_interval': 3600  # 1小时
        }
        
        self.security_config = {
            'enable_tool_permissions': True,
            'max_tool_execution_time': 60.0,
            'allowed_domains': ['api.example.com']
        }

# 2. 健康检查
class HealthChecker:
    def __init__(self, agent: Agent):
        self.agent = agent
    
    async def check_health(self) -> dict:
        """检查智能体健康状态"""
        health_status = {
            'status': 'healthy',
            'checks': {}
        }
        
        # 检查模型连接
        try:
            await self.agent.model_interface.health_check()
            health_status['checks']['model'] = 'ok'
        except Exception as e:
            health_status['checks']['model'] = f'error: {e}'
            health_status['status'] = 'unhealthy'
        
        # 检查记忆系统
        try:
            self.agent.memory.health_check()
            health_status['checks']['memory'] = 'ok'
        except Exception as e:
            health_status['checks']['memory'] = f'error: {e}'
            health_status['status'] = 'unhealthy'
        
        # 检查工具系统
        try:
            await self.agent.tool_system.health_check()
            health_status['checks']['tools'] = 'ok'
        except Exception as e:
            health_status['checks']['tools'] = f'error: {e}'
            health_status['status'] = 'unhealthy'
        
        return health_status

# 3. 监控指标
class MetricsCollector:
    def __init__(self):
        self.metrics = {
            'requests_total': 0,
            'requests_success': 0,
            'requests_error': 0,
            'response_time_avg': 0.0,
            'tool_calls_total': 0,
            'memory_usage': 0
        }
    
    def record_request(self, success: bool, response_time: float):
        """记录请求指标"""
        self.metrics['requests_total'] += 1
        
        if success:
            self.metrics['requests_success'] += 1
        else:
            self.metrics['requests_error'] += 1
        
        # 更新平均响应时间
        total_requests = self.metrics['requests_total']
        current_avg = self.metrics['response_time_avg']
        self.metrics['response_time_avg'] = (
            (current_avg * (total_requests - 1) + response_time) / total_requests
        )
    
    def get_metrics(self) -> dict:
        """获取当前指标"""
        return self.metrics.copy()

这个源码剖析涵盖了OpenAI Agents SDK的主要架构、核心API、模块设计和实战经验。通过深入分析代码结构和实现细节,可以更好地理解如何构建高质量的AI智能体应用。