概述

本文档汇总了AutoGPT平台在开发、部署、运维过程中积累的最佳实践和实战经验。通过这些经过验证的方法和策略,可以帮助开发者和运维人员更好地理解和使用AutoGPT平台,避免常见的陷阱,提高开发效率和系统稳定性。

1. 开发最佳实践

1.1 代码规范与质量

1.1.1 Python代码规范

基础规范

# 遵循PEP 8规范
# 使用类型注解提高代码可读性和IDE支持

from typing import Dict, List, Optional, Union
from datetime import datetime
from pydantic import BaseModel, Field

class UserProfile(BaseModel):
    """
    用户配置模型
    
    遵循以下规范:
    1. 类名使用PascalCase
    2. 函数和变量使用snake_case
    3. 常量使用UPPER_CASE
    4. 私有成员使用下划线前缀
    """
    
    id: str = Field(..., description="用户唯一标识符")
    email: str = Field(..., description="用户邮箱地址")
    created_at: datetime = Field(..., description="创建时间")
    preferences: Dict[str, Union[str, int, bool]] = Field(
        default_factory=dict, 
        description="用户偏好设置"
    )
    
    def get_display_name(self) -> str:
        """
        获取用户显示名称
        
        Returns:
            str: 用户显示名称
        """
        return self.email.split('@')[0]
    
    def _validate_preferences(self) -> bool:
        """私有方法:验证偏好设置"""
        # 实现验证逻辑
        return True

# 常量定义
DEFAULT_PAGE_SIZE = 20
MAX_RETRY_ATTEMPTS = 3
CACHE_EXPIRY_SECONDS = 3600

错误处理规范

import logging
from typing import Optional
from contextlib import asynccontextmanager

logger = logging.getLogger(__name__)

class AutoGPTError(Exception):
    """AutoGPT基础异常类"""
    
    def __init__(self, message: str, error_code: Optional[str] = None):
        self.message = message
        self.error_code = error_code
        super().__init__(self.message)

class ValidationError(AutoGPTError):
    """数据验证异常"""
    pass

class ExecutionError(AutoGPTError):
    """执行异常"""
    pass

async def execute_graph_safely(graph_id: str, inputs: Dict) -> Dict:
    """
    安全执行图的示例
    
    最佳实践:
    1. 明确的异常类型
    2. 详细的错误日志
    3. 优雅的错误处理
    4. 资源清理
    """
    try:
        logger.info(f"开始执行图: {graph_id}")
        
        # 验证输入
        if not graph_id:
            raise ValidationError("图ID不能为空", "INVALID_GRAPH_ID")
        
        # 执行逻辑
        result = await _execute_graph_internal(graph_id, inputs)
        
        logger.info(f"图执行成功: {graph_id}")
        return result
        
    except ValidationError as e:
        logger.error(f"输入验证失败: {e.message}")
        raise
    except ExecutionError as e:
        logger.error(f"图执行失败: {e.message}")
        raise
    except Exception as e:
        logger.error(f"未知错误: {str(e)}", exc_info=True)
        raise ExecutionError(f"图执行异常: {str(e)}", "EXECUTION_FAILED")

@asynccontextmanager
async def database_transaction():
    """数据库事务上下文管理器"""
    transaction = None
    try:
        transaction = await db.begin()
        yield transaction
        await transaction.commit()
    except Exception as e:
        if transaction:
            await transaction.rollback()
        logger.error(f"数据库事务失败: {e}")
        raise

1.1.2 TypeScript/JavaScript代码规范

React组件规范

// 使用TypeScript提供类型安全
import React, { useState, useEffect, useCallback } from 'react';
import { useQuery, useMutation } from '@tanstack/react-query';

// 接口定义
interface GraphExecutionProps {
  graphId: string;
  onExecutionComplete?: (result: ExecutionResult) => void;
  className?: string;
}

interface ExecutionResult {
  id: string;
  status: 'completed' | 'failed' | 'cancelled';
  outputs: Record<string, unknown>;
  error?: string;
}

// 组件实现
export const GraphExecution: React.FC<GraphExecutionProps> = ({
  graphId,
  onExecutionComplete,
  className = ''
}) => {
  // 状态管理
  const [isExecuting, setIsExecuting] = useState(false);
  const [executionId, setExecutionId] = useState<string | null>(null);
  
  // 数据获取
  const { data: graph, isLoading } = useQuery({
    queryKey: ['graph', graphId],
    queryFn: () => fetchGraph(graphId),
    enabled: !!graphId
  });
  
  // 执行变更
  const executeMutation = useMutation({
    mutationFn: executeGraph,
    onSuccess: (result) => {
      setIsExecuting(false);
      onExecutionComplete?.(result);
    },
    onError: (error) => {
      setIsExecuting(false);
      console.error('执行失败:', error);
    }
  });
  
  // 事件处理
  const handleExecute = useCallback(async () => {
    if (!graph || isExecuting) return;
    
    setIsExecuting(true);
    try {
      const result = await executeMutation.mutateAsync({
        graphId,
        inputs: {}
      });
      setExecutionId(result.id);
    } catch (error) {
      // 错误已在mutation中处理
    }
  }, [graph, isExecuting, executeMutation, graphId]);
  
  // 副作用
  useEffect(() => {
    // 清理逻辑
    return () => {
      if (executionId) {
        // 取消执行
        cancelExecution(executionId);
      }
    };
  }, [executionId]);
  
  if (isLoading) {
    return <div className="loading">加载中...</div>;
  }
  
  return (
    <div className={`graph-execution ${className}`}>
      <h3>{graph?.name}</h3>
      <button 
        onClick={handleExecute}
        disabled={isExecuting}
        className="execute-button"
      >
        {isExecuting ? '执行中...' : '执行'}
      </button>
    </div>
  );
};

// 工具函数
async function fetchGraph(graphId: string): Promise<Graph> {
  const response = await fetch(`/api/graphs/${graphId}`);
  if (!response.ok) {
    throw new Error(`获取图失败: ${response.statusText}`);
  }
  return response.json();
}

async function executeGraph(params: {
  graphId: string;
  inputs: Record<string, unknown>;
}): Promise<ExecutionResult> {
  const response = await fetch('/api/executions', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(params)
  });
  
  if (!response.ok) {
    throw new Error(`执行失败: ${response.statusText}`);
  }
  
  return response.json();
}

1.2 架构设计最佳实践

1.2.1 模块化设计原则

单一职责原则

# 好的例子:职责单一的服务类
class UserAuthenticationService:
    """用户认证服务 - 只负责认证相关逻辑"""
    
    def __init__(self, jwt_service: JWTService, user_repository: UserRepository):
        self.jwt_service = jwt_service
        self.user_repository = user_repository
    
    async def authenticate_user(self, email: str, password: str) -> Optional[User]:
        """认证用户"""
        user = await self.user_repository.get_by_email(email)
        if user and self._verify_password(password, user.password_hash):
            return user
        return None
    
    def _verify_password(self, password: str, password_hash: str) -> bool:
        """验证密码"""
        # 密码验证逻辑
        pass

