AutoGPT平台框架使用手册
概述
AutoGPT平台是一个强大的AI智能体创建和运行系统,旨在帮助企业和开发者构建自动化的AI解决方案。本手册将详细介绍平台的架构、使用方法、API接口以及最佳实践。
目录
快速开始
系统要求
- Docker 和 Docker Compose V2
- Node.js 18+ (用于前端开发)
- Python 3.11+ (用于后端开发)
- PostgreSQL 14+ (数据库)
- Redis 7+ (缓存和队列)
一键启动
# 1. 克隆仓库
git clone https://github.com/Significant-Gravitas/AutoGPT.git
cd AutoGPT/autogpt_platform
# 2. 配置环境变量
cp .env.default .env
# 编辑 .env 文件,配置必要的环境变量
# 3. 启动所有服务
docker compose up -d
# 4. 访问平台
# 前端: http://localhost:3000
# 后端API: http://localhost:8000
# API文档: http://localhost:8000/docs
验证安装
# 检查服务状态
docker compose ps
# 检查健康状态
curl http://localhost:8000/health
平台架构概览
整体架构图
graph TB
subgraph "前端层 - Frontend Layer"
UI[Next.js Web UI]
Mobile[移动端应用]
end
subgraph "网关层 - Gateway Layer"
LB[负载均衡器]
Auth[认证中间件]
RateLimit[限流中间件]
end
subgraph "应用层 - Application Layer"
API[REST API Server]
WS[WebSocket Server]
External[External API]
end
subgraph "业务层 - Business Layer"
AgentEngine[智能体引擎]
BlockSystem[Block系统]
Workflow[工作流引擎]
Store[应用商店]
Library[智能体库]
end
subgraph "执行层 - Execution Layer"
Executor[执行引擎]
Scheduler[调度器]
Queue[消息队列]
end
subgraph "数据层 - Data Layer"
DB[(PostgreSQL)]
Cache[(Redis)]
Storage[(文件存储)]
Vector[(向量数据库)]
end
subgraph "基础设施层 - Infrastructure Layer"
Monitor[监控系统]
Log[日志系统]
Security[安全系统]
end
UI --> LB
Mobile --> LB
LB --> Auth
Auth --> RateLimit
RateLimit --> API
RateLimit --> WS
RateLimit --> External
API --> AgentEngine
API --> BlockSystem
API --> Store
API --> Library
WS --> Workflow
AgentEngine --> Executor
Workflow --> Scheduler
Scheduler --> Queue
Executor --> DB
Executor --> Cache
AgentEngine --> Vector
Store --> Storage
API --> Monitor
Executor --> Log
Auth --> Security
技术栈
后端技术栈
- 框架: FastAPI (Python)
- 数据库: PostgreSQL + Prisma ORM
- 缓存: Redis
- 消息队列: RabbitMQ
- 认证: JWT + Supabase
- 监控: Prometheus + Grafana
- 日志: 结构化日志 + 云日志集成
前端技术栈
- 框架: Next.js 14 (App Router)
- UI库: React + Radix UI + Tailwind CSS
- 状态管理: React Hooks + Supabase Client
- 图形编辑: @xyflow/react
- 特性开关: LaunchDarkly
基础设施
- 容器化: Docker + Docker Compose
- CI/CD: GitHub Actions
- 云服务: Google Cloud Platform
- 安全: ClamAV病毒扫描
核心概念
1. 智能体 (Agent)
智能体是AutoGPT平台的核心概念,代表一个可执行的AI工作流。
# 智能体数据模型
class AgentGraph:
id: str # 智能体唯一标识
version: int # 版本号
name: str # 智能体名称
description: str # 描述信息
instructions: str # 执行指令
nodes: List[AgentNode] # 节点列表
is_active: bool # 是否激活
user_id: str # 创建者ID
2. 节点 (Node/Block)
节点是智能体工作流中的基本执行单元,每个节点执行特定的功能。
# 节点基类
class Block:
"""
Block是所有节点的基类,定义了节点的基本接口
"""
class Input(BaseModel):
"""输入数据模型"""
pass
class Output(BaseModel):
"""输出数据模型"""
pass
def __init__(self):
self.id = str(uuid.uuid4())
@classmethod
def get_block_type(cls) -> BlockType:
"""获取节点类型"""
raise NotImplementedError
def run(self, input_data: Input, **kwargs) -> BlockOutput:
"""执行节点逻辑"""
raise NotImplementedError
3. 工作流 (Workflow)
工作流定义了节点之间的连接关系和数据流向。
# 工作流执行引擎
class WorkflowEngine:
"""
工作流执行引擎,负责协调节点执行
"""
def __init__(self, graph: AgentGraph):
self.graph = graph
self.execution_context = {}
async def execute(self) -> ExecutionResult:
"""执行工作流"""
try:
# 1. 验证工作流
self._validate_workflow()
# 2. 初始化执行上下文
self._initialize_context()
# 3. 按拓扑顺序执行节点
result = await self._execute_nodes()
return ExecutionResult(
success=True,
output=result,
execution_id=self.execution_id
)
except Exception as e:
return ExecutionResult(
success=False,
error=str(e),
execution_id=self.execution_id
)
开发环境搭建
后端开发环境
# 1. 进入后端目录
cd autogpt_platform/backend
# 2. 安装Poetry (Python包管理器)
curl -sSL https://install.python-poetry.org | python3 -
# 3. 安装依赖
poetry install
# 4. 激活虚拟环境
poetry shell
# 5. 配置数据库
cp .env.default .env
# 编辑数据库连接配置
# 6. 运行数据库迁移
poetry run prisma migrate dev
# 7. 启动开发服务器
poetry run python -m backend.app
前端开发环境
# 1. 进入前端目录
cd autogpt_platform/frontend
# 2. 安装依赖
pnpm install
# 3. 配置环境变量
cp .env.default .env.local
# 编辑API端点配置
# 4. 启动开发服务器
pnpm dev
# 5. 访问开发环境
# http://localhost:3000
开发工具配置
VS Code 配置
// .vscode/settings.json
{
"python.defaultInterpreterPath": "./backend/.venv/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"typescript.preferences.importModuleSpecifier": "relative",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
}
代码质量工具
# Python代码格式化和检查
poetry run black .
poetry run flake8 .
poetry run mypy .
# 前端代码格式化和检查
pnpm lint
pnpm format
pnpm type-check
API接口详解
认证系统
JWT认证流程
# 认证中间件实现
from autogpt_libs.auth import requires_user, get_jwt_payload
@app.post("/api/v1/auth/user")
async def get_or_create_user_route(
user_data: dict = Security(get_jwt_payload)
):
"""
获取或创建用户
功能:
1. 验证JWT令牌
2. 提取用户信息
3. 创建或更新用户记录
返回:
- 用户信息对象
"""
user = await get_or_create_user(user_data)
return user.model_dump()
# 使用认证装饰器保护端点
@app.get("/api/v1/protected-endpoint")
async def protected_endpoint(
user: User = Depends(requires_user)
):
"""受保护的端点,需要用户认证"""
return {"message": f"Hello, {user.email}!"}
智能体管理API
创建智能体
@v1_router.post("/graphs")
async def create_agent_graph(
create_graph: CreateGraph,
user_id: str = Security(get_user_id),
) -> Graph:
"""
创建新的智能体图
参数:
- create_graph: 智能体创建请求
- user_id: 用户ID (从JWT中提取)
返回:
- 创建的智能体图对象
"""
try:
# 1. 验证输入数据
if not create_graph.name:
raise ValueError("智能体名称不能为空")
# 2. 创建智能体图
graph_data = {
"name": create_graph.name,
"description": create_graph.description,
"user_id": user_id,
"nodes": [],
"is_active": True
}
# 3. 保存到数据库
graph = await AgentGraphDB.create_graph(graph_data)
# 4. 记录审计日志
logger.info(f"用户 {user_id} 创建智能体: {graph.id}")
return graph
except Exception as e:
logger.error(f"创建智能体失败: {e}")
raise HTTPException(
status_code=500,
detail="创建智能体失败"
)
执行智能体
@v1_router.post("/graphs/{graph_id}/executions")
async def execute_agent_graph(
graph_id: str,
execution_request: ExecutionRequest,
user_id: str = Security(get_user_id),
) -> ExecutionResponse:
"""
执行智能体图
参数:
- graph_id: 智能体图ID
- execution_request: 执行请求参数
- user_id: 用户ID
返回:
- 执行结果
"""
try:
# 1. 验证权限
graph = await AgentGraphDB.get_graph(graph_id, user_id)
if not graph:
raise HTTPException(404, "智能体不存在")
# 2. 验证智能体状态
if not graph.is_active:
raise HTTPException(400, "智能体未激活")
# 3. 创建执行记录
execution = await ExecutionDB.create_execution(
graph_id=graph_id,
user_id=user_id,
input_data=execution_request.input_data
)
# 4. 提交到执行队列
await ExecutionQueue.submit_execution(execution.id)
# 5. 返回执行ID
return ExecutionResponse(
execution_id=execution.id,
status="QUEUED",
message="智能体已提交执行"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"执行智能体失败: {e}")
raise HTTPException(500, "执行智能体失败")
WebSocket实时通信
# WebSocket连接管理
class WebSocketManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.user_connections: Dict[str, Set[str]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
"""建立WebSocket连接"""
await websocket.accept()
connection_id = str(uuid.uuid4())
self.active_connections[connection_id] = websocket
if user_id not in self.user_connections:
self.user_connections[user_id] = set()
self.user_connections[user_id].add(connection_id)
logger.info(f"用户 {user_id} 建立WebSocket连接: {connection_id}")
return connection_id
async def disconnect(self, connection_id: str, user_id: str):
"""断开WebSocket连接"""
if connection_id in self.active_connections:
del self.active_connections[connection_id]
if user_id in self.user_connections:
self.user_connections[user_id].discard(connection_id)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
logger.info(f"WebSocket连接已断开: {connection_id}")
async def send_to_user(self, user_id: str, message: dict):
"""向特定用户发送消息"""
if user_id not in self.user_connections:
return
disconnected = []
for connection_id in self.user_connections[user_id]:
websocket = self.active_connections.get(connection_id)
if websocket:
try:
await websocket.send_json(message)
except ConnectionClosedError:
disconnected.append(connection_id)
# 清理断开的连接
for connection_id in disconnected:
await self.disconnect(connection_id, user_id)
# WebSocket端点
@websocket_router.websocket("/ws/{user_id}")
async def websocket_endpoint(
websocket: WebSocket,
user_id: str,
token: str = Query(...)
):
"""WebSocket端点,用于实时通信"""
try:
# 1. 验证JWT令牌
payload = parse_jwt_token(token)
if payload.get("sub") != user_id:
await websocket.close(code=1008, reason="无效的用户ID")
return
# 2. 建立连接
connection_id = await ws_manager.connect(websocket, user_id)
try:
while True:
# 3. 接收客户端消息
data = await websocket.receive_json()
# 4. 处理消息
await handle_websocket_message(user_id, data)
except WebSocketDisconnect:
pass
finally:
# 5. 清理连接
await ws_manager.disconnect(connection_id, user_id)
except Exception as e:
logger.error(f"WebSocket连接错误: {e}")
await websocket.close(code=1011, reason="服务器错误")
限流和安全
# 限流中间件
from autogpt_libs.rate_limit import rate_limit_middleware, RateLimitConfig
# 配置限流策略
rate_limit_config = RateLimitConfig(
requests_per_minute=60, # 每分钟60次请求
exempt_paths=[ # 豁免路径
r"^/health$",
r"^/docs$",
r"^/static/"
]
)
@app.middleware("http")
async def rate_limit_entry(request: Request, call_next):
"""限流中间件入口"""
return await rate_limit_middleware(request, call_next, rate_limit_config)
# 安全头中间件
class SecurityHeadersMiddleware:
"""安全头中间件"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# 添加安全响应头
async def send_wrapper(message):
if message["type"] == "http.response.start":
headers = dict(message.get("headers", []))
# 安全头配置
security_headers = {
b"x-content-type-options": b"nosniff",
b"x-frame-options": b"DENY",
b"x-xss-protection": b"1; mode=block",
b"strict-transport-security": b"max-age=31536000; includeSubDomains",
b"content-security-policy": b"default-src 'self'",
b"referrer-policy": b"strict-origin-when-cross-origin"
}
headers.update(security_headers)
message["headers"] = list(headers.items())
await send(message)
await self.app(scope, receive, send_wrapper)
else:
await self.app(scope, receive, send)
模块深度解析
1. 执行引擎模块
架构图
graph TB
subgraph "执行引擎架构"
subgraph "调度层"
Scheduler[调度器]
Queue[任务队列]
Priority[优先级管理]
end
subgraph "执行层"
Executor[执行器]
Worker[工作进程]
Monitor[执行监控]
end
subgraph "存储层"
ExecDB[执行记录]
ResultCache[结果缓存]
LogStore[日志存储]
end
end
Scheduler --> Queue
Queue --> Priority
Priority --> Executor
Executor --> Worker
Worker --> Monitor
Executor --> ExecDB
Worker --> ResultCache
Monitor --> LogStore
核心实现
# 执行管理器
class ExecutionManager(AppService):
"""
执行管理器 - 负责智能体的执行调度和管理
主要功能:
1. 接收执行请求
2. 调度执行任务
3. 监控执行状态
4. 管理执行结果
"""
def __init__(self):
super().__init__(port=8001)
self.execution_queue = asyncio.Queue()
self.active_executions: Dict[str, ExecutionContext] = {}
self.worker_pool = []
async def run_service(self):
"""运行执行服务"""
# 启动工作进程池
for i in range(self.get_worker_count()):
worker = ExecutionWorker(f"worker-{i}")
self.worker_pool.append(worker)
asyncio.create_task(worker.start())
# 启动调度循环
await self._schedule_loop()
async def _schedule_loop(self):
"""调度循环"""
while True:
try:
# 从队列获取执行任务
execution_request = await self.execution_queue.get()
# 分配给可用的工作进程
worker = await self._get_available_worker()
if worker:
await worker.execute(execution_request)
else:
# 重新排队
await self.execution_queue.put(execution_request)
await asyncio.sleep(1)
except Exception as e:
logger.error(f"调度循环错误: {e}")
await asyncio.sleep(5)
@expose
async def submit_execution(self, request: ExecutionRequest) -> ExecutionResponse:
"""
提交执行请求
参数:
- request: 执行请求对象
返回:
- 执行响应对象
"""
try:
# 1. 验证请求
await self._validate_execution_request(request)
# 2. 创建执行上下文
context = ExecutionContext(
execution_id=request.execution_id,
graph_id=request.graph_id,
user_id=request.user_id,
input_data=request.input_data,
created_at=datetime.utcnow()
)
# 3. 加入执行队列
await self.execution_queue.put(context)
self.active_executions[request.execution_id] = context
# 4. 记录执行日志
logger.info(f"执行请求已提交: {request.execution_id}")
return ExecutionResponse(
execution_id=request.execution_id,
status=ExecutionStatus.QUEUED,
message="执行请求已提交到队列"
)
except Exception as e:
logger.error(f"提交执行请求失败: {e}")
raise
@expose
async def get_execution_status(self, execution_id: str) -> ExecutionStatusResponse:
"""
获取执行状态
参数:
- execution_id: 执行ID
返回:
- 执行状态响应
"""
if execution_id in self.active_executions:
context = self.active_executions[execution_id]
return ExecutionStatusResponse(
execution_id=execution_id,
status=context.status,
progress=context.progress,
current_node=context.current_node,
output=context.output,
error=context.error
)
else:
# 从数据库查询历史执行记录
execution = await ExecutionDB.get_execution(execution_id)
if execution:
return ExecutionStatusResponse.from_db_record(execution)
else:
raise NotFoundError(f"执行记录不存在: {execution_id}")
# 执行工作进程
class ExecutionWorker:
"""
执行工作进程 - 负责具体的智能体执行
"""
def __init__(self, worker_id: str):
self.worker_id = worker_id
self.is_busy = False
self.current_execution: Optional[ExecutionContext] = None
async def start(self):
"""启动工作进程"""
logger.info(f"执行工作进程启动: {self.worker_id}")
async def execute(self, context: ExecutionContext):
"""
执行智能体
参数:
- context: 执行上下文
"""
self.is_busy = True
self.current_execution = context
try:
# 1. 更新执行状态
context.status = ExecutionStatus.RUNNING
context.started_at = datetime.utcnow()
# 2. 加载智能体图
graph = await AgentGraphDB.get_graph(context.graph_id)
if not graph:
raise ValueError(f"智能体图不存在: {context.graph_id}")
# 3. 创建执行引擎
engine = WorkflowEngine(graph, context)
# 4. 执行工作流
result = await engine.execute()
# 5. 更新执行结果
context.status = ExecutionStatus.COMPLETED if result.success else ExecutionStatus.FAILED
context.output = result.output
context.error = result.error
context.completed_at = datetime.utcnow()
# 6. 保存执行记录
await ExecutionDB.update_execution(context)
# 7. 发送WebSocket通知
await self._notify_execution_complete(context)
except Exception as e:
# 处理执行异常
context.status = ExecutionStatus.FAILED
context.error = str(e)
context.completed_at = datetime.utcnow()
logger.error(f"执行失败 {context.execution_id}: {e}")
finally:
self.is_busy = False
self.current_execution = None
async def _notify_execution_complete(self, context: ExecutionContext):
"""通知执行完成"""
message = {
"type": "execution_complete",
"execution_id": context.execution_id,
"status": context.status.value,
"output": context.output,
"error": context.error
}
# 通过WebSocket发送通知
await ws_manager.send_to_user(context.user_id, message)
2. Block系统模块
Block基类实现
# Block基类定义
class Block(ABC):
"""
Block基类 - 所有功能块的基础类
设计原则:
1. 单一职责: 每个Block只负责一个特定功能
2. 可组合性: Block可以自由组合形成复杂工作流
3. 类型安全: 使用Pydantic进行输入输出验证
4. 可测试性: 每个Block都可以独立测试
"""
class Input(BaseModel):
"""输入数据模型 - 子类必须重写"""
pass
class Output(BaseModel):
"""输出数据模型 - 子类必须重写"""
pass
def __init__(self):
self.id = str(uuid.uuid4())
self.execution_count = 0
self.last_execution_time: Optional[datetime] = None
@classmethod
@abstractmethod
def get_block_type(cls) -> BlockType:
"""获取Block类型"""
pass
@classmethod
def get_block_category(cls) -> BlockCategory:
"""获取Block分类"""
return BlockCategory.BASIC
@classmethod
def get_block_description(cls) -> str:
"""获取Block描述"""
return cls.__doc__ or "无描述"
@classmethod
def get_input_schema(cls) -> Dict[str, Any]:
"""获取输入数据模式"""
return cls.Input.model_json_schema()
@classmethod
def get_output_schema(cls) -> Dict[str, Any]:
"""获取输出数据模式"""
return cls.Output.model_json_schema()
def run(
self,
input_data: Input,
**kwargs
) -> BlockOutput:
"""
执行Block逻辑
参数:
- input_data: 输入数据
- **kwargs: 额外参数 (user_id, credentials等)
返回:
- BlockOutput: 执行结果
"""
try:
# 1. 记录执行信息
self.execution_count += 1
self.last_execution_time = datetime.utcnow()
# 2. 验证输入数据
validated_input = self._validate_input(input_data)
# 3. 执行核心逻辑
result = self._execute(validated_input, **kwargs)
# 4. 验证输出数据
validated_output = self._validate_output(result)
# 5. 返回执行结果
return BlockOutput(
id=self.id,
block_type=self.get_block_type(),
output_data=validated_output,
execution_time=datetime.utcnow() - self.last_execution_time,
success=True
)
except Exception as e:
# 处理执行异常
logger.error(f"Block执行失败 {self.id}: {e}")
return BlockOutput(
id=self.id,
block_type=self.get_block_type(),
error=str(e),
execution_time=datetime.utcnow() - self.last_execution_time,
success=False
)
@abstractmethod
def _execute(self, input_data: Input, **kwargs) -> Output:
"""
执行核心逻辑 - 子类必须实现
参数:
- input_data: 验证后的输入数据
- **kwargs: 额外参数
返回:
- Output: 执行结果
"""
pass
def _validate_input(self, input_data: Any) -> Input:
"""验证输入数据"""
if isinstance(input_data, dict):
return self.Input(**input_data)
elif isinstance(input_data, self.Input):
return input_data
else:
raise ValueError(f"无效的输入数据类型: {type(input_data)}")
def _validate_output(self, output_data: Any) -> Output:
"""验证输出数据"""
if isinstance(output_data, dict):
return self.Output(**output_data)
elif isinstance(output_data, self.Output):
return output_data
else:
raise ValueError(f"无效的输出数据类型: {type(output_data)}")
# 具体Block实现示例
class TextProcessingBlock(Block):
"""
文本处理Block - 提供基础的文本处理功能
"""
class Input(BaseModel):
text: str = Field(..., description="要处理的文本")
operation: str = Field(..., description="处理操作: upper, lower, strip, reverse")
class Output(BaseModel):
processed_text: str = Field(..., description="处理后的文本")
original_length: int = Field(..., description="原始文本长度")
processed_length: int = Field(..., description="处理后文本长度")
@classmethod
def get_block_type(cls) -> BlockType:
return BlockType.TEXT_PROCESSING
@classmethod
def get_block_category(cls) -> BlockCategory:
return BlockCategory.TEXT
def _execute(self, input_data: Input, **kwargs) -> Output:
"""执行文本处理逻辑"""
text = input_data.text
operation = input_data.operation.lower()
# 执行文本处理操作
if operation == "upper":
processed_text = text.upper()
elif operation == "lower":
processed_text = text.lower()
elif operation == "strip":
processed_text = text.strip()
elif operation == "reverse":
processed_text = text[::-1]
else:
raise ValueError(f"不支持的操作: {operation}")
return self.Output(
processed_text=processed_text,
original_length=len(text),
processed_length=len(processed_text)
)
# LLM Block实现
class LLMBlock(Block):
"""
大语言模型Block - 提供AI文本生成功能
"""
class Input(BaseModel):
prompt: str = Field(..., description="输入提示")
model: str = Field(default="gpt-3.5-turbo", description="使用的模型")
max_tokens: int = Field(default=1000, description="最大token数")
temperature: float = Field(default=0.7, description="温度参数")
class Output(BaseModel):
response: str = Field(..., description="AI生成的响应")
token_usage: Dict[str, int] = Field(..., description="Token使用情况")
model_used: str = Field(..., description="实际使用的模型")
@classmethod
def get_block_type(cls) -> BlockType:
return BlockType.LLM
@classmethod
def get_block_category(cls) -> BlockCategory:
return BlockCategory.AI
def _execute(self, input_data: Input, **kwargs) -> Output:
"""执行LLM推理"""
# 获取用户凭据
user_id = kwargs.get("user_id")
credentials = kwargs.get("credentials")
if not credentials:
raise ValueError("缺少API凭据")
# 调用LLM API
llm_client = self._get_llm_client(credentials)
try:
response = llm_client.chat.completions.create(
model=input_data.model,
messages=[
{"role": "user", "content": input_data.prompt}
],
max_tokens=input_data.max_tokens,
temperature=input_data.temperature
)
return self.Output(
response=response.choices[0].message.content,
token_usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
model_used=response.model
)
except Exception as e:
raise RuntimeError(f"LLM调用失败: {e}")
def _get_llm_client(self, credentials):
"""获取LLM客户端"""
# 根据凭据类型创建相应的客户端
if credentials.provider == "openai":
import openai
return openai.OpenAI(api_key=credentials.api_key)
else:
raise ValueError(f"不支持的LLM提供商: {credentials.provider}")
3. 应用商店模块
商店API实现
# 应用商店路由
@store_router.get("/agents")
async def get_store_agents(
featured: bool = False,
creator: Optional[str] = None,
sorted_by: Optional[str] = None,
search_query: Optional[str] = None,
category: Optional[str] = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
) -> PaginatedResponse[StoreAgent]:
"""
获取应用商店智能体列表
参数:
- featured: 是否只显示精选智能体
- creator: 创建者筛选
- sorted_by: 排序方式 (popular, recent, rating)
- search_query: 搜索关键词
- category: 分类筛选
- page: 页码
- page_size: 每页大小
返回:
- 分页的智能体列表
"""
try:
# 1. 构建查询条件
query_params = StoreQueryParams(
featured=featured,
creator=creator,
sorted_by=sorted_by,
search_query=search_query,
category=category,
page=page,
page_size=page_size
)
# 2. 从缓存获取数据
cache_key = f"store_agents:{hash(str(query_params))}"
cached_result = await redis_client.get(cache_key)
if cached_result:
return PaginatedResponse.parse_raw(cached_result)
# 3. 查询数据库
agents, total_count = await StoreDB.get_agents(query_params)
# 4. 构建响应
response = PaginatedResponse(
items=agents,
total=total_count,
page=page,
page_size=page_size,
total_pages=(total_count + page_size - 1) // page_size
)
# 5. 缓存结果
await redis_client.setex(
cache_key,
300, # 5分钟缓存
response.json()
)
return response
except Exception as e:
logger.error(f"获取商店智能体失败: {e}")
raise HTTPException(500, "获取智能体列表失败")
@store_router.post("/submissions")
async def submit_agent_to_store(
submission: StoreSubmission,
user_id: str = Security(get_user_id),
) -> SubmissionResponse:
"""
提交智能体到应用商店
参数:
- submission: 提交数据
- user_id: 用户ID
返回:
- 提交结果
"""
try:
# 1. 验证用户权限
user_profile = await StoreDB.get_user_profile(user_id)
if not user_profile or not user_profile.can_submit:
raise HTTPException(403, "没有提交权限")
# 2. 验证智能体
graph = await AgentGraphDB.get_graph(submission.graph_id, user_id)
if not graph:
raise HTTPException(404, "智能体不存在")
# 3. 检查重复提交
existing_submission = await StoreDB.get_submission_by_graph(
submission.graph_id
)
if existing_submission:
raise HTTPException(409, "智能体已提交")
# 4. 创建提交记录
submission_record = await StoreDB.create_submission(
graph_id=submission.graph_id,
user_id=user_id,
title=submission.title,
description=submission.description,
category=submission.category,
tags=submission.tags,
media_urls=submission.media_urls
)
# 5. 触发审核流程
await ReviewQueue.submit_for_review(submission_record.id)
# 6. 发送通知
await NotificationManager.send_notification(
user_id=user_id,
type="submission_received",
data={
"submission_id": submission_record.id,
"title": submission.title
}
)
return SubmissionResponse(
submission_id=submission_record.id,
status="PENDING_REVIEW",
message="智能体已提交审核"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"提交智能体失败: {e}")
raise HTTPException(500, "提交失败")
# 商店数据库操作
class StoreDB:
"""应用商店数据库操作类"""
@staticmethod
async def get_agents(
query_params: StoreQueryParams
) -> Tuple[List[StoreAgent], int]:
"""获取智能体列表"""
# 构建查询条件
where_conditions = []
if query_params.featured:
where_conditions.append("featured = true")
if query_params.creator:
where_conditions.append("creator_id = %s")
if query_params.category:
where_conditions.append("category = %s")
if query_params.search_query:
where_conditions.append(
"(title ILIKE %s OR description ILIKE %s)"
)
# 构建排序条件
order_by = "created_at DESC" # 默认按创建时间排序
if query_params.sorted_by == "popular":
order_by = "download_count DESC, rating DESC"
elif query_params.sorted_by == "rating":
order_by = "rating DESC, review_count DESC"
elif query_params.sorted_by == "recent":
order_by = "updated_at DESC"
# 执行查询
query = f"""
SELECT
sl.*,
ag.name,
ag.description,
u.email as creator_email,
up.name as creator_name,
up.avatar_url as creator_avatar
FROM store_listings sl
JOIN agent_graphs ag ON sl.graph_id = ag.id
JOIN users u ON sl.creator_id = u.id
LEFT JOIN user_profiles up ON u.id = up.user_id
WHERE sl.status = 'APPROVED'
{' AND ' + ' AND '.join(where_conditions) if where_conditions else ''}
ORDER BY {order_by}
LIMIT %s OFFSET %s
"""
# 查询参数
params = []
if query_params.creator:
params.append(query_params.creator)
if query_params.category:
params.append(query_params.category)
if query_params.search_query:
search_pattern = f"%{query_params.search_query}%"
params.extend([search_pattern, search_pattern])
params.extend([
query_params.page_size,
(query_params.page - 1) * query_params.page_size
])
# 执行查询
results = await database.fetch_all(query, params)
# 获取总数
count_query = f"""
SELECT COUNT(*)
FROM store_listings sl
WHERE sl.status = 'APPROVED'
{' AND ' + ' AND '.join(where_conditions) if where_conditions else ''}
"""
count_params = params[:-2] # 移除LIMIT和OFFSET参数
total_count = await database.fetch_val(count_query, count_params)
# 转换为模型对象
agents = [StoreAgent.from_db_record(record) for record in results]
return agents, total_count
实战经验与最佳实践
1. 性能优化
数据库优化
# 数据库连接池配置
DATABASE_CONFIG = {
"url": "postgresql://user:pass@localhost/autogpt",
"min_size": 10, # 最小连接数
"max_size": 100, # 最大连接数
"max_queries": 50000, # 每个连接最大查询数
"max_inactive_connection_lifetime": 300, # 连接最大空闲时间
}
# 查询优化示例
class OptimizedQueries:
"""优化的数据库查询"""
@staticmethod
async def get_user_agents_with_stats(user_id: str) -> List[AgentWithStats]:
"""
获取用户智能体及统计信息
优化点:
1. 使用JOIN减少查询次数
2. 添加适当的索引
3. 使用子查询预聚合数据
"""
query = """
SELECT
ag.*,
COALESCE(exec_stats.total_executions, 0) as total_executions,
COALESCE(exec_stats.success_rate, 0) as success_rate,
COALESCE(exec_stats.avg_duration, 0) as avg_duration
FROM agent_graphs ag
LEFT JOIN (
SELECT
graph_id,
COUNT(*) as total_executions,
AVG(CASE WHEN status = 'COMPLETED' THEN 1.0 ELSE 0.0 END) as success_rate,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration
FROM agent_graph_executions
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY graph_id
) exec_stats ON ag.id = exec_stats.graph_id
WHERE ag.user_id = $1 AND ag.is_active = true
ORDER BY ag.updated_at DESC
"""
results = await database.fetch_all(query, [user_id])
return [AgentWithStats.from_db_record(record) for record in results]
# 缓存策略
class CacheManager:
"""缓存管理器"""
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = 300 # 5分钟
async def get_or_set(
self,
key: str,
fetch_func: Callable,
ttl: Optional[int] = None
) -> Any:
"""获取缓存或设置缓存"""
# 尝试从缓存获取
cached_value = await self.redis.get(key)
if cached_value:
return json.loads(cached_value)
# 缓存未命中,执行获取函数
value = await fetch_func()
# 设置缓存
await self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(value, default=str)
)
return value
async def invalidate_pattern(self, pattern: str):
"""批量删除匹配模式的缓存"""
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
异步处理优化
# 批量处理优化
class BatchProcessor:
"""批量处理器"""
def __init__(self, batch_size: int = 100, flush_interval: float = 5.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch: List[Any] = []
self.last_flush = time.time()
async def add_item(self, item: Any):
"""添加项目到批次"""
self.batch.append(item)
# 检查是否需要刷新
if (len(self.batch) >= self.batch_size or
time.time() - self.last_flush >= self.flush_interval):
await self.flush()
async def flush(self):
"""刷新批次"""
if not self.batch:
return
try:
await self._process_batch(self.batch.copy())
finally:
self.batch.clear()
self.last_flush = time.time()
async def _process_batch(self, items: List[Any]):
"""处理批次 - 子类实现"""
raise NotImplementedError
# 并发控制
class ConcurrencyLimiter:
"""并发限制器"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.active_tasks: Set[asyncio.Task] = set()
async def run_with_limit(self, coro):
"""在并发限制下运行协程"""
async with self.semaphore:
task = asyncio.create_task(coro)
self.active_tasks.add(task)
try:
return await task
finally:
self.active_tasks.discard(task)
async def wait_all(self):
"""等待所有任务完成"""
if self.active_tasks:
await asyncio.gather(*self.active_tasks, return_exceptions=True)
2. 错误处理和监控
# 统一错误处理
class ErrorHandler:
"""统一错误处理器"""
@staticmethod
def handle_database_error(e: Exception) -> HTTPException:
"""处理数据库错误"""
if isinstance(e, IntegrityError):
return HTTPException(409, "数据冲突")
elif isinstance(e, DataError):
return HTTPException(400, "数据格式错误")
else:
logger.error(f"数据库错误: {e}")
return HTTPException(500, "数据库操作失败")
@staticmethod
def handle_external_api_error(e: Exception) -> HTTPException:
"""处理外部API错误"""
if isinstance(e, TimeoutError):
return HTTPException(504, "外部服务超时")
elif isinstance(e, ConnectionError):
return HTTPException(502, "外部服务不可用")
else:
logger.error(f"外部API错误: {e}")
return HTTPException(500, "外部服务调用失败")
# 监控指标收集
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.request_counter = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
self.execution_counter = Counter(
'agent_executions_total',
'Total agent executions',
['status', 'graph_id']
)
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""记录请求指标"""
self.request_counter.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
self.request_duration.labels(
method=method,
endpoint=endpoint
).observe(duration)
def record_execution(self, status: str, graph_id: str):
"""记录执行指标"""
self.execution_counter.labels(
status=status,
graph_id=graph_id
).inc()
# 健康检查
class HealthChecker:
"""健康检查器"""
def __init__(self):
self.checks = {}
def register_check(self, name: str, check_func: Callable):
"""注册健康检查"""
self.checks[name] = check_func
async def run_checks(self) -> Dict[str, Any]:
"""运行所有健康检查"""
results = {}
overall_healthy = True
for name, check_func in self.checks.items():
try:
start_time = time.time()
result = await check_func()
duration = time.time() - start_time
results[name] = {
"status": "healthy" if result else "unhealthy",
"duration": duration,
"timestamp": datetime.utcnow().isoformat()
}
if not result:
overall_healthy = False
except Exception as e:
results[name] = {
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
overall_healthy = False
return {
"status": "healthy" if overall_healthy else "unhealthy",
"checks": results,
"timestamp": datetime.utcnow().isoformat()
}
# 使用示例
health_checker = HealthChecker()
# 注册数据库健康检查
async def check_database():
try:
await database.fetch_val("SELECT 1")
return True
except:
return False
health_checker.register_check("database", check_database)
# 注册Redis健康检查
async def check_redis():
try:
await redis_client.ping()
return True
except:
return False
health_checker.register_check("redis", check_redis)
3. 安全最佳实践
# 输入验证和清理
class InputValidator:
"""输入验证器"""
@staticmethod
def sanitize_html(text: str) -> str:
"""清理HTML内容"""
import bleach
allowed_tags = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li']
allowed_attributes = {}
return bleach.clean(
text,
tags=allowed_tags,
attributes=allowed_attributes,
strip=True
)
@staticmethod
def validate_file_upload(file_data: bytes, allowed_types: List[str]) -> bool:
"""验证文件上传"""
import magic
# 检查文件类型
file_type = magic.from_buffer(file_data, mime=True)
if file_type not in allowed_types:
raise ValueError(f"不允许的文件类型: {file_type}")
# 检查文件大小
max_size = 10 * 1024 * 1024 # 10MB
if len(file_data) > max_size:
raise ValueError("文件大小超过限制")
return True
@staticmethod
def validate_sql_injection(query: str) -> bool:
"""检查SQL注入"""
dangerous_patterns = [
r"union\s+select",
r"drop\s+table",
r"delete\s+from",
r"insert\s+into",
r"update\s+set",
r"exec\s*\(",
r"script\s*>",
]
query_lower = query.lower()
for pattern in dangerous_patterns:
if re.search(pattern, query_lower):
raise ValueError("检测到潜在的SQL注入攻击")
return True
# 权限控制
class PermissionManager:
"""权限管理器"""
def __init__(self):
self.permissions = {
"admin": [
"user.create", "user.read", "user.update", "user.delete",
"agent.create", "agent.read", "agent.update", "agent.delete",
"store.approve", "store.reject", "store.manage"
],
"user": [
"agent.create", "agent.read", "agent.update", "agent.delete",
"store.submit", "store.read"
],
"viewer": [
"agent.read", "store.read"
]
}
def check_permission(self, user_role: str, permission: str) -> bool:
"""检查权限"""
if user_role not in self.permissions:
return False
return permission in self.permissions[user_role]
def require_permission(self, permission: str):
"""权限装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# 从请求中获取用户信息
user = kwargs.get("user") or args[0] if args else None
if not user or not self.check_permission(user.role, permission):
raise HTTPException(403, "权限不足")
return await func(*args, **kwargs)
return wrapper
return decorator
# 审计日志
class AuditLogger:
"""审计日志记录器"""
def __init__(self):
self.logger = logging.getLogger("audit")
def log_action(
self,
user_id: str,
action: str,
resource_type: str,
resource_id: str,
details: Optional[Dict] = None
):
"""记录用户操作"""
audit_record = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"details": details or {},
"ip_address": self._get_client_ip(),
"user_agent": self._get_user_agent()
}
self.logger.info(json.dumps(audit_record))
def _get_client_ip(self) -> str:
"""获取客户端IP"""
# 从请求上下文获取IP
return "unknown"
def _get_user_agent(self) -> str:
"""获取用户代理"""
# 从请求上下文获取User-Agent
return "unknown"
4. 测试策略
# 单元测试示例
import pytest
from unittest.mock import Mock, AsyncMock
class TestTextProcessingBlock:
"""文本处理Block测试"""
def setup_method(self):
"""测试前准备"""
self.block = TextProcessingBlock()
def test_upper_operation(self):
"""测试大写转换"""
input_data = TextProcessingBlock.Input(
text="hello world",
operation="upper"
)
result = self.block.run(input_data)
assert result.success is True
assert result.output_data.processed_text == "HELLO WORLD"
assert result.output_data.original_length == 11
assert result.output_data.processed_length == 11
def test_invalid_operation(self):
"""测试无效操作"""
input_data = TextProcessingBlock.Input(
text="hello world",
operation="invalid"
)
result = self.block.run(input_data)
assert result.success is False
assert "不支持的操作" in result.error
# 集成测试示例
@pytest.mark.asyncio
class TestAgentExecution:
"""智能体执行集成测试"""
async def test_simple_workflow_execution(self, test_client, test_user):
"""测试简单工作流执行"""
# 1. 创建测试智能体
graph_data = {
"name": "测试智能体",
"description": "用于测试的简单智能体",
"nodes": [
{
"id": "node1",
"block_type": "TEXT_PROCESSING",
"input_data": {
"text": "hello world",
"operation": "upper"
}
}
]
}
response = await test_client.post(
"/api/v1/graphs",
json=graph_data,
headers={"Authorization": f"Bearer {test_user.token}"}
)
assert response.status_code == 200
graph = response.json()
# 2. 执行智能体
execution_request = {
"input_data": {}
}
response = await test_client.post(
f"/api/v1/graphs/{graph['id']}/executions",
json=execution_request,
headers={"Authorization": f"Bearer {test_user.token}"}
)
assert response.status_code == 200
execution = response.json()
# 3. 等待执行完成
await asyncio.sleep(2)
# 4. 检查执行结果
response = await test_client.get(
f"/api/v1/executions/{execution['execution_id']}",
headers={"Authorization": f"Bearer {test_user.token}"}
)
assert response.status_code == 200
result = response.json()
assert result["status"] == "COMPLETED"
# 性能测试示例
@pytest.mark.performance
class TestPerformance:
"""性能测试"""
@pytest.mark.asyncio
async def test_concurrent_executions(self, test_client, test_user):
"""测试并发执行性能"""
# 创建多个并发执行请求
tasks = []
for i in range(10):
task = self._execute_agent(test_client, test_user, i)
tasks.append(task)
# 并发执行
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
# 验证结果
assert all(result["success"] for result in results)
assert end_time - start_time < 30 # 30秒内完成
async def _execute_agent(self, client, user, index):
"""执行单个智能体"""
# 实现具体的执行逻辑
pass
总结
AutoGPT平台是一个功能强大、架构完善的AI智能体开发平台。通过本手册的学习,开发者可以:
- 快速上手: 了解平台的核心概念和基本使用方法
- 深入开发: 掌握各个模块的API接口和实现细节
- 优化性能: 应用最佳实践提升系统性能和稳定性
- 保障安全: 实施安全措施保护系统和数据安全
- 持续改进: 通过监控和测试不断优化系统
平台的模块化设计使得开发者可以根据需要扩展功能,丰富的API接口支持各种集成场景,完善的监控和日志系统确保系统的可观测性和可维护性。
希望本手册能够帮助开发者更好地理解和使用AutoGPT平台,构建出优秀的AI应用解决方案。