8.1 会话管理模块 (Memory/Session)

8.1.1 会话系统架构

会话管理系统负责自动维护对话历史,使代理能够在多轮对话中保持上下文连续性。

graph TD
    A[Runner] --> B[Session Manager]
    B --> C{Session Type}
    
    C --> D[SQLiteSession]
    C --> E[RedisSession]
    C --> F[OpenAIConversationsSession]
    C --> G[Custom Session]
    
    D --> H[SQLite Database]
    E --> I[Redis Server]
    F --> J[OpenAI Conversations API]
    G --> K[User Implementation]
    
    B --> L[Session Operations]
    L --> M[get_items 获取历史]
    L --> N[add_items 添加项目]
    L --> O[pop_item 移除项目]
    L --> P[clear_session 清空会话]
    
    style B fill:#e3f2fd
    style L fill:#f3e5f5

8.1.2 Session接口定义

# 位于 src/agents/memory/session.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional

class SessionABC(ABC):
    """会话管理的抽象基类"""
    
    @abstractmethod
    async def get_items(self, limit: int | None = None) -> list[dict]:
        """
        获取会话历史项目
        
        Args:
            limit: 限制返回的项目数量,None表示返回全部
            
        Returns:
            list[dict]: 历史消息项目列表
        """
        pass
    
    @abstractmethod
    async def add_items(self, items: list[dict]) -> None:
        """
        添加新的项目到会话
        
        Args:
            items: 要添加的项目列表
        """
        pass
    
    @abstractmethod
    async def pop_item(self) -> dict | None:
        """
        移除并返回最近的一个项目
        
        Returns:
            dict | None: 被移除的项目,如果会话为空则返回None
        """
        pass
    
    @abstractmethod
    async def clear_session(self) -> None:
        """清空会话中的所有项目"""
        pass

# 类型别名,用于更好的API体验
Session = SessionABC

# 会话输入回调类型
SessionInputCallback = Callable[[list[dict], list[dict]], MaybeAwaitable[list[dict]]]

8.1.3 SQLite会话实现

# 位于 src/agents/memory/sqlite_session.py
import sqlite3
import json
import asyncio
from typing import Any, Dict, List, Optional
from contextlib import asynccontextmanager