class UserProfileService:
    """用户配置服务 - 只负责配置管理"""
    
    def __init__(self, user_repository: UserRepository):
        self.user_repository = user_repository
    
    async def update_profile(self, user_id: str, profile_data: Dict) -> User:
        """更新用户配置"""
        # 配置更新逻辑
        pass

# 避免的例子:职责混乱的服务类
class UserService:  # 不好的设计
    """用户服务 - 职责过多"""
    
    async def authenticate_user(self, email: str, password: str):
        pass
    
    async def update_profile(self, user_id: str, profile_data: Dict):
        pass
    
    async def send_notification(self, user_id: str, message: str):
        pass
    
    async def calculate_usage_cost(self, user_id: str):
        pass

依赖注入模式

from abc import ABC, abstractmethod
from typing import Protocol

# 定义接口
class EmailServiceProtocol(Protocol):
    async def send_email(self, to: str, subject: str, body: str) -> bool:
        ...

class UserRepositoryProtocol(Protocol):
    async def get_by_id(self, user_id: str) -> Optional[User]:
        ...
    
    async def save(self, user: User) -> User:
        ...

# 实现类
class SMTPEmailService:
    """SMTP邮件服务实现"""
    
    def __init__(self, smtp_config: SMTPConfig):
        self.smtp_config = smtp_config
    
    async def send_email(self, to: str, subject: str, body: str) -> bool:
        # SMTP发送实现
        pass

class DatabaseUserRepository:
    """数据库用户仓储实现"""
    
    def __init__(self, db_connection: DatabaseConnection):
        self.db = db_connection
    
    async def get_by_id(self, user_id: str) -> Optional[User]:
        # 数据库查询实现
        pass

# 服务类使用依赖注入
class NotificationService:
    """通知服务"""
    
    def __init__(
        self, 
        email_service: EmailServiceProtocol,
        user_repository: UserRepositoryProtocol
    ):
        self.email_service = email_service
        self.user_repository = user_repository
    
    async def notify_user(self, user_id: str, message: str) -> bool:
        """通知用户"""
        user = await self.user_repository.get_by_id(user_id)
        if not user:
            return False
        
        return await self.email_service.send_email(
            to=user.email,
            subject="AutoGPT通知",
            body=message
        )

# 依赖注入容器配置
def create_notification_service() -> NotificationService:
    """创建通知服务实例"""
    smtp_config = SMTPConfig.from_env()
    email_service = SMTPEmailService(smtp_config)
    
    db_connection = create_db_connection()
    user_repository = DatabaseUserRepository(db_connection)
    
    return NotificationService(email_service, user_repository)

1.2.2 异步编程最佳实践

异步函数设计

import asyncio
from typing import List, Dict, Any
from contextlib import asynccontextmanager

class GraphExecutionService:
    """图执行服务 - 异步编程最佳实践"""
    
    def __init__(self, max_concurrent_executions: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent_executions)
        self.active_executions: Dict[str, asyncio.Task] = {}
    
    async def execute_graph(self, graph_id: str, inputs: Dict) -> str:
        """执行图 - 返回执行ID"""
        execution_id = generate_execution_id()
        
        # 使用信号量控制并发数
        async with self.semaphore:
            task = asyncio.create_task(
                self._execute_graph_internal(execution_id, graph_id, inputs)
            )
            self.active_executions[execution_id] = task
            
            # 设置任务完成回调
            task.add_done_callback(
                lambda t: self.active_executions.pop(execution_id, None)
            )
        
        return execution_id
    
    async def _execute_graph_internal(
        self, 
        execution_id: str, 
        graph_id: str, 
        inputs: Dict
    ) -> Dict:
        """内部执行逻辑"""
        try:
            # 获取图定义
            graph = await self._get_graph(graph_id)
            
            # 并发执行节点
            results = await self._execute_nodes_concurrently(
                execution_id, 
                graph.nodes, 
                inputs
            )
            
            return {"execution_id": execution_id, "results": results}
            
        except Exception as e:
            logger.error(f"图执行失败 {execution_id}: {e}")
            raise
    
    async def _execute_nodes_concurrently(
        self, 
        execution_id: str, 
        nodes: List[Node], 
        inputs: Dict
    ) -> Dict:
        """并发执行节点"""
        # 构建依赖图
        dependency_graph = self._build_dependency_graph(nodes)
        
        # 按拓扑顺序执行
        results = {}
        executed_nodes = set()
        
        while len(executed_nodes) < len(nodes):
            # 找到可以执行的节点(依赖已满足)
            ready_nodes = [
                node for node in nodes
                if node.id not in executed_nodes
                and all(dep in executed_nodes for dep in dependency_graph.get(node.id, []))
            ]
            
            if not ready_nodes:
                raise ExecutionError("检测到循环依赖")
            
            # 并发执行就绪的节点
            tasks = [
                self._execute_node(execution_id, node, inputs, results)
                for node in ready_nodes
            ]
            
            node_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理结果
            for node, result in zip(ready_nodes, node_results):
                if isinstance(result, Exception):
                    raise ExecutionError(f"节点执行失败 {node.id}: {result}")
                
                results[node.id] = result
                executed_nodes.add(node.id)
        
        return results
    
    async def cancel_execution(self, execution_id: str) -> bool:
        """取消执行"""
        task = self.active_executions.get(execution_id)
        if task and not task.done():
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
            return True
        return False
    
    @asynccontextmanager
    async def execution_context(self, execution_id: str):
        """执行上下文管理器"""
        logger.info(f"开始执行 {execution_id}")
        start_time = asyncio.get_event_loop().time()
        
        try:
            yield
        finally:
            end_time = asyncio.get_event_loop().time()
            duration = end_time - start_time
            logger.info(f"执行完成 {execution_id}, 耗时: {duration:.2f}s")

1.3 测试最佳实践

1.3.1 单元测试

测试结构和命名

import pytest
from unittest.mock import Mock, AsyncMock, patch
from typing import Dict, Any

