10.1 架构设计最佳实践

10.1.1 代理设计原则

单一职责原则 每个代理应该专注于一个特定的领域或任务,避免创建"万能代理"。

# ❌ 不好的做法:万能代理
mega_agent = Agent(
    name="Super Agent",
    instructions="You can do everything: answer questions, write code, analyze data, send emails, book flights...",
    tools=[email_tool, code_tool, data_tool, booking_tool, weather_tool, ...]
)

# ✅ 好的做法:专门化代理
customer_service_agent = Agent(
    name="Customer Service",
    instructions="You are a helpful customer service representative. Focus on resolving customer issues and providing support.",
    tools=[ticket_system_tool, knowledge_base_tool, escalation_tool]
)

technical_support_agent = Agent(
    name="Technical Support", 
    instructions="You are a technical support specialist. Help with technical issues and troubleshooting.",
    tools=[system_diagnostics_tool, log_analysis_tool, deployment_tool]
)

triage_agent = Agent(
    name="Support Triage",
    instructions="Route customer requests to the appropriate specialized agent based on the issue type.",
    handoffs=[customer_service_agent, technical_support_agent]
)

层次化代理设计 使用分层的代理结构,从通用到专门,实现清晰的责任分工。

# 实战案例:电商客服系统
class ECommerceCustomerService:
    def __init__(self):
        # 专门代理
        self.order_agent = Agent(
            name="Order Specialist",
            instructions="Handle order-related inquiries: status, modifications, cancellations.",
            tools=[order_lookup_tool, order_modify_tool, shipping_tracker_tool]
        )
        
        self.product_agent = Agent(
            name="Product Specialist", 
            instructions="Answer product questions: features, compatibility, recommendations.",
            tools=[product_catalog_tool, inventory_tool, recommendation_engine_tool]
        )
        
        self.billing_agent = Agent(
            name="Billing Specialist",
            instructions="Handle billing and payment issues: refunds, payment failures, invoices.",
            tools=[payment_processor_tool, refund_tool, invoice_generator_tool]
        )
        
        # 主路由代理
        self.main_agent = Agent(
            name="Customer Service Router",
            instructions="""
            You are the main customer service agent. Analyze customer requests and:
            1. For order-related issues, handoff to Order Specialist
            2. For product questions, handoff to Product Specialist  
            3. For billing/payment issues, handoff to Billing Specialist
            4. For simple greetings or general info, handle directly
            """,
            handoffs=[self.order_agent, self.product_agent, self.billing_agent],
            tools=[general_info_tool, store_hours_tool]
        )

10.1.2 工具设计最佳实践

幂等性设计 工具应该设计为幂等的,多次调用相同参数产生相同结果。

@function_tool
def get_user_info(user_id: str) -> UserInfo:
    """
    获取用户信息 - 幂等操作
    
    多次调用返回相同结果,不会产生副作用
    """
    return database.get_user(user_id)

@function_tool
def create_user_safely(
    email: str, 
    name: str,
    phone: str | None = None
) -> UserInfo:
    """
    安全创建用户 - 处理重复调用
    
    如果用户已存在,返回现有用户而不是报错
    """
    existing_user = database.find_user_by_email(email)
    if existing_user:
        logger.info(f"User {email} already exists, returning existing user")
        return existing_user
    
    return database.create_user(email=email, name=name, phone=phone)

@function_tool
def update_user_email(
    user_id: str, 
    new_email: str,
    confirmation_required: bool = True
) -> UpdateResult:
    """
    更新用户邮箱 - 带确认机制
    
    包含状态检查,避免无效更新
    """
    user = database.get_user(user_id)
    if user.email == new_email:
        return UpdateResult(success=True, message="Email unchanged", user=user)
    
    if confirmation_required and not user.email_verified:
        return UpdateResult(
            success=False, 
            message="Please verify current email before changing",
            requires_verification=True
        )
    
    updated_user = database.update_user_email(user_id, new_email)
    return UpdateResult(success=True, message="Email updated", user=updated_user)

错误处理和优雅降级