class SQLiteSession(SessionABC):
    """
    基于SQLite的会话实现
    
    使用SQLite数据库持久化存储对话历史
    支持多个独立的会话ID
    """
    
    def __init__(
        self,
        session_id: str,
        db_path: str = "sessions.db",
        table_name: str = "session_items",
    ):
        self.session_id = session_id
        self.db_path = db_path
        self.table_name = table_name
        self._lock = asyncio.Lock()
        
        # 初始化数据库表
        asyncio.create_task(self._ensure_table_exists())
    
    async def _ensure_table_exists(self) -> None:
        """确保数据库表存在"""
        async with self._get_connection() as conn:
            await conn.execute(f"""
                CREATE TABLE IF NOT EXISTS {self.table_name} (
                    session_id TEXT NOT NULL,
                    item_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                    content TEXT NOT NULL,
                    metadata TEXT
                )
            """)
            
            # 创建索引以提升查询性能
            await conn.execute(f"""
                CREATE INDEX IF NOT EXISTS idx_{self.table_name}_session_id 
                ON {self.table_name} (session_id)
            """)
            
            await conn.commit()
    
    @asynccontextmanager
    async def _get_connection(self):
        """获取异步SQLite连接"""
        import aiosqlite
        
        async with aiosqlite.connect(self.db_path) as conn:
            # 启用WAL模式以提升并发性能
            await conn.execute("PRAGMA journal_mode=WAL")
            yield conn
    
    async def get_items(self, limit: int | None = None) -> list[dict]:
        """获取会话历史项目"""
        async with self._lock:
            async with self._get_connection() as conn:
                query = f"""
                    SELECT content, metadata FROM {self.table_name}
                    WHERE session_id = ?
                    ORDER BY item_id ASC
                """
                
                params = [self.session_id]
                if limit is not None:
                    query += " LIMIT ?"
                    params.append(limit)
                
                async with conn.execute(query, params) as cursor:
                    rows = await cursor.fetchall()
                
                items = []
                for content_json, metadata_json in rows:
                    try:
                        item = json.loads(content_json)
                        if metadata_json:
                            item["_metadata"] = json.loads(metadata_json)
                        items.append(item)
                    except json.JSONDecodeError as e:
                        logger.error(f"Failed to parse session item: {e}")
                        continue
                
                return items
    
    async def add_items(self, items: list[dict]) -> None:
        """添加新项目到会话"""
        if not items:
            return
        
        async with self._lock:
            async with self._get_connection() as conn:
                insert_data = []
                
                for item in items:
                    # 分离元数据
                    item_copy = item.copy()
                    metadata = item_copy.pop("_metadata", None)
                    
                    content_json = json.dumps(item_copy, ensure_ascii=False)
                    metadata_json = json.dumps(metadata) if metadata else None
                    
                    insert_data.append((self.session_id, content_json, metadata_json))
                
                await conn.executemany(f"""
                    INSERT INTO {self.table_name} (session_id, content, metadata)
                    VALUES (?, ?, ?)
                """, insert_data)
                
                await conn.commit()
    
    async def pop_item(self) -> dict | None:
        """移除并返回最近的项目"""
        async with self._lock:
            async with self._get_connection() as conn:
                # 获取最新的项目
                async with conn.execute(f"""
                    SELECT item_id, content, metadata FROM {self.table_name}
                    WHERE session_id = ?
                    ORDER BY item_id DESC
                    LIMIT 1
                """, [self.session_id]) as cursor:
                    row = await cursor.fetchone()
                
                if not row:
                    return None
                
                item_id, content_json, metadata_json = row
                
                try:
                    item = json.loads(content_json)
                    if metadata_json:
                        item["_metadata"] = json.loads(metadata_json)
                    
                    # 删除该项目
                    await conn.execute(f"""
                        DELETE FROM {self.table_name}
                        WHERE item_id = ?
                    """, [item_id])
                    
                    await conn.commit()
                    
                    return item
                    
                except json.JSONDecodeError as e:
                    logger.error(f"Failed to parse popped session item: {e}")
                    return None
    
    async def clear_session(self) -> None:
        """清空当前会话"""
        async with self._lock:
            async with self._get_connection() as conn:
                await conn.execute(f"""
                    DELETE FROM {self.table_name}
                    WHERE session_id = ?
                """, [self.session_id])
                
                await conn.commit()
    
    async def get_session_stats(self) -> dict[str, Any]:
        """获取会话统计信息"""
        async with self._get_connection() as conn:
            async with conn.execute(f"""
                SELECT 
                    COUNT(*) as item_count,
                    MIN(timestamp) as first_item_time,
                    MAX(timestamp) as last_item_time
                FROM {self.table_name}
                WHERE session_id = ?
            """, [self.session_id]) as cursor:
                row = await cursor.fetchone()
                
                if row and row[0] > 0:
                    return {
                        "item_count": row[0],
                        "first_item_time": row[1],
                        "last_item_time": row[2],
                    }
                else:
                    return {"item_count": 0}

8.1.4 Redis会话实现

# 位于 src/agents/extensions/memory/redis_session.py
import json
import redis.asyncio as redis
from typing import Any, Dict, List, Optional

class RedisSession(SessionABC):
    """
    基于Redis的会话实现
    
    适用于分布式部署和高并发场景
    支持TTL和压缩存储
    """
    
    def __init__(
        self,
        session_id: str,
        redis_client: redis.Redis | None = None,
        key_prefix: str = "agents:session:",
        ttl: int | None = 86400 * 7,  # 默认7天过期
    ):
        self.session_id = session_id
        self.redis_client = redis_client or self._create_default_client()
        self.key = f"{key_prefix}{session_id}"
        self.ttl = ttl
    
    @classmethod
    def from_url(
        cls,
        session_id: str,
        url: str = "redis://localhost:6379/0",
        **kwargs
    ) -> "RedisSession":
        """从Redis URL创建会话实例"""
        redis_client = redis.from_url(url, decode_responses=True)
        return cls(session_id, redis_client, **kwargs)
    
    def _create_default_client(self) -> redis.Redis:
        """创建默认的Redis客户端"""
        return redis.from_url("redis://localhost:6379/0", decode_responses=True)
    
    async def get_items(self, limit: int | None = None) -> list[dict]:
        """获取会话历史项目"""
        try:
            # 使用Redis列表存储项目
            if limit is not None:
                items_json = await self.redis_client.lrange(self.key, 0, limit - 1)
            else:
                items_json = await self.redis_client.lrange(self.key, 0, -1)
            
            items = []
            for item_json in items_json:
                try:
                    item = json.loads(item_json)
                    items.append(item)
                except json.JSONDecodeError as e:
                    logger.error(f"Failed to parse Redis session item: {e}")
                    continue
            
            return items
            
        except redis.RedisError as e:
            logger.error(f"Redis error in get_items: {e}")
            return []
    
    async def add_items(self, items: list[dict]) -> None:
        """添加新项目到会话"""
        if not items:
            return
        
        try:
            # 使用管道批量操作
            pipe = self.redis_client.pipeline()
            
            for item in items:
                item_json = json.dumps(item, ensure_ascii=False)
                pipe.rpush(self.key, item_json)
            
            # 设置TTL
            if self.ttl:
                pipe.expire(self.key, self.ttl)
            
            await pipe.execute()
            
        except redis.RedisError as e:
            logger.error(f"Redis error in add_items: {e}")
            raise
    
    async def pop_item(self) -> dict | None:
        """移除并返回最近的项目"""
        try:
            item_json = await self.redis_client.rpop(self.key)
            
            if item_json:
                try:
                    return json.loads(item_json)
                except json.JSONDecodeError as e:
                    logger.error(f"Failed to parse popped Redis item: {e}")
                    return None
            
            return None
            
        except redis.RedisError as e:
            logger.error(f"Redis error in pop_item: {e}")
            return None
    
    async def clear_session(self) -> None:
        """清空当前会话"""
        try:
            await self.redis_client.delete(self.key)
        except redis.RedisError as e:
            logger.error(f"Redis error in clear_session: {e}")
            raise
    
    async def get_session_info(self) -> dict[str, Any]:
        """获取会话信息"""
        try:
            pipe = self.redis_client.pipeline()
            pipe.llen(self.key)
            pipe.ttl(self.key)
            
            results = await pipe.execute()
            item_count, ttl_remaining = results
            
            return {
                "item_count": item_count,
                "ttl_remaining": ttl_remaining if ttl_remaining > 0 else None,
            }
            
        except redis.RedisError as e:
            logger.error(f"Redis error in get_session_info: {e}")
            return {"item_count": 0}

8.2 追踪监控模块 (Tracing)

8.2.1 追踪系统架构

graph TD
    A[Agent Execution] --> B[Trace Manager]
    B --> C[Span Creation]
    C --> D{Span Types}
    
    D --> E[AgentSpan 代理跨度]
    D --> F[GenerationSpan 生成跨度]
    D --> G[FunctionSpan 函数跨度]
    D --> H[GuardrailSpan 防护跨度]
    D --> I[HandoffSpan 切换跨度]
    
    B --> J[Tracing Processors]
    J --> K[内置处理器]
    J --> L[外部处理器]
    
    K --> M[ConsoleProcessor 控制台]
    K --> N[FileProcessor 文件]
    
    L --> O[LogfireProcessor]
    L --> P[AgentOpsProcessor]
    L --> Q[BraintrustProcessor]
    L --> R[Custom Processors]
    
    style B fill:#e3f2fd
    style J fill:#f3e5f5

8.2.2 追踪核心接口

# 位于 src/agents/tracing/__init__.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
import uuid

TSpanData = TypeVar("TSpanData", bound="SpanData")

@dataclass
class SpanData:
    """追踪跨度的基础数据"""
    span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    start_time: datetime = field(default_factory=datetime.utcnow)
    end_time: datetime | None = None
    parent_span_id: str | None = None
    trace_id: str = ""
    metadata: Dict[str, Any] = field(default_factory=dict)
    tags: Dict[str, str] = field(default_factory=dict)

@dataclass 
class AgentSpanData(SpanData):
    """代理跨度的专用数据"""
    agent_name: str = ""
    handoffs: List[str] = field(default_factory=list)
    output_type: str = ""
    tools: List[str] = field(default_factory=list)
    error: Optional[str] = None

@dataclass
class GenerationSpanData(SpanData):
    """模型生成跨度的专用数据"""
    model: str = ""
    provider: str = ""
    input: Any = None
    output: Any = None
    usage: Optional[Dict[str, int]] = None
    settings: Dict[str, Any] = field(default_factory=dict)

@dataclass
class FunctionSpanData(SpanData):
    """函数调用跨度的专用数据"""
    function_name: str = ""
    arguments: Dict[str, Any] = field(default_factory=dict)
    result: Any = None
    execution_time: float = 0.0

class Span(Generic[TSpanData]):
    """追踪跨度的核心类"""
    
    def __init__(self, span_data: TSpanData):
        self.span_data = span_data
        self._is_current = False
        self._children: List[Span] = []
        self._parent: Optional[Span] = None
    
    def start(self, mark_as_current: bool = False) -> "Span[TSpanData]":
        """开始跨度"""
        self.span_data.start_time = datetime.utcnow()
        
        if mark_as_current:
            _set_current_span(self)
            self._is_current = True
        
        # 通知处理器
        _notify_span_start(self)
        
        return self
    
    def finish(self, reset_current: bool = False) -> None:
        """结束跨度"""
        self.span_data.end_time = datetime.utcnow()
        
        if reset_current and self._is_current:
            _reset_current_span()
            self._is_current = False
        
        # 通知处理器
        _notify_span_end(self)
    
    def add_child(self, child_span: "Span") -> None:
        """添加子跨度"""
        child_span._parent = self
        child_span.span_data.parent_span_id = self.span_data.span_id
        self._children.append(child_span)
    
    def set_error(self, error: Exception | str) -> None:
        """设置错误信息"""
        if isinstance(error, Exception):
            error_msg = f"{type(error).__name__}: {str(error)}"
        else:
            error_msg = str(error)
        
        self.span_data.metadata["error"] = error_msg
        self.span_data.tags["error"] = "true"

class TracingProcessor(ABC):
    """追踪处理器的抽象基类"""
    
    @abstractmethod
    async def process_span_start(self, span: Span[Any]) -> None:
        """处理跨度开始事件"""
        pass
    
    @abstractmethod
    async def process_span_end(self, span: Span[Any]) -> None:
        """处理跨度结束事件"""
        pass
    
    @abstractmethod
    async def process_trace_complete(self, trace: "Trace") -> None:
        """处理追踪完成事件"""
        pass

class Trace:
    """追踪的顶级容器"""
    
    def __init__(
        self,
        workflow_name: str = "Agent Workflow",
        trace_id: str | None = None,
        group_id: str | None = None,
        metadata: Dict[str, Any] | None = None,
    ):
        self.trace_id = trace_id or str(uuid.uuid4())
        self.workflow_name = workflow_name
        self.group_id = group_id
        self.metadata = metadata or {}
        self.start_time = datetime.utcnow()
        self.end_time: datetime | None = None
        self.root_spans: List[Span] = []
        self._is_current = False
    
    def start(self, mark_as_current: bool = False) -> "Trace":
        """开始追踪"""
        self.start_time = datetime.utcnow()
        
        if mark_as_current:
            _set_current_trace(self)
            self._is_current = True
        
        return self
    
    def finish(self, reset_current: bool = False) -> None:
        """完成追踪"""
        self.end_time = datetime.utcnow()
        
        if reset_current and self._is_current:
            _reset_current_trace()
            self._is_current = False
        
        # 通知处理器
        _notify_trace_complete(self)

# 全局状态管理
_current_trace: Optional[Trace] = None
_current_span: Optional[Span] = None
_trace_processors: List[TracingProcessor] = []

def get_current_trace() -> Optional[Trace]:
    """获取当前追踪"""
    return _current_trace

def get_current_span() -> Optional[Span]:
    """获取当前跨度"""
    return _current_span

def _set_current_trace(trace: Trace) -> None:
    """设置当前追踪"""
    global _current_trace
    _current_trace = trace

def _set_current_span(span: Span) -> None:
    """设置当前跨度"""
    global _current_span
    _current_span = span

8.2.3 内置追踪处理器

# 位于 src/agents/tracing/processors.py
import json
import logging
from pathlib import Path
from typing import Any, Dict, TextIO

class ConsoleTracingProcessor(TracingProcessor):
    """控制台追踪处理器"""
    
    def __init__(self, level: str = "INFO", include_data: bool = False):
        self.logger = logging.getLogger("agents.tracing.console")
        self.logger.setLevel(getattr(logging, level.upper()))
        self.include_data = include_data
        
        # 配置控制台输出格式
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
    
    async def process_span_start(self, span: Span[Any]) -> None:
        """处理跨度开始"""
        self.logger.info(f"[SPAN START] {span.span_data.name} (ID: {span.span_data.span_id})")
        
        if self.include_data and span.span_data.metadata:
            self.logger.debug(f"  Metadata: {span.span_data.metadata}")
    
    async def process_span_end(self, span: Span[Any]) -> None:
        """处理跨度结束"""
        duration = 0.0
        if span.span_data.end_time and span.span_data.start_time:
            duration = (span.span_data.end_time - span.span_data.start_time).total_seconds()
        
        self.logger.info(f"[SPAN END] {span.span_data.name} (Duration: {duration:.3f}s)")
        
        # 记录错误
        if "error" in span.span_data.metadata:
            self.logger.error(f"  Error: {span.span_data.metadata['error']}")
    
    async def process_trace_complete(self, trace: Trace) -> None:
        """处理追踪完成"""
        duration = 0.0
        if trace.end_time and trace.start_time:
            duration = (trace.end_time - trace.start_time).total_seconds()
        
        self.logger.info(f"[TRACE COMPLETE] {trace.workflow_name} (Duration: {duration:.3f}s)")

class FileTracingProcessor(TracingProcessor):
    """文件追踪处理器"""
    
    def __init__(
        self,
        file_path: str | Path,
        format: str = "jsonl",
        include_sensitive_data: bool = True,
    ):
        self.file_path = Path(file_path)
        self.format = format
        self.include_sensitive_data = include_sensitive_data
        
        # 确保目录存在
        self.file_path.parent.mkdir(parents=True, exist_ok=True)
    
    async def process_span_start(self, span: Span[Any]) -> None:
        """处理跨度开始"""
        await self._write_event("span_start", span.span_data)
    
    async def process_span_end(self, span: Span[Any]) -> None:
        """处理跨度结束"""
        await self._write_event("span_end", span.span_data)
    
    async def process_trace_complete(self, trace: Trace) -> None:
        """处理追踪完成"""
        trace_data = {
            "trace_id": trace.trace_id,
            "workflow_name": trace.workflow_name,
            "group_id": trace.group_id,
            "start_time": trace.start_time.isoformat(),
            "end_time": trace.end_time.isoformat() if trace.end_time else None,
            "metadata": trace.metadata,
        }
        
        await self._write_event("trace_complete", trace_data)
    
    async def _write_event(self, event_type: str, data: Any) -> None:
        """写入事件到文件"""
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "data": self._serialize_data(data),
        }
        
        try:
            with open(self.file_path, "a", encoding="utf-8") as f:
                if self.format == "jsonl":
                    json.dump(event, f, ensure_ascii=False, default=str)
                    f.write("\n")
                else:
                    # 其他格式支持
                    pass
        except IOError as e:
            logger.error(f"Failed to write tracing event to file: {e}")
    
    def _serialize_data(self, data: Any) -> Any:
        """序列化数据"""
        if hasattr(data, '__dict__'):
            result = {}
            for key, value in data.__dict__.items():
                if key.startswith('_'):
                    continue
                
                if not self.include_sensitive_data and key in ['input', 'output', 'arguments']:
                    result[key] = "<redacted>"
                else:
                    result[key] = self._serialize_data(value)
            
            return result
        elif isinstance(data, (list, tuple)):
            return [self._serialize_data(item) for item in data]
        elif isinstance(data, dict):
            return {k: self._serialize_data(v) for k, v in data.items()}
        else:
            return data

8.3 扩展功能模块

8.3.1 语音处理模块

# 位于 src/agents/voice/__init__.py
from typing import Any, AsyncIterator, Optional
from dataclasses import dataclass

@dataclass
class VoicePipelineConfig:
    """语音处理管道配置"""
    
    # 输入配置
    input_audio_format: str = "pcm16"           # 输入音频格式
    input_sample_rate: int = 16000              # 采样率
    
    # TTS配置
    tts_model: str = "tts-1"                   # TTS模型
    tts_voice: str = "alloy"                   # 语音角色
    
    # STT配置  
    stt_model: str = "whisper-1"               # STT模型
    stt_language: str | None = None            # 语音识别语言
    
    # 处理配置
    silence_threshold: float = 0.01            # 静音阈值
    silence_duration_ms: int = 500             # 静音持续时间

class VoicePipeline:
    """语音处理管道"""
    
    def __init__(
        self,
        agent: Agent,
        config: VoicePipelineConfig,
        tts_model: TTSModel | None = None,
        stt_model: STTModel | None = None,
    ):
        self.agent = agent
        self.config = config
        self.tts_model = tts_model or self._create_default_tts_model()
        self.stt_model = stt_model or self._create_default_stt_model()
    
    async def process_audio_stream(
        self,
        audio_stream: AsyncIterator[bytes],
        context: Any = None,
    ) -> AsyncIterator[VoiceStreamEvent]:
        """
        处理音频流
        
        执行流程:
        1. 音频流 -> STT -> 文本
        2. 文本 -> Agent -> 响应
        3. 响应 -> TTS -> 音频流
        """
        
        # 1. 语音转文字
        text_buffer = ""
        async for audio_chunk in audio_stream:
            stt_result = await self.stt_model.transcribe_chunk(audio_chunk)
            
            if stt_result.is_final:
                text_buffer += stt_result.text
                
                # 发送转录事件
                yield VoiceStreamEventTranscription(
                    text=stt_result.text,
                    is_final=True
                )
                
                if self._is_complete_utterance(text_buffer):
                    # 2. 处理完整的用户输入
                    async for agent_event in self._process_text_with_agent(
                        text_buffer, context
                    ):
                        yield agent_event
                    
                    text_buffer = ""
    
    async def _process_text_with_agent(
        self, 
        text: str, 
        context: Any
    ) -> AsyncIterator[VoiceStreamEvent]:
        """使用代理处理文本"""
        
        # 运行代理
        result = await Runner.run(
            self.agent,
            input=text,
            context=context,
        )
        
        response_text = result.final_output
        
        # 发送代理响应事件
        yield VoiceStreamEventAgentResponse(
            text=response_text,
            agent_name=self.agent.name,
        )
        
        # 转换为语音
        async for audio_chunk in self.tts_model.synthesize_streaming(response_text):
            yield VoiceStreamEventAudio(
                audio_data=audio_chunk,
                format=self.config.tts_output_format,
            )

8.3.2 实时交互模块

# 位于 src/agents/realtime/__init__.py
from typing import Any, AsyncIterator, Dict, List
from dataclasses import dataclass

@dataclass
class RealtimeConfig:
    """实时交互配置"""
    
    model: str = "gpt-4-realtime"              # 实时模型
    voice: str = "alloy"                       # 语音角色
    input_audio_format: str = "pcm16"          # 输入音频格式
    output_audio_format: str = "pcm16"         # 输出音频格式
    turn_detection: Dict[str, Any] | None = None  # 轮换检测配置
    
class RealtimeAgent:
    """实时交互代理"""
    
    def __init__(
        self,
        name: str,
        instructions: str,
        config: RealtimeConfig,
        tools: List[Tool] | None = None,
    ):
        self.name = name
        self.instructions = instructions
        self.config = config
        self.tools = tools or []
    
    async def start_realtime_session(
        self,
        context: Any = None,
    ) -> AsyncIterator[RealtimeEvent]:
        """启动实时会话"""
        
        # 连接到实时模型API
        async with self._create_realtime_connection() as connection:
            
            # 发送会话配置
            await connection.send_config({
                "model": self.config.model,
                "voice": self.config.voice,
                "instructions": self.instructions,
                "tools": [tool.to_openai_format() for tool in self.tools],
                "input_audio_format": self.config.input_audio_format,
                "output_audio_format": self.config.output_audio_format,
                "turn_detection": self.config.turn_detection,
            })
            
            # 处理实时事件流
            async for event in connection.receive_events():
                yield self._convert_realtime_event(event)
    
    async def _create_realtime_connection(self):
        """创建实时连接"""
        # 实现WebSocket连接到OpenAI实时API
        pass
    
    def _convert_realtime_event(self, raw_event: Dict[str, Any]) -> RealtimeEvent:
        """转换原始事件为框架事件"""
        event_type = raw_event.get("type")
        
        if event_type == "response.audio.delta":
            return RealtimeAudioEvent(
                audio_data=raw_event["delta"],
                format=self.config.output_audio_format,
            )
        elif event_type == "response.text.delta":
            return RealtimeTextEvent(
                text_delta=raw_event["delta"],
            )
        elif event_type == "response.function_call":
            return RealtimeFunctionCallEvent(
                function_name=raw_event["name"],
                arguments=raw_event["arguments"],
                call_id=raw_event["call_id"],
            )
        else:
            return RealtimeRawEvent(raw_event)

8.4 最佳实践和性能优化

8.4.1 性能优化技巧

# 1. 工具并行执行
async def execute_tools_parallel(tools_and_args: List[Tuple[Tool, str]]) -> List[Any]:
    """并行执行多个工具"""
    
    tasks = [
        tool.execute(context, args)
        for tool, args in tools_and_args
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 处理异常结果
    processed_results = []
    for result in results:
        if isinstance(result, Exception):
            processed_results.append(f"Tool execution failed: {result}")
        else:
            processed_results.append(result)
    
    return processed_results

# 2. 会话数据压缩
class CompressedSession(SessionABC):
    """压缩存储的会话实现"""
    
    def __init__(self, base_session: SessionABC, compression_level: int = 6):
        self.base_session = base_session
        self.compression_level = compression_level
    
    async def add_items(self, items: List[dict]) -> None:
        # 压缩数据后存储
        compressed_items = [
            {
                **item,
                "_compressed": self._compress_large_fields(item)
            }
            for item in items
        ]
        await self.base_session.add_items(compressed_items)
    
    def _compress_large_fields(self, item: dict) -> dict:
        """压缩大字段"""
        import zlib
        import base64
        
        compressed_fields = {}
        
        for key, value in item.items():
            if isinstance(value, str) and len(value) > 1000:
                compressed = zlib.compress(value.encode('utf-8'), self.compression_level)
                compressed_fields[key] = base64.b64encode(compressed).decode('ascii')
        
        return compressed_fields

# 3. 智能缓存
class CachedAgent(Agent):
    """带缓存的代理实现"""
    
    def __init__(self, *args, cache_size: int = 100, **kwargs):
        super().__init__(*args, **kwargs)
        self._cache = {}
        self._cache_size = cache_size
        self._cache_hits = 0
        self._cache_misses = 0
    
    async def get_cached_response(
        self, 
        input_hash: str, 
        context: Any
    ) -> Any | None:
        """获取缓存的响应"""
        
        if input_hash in self._cache:
            self._cache_hits += 1
            return self._cache[input_hash]
        
        self._cache_misses += 1
        return None
    
    def cache_response(self, input_hash: str, response: Any) -> None:
        """缓存响应"""
        
        if len(self._cache) >= self._cache_size:
            # LRU淘汰策略
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]
        
        self._cache[input_hash] = response

这个综合模块分析涵盖了OpenAI Agents SDK的重要支持模块,包括会话管理、追踪监控、语音处理、实时交互等功能,为构建完整的AI代理应用提供了全面的基础设施支持。