class TestUserAuthenticationService:
    """用户认证服务测试类
    
    测试命名规范:test_<method_name>_<scenario>_<expected_result>
    """
    
    @pytest.fixture
    def mock_jwt_service(self):
        """JWT服务模拟对象"""
        return Mock(spec=JWTService)
    
    @pytest.fixture
    def mock_user_repository(self):
        """用户仓储模拟对象"""
        return AsyncMock(spec=UserRepository)
    
    @pytest.fixture
    def auth_service(self, mock_jwt_service, mock_user_repository):
        """认证服务实例"""
        return UserAuthenticationService(mock_jwt_service, mock_user_repository)
    
    async def test_authenticate_user_valid_credentials_returns_user(
        self, 
        auth_service, 
        mock_user_repository
    ):
        """测试:有效凭据认证返回用户"""
        # Arrange
        email = "test@example.com"
        password = "password123"
        expected_user = User(id="user1", email=email)
        
        mock_user_repository.get_by_email.return_value = expected_user
        
        # Act
        result = await auth_service.authenticate_user(email, password)
        
        # Assert
        assert result == expected_user
        mock_user_repository.get_by_email.assert_called_once_with(email)
    
    async def test_authenticate_user_invalid_credentials_returns_none(
        self, 
        auth_service, 
        mock_user_repository
    ):
        """测试:无效凭据认证返回None"""
        # Arrange
        email = "test@example.com"
        password = "wrong_password"
        
        mock_user_repository.get_by_email.return_value = None
        
        # Act
        result = await auth_service.authenticate_user(email, password)
        
        # Assert
        assert result is None
    
    async def test_authenticate_user_database_error_raises_exception(
        self, 
        auth_service, 
        mock_user_repository
    ):
        """测试:数据库错误抛出异常"""
        # Arrange
        email = "test@example.com"
        password = "password123"
        
        mock_user_repository.get_by_email.side_effect = DatabaseError("连接失败")
        
        # Act & Assert
        with pytest.raises(DatabaseError):
            await auth_service.authenticate_user(email, password)

# 参数化测试
@pytest.mark.parametrize("email,password,expected", [
    ("valid@example.com", "password123", True),
    ("", "password123", False),
    ("valid@example.com", "", False),
    ("invalid-email", "password123", False),
])
async def test_validate_credentials_various_inputs(email, password, expected):
    """测试:各种输入的凭据验证"""
    result = validate_credentials(email, password)
    assert result == expected

# 集成测试
@pytest.mark.integration
class TestGraphExecutionIntegration:
    """图执行集成测试"""
    
    @pytest.fixture
    async def test_database(self):
        """测试数据库"""
        # 创建测试数据库连接
        db = await create_test_database()
        yield db
        await cleanup_test_database(db)
    
    async def test_complete_graph_execution_flow(self, test_database):
        """测试:完整的图执行流程"""
        # 创建测试图
        graph = await create_test_graph(test_database)
        
        # 执行图
        execution_service = GraphExecutionService(test_database)
        execution_id = await execution_service.execute_graph(
            graph.id, 
            {"input": "test_value"}
        )
        
        # 等待执行完成
        result = await wait_for_execution_completion(execution_id)
        
        # 验证结果
        assert result.status == "completed"
        assert "output" in result.outputs

1.3.2 端到端测试

Playwright测试示例

// tests/e2e/graph-execution.spec.ts
import { test, expect } from '@playwright/test';

test.describe('图执行功能', () => {
  test.beforeEach(async ({ page }) => {
    // 登录测试用户
    await page.goto('/login');
    await page.fill('[data-testid="email-input"]', 'test@example.com');
    await page.fill('[data-testid="password-input"]', 'password123');
    await page.click('[data-testid="login-button"]');
    
    // 等待跳转到主页
    await expect(page).toHaveURL('/dashboard');
  });

  test('应该能够创建和执行简单图', async ({ page }) => {
    // 导航到图编辑器
    await page.click('[data-testid="create-graph-button"]');
    await expect(page).toHaveURL('/graphs/new');
    
    // 添加节点
    await page.click('[data-testid="add-node-button"]');
    await page.selectOption('[data-testid="node-type-select"]', 'text-input');
    await page.click('[data-testid="confirm-add-node"]');
    
    // 配置节点
    await page.fill('[data-testid="node-input-text"]', 'Hello, World!');
    
    // 保存图
    await page.fill('[data-testid="graph-name-input"]', '测试图');
    await page.click('[data-testid="save-graph-button"]');
    
    // 等待保存成功
    await expect(page.locator('[data-testid="success-message"]')).toBeVisible();
    
    // 执行图
    await page.click('[data-testid="execute-graph-button"]');
    
    // 等待执行完成
    await expect(page.locator('[data-testid="execution-status"]')).toHaveText('已完成');
    
    // 验证输出
    const output = await page.textContent('[data-testid="execution-output"]');
    expect(output).toContain('Hello, World!');
  });

  test('应该能够处理执行错误', async ({ page }) => {
    // 创建会失败的图
    await page.goto('/graphs/new');
    await page.click('[data-testid="add-node-button"]');
    await page.selectOption('[data-testid="node-type-select"]', 'api-call');
    
    // 配置无效的API调用
    await page.fill('[data-testid="api-url-input"]', 'invalid-url');
    
    // 保存并执行
    await page.fill('[data-testid="graph-name-input"]', '错误测试图');
    await page.click('[data-testid="save-graph-button"]');
    await page.click('[data-testid="execute-graph-button"]');
    
    // 验证错误处理
    await expect(page.locator('[data-testid="execution-status"]')).toHaveText('失败');
    await expect(page.locator('[data-testid="error-message"]')).toBeVisible();
  });
});

2. 性能优化最佳实践

2.1 数据库优化

2.1.1 查询优化

索引策略

-- 用户表优化
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
CREATE INDEX CONCURRENTLY idx_users_created_at ON users(created_at);
CREATE INDEX CONCURRENTLY idx_users_active ON users(is_active) WHERE is_active = true;

-- 图表优化
CREATE INDEX CONCURRENTLY idx_graphs_user_id ON graphs(user_id);
CREATE INDEX CONCURRENTLY idx_graphs_user_active ON graphs(user_id, is_active) 
  WHERE is_active = true;

-- 执行表优化
CREATE INDEX CONCURRENTLY idx_executions_user_status ON graph_executions(user_id, status);
CREATE INDEX CONCURRENTLY idx_executions_created_at ON graph_executions(created_at);
CREATE INDEX CONCURRENTLY idx_executions_graph_id ON graph_executions(graph_id);

-- 复合索引优化
CREATE INDEX CONCURRENTLY idx_executions_user_created ON graph_executions(user_id, created_at DESC);

查询优化示例

class OptimizedGraphRepository:
    """优化的图仓储实现"""
    
    async def get_user_graphs_with_stats(
        self, 
        user_id: str, 
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict]:
        """
        获取用户图列表及统计信息
        
        优化策略:
        1. 使用单个查询获取所有需要的数据
        2. 避免N+1查询问题
        3. 使用适当的索引
        """
        query = """
        SELECT 
            g.id,
            g.name,
            g.description,
            g.created_at,
            g.updated_at,
            COUNT(ge.id) as execution_count,
            MAX(ge.created_at) as last_execution_at,
            AVG(CASE WHEN ge.status = 'completed' THEN 1.0 ELSE 0.0 END) as success_rate
        FROM graphs g
        LEFT JOIN graph_executions ge ON g.id = ge.graph_id
        WHERE g.user_id = $1 AND g.is_active = true
        GROUP BY g.id, g.name, g.description, g.created_at, g.updated_at
        ORDER BY g.updated_at DESC
        LIMIT $2 OFFSET $3
        """
        
        return await self.db.fetch_all(query, user_id, limit, offset)
    
    async def get_execution_history_paginated(
        self,
        user_id: str,
        page: int = 1,
        page_size: int = 20,
        status_filter: Optional[str] = None
    ) -> Dict:
        """
        分页获取执行历史
        
        优化策略:
        1. 使用游标分页提高大数据集性能
        2. 并行执行数据查询和计数查询
        3. 使用适当的WHERE条件和索引
        """
        offset = (page - 1) * page_size
        
        # 构建WHERE条件
        where_conditions = ["user_id = $1"]
        params = [user_id]
        
        if status_filter:
            where_conditions.append("status = $2")
            params.append(status_filter)
        
        where_clause = " AND ".join(where_conditions)
        
        # 并行执行数据查询和计数查询
        data_query = f"""
        SELECT id, graph_id, status, created_at, started_at, ended_at
        FROM graph_executions
        WHERE {where_clause}
        ORDER BY created_at DESC
        LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
        """
        
        count_query = f"""
        SELECT COUNT(*) as total
        FROM graph_executions
        WHERE {where_clause}
        """
        
        # 使用asyncio.gather并行执行
        data_task = self.db.fetch_all(data_query, *params, page_size, offset)
        count_task = self.db.fetch_one(count_query, *params)
        
        data_results, count_result = await asyncio.gather(data_task, count_task)
        
        return {
            "data": data_results,
            "pagination": {
                "page": page,
                "page_size": page_size,
                "total": count_result["total"],
                "total_pages": (count_result["total"] + page_size - 1) // page_size
            }
        }

2.1.2 连接池优化

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool

class DatabaseManager:
    """数据库管理器 - 连接池优化"""
    
    def __init__(self, database_url: str):
        # 连接池配置
        self.engine = create_async_engine(
            database_url,
            # 连接池设置
            poolclass=QueuePool,
            pool_size=20,          # 连接池大小
            max_overflow=30,       # 最大溢出连接数
            pool_timeout=30,       # 获取连接超时时间
            pool_recycle=3600,     # 连接回收时间(秒)
            pool_pre_ping=True,    # 连接前ping测试
            
            # 连接参数
            connect_args={
                "server_settings": {
                    "application_name": "autogpt_backend",
                    "jit": "off"  # 关闭JIT以提高小查询性能
                }
            },
            
            # 执行选项
            execution_options={
                "isolation_level": "READ_COMMITTED"
            }
        )
        
        self.async_session = sessionmaker(
            self.engine,
            class_=AsyncSession,
            expire_on_commit=False
        )
    
    async def get_session(self) -> AsyncSession:
        """获取数据库会话"""
        return self.async_session()
    
    async def close(self):
        """关闭数据库连接"""
        await self.engine.dispose()

2.2 缓存策略优化

2.2.1 多层缓存架构

from typing import Optional, Any, Dict
import json
import hashlib
from datetime import timedelta

class MultiLevelCache:
    """多层缓存实现"""
    
    def __init__(
        self,
        redis_client,
        local_cache_size: int = 1000,
        default_ttl: int = 3600
    ):
        self.redis = redis_client
        self.local_cache = {}  # 简化的本地缓存
        self.local_cache_size = local_cache_size
        self.default_ttl = default_ttl
    
    async def get(self, key: str) -> Optional[Any]:
        """
        获取缓存值
        
        查找顺序:本地缓存 -> Redis -> 数据库
        """
        # 1. 检查本地缓存
        if key in self.local_cache:
            return self.local_cache[key]
        
        # 2. 检查Redis缓存
        redis_value = await self.redis.get(key)
        if redis_value:
            try:
                value = json.loads(redis_value)
                # 更新本地缓存
                self._update_local_cache(key, value)
                return value
            except json.JSONDecodeError:
                pass
        
        return None
    
    async def set(
        self, 
        key: str, 
        value: Any, 
        ttl: Optional[int] = None
    ) -> bool:
        """
        设置缓存值
        
        同时更新本地缓存和Redis
        """
        ttl = ttl or self.default_ttl
        
        try:
            # 序列化值
            serialized_value = json.dumps(value, default=str)
            
            # 更新Redis
            await self.redis.setex(key, ttl, serialized_value)
            
            # 更新本地缓存
            self._update_local_cache(key, value)
            
            return True
        except Exception as e:
            logger.error(f"缓存设置失败: {e}")
            return False
    
    async def delete(self, key: str) -> bool:
        """删除缓存"""
        # 删除本地缓存
        self.local_cache.pop(key, None)
        
        # 删除Redis缓存
        return await self.redis.delete(key) > 0
    
    def _update_local_cache(self, key: str, value: Any):
        """更新本地缓存"""
        # 简单的LRU实现
        if len(self.local_cache) >= self.local_cache_size:
            # 删除最旧的条目
            oldest_key = next(iter(self.local_cache))
            del self.local_cache[oldest_key]
        
        self.local_cache[key] = value

class CacheService:
    """缓存服务 - 业务层缓存封装"""
    
    def __init__(self, cache: MultiLevelCache):
        self.cache = cache
    
    async def get_user_profile(self, user_id: str) -> Optional[Dict]:
        """获取用户配置(带缓存)"""
        cache_key = f"user_profile:{user_id}"
        
        # 尝试从缓存获取
        cached_profile = await self.cache.get(cache_key)
        if cached_profile:
            return cached_profile
        
        # 从数据库获取
        profile = await self._fetch_user_profile_from_db(user_id)
        if profile:
            # 缓存用户配置(1小时)
            await self.cache.set(cache_key, profile, ttl=3600)
        
        return profile
    
    async def invalidate_user_cache(self, user_id: str):
        """使用户缓存失效"""
        patterns = [
            f"user_profile:{user_id}",
            f"user_graphs:{user_id}",
            f"user_executions:{user_id}",
        ]
        
        for pattern in patterns:
            await self.cache.delete(pattern)
    
    def cache_key_for_graph_list(
        self, 
        user_id: str, 
        page: int, 
        filters: Dict
    ) -> str:
        """生成图列表的缓存键"""
        # 创建过滤器的哈希
        filter_hash = hashlib.md5(
            json.dumps(filters, sort_keys=True).encode()
        ).hexdigest()[:8]
        
        return f"graph_list:{user_id}:{page}:{filter_hash}"

2.2.2 缓存预热和更新策略

class CacheWarmupService:
    """缓存预热服务"""
    
    def __init__(self, cache_service: CacheService):
        self.cache_service = cache_service
    
    async def warmup_user_data(self, user_id: str):
        """预热用户数据"""
        tasks = [
            self._warmup_user_profile(user_id),
            self._warmup_user_graphs(user_id),
            self._warmup_recent_executions(user_id),
        ]
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _warmup_user_profile(self, user_id: str):
        """预热用户配置"""
        await self.cache_service.get_user_profile(user_id)
    
    async def _warmup_user_graphs(self, user_id: str):
        """预热用户图列表"""
        # 预热前几页的图列表
        for page in range(1, 4):
            await self.cache_service.get_user_graphs(
                user_id, 
                page=page, 
                filters={}
            )
    
    async def schedule_cache_refresh(self):
        """定期刷新缓存"""
        while True:
            try:
                # 获取活跃用户列表
                active_users = await self._get_active_users()
                
                # 并发预热活跃用户数据
                semaphore = asyncio.Semaphore(10)  # 限制并发数
                
                async def warmup_with_semaphore(user_id: str):
                    async with semaphore:
                        await self.warmup_user_data(user_id)
                
                tasks = [
                    warmup_with_semaphore(user_id) 
                    for user_id in active_users
                ]
                
                await asyncio.gather(*tasks, return_exceptions=True)
                
                # 等待下次刷新
                await asyncio.sleep(300)  # 5分钟
                
            except Exception as e:
                logger.error(f"缓存刷新失败: {e}")
                await asyncio.sleep(60)  # 错误时等待1分钟

2.3 前端性能优化

2.3.1 React组件优化

import React, { memo, useMemo, useCallback, useState } from 'react';
import { useVirtualizer } from '@tanstack/react-virtual';

// 使用memo优化组件重渲染
const GraphListItem = memo<{
  graph: Graph;
  onExecute: (graphId: string) => void;
  onEdit: (graphId: string) => void;
}>(({ graph, onExecute, onEdit }) => {
  // 使用useCallback缓存事件处理函数
  const handleExecute = useCallback(() => {
    onExecute(graph.id);
  }, [graph.id, onExecute]);
  
  const handleEdit = useCallback(() => {
    onEdit(graph.id);
  }, [graph.id, onEdit]);
  
  return (
    <div className="graph-item">
      <h3>{graph.name}</h3>
      <p>{graph.description}</p>
      <div className="actions">
        <button onClick={handleExecute}>执行</button>
        <button onClick={handleEdit}>编辑</button>
      </div>
    </div>
  );
});

// 虚拟滚动优化大列表
const VirtualizedGraphList: React.FC<{
  graphs: Graph[];
  onExecute: (graphId: string) => void;
  onEdit: (graphId: string) => void;
}> = ({ graphs, onExecute, onEdit }) => {
  const parentRef = React.useRef<HTMLDivElement>(null);
  
  const virtualizer = useVirtualizer({
    count: graphs.length,
    getScrollElement: () => parentRef.current,
    estimateSize: () => 120, // 估算每项高度
    overscan: 5, // 预渲染项数
  });
  
  return (
    <div
      ref={parentRef}
      className="graph-list-container"
      style={{ height: '600px', overflow: 'auto' }}
    >
      <div
        style={{
          height: `${virtualizer.getTotalSize()}px`,
          width: '100%',
          position: 'relative',
        }}
      >
        {virtualizer.getVirtualItems().map((virtualItem) => {
          const graph = graphs[virtualItem.index];
          return (
            <div
              key={virtualItem.key}
              style={{
                position: 'absolute',
                top: 0,
                left: 0,
                width: '100%',
                height: `${virtualItem.size}px`,
                transform: `translateY(${virtualItem.start}px)`,
              }}
            >
              <GraphListItem
                graph={graph}
                onExecute={onExecute}
                onEdit={onEdit}
              />
            </div>
          );
        })}
      </div>
    </div>
  );
};

// 使用useMemo优化计算
const GraphDashboard: React.FC = () => {
  const [graphs, setGraphs] = useState<Graph[]>([]);
  const [filter, setFilter] = useState('');
  
  // 使用useMemo缓存过滤结果
  const filteredGraphs = useMemo(() => {
    if (!filter) return graphs;
    
    return graphs.filter(graph =>
      graph.name.toLowerCase().includes(filter.toLowerCase()) ||
      graph.description.toLowerCase().includes(filter.toLowerCase())
    );
  }, [graphs, filter]);
  
  // 使用useMemo缓存统计数据
  const stats = useMemo(() => {
    return {
      total: graphs.length,
      active: graphs.filter(g => g.isActive).length,
      templates: graphs.filter(g => g.isTemplate).length,
    };
  }, [graphs]);
  
  const handleExecute = useCallback((graphId: string) => {
    // 执行逻辑
  }, []);
  
  const handleEdit = useCallback((graphId: string) => {
    // 编辑逻辑
  }, []);
  
  return (
    <div className="dashboard">
      <div className="stats">
        <div>总计: {stats.total}</div>
        <div>活跃: {stats.active}</div>
        <div>模板: {stats.templates}</div>
      </div>
      
      <input
        type="text"
        placeholder="搜索图..."
        value={filter}
        onChange={(e) => setFilter(e.target.value)}
      />
      
      <VirtualizedGraphList
        graphs={filteredGraphs}
        onExecute={handleExecute}
        onEdit={handleEdit}
      />
    </div>
  );
};

2.3.2 代码分割和懒加载

import { lazy, Suspense } from 'react';
import { Routes, Route } from 'react-router-dom';

// 懒加载组件
const GraphEditor = lazy(() => import('./components/GraphEditor'));
const GraphList = lazy(() => import('./components/GraphList'));
const Settings = lazy(() => import('./components/Settings'));
const Analytics = lazy(() => import('./components/Analytics'));

// 加载组件
const LoadingSpinner = () => (
  <div className="loading-spinner">
    <div className="spinner" />
    <p>加载中...</p>
  </div>
);

// 错误边界组件
class ErrorBoundary extends React.Component<
  { children: React.ReactNode },
  { hasError: boolean }
> {
  constructor(props: { children: React.ReactNode }) {
    super(props);
    this.state = { hasError: false };
  }
  
  static getDerivedStateFromError(error: Error) {
    return { hasError: true };
  }
  
  componentDidCatch(error: Error, errorInfo: React.ErrorInfo) {
    console.error('组件错误:', error, errorInfo);
  }
  
  render() {
    if (this.state.hasError) {
      return (
        <div className="error-boundary">
          <h2>出错了</h2>
          <p>页面加载失败,请刷新重试。</p>
        </div>
      );
    }
    
    return this.props.children;
  }
}

// 应用路由
const App: React.FC = () => {
  return (
    <ErrorBoundary>
      <Routes>
        <Route path="/" element={
          <Suspense fallback={<LoadingSpinner />}>
            <GraphList />
          </Suspense>
        } />
        
        <Route path="/editor/:graphId?" element={
          <Suspense fallback={<LoadingSpinner />}>
            <GraphEditor />
          </Suspense>
        } />
        
        <Route path="/settings" element={
          <Suspense fallback={<LoadingSpinner />}>
            <Settings />
          </Suspense>
        } />
        
        <Route path="/analytics" element={
          <Suspense fallback={<LoadingSpinner />}>
            <Analytics />
          </Suspense>
        } />
      </Routes>
    </ErrorBoundary>
  );
};

// 预加载关键路由
const preloadRoutes = () => {
  // 在空闲时间预加载常用组件
  if ('requestIdleCallback' in window) {
    requestIdleCallback(() => {
      import('./components/GraphEditor');
      import('./components/Settings');
    });
  }
};

// 在应用启动时预加载
export { App, preloadRoutes };

3. 安全最佳实践

3.1 认证和授权

3.1.1 JWT安全实践

import jwt
import secrets
from datetime import datetime, timedelta
from typing import Dict, Optional

class SecureJWTService:
    """安全的JWT服务实现"""
    
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        if len(secret_key) < 32:
            raise ValueError("JWT密钥长度至少32字符")
        
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.access_token_expire = timedelta(minutes=15)  # 短期访问令牌
        self.refresh_token_expire = timedelta(days=7)     # 长期刷新令牌
    
    def create_access_token(self, user_id: str, permissions: List[str]) -> str:
        """创建访问令牌"""
        now = datetime.utcnow()
        payload = {
            "sub": user_id,
            "iat": now,
            "exp": now + self.access_token_expire,
            "type": "access",
            "permissions": permissions,
            "jti": secrets.token_urlsafe(16),  # JWT ID防止重放攻击
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def create_refresh_token(self, user_id: str) -> str:
        """创建刷新令牌"""
        now = datetime.utcnow()
        payload = {
            "sub": user_id,
            "iat": now,
            "exp": now + self.refresh_token_expire,
            "type": "refresh",
            "jti": secrets.token_urlsafe(16),
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def verify_token(self, token: str, token_type: str = "access") -> Optional[Dict]:
        """验证令牌"""
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm],
                options={
                    "verify_exp": True,
                    "verify_iat": True,
                    "verify_signature": True,
                }
            )
            
            # 验证令牌类型
            if payload.get("type") != token_type:
                return None
            
            # 检查令牌是否在黑名单中
            if await self._is_token_blacklisted(payload.get("jti")):
                return None
            
            return payload
            
        except jwt.ExpiredSignatureError:
            logger.warning("JWT令牌已过期")
            return None
        except jwt.InvalidTokenError as e:
            logger.warning(f"JWT令牌无效: {e}")
            return None
    
    async def blacklist_token(self, jti: str):
        """将令牌加入黑名单"""
        # 使用Redis存储黑名单,设置过期时间
        await redis_client.setex(
            f"blacklist:{jti}",
            self.refresh_token_expire.total_seconds(),
            "1"
        )
    
    async def _is_token_blacklisted(self, jti: str) -> bool:
        """检查令牌是否在黑名单中"""
        return await redis_client.exists(f"blacklist:{jti}")

# 权限装饰器
def require_permissions(*required_permissions: str):
    """权限检查装饰器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # 从请求上下文获取用户权限
            user_permissions = get_current_user_permissions()
            
            # 检查权限
            if not all(perm in user_permissions for perm in required_permissions):
                raise HTTPException(
                    status_code=403,
                    detail="权限不足"
                )
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

# 使用示例
@require_permissions("graph:execute", "user:read")
async def execute_user_graph(graph_id: str, user_id: str):
    """执行用户图 - 需要图执行和用户读取权限"""
    pass

3.1.2 输入验证和清理

from pydantic import BaseModel, Field, validator
import re
import html
from typing import Optional, List

class SecureUserInput(BaseModel):
    """安全的用户输入模型"""
    
    email: str = Field(..., max_length=254)
    name: Optional[str] = Field(None, max_length=100)
    bio: Optional[str] = Field(None, max_length=500)
    website: Optional[str] = Field(None, max_length=200)
    
    @validator('email')
    def validate_email(cls, v):
        """验证邮箱格式"""
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_pattern, v):
            raise ValueError('邮箱格式无效')
        return v.lower().strip()
    
    @validator('name')
    def validate_name(cls, v):
        """验证姓名"""
        if v is None:
            return v
        
        # 移除HTML标签
        v = html.escape(v.strip())
        
        # 检查特殊字符
        if re.search(r'[<>"\']', v):
            raise ValueError('姓名包含无效字符')
        
        return v
    
    @validator('bio')
    def validate_bio(cls, v):
        """验证个人简介"""
        if v is None:
            return v
        
        # HTML转义
        v = html.escape(v.strip())
        
        # 检查恶意内容
        malicious_patterns = [
            r'<script.*?>.*?</script>',
            r'javascript:',
            r'on\w+\s*=',
        ]
        
        for pattern in malicious_patterns:
            if re.search(pattern, v, re.IGNORECASE):
                raise ValueError('内容包含不安全字符')
        
        return v
    
    @validator('website')
    def validate_website(cls, v):
        """验证网站URL"""
        if v is None:
            return v
        
        # URL格式验证
        url_pattern = r'^https?://[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(/.*)?$'
        if not re.match(url_pattern, v):
            raise ValueError('网站URL格式无效')
        
        # 检查恶意URL
        if any(domain in v.lower() for domain in ['malware.com', 'phishing.net']):
            raise ValueError('不允许的域名')
        
        return v

class SQLInjectionPrevention:
    """SQL注入防护"""
    
    @staticmethod
    def sanitize_search_query(query: str) -> str:
        """清理搜索查询"""
        if not query:
            return ""
        
        # 移除危险字符
        dangerous_chars = [';', '--', '/*', '*/', 'xp_', 'sp_']
        for char in dangerous_chars:
            query = query.replace(char, '')
        
        # 限制长度
        query = query[:100]
        
        # HTML转义
        query = html.escape(query.strip())
        
        return query
    
    @staticmethod
    def validate_order_by(order_by: str, allowed_columns: List[str]) -> str:
        """验证ORDER BY参数"""
        if order_by not in allowed_columns:
            raise ValueError(f"无效的排序字段: {order_by}")
        
        return order_by

# 使用参数化查询
async def get_user_graphs_safe(
    user_id: str, 
    search_query: Optional[str] = None,
    order_by: str = "created_at"
) -> List[Dict]:
    """安全的用户图查询"""
    
    # 验证输入
    if not user_id or not isinstance(user_id, str):
        raise ValueError("无效的用户ID")
    
    # 清理搜索查询
    if search_query:
        search_query = SQLInjectionPrevention.sanitize_search_query(search_query)
    
    # 验证排序字段
    allowed_order_columns = ["created_at", "updated_at", "name"]
    order_by = SQLInjectionPrevention.validate_order_by(order_by, allowed_order_columns)
    
    # 使用参数化查询
    if search_query:
        query = f"""
        SELECT id, name, description, created_at
        FROM graphs
        WHERE user_id = $1 
          AND is_active = true
          AND (name ILIKE $2 OR description ILIKE $2)
        ORDER BY {order_by} DESC
        """
        return await db.fetch_all(query, user_id, f"%{search_query}%")
    else:
        query = f"""
        SELECT id, name, description, created_at
        FROM graphs
        WHERE user_id = $1 AND is_active = true
        ORDER BY {order_by} DESC
        """
        return await db.fetch_all(query, user_id)

3.2 数据保护

3.2.1 敏感数据加密

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
from typing import Union

class DataEncryption:
    """数据加密服务"""
    
    def __init__(self, master_key: str):
        self.master_key = master_key.encode()
        self._fernet = self._create_fernet()
    
    def _create_fernet(self) -> Fernet:
        """创建Fernet加密器"""
        # 使用PBKDF2派生密钥
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=b'autogpt_salt',  # 在生产环境中应使用随机盐
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
        return Fernet(key)
    
    def encrypt(self, data: Union[str, bytes]) -> str:
        """加密数据"""
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        encrypted_data = self._fernet.encrypt(data)
        return base64.urlsafe_b64encode(encrypted_data).decode('utf-8')
    
    def decrypt(self, encrypted_data: str) -> str:
        """解密数据"""
        try:
            encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
            decrypted_data = self._fernet.decrypt(encrypted_bytes)
            return decrypted_data.decode('utf-8')
        except Exception as e:
            logger.error(f"解密失败: {e}")
            raise ValueError("数据解密失败")

class SecureCredentialsStore:
    """安全凭据存储"""
    
    def __init__(self, encryption_service: DataEncryption):
        self.encryption = encryption_service
    
    async def store_api_key(
        self, 
        user_id: str, 
        provider: str, 
        api_key: str
    ) -> str:
        """存储API密钥"""
        # 加密API密钥
        encrypted_key = self.encryption.encrypt(api_key)
        
        # 生成密钥ID
        key_id = secrets.token_urlsafe(16)
        
        # 存储到数据库
        await db.execute(
            """
            INSERT INTO user_credentials (id, user_id, provider, encrypted_data, created_at)
            VALUES ($1, $2, $3, $4, $5)
            """,
            key_id, user_id, provider, encrypted_key, datetime.utcnow()
        )
        
        return key_id
    
    async def retrieve_api_key(self, user_id: str, key_id: str) -> Optional[str]:
        """检索API密钥"""
        result = await db.fetch_one(
            """
            SELECT encrypted_data
            FROM user_credentials
            WHERE id = $1 AND user_id = $2
            """,
            key_id, user_id
        )
        
        if not result:
            return None
        
        try:
            return self.encryption.decrypt(result['encrypted_data'])
        except ValueError:
            logger.error(f"无法解密凭据: {key_id}")
            return None
    
    async def delete_api_key(self, user_id: str, key_id: str) -> bool:
        """删除API密钥"""
        result = await db.execute(
            """
            DELETE FROM user_credentials
            WHERE id = $1 AND user_id = $2
            """,
            key_id, user_id
        )
        
        return result > 0

# 数据脱敏
class DataMasking:
    """数据脱敏工具"""
    
    @staticmethod
    def mask_email(email: str) -> str:
        """脱敏邮箱地址"""
        if '@' not in email:
            return email
        
        local, domain = email.split('@', 1)
        if len(local) <= 2:
            masked_local = local
        else:
            masked_local = local[0] + '*' * (len(local) - 2) + local[-1]
        
        return f"{masked_local}@{domain}"
    
    @staticmethod
    def mask_api_key(api_key: str) -> str:
        """脱敏API密钥"""
        if len(api_key) <= 8:
            return '*' * len(api_key)
        
        return api_key[:4] + '*' * (len(api_key) - 8) + api_key[-4:]
    
    @staticmethod
    def mask_phone(phone: str) -> str:
        """脱敏电话号码"""
        if len(phone) <= 4:
            return '*' * len(phone)
        
        return phone[:3] + '*' * (len(phone) - 6) + phone[-3:]

4. 运维最佳实践

4.1 监控和告警

4.1.1 应用性能监控

import time
import psutil
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from typing import Dict, Any

# Prometheus指标定义
REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

ACTIVE_CONNECTIONS = Gauge(
    'active_connections_total',
    'Active connections'
)

EXECUTION_COUNT = Counter(
    'graph_executions_total',
    'Total graph executions',
    ['status', 'user_id']
)

EXECUTION_DURATION = Histogram(
    'graph_execution_duration_seconds',
    'Graph execution duration',
    ['graph_id']
)

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.start_time = time.time()
    
    def record_request(self, method: str, endpoint: str, status: int, duration: float):
        """记录HTTP请求指标"""
        REQUEST_COUNT.labels(
            method=method,
            endpoint=endpoint,
            status=str(status)
        ).inc()
        
        REQUEST_DURATION.labels(
            method=method,
            endpoint=endpoint
        ).observe(duration)
    
    def record_execution(self, graph_id: str, status: str, user_id: str, duration: float):
        """记录图执行指标"""
        EXECUTION_COUNT.labels(
            status=status,
            user_id=user_id
        ).inc()
        
        EXECUTION_DURATION.labels(
            graph_id=graph_id
        ).observe(duration)
    
    def get_system_metrics(self) -> Dict[str, Any]:
        """获取系统指标"""
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk = psutil.disk_usage('/')
        
        return {
            'cpu_usage_percent': cpu_percent,
            'memory_usage_percent': memory.percent,
            'memory_available_bytes': memory.available,
            'disk_usage_percent': disk.percent,
            'disk_free_bytes': disk.free,
            'uptime_seconds': time.time() - self.start_time,
        }
    
    async def collect_application_metrics(self) -> Dict[str, Any]:
        """收集应用指标"""
        # 数据库连接池状态
        db_pool_stats = await self._get_db_pool_stats()
        
        # Redis连接状态
        redis_stats = await self._get_redis_stats()
        
        # 活跃执行数量
        active_executions = await self._get_active_executions_count()
        
        return {
            'database': db_pool_stats,
            'redis': redis_stats,
            'active_executions': active_executions,
        }

# FastAPI中间件集成
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

class MetricsMiddleware(BaseHTTPMiddleware):
    """指标收集中间件"""
    
    def __init__(self, app, monitor: PerformanceMonitor):
        super().__init__(app)
        self.monitor = monitor
    
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        # 处理请求
        response = await call_next(request)
        
        # 记录指标
        duration = time.time() - start_time
        self.monitor.record_request(
            method=request.method,
            endpoint=request.url.path,
            status=response.status_code,
            duration=duration
        )
        
        return response

# 健康检查端点
@app.get("/health")
async def health_check():
    """健康检查"""
    try:
        # 检查数据库连接
        await db.execute("SELECT 1")
        
        # 检查Redis连接
        await redis.ping()
        
        # 检查系统资源
        system_metrics = monitor.get_system_metrics()
        
        # 判断健康状态
        is_healthy = (
            system_metrics['cpu_usage_percent'] < 90 and
            system_metrics['memory_usage_percent'] < 90 and
            system_metrics['disk_usage_percent'] < 90
        )
        
        status = "healthy" if is_healthy else "degraded"
        
        return {
            "status": status,
            "timestamp": datetime.utcnow().isoformat(),
            "metrics": system_metrics
        }
        
    except Exception as e:
        logger.error(f"健康检查失败: {e}")
        return {
            "status": "unhealthy",
            "timestamp": datetime.utcnow().isoformat(),
            "error": str(e)
        }

@app.get("/metrics")
async def metrics():
    """Prometheus指标端点"""
    return Response(
        generate_latest(),
        media_type="text/plain"
    )

4.1.2 告警规则配置

# prometheus/alerts.yml
groups:
  - name: autogpt_alerts
    rules:
      # 高错误率告警
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "高错误率检测"
          description: "5分钟内HTTP 5xx错误率超过10%"
      
      # 响应时间过长告警
      - alert: HighResponseTime
        expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "响应时间过长"
          description: "95%的请求响应时间超过2秒"
      
      # 系统资源告警
      - alert: HighCPUUsage
        expr: cpu_usage_percent > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "CPU使用率过高"
          description: "CPU使用率持续5分钟超过80%"
      
      - alert: HighMemoryUsage
        expr: memory_usage_percent > 85
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "内存使用率过高"
          description: "内存使用率持续3分钟超过85%"
      
      # 数据库连接告警
      - alert: DatabaseConnectionIssue
        expr: up{job="autogpt-backend"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "数据库连接异常"
          description: "无法连接到数据库"
      
      # 执行队列积压告警
      - alert: ExecutionQueueBacklog
        expr: execution_queue_size > 100
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "执行队列积压"
          description: "执行队列中待处理任务超过100个"

4.2 日志管理

4.2.1 结构化日志实践

import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional

class StructuredLogger:
    """结构化日志器"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # 配置处理器
        handler = logging.StreamHandler()
        formatter = StructuredFormatter()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def info(self, message: str, **kwargs):
        """记录信息日志"""
        self._log(logging.INFO, message, **kwargs)
    
    def error(self, message: str, **kwargs):
        """记录错误日志"""
        self._log(logging.ERROR, message, **kwargs)
    
    def warning(self, message: str, **kwargs):
        """记录警告日志"""
        self._log(logging.WARNING, message, **kwargs)
    
    def _log(self, level: int, message: str, **kwargs):
        """内部日志方法"""
        extra = {
            'timestamp': datetime.utcnow().isoformat(),
            'service': 'autogpt-backend',
            'version': '1.0.0',
            **kwargs
        }
        
        self.logger.log(level, message, extra=extra)

class StructuredFormatter(logging.Formatter):
    """结构化日志格式化器"""
    
    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            'timestamp': getattr(record, 'timestamp', datetime.utcnow().isoformat()),
            'level': record.levelname,
            'message': record.getMessage(),
            'logger': record.name,
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
        }
        
        # 添加额外字段
        for key, value in record.__dict__.items():
            if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 
                          'pathname', 'filename', 'module', 'lineno', 
                          'funcName', 'created', 'msecs', 'relativeCreated', 
                          'thread', 'threadName', 'processName', 'process']:
                log_data[key] = value
        
        return json.dumps(log_data, ensure_ascii=False)

# 业务日志记录
class BusinessLogger:
    """业务日志记录器"""
    
    def __init__(self):
        self.logger = StructuredLogger('business')
    
    def log_user_action(
        self, 
        user_id: str, 
        action: str, 
        resource_type: str,
        resource_id: str,
        result: str,
        **kwargs
    ):
        """记录用户操作日志"""
        self.logger.info(
            f"用户操作: {action}",
            user_id=user_id,
            action=action,
            resource_type=resource_type,
            resource_id=resource_id,
            result=result,
            **kwargs
        )
    
    def log_graph_execution(
        self,
        execution_id: str,
        graph_id: str,
        user_id: str,
        status: str,
        duration: Optional[float] = None,
        error: Optional[str] = None
    ):
        """记录图执行日志"""
        log_data = {
            'execution_id': execution_id,
            'graph_id': graph_id,
            'user_id': user_id,
            'status': status,
        }
        
        if duration is not None:
            log_data['duration_seconds'] = duration
        
        if error:
            log_data['error'] = error
            self.logger.error(f"图执行失败: {execution_id}", **log_data)
        else:
            self.logger.info(f"图执行{status}: {execution_id}", **log_data)
    
    def log_api_call(
        self,
        provider: str,
        endpoint: str,
        user_id: str,
        status_code: int,
        duration: float,
        cost: Optional[int] = None
    ):
        """记录API调用日志"""
        self.logger.info(
            f"API调用: {provider}/{endpoint}",
            provider=provider,
            endpoint=endpoint,
            user_id=user_id,
            status_code=status_code,
            duration_seconds=duration,
            cost_cents=cost
        )

# 使用示例
business_logger = BusinessLogger()

async def execute_graph_with_logging(graph_id: str, user_id: str, inputs: Dict):
    """带日志的图执行"""
    execution_id = generate_execution_id()
    start_time = time.time()
    
    try:
        business_logger.log_user_action(
            user_id=user_id,
            action="execute_graph",
            resource_type="graph",
            resource_id=graph_id,
            result="started",
            execution_id=execution_id
        )
        
        # 执行图
        result = await _execute_graph_internal(graph_id, inputs)
        
        duration = time.time() - start_time
        business_logger.log_graph_execution(
            execution_id=execution_id,
            graph_id=graph_id,
            user_id=user_id,
            status="completed",
            duration=duration
        )
        
        return result
        
    except Exception as e:
        duration = time.time() - start_time
        business_logger.log_graph_execution(
            execution_id=execution_id,
            graph_id=graph_id,
            user_id=user_id,
            status="failed",
            duration=duration,
            error=str(e)
        )
        raise

4.3 部署和发布

4.3.1 Docker化部署

# Dockerfile
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd --create-home --shell /bin/bash autogpt
RUN chown -R autogpt:autogpt /app
USER autogpt

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  backend:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@postgres:5432/autogpt
      - REDIS_URL=redis://redis:6379
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    deploy:
      resources:
        limits:
          memory: 1G
          cpus: '0.5'
        reservations:
          memory: 512M
          cpus: '0.25'

  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=autogpt
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - backend
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

4.3.2 Kubernetes部署

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: autogpt-backend
  labels:
    app: autogpt-backend
spec:
  replicas: 3
  selector:
    matchLabels:
      app: autogpt-backend
  template:
    metadata:
      labels:
        app: autogpt-backend
    spec:
      containers:
      - name: backend
        image: autogpt/backend:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: autogpt-secrets
              key: database-url
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: autogpt-secrets
              key: redis-url
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: autogpt-backend-service
spec:
  selector:
    app: autogpt-backend
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: ClusterIP

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: autogpt-ingress
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
  - hosts:
    - api.autogpt.com
    secretName: autogpt-tls
  rules:
  - host: api.autogpt.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: autogpt-backend-service
            port:
              number: 80