@function_tool
def robust_weather_lookup(
    city: str,
    country_code: str = "US"
) -> WeatherInfo:
    """
    健壮的天气查询工具 - 多重错误处理
    """
    
    try:
        # 主要API服务
        result = primary_weather_api.get_weather(city, country_code)
        return WeatherInfo(
            temperature=result.temperature,
            conditions=result.conditions,
            source="primary_api",
            reliability="high"
        )
        
    except APIRateLimitError:
        # 备用API服务
        try:
            result = backup_weather_api.get_weather(city, country_code)
            return WeatherInfo(
                temperature=result.temp,
                conditions=result.weather,
                source="backup_api", 
                reliability="medium"
            )
        except Exception as backup_error:
            logger.warning(f"Backup weather API failed: {backup_error}")
            
    except APIKeyError:
        return WeatherInfo(
            error="Weather service temporarily unavailable due to authentication issue",
            source="error",
            reliability="none"
        )
        
    except Exception as e:
        logger.error(f"Weather lookup failed for {city}: {e}")
        
        # 使用缓存的数据作为最后手段
        cached_data = weather_cache.get(f"{city}_{country_code}")
        if cached_data and not cached_data.is_expired():
            return WeatherInfo(
                temperature=cached_data.temperature,
                conditions=cached_data.conditions,
                source="cache",
                reliability="low",
                note="Data from cache due to service unavailability"
            )
        
        # 最终降级:返回通用信息
        return WeatherInfo(
            error=f"Unable to get weather for {city}. Please try again later.",
            source="error",
            reliability="none"
        )

10.2 性能优化实战

10.2.1 批处理和并行执行

class OptimizedAgentWorkflow:
    """优化的代理工作流示例"""
    
    @function_tool
    async def batch_user_lookup(
        user_ids: List[str]
    ) -> List[UserInfo]:
        """
        批量用户查询 - 减少数据库往返
        """
        # 批量查询而不是循环单次查询
        users = await database.get_users_batch(user_ids)
        return users
    
    @function_tool  
    async def parallel_data_enrichment(
        user_id: str
    ) -> EnrichedUserData:
        """
        并行数据增强 - 同时调用多个API
        """
        
        # 并行执行多个独立的API调用
        user_task = database.get_user(user_id)
        orders_task = order_service.get_user_orders(user_id)
        preferences_task = preferences_service.get_user_preferences(user_id)
        activity_task = activity_service.get_recent_activity(user_id)
        
        user, orders, preferences, activity = await asyncio.gather(
            user_task, orders_task, preferences_task, activity_task,
            return_exceptions=True
        )
        
        # 处理部分失败的情况
        enriched_data = EnrichedUserData(user=user)
        
        if not isinstance(orders, Exception):
            enriched_data.orders = orders
        if not isinstance(preferences, Exception):
            enriched_data.preferences = preferences  
        if not isinstance(activity, Exception):
            enriched_data.recent_activity = activity
        
        return enriched_data

10.2.2 智能缓存策略

class CacheOptimizedTools:
    """带缓存优化的工具集"""
    
    def __init__(self):
        # 多层缓存:内存 + Redis
        self.memory_cache = {}
        self.redis_cache = redis.Redis()
        
    @function_tool
    async def cached_api_call(
        self,
        endpoint: str,
        params: Dict[str, Any],
        ttl: int = 300  # 5分钟缓存
    ) -> ApiResponse:
        """
        带缓存的API调用
        """
        
        # 生成缓存键
        cache_key = self._generate_cache_key(endpoint, params)
        
        # L1: 内存缓存查找
        if cache_key in self.memory_cache:
            cached_data, timestamp = self.memory_cache[cache_key]
            if time.time() - timestamp < ttl:
                return cached_data
            else:
                del self.memory_cache[cache_key]
        
        # L2: Redis缓存查找
        cached_json = await self.redis_cache.get(cache_key)
        if cached_json:
            try:
                cached_data = ApiResponse.from_json(cached_json)
                # 同时更新内存缓存
                self.memory_cache[cache_key] = (cached_data, time.time())
                return cached_data
            except Exception:
                await self.redis_cache.delete(cache_key)
        
        # 缓存未命中,执行实际API调用
        response = await self._make_api_call(endpoint, params)
        
        # 更新两级缓存
        self.memory_cache[cache_key] = (response, time.time())
        await self.redis_cache.setex(
            cache_key, 
            ttl, 
            response.to_json()
        )
        
        return response
    
    def _generate_cache_key(self, endpoint: str, params: Dict[str, Any]) -> str:
        """生成一致的缓存键"""
        sorted_params = sorted(params.items())
        params_str = "&".join(f"{k}={v}" for k, v in sorted_params)
        return f"api_cache:{endpoint}:{hashlib.md5(params_str.encode()).hexdigest()}"

10.2.3 连接池和资源管理

class ResourceOptimizedAgent:
    """资源优化的代理实现"""
    
    def __init__(self):
        # 数据库连接池
        self.db_pool = asyncpg.create_pool(
            database_url,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
        
        # HTTP会话复用
        self.http_session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            connector=aiohttp.TCPConnector(limit=100, limit_per_host=10)
        )
    
    @function_tool
    async def efficient_database_query(
        self,
        query: str,
        params: List[Any] | None = None
    ) -> List[Dict[str, Any]]:
        """
        高效的数据库查询 - 使用连接池
        """
        
        async with self.db_pool.acquire() as connection:
            async with connection.transaction():
                result = await connection.fetch(query, *(params or []))
                return [dict(row) for row in result]
    
    @function_tool
    async def efficient_http_request(
        self,
        url: str,
        method: str = "GET",
        headers: Dict[str, str] | None = None,
        data: Any = None
    ) -> HttpResponse:
        """
        高效的HTTP请求 - 复用连接
        """
        
        async with self.http_session.request(
            method, url, headers=headers, json=data
        ) as response:
            content = await response.text()
            return HttpResponse(
                status_code=response.status,
                headers=dict(response.headers),
                content=content
            )
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        await self.db_pool
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口 - 清理资源"""
        await self.db_pool.close()
        await self.http_session.close()

10.3 安全性最佳实践

10.3.1 输入验证和清洗

@input_guardrail
async def comprehensive_input_validation(
    context: RunContextWrapper,
    agent: Agent,
    input_data: str | List[TResponseInputItem]
) -> GuardrailFunctionOutput:
    """
    综合输入验证防护
    """
    
    # 将输入转换为文本进行分析
    if isinstance(input_data, list):
        text = "\n".join(
            item.get("content", "") if isinstance(item, dict) else str(item)
            for item in input_data
        )
    else:
        text = input_data
    
    # 1. SQL注入检测
    sql_patterns = [
        r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)",
        r"(UNION\s+SELECT)",
        r"(OR\s+1\s*=\s*1)",
        r"(;\s*(DROP|DELETE))",
    ]
    
    for pattern in sql_patterns:
        if re.search(pattern, text, re.IGNORECASE):
            return GuardrailFunctionOutput(
                tripwire_triggered=True,
                message="Potential SQL injection attempt detected",
                severity="high",
                metadata={"pattern_matched": pattern, "input_sample": text[:100]}
            )
    
    # 2. XSS检测
    xss_patterns = [
        r"<script[^>]*>.*?</script>",
        r"javascript:",
        r"on\w+\s*=",
        r"<iframe[^>]*>",
    ]
    
    for pattern in xss_patterns:
        if re.search(pattern, text, re.IGNORECASE):
            return GuardrailFunctionOutput(
                tripwire_triggered=True,
                message="Potential XSS attempt detected",
                severity="high",
                metadata={"pattern_matched": pattern}
            )
    
    # 3. 敏感信息检测
    sensitive_patterns = {
        "credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
        "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
        "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
        "phone": r"\b\d{3}-\d{3}-\d{4}\b",
    }
    
    detected_sensitive = []
    for info_type, pattern in sensitive_patterns.items():
        matches = re.findall(pattern, text)
        if matches:
            detected_sensitive.append({
                "type": info_type,
                "count": len(matches)
            })
    
    if detected_sensitive:
        return GuardrailFunctionOutput(
            tripwire_triggered=False,  # 警告但不阻止
            message="Sensitive information detected in input",
            severity="medium",
            metadata={"sensitive_info": detected_sensitive}
        )
    
    # 4. 内容长度检查
    if len(text) > 50000:  # 50KB限制
        return GuardrailFunctionOutput(
            tripwire_triggered=True,
            message="Input exceeds maximum allowed length",
            severity="medium",
            metadata={"input_length": len(text), "max_length": 50000}
        )
    
    return GuardrailFunctionOutput(
        tripwire_triggered=False,
        message="Input validation passed"
    )

10.3.2 权限控制和访问管理

class SecureAgentSystem:
    """安全的代理系统实现"""
    
    def __init__(self):
        self.permission_manager = PermissionManager()
        self.audit_logger = AuditLogger()
    
    @function_tool
    async def secure_database_operation(
        self,
        context: ToolContext,
        operation: str,
        table: str,
        data: Dict[str, Any] | None = None,
        user_id: str | None = None
    ) -> OperationResult:
        """
        安全的数据库操作工具
        """
        
        # 1. 用户认证
        if not user_id:
            user_id = context.context.get("current_user_id")
            
        if not user_id:
            await self.audit_logger.log_security_event(
                event_type="unauthorized_access_attempt",
                details={"operation": operation, "table": table},
                severity="high"
            )
            return OperationResult(
                success=False,
                error="User authentication required",
                error_code="AUTH_REQUIRED"
            )
        
        # 2. 权限检查
        required_permission = f"database.{table}.{operation}"
        if not await self.permission_manager.check_permission(user_id, required_permission):
            await self.audit_logger.log_security_event(
                event_type="permission_denied",
                user_id=user_id,
                details={
                    "operation": operation,
                    "table": table,
                    "required_permission": required_permission
                },
                severity="medium"
            )
            return OperationResult(
                success=False,
                error=f"Insufficient permissions for {operation} on {table}",
                error_code="PERMISSION_DENIED"
            )
        
        # 3. 数据验证和清洗
        if data:
            validation_result = await self._validate_data(table, data)
            if not validation_result.valid:
                return OperationResult(
                    success=False,
                    error=f"Data validation failed: {validation_result.error}",
                    error_code="VALIDATION_FAILED"
                )
        
        # 4. 执行操作(在事务中)
        try:
            async with database.transaction():
                result = await self._execute_database_operation(
                    operation, table, data
                )
                
                # 5. 审计日志
                await self.audit_logger.log_database_operation(
                    user_id=user_id,
                    operation=operation,
                    table=table,
                    data_hash=hashlib.sha256(
                        json.dumps(data, sort_keys=True).encode()
                    ).hexdigest() if data else None,
                    success=True
                )
                
                return OperationResult(
                    success=True,
                    data=result,
                    operation_id=str(uuid.uuid4())
                )
                
        except Exception as e:
            await self.audit_logger.log_database_operation(
                user_id=user_id,
                operation=operation,
                table=table,
                error=str(e),
                success=False
            )
            
            return OperationResult(
                success=False,
                error=f"Database operation failed: {str(e)}",
                error_code="OPERATION_FAILED"
            )

class PermissionManager:
    """权限管理器"""
    
    async def check_permission(self, user_id: str, permission: str) -> bool:
        """检查用户权限"""
        user_permissions = await self._get_user_permissions(user_id)
        return permission in user_permissions or self._check_wildcard_permissions(
            user_permissions, permission
        )
    
    def _check_wildcard_permissions(
        self, 
        user_permissions: Set[str], 
        required_permission: str
    ) -> bool:
        """检查通配符权限"""
        permission_parts = required_permission.split(".")
        
        for i in range(len(permission_parts)):
            wildcard_permission = ".".join(permission_parts[:i + 1]) + ".*"
            if wildcard_permission in user_permissions:
                return True
                
        return "admin.*" in user_permissions

class AuditLogger:
    """审计日志记录器"""
    
    async def log_security_event(
        self,
        event_type: str,
        user_id: str | None = None,
        details: Dict[str, Any] | None = None,
        severity: str = "medium"
    ) -> None:
        """记录安全事件"""
        
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "details": details or {},
            "severity": severity,
            "source": "agents_security_system"
        }
        
        # 写入安全日志
        await self._write_to_security_log(event)
        
        # 高危事件实时告警
        if severity == "high":
            await self._send_security_alert(event)

10.4 监控和可观测性实战

10.4.1 自定义追踪处理器

class ProductionTracingProcessor(TracingProcessor):
    """生产环境追踪处理器"""
    
    def __init__(self, config: TracingConfig):
        self.config = config
        self.metrics_collector = MetricsCollector()
        self.alert_manager = AlertManager()
        
    async def process_span_start(self, span: Span[Any]) -> None:
        """处理跨度开始 - 收集指标"""
        
        # 记录开始时间指标
        self.metrics_collector.record_span_start(
            span_type=type(span.span_data).__name__,
            agent_name=getattr(span.span_data, "agent_name", "unknown"),
            trace_id=span.span_data.trace_id
        )
    
    async def process_span_end(self, span: Span[Any]) -> None:
        """处理跨度结束 - 性能分析和告警"""
        
        duration = self._calculate_duration(span)
        
        # 记录持续时间指标
        self.metrics_collector.record_span_duration(
            span_type=type(span.span_data).__name__,
            duration=duration,
            success="error" not in span.span_data.tags
        )
        
        # 检查性能阈值
        if duration > self.config.slow_span_threshold:
            await self._handle_slow_span(span, duration)
        
        # 检查错误
        if "error" in span.span_data.tags:
            await self._handle_error_span(span)
    
    async def _handle_slow_span(self, span: Span[Any], duration: float) -> None:
        """处理慢查询跨度"""
        
        alert = {
            "type": "performance_degradation",
            "severity": "warning",
            "message": f"Slow span detected: {span.span_data.name}",
            "duration": duration,
            "threshold": self.config.slow_span_threshold,
            "trace_id": span.span_data.trace_id,
            "span_id": span.span_data.span_id,
            "metadata": span.span_data.metadata
        }
        
        await self.alert_manager.send_alert(alert)
    
    async def _handle_error_span(self, span: Span[Any]) -> None:
        """处理错误跨度"""
        
        error_rate = await self.metrics_collector.get_recent_error_rate(
            span_type=type(span.span_data).__name__,
            time_window=300  # 5分钟窗口
        )
        
        if error_rate > self.config.error_rate_threshold:
            alert = {
                "type": "high_error_rate",
                "severity": "critical",
                "message": f"High error rate detected: {error_rate:.2%}",
                "span_type": type(span.span_data).__name__,
                "error_rate": error_rate,
                "threshold": self.config.error_rate_threshold,
                "recent_errors": await self._get_recent_error_samples(span)
            }
            
            await self.alert_manager.send_alert(alert)

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self):
        # 使用时序数据库存储指标
        self.timeseries_db = InfluxDBClient()
        
    async def record_span_duration(
        self,
        span_type: str,
        duration: float,
        success: bool,
        tags: Dict[str, str] | None = None
    ) -> None:
        """记录跨度持续时间"""
        
        measurement = {
            "measurement": "agent_span_duration",
            "tags": {
                "span_type": span_type,
                "success": str(success),
                **(tags or {})
            },
            "fields": {
                "duration_seconds": duration
            },
            "time": datetime.utcnow()
        }
        
        await self.timeseries_db.write_point(measurement)
    
    async def get_recent_error_rate(
        self,
        span_type: str,
        time_window: int = 300
    ) -> float:
        """获取最近的错误率"""
        
        query = f"""
        SELECT 
            COUNT(*) as total,
            COUNT(CASE WHEN success = 'false' THEN 1 END) as errors
        FROM agent_span_duration 
        WHERE span_type = '{span_type}'
        AND time >= now() - {time_window}s
        """
        
        result = await self.timeseries_db.query(query)
        
        if result and len(result) > 0:
            total = result[0].get("total", 0)
            errors = result[0].get("errors", 0)
            return errors / total if total > 0 else 0.0
        
        return 0.0

10.4.2 健康检查和状态监控

class AgentHealthMonitor:
    """代理健康监控器"""
    
    def __init__(self, agents: List[Agent]):
        self.agents = agents
        self.health_checks = {}
        self.last_check_time = {}
        
    async def register_health_check(
        self,
        agent_name: str,
        check_function: Callable[[], Awaitable[HealthStatus]]
    ) -> None:
        """注册健康检查函数"""
        self.health_checks[agent_name] = check_function
    
    async def check_all_agents_health(self) -> Dict[str, HealthStatus]:
        """检查所有代理的健康状态"""
        
        health_results = {}
        
        for agent in self.agents:
            try:
                health_status = await self._check_agent_health(agent)
                health_results[agent.name] = health_status
                
            except Exception as e:
                health_results[agent.name] = HealthStatus(
                    status="unhealthy",
                    message=f"Health check failed: {str(e)}",
                    last_check=datetime.utcnow(),
                    checks={}
                )
        
        return health_results
    
    async def _check_agent_health(self, agent: Agent) -> HealthStatus:
        """检查单个代理的健康状态"""
        
        checks = {}
        overall_status = "healthy"
        
        # 1. 基础连接检查
        model_check = await self._check_model_connectivity(agent)
        checks["model_connectivity"] = model_check
        if model_check["status"] != "ok":
            overall_status = "degraded"
        
        # 2. 工具可用性检查
        tools_check = await self._check_tools_availability(agent)
        checks["tools_availability"] = tools_check
        if tools_check["status"] != "ok":
            overall_status = "degraded"
        
        # 3. 自定义健康检查
        if agent.name in self.health_checks:
            custom_check = await self.health_checks[agent.name]()
            checks["custom"] = {
                "status": "ok" if custom_check.is_healthy else "error",
                "message": custom_check.message,
                "details": custom_check.details
            }
            if not custom_check.is_healthy:
                overall_status = "unhealthy"
        
        # 4. 性能指标检查
        performance_check = await self._check_performance_metrics(agent)
        checks["performance"] = performance_check
        if performance_check["status"] == "warning":
            overall_status = max(overall_status, "degraded")
        
        return HealthStatus(
            status=overall_status,
            message=self._generate_health_summary(checks),
            last_check=datetime.utcnow(),
            checks=checks
        )
    
    async def _check_model_connectivity(self, agent: Agent) -> Dict[str, Any]:
        """检查模型连接性"""
        
        try:
            # 发送简单的测试请求
            test_result = await Runner.run(
                agent.clone(
                    instructions="Reply with exactly 'OK' and nothing else.",
                    tools=[],  # 移除工具以简化测试
                    handoffs=[],
                    input_guardrails=[],
                    output_guardrails=[]
                ),
                input="Health check",
                max_turns=1
            )
            
            response_time = test_result.context_wrapper.usage.total_tokens / 1000  # 估算
            
            if "OK" in str(test_result.final_output):
                return {
                    "status": "ok",
                    "response_time": response_time,
                    "message": "Model responding normally"
                }
            else:
                return {
                    "status": "warning", 
                    "response_time": response_time,
                    "message": "Model response unexpected",
                    "actual_response": str(test_result.final_output)[:100]
                }
                
        except Exception as e:
            return {
                "status": "error",
                "message": f"Model connectivity failed: {str(e)}"
            }
    
    async def _check_tools_availability(self, agent: Agent) -> Dict[str, Any]:
        """检查工具可用性"""
        
        unavailable_tools = []
        
        for tool in agent.tools:
            try:
                # 尝试基本的工具调用(如果工具支持健康检查)
                if hasattr(tool, 'health_check'):
                    is_healthy = await tool.health_check()
                    if not is_healthy:
                        unavailable_tools.append(tool.name)
                        
            except Exception as e:
                unavailable_tools.append(f"{tool.name} ({str(e)})")
        
        if unavailable_tools:
            return {
                "status": "error" if len(unavailable_tools) == len(agent.tools) else "warning",
                "message": f"Some tools unavailable: {', '.join(unavailable_tools)}",
                "unavailable_tools": unavailable_tools,
                "total_tools": len(agent.tools)
            }
        else:
            return {
                "status": "ok",
                "message": "All tools available",
                "total_tools": len(agent.tools)
            }

@dataclass
class HealthStatus:
    """健康状态数据类"""
    status: Literal["healthy", "degraded", "unhealthy"]
    message: str
    last_check: datetime
    checks: Dict[str, Any]
    
    @property
    def is_healthy(self) -> bool:
        return self.status == "healthy"

这些实战经验和最佳实践涵盖了OpenAI Agents SDK在生产环境中的关键考虑因素,包括架构设计、性能优化、安全性、监控等各个方面,为开发者提供了实用的指导。