AutoGPT平台关键数据结构UML分析
概述
AutoGPT平台采用了复杂的数据结构来支持智能体的创建、执行和管理。本文档通过UML图和详细说明,深入分析平台的核心数据模型,包括图结构、执行模型、Block系统、用户管理等关键组件。
核心数据模型概览
整体数据架构图
erDiagram
User ||--o{ Graph : creates
User ||--o{ GraphExecution : executes
User ||--o{ UserCredit : has
User ||--o{ IntegrationCredentials : owns
Graph ||--o{ Node : contains
Graph ||--o{ Link : contains
Graph ||--o{ GraphExecution : executed_as
Graph ||--o{ GraphVersion : has_versions
Node ||--o{ NodeExecution : executed_as
Node }|--|| Block : implements
GraphExecution ||--o{ NodeExecution : contains
GraphExecution }|--|| ExecutionStatus : has_status
Block ||--o{ BlockSchema : defines
Block ||--o{ BlockCost : has_costs
IntegrationCredentials }|--|| CredentialsType : has_type
GraphExecution ||--o{ ExecutionLog : generates
NodeExecution ||--o{ ExecutionMetrics : produces
图结构数据模型
Graph相关类图
classDiagram
class Graph {
+id: str
+name: str
+description: str
+user_id: str
+version: int
+is_active: bool
+is_template: bool
+created_at: datetime
+updated_at: datetime
+nodes: List[Node]
+links: List[Link]
+validate_structure() bool
+get_entry_nodes() List[Node]
+get_exit_nodes() List[Node]
}
class Node {
+id: str
+graph_id: str
+block_id: str
+label: str
+input_default: dict
+metadata: dict
+position: Position
+block: Block
+validate_inputs(inputs: dict) bool
+get_required_inputs() List[str]
+get_output_schema() dict
}
class Link {
+id: str
+graph_id: str
+source_id: str
+sink_id: str
+source_name: str
+sink_name: str
+is_static: bool
+validate_connection() bool
}
class Position {
+x: float
+y: float
}
class GraphVersion {
+id: str
+graph_id: str
+version: int
+is_active: bool
+created_at: datetime
+snapshot: dict
}
Graph ||--o{ Node : contains
Graph ||--o{ Link : contains
Graph ||--o{ GraphVersion : has_versions
Node ||--|| Position : has_position
Node }|--|| Block : implements
Graph类详细说明
class Graph(BaseModel):
"""
智能体图数据模型
表示一个完整的智能体工作流,包含节点、连接和元数据。
支持版本管理、模板功能和结构验证。
字段说明:
- id: 图的唯一标识符,UUID格式
- name: 图名称,用户可见的标识
- description: 图描述,说明图的功能和用途
- user_id: 创建者用户ID,关联到User表
- version: 当前版本号,支持版本管理
- is_active: 是否为活跃版本,用于版本切换
- is_template: 是否为模板,模板可被其他用户复制
- created_at: 创建时间戳
- updated_at: 最后更新时间戳
- nodes: 包含的节点列表
- links: 节点间的连接列表
核心方法:
- validate_structure(): 验证图结构的完整性和正确性
- get_entry_nodes(): 获取入口节点(无输入连接的节点)
- get_exit_nodes(): 获取出口节点(无输出连接的节点)
"""
id: str = Field(..., description="图唯一标识符")
name: str = Field(..., max_length=100, description="图名称")
description: Optional[str] = Field(None, max_length=500, description="图描述")
user_id: str = Field(..., description="创建者用户ID")
version: int = Field(default=1, ge=1, description="版本号")
is_active: bool = Field(default=True, description="是否为活跃版本")
is_template: bool = Field(default=False, description="是否为模板")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
nodes: List[Node] = Field(default_factory=list, description="节点列表")
links: List[Link] = Field(default_factory=list, description="连接列表")
def validate_structure(self) -> bool:
"""
验证图结构的完整性
验证规则:
1. 至少包含一个节点
2. 所有连接的源节点和目标节点都存在
3. 没有循环依赖
4. 入口节点至少有一个
5. 所有节点都可达
返回:
bool: 结构是否有效
"""
if not self.nodes:
return False
# 验证连接的有效性
node_ids = {node.id for node in self.nodes}
for link in self.links:
if link.source_id not in node_ids or link.sink_id not in node_ids:
return False
# 检查循环依赖
if self._has_cycles():
return False
# 验证入口节点存在
entry_nodes = self.get_entry_nodes()
if not entry_nodes:
return False
return True
def get_entry_nodes(self) -> List[Node]:
"""获取入口节点(没有输入连接的节点)"""
sink_ids = {link.sink_id for link in self.links}
return [node for node in self.nodes if node.id not in sink_ids]
def get_exit_nodes(self) -> List[Node]:
"""获取出口节点(没有输出连接的节点)"""
source_ids = {link.source_id for link in self.links}
return [node for node in self.nodes if node.id not in source_ids]
Node类详细说明
class Node(BaseModel):
"""
图节点数据模型
表示图中的一个执行单元,封装了Block的实例化配置。
每个节点都关联到一个Block类型,并包含特定的输入配置。
字段说明:
- id: 节点唯一标识符
- graph_id: 所属图ID
- block_id: 关联的Block类型ID
- label: 节点显示标签
- input_default: 默认输入值配置
- metadata: 节点元数据,包含位置、样式等信息
- position: 节点在画布上的位置
- block: 关联的Block实例(运行时加载)
核心方法:
- validate_inputs(): 验证输入数据的有效性
- get_required_inputs(): 获取必需的输入字段
- get_output_schema(): 获取输出数据结构定义
"""
id: str = Field(..., description="节点唯一标识符")
graph_id: str = Field(..., description="所属图ID")
block_id: str = Field(..., description="Block类型ID")
label: str = Field(..., max_length=100, description="节点标签")
input_default: dict = Field(default_factory=dict, description="默认输入配置")
metadata: dict = Field(default_factory=dict, description="节点元数据")
position: Position = Field(..., description="节点位置")
block: Optional[Block] = Field(None, description="关联的Block实例")
def validate_inputs(self, inputs: dict) -> bool:
"""
验证输入数据的有效性
参数:
inputs: 要验证的输入数据
返回:
bool: 输入是否有效
"""
if not self.block:
return False
try:
# 合并默认值和实际输入
merged_inputs = {**self.input_default, **inputs}
# 使用Block的schema验证
self.block.input_schema.model_validate(merged_inputs)
return True
except ValidationError:
return False
def get_required_inputs(self) -> List[str]:
"""获取必需的输入字段列表"""
if not self.block:
return []
required_fields = []
for field_name, field_info in self.block.input_schema.model_fields.items():
if field_info.is_required() and field_name not in self.input_default:
required_fields.append(field_name)
return required_fields
def get_output_schema(self) -> dict:
"""获取输出数据结构定义"""
if not self.block:
return {}
return self.block.output_schema.model_json_schema()
执行引擎数据模型
执行相关类图
classDiagram
class GraphExecution {
+id: str
+graph_id: str
+graph_version: int
+user_id: str
+status: ExecutionStatus
+inputs: dict
+outputs: dict
+created_at: datetime
+started_at: datetime
+ended_at: datetime
+error_message: str
+stats: ExecutionStats
+user_context: UserContext
+is_shared: bool
+share_token: str
+get_duration() timedelta
+is_running() bool
+can_be_cancelled() bool
}
class NodeExecution {
+id: str
+graph_execution_id: str
+node_id: str
+status: ExecutionStatus
+inputs: dict
+outputs: dict
+started_at: datetime
+ended_at: datetime
+error_message: str
+execution_stats: NodeExecutionStats
+get_duration() timedelta
+get_cost() int
}
class ExecutionStatus {
<<enumeration>>
QUEUED
RUNNING
COMPLETED
FAILED
CANCELLED
TIMEOUT
}
class ExecutionStats {
+total_nodes: int
+completed_nodes: int
+failed_nodes: int
+total_cost: int
+execution_time: float
+memory_usage: int
+calculate_success_rate() float
}
class NodeExecutionStats {
+node_id: str
+block_name: str
+execution_time: float
+input_size: int
+output_size: int
+cost: int
+memory_usage: int
+cpu_usage: float
}
class UserContext {
+timezone: str
+locale: str
+preferences: dict
+get_local_time() datetime
}
GraphExecution ||--o{ NodeExecution : contains
GraphExecution }|--|| ExecutionStatus : has_status
GraphExecution ||--|| ExecutionStats : has_stats
GraphExecution ||--|| UserContext : has_context
NodeExecution }|--|| ExecutionStatus : has_status
NodeExecution ||--|| NodeExecutionStats : has_stats
GraphExecution类详细说明
class GraphExecution(BaseModel):
"""
图执行数据模型
表示一次完整的图执行过程,包含执行状态、输入输出、
统计信息和错误信息。支持执行分享和用户上下文。
字段说明:
- id: 执行唯一标识符
- graph_id: 执行的图ID
- graph_version: 执行的图版本
- user_id: 执行用户ID
- status: 执行状态(排队、运行、完成、失败等)
- inputs: 执行输入数据
- outputs: 执行输出数据
- created_at: 创建时间
- started_at: 开始执行时间
- ended_at: 结束时间
- error_message: 错误信息(如果失败)
- stats: 执行统计信息
- user_context: 用户上下文信息
- is_shared: 是否公开分享
- share_token: 分享令牌
核心方法:
- get_duration(): 计算执行持续时间
- is_running(): 检查是否正在运行
- can_be_cancelled(): 检查是否可以取消
"""
id: str = Field(..., description="执行唯一标识符")
graph_id: str = Field(..., description="图ID")
graph_version: int = Field(..., description="图版本")
user_id: str = Field(..., description="用户ID")
status: ExecutionStatus = Field(default=ExecutionStatus.QUEUED, description="执行状态")
inputs: dict = Field(default_factory=dict, description="输入数据")
outputs: dict = Field(default_factory=dict, description="输出数据")
created_at: datetime = Field(default_factory=datetime.utcnow)
started_at: Optional[datetime] = Field(None, description="开始时间")
ended_at: Optional[datetime] = Field(None, description="结束时间")
error_message: Optional[str] = Field(None, description="错误信息")
stats: Optional[ExecutionStats] = Field(None, description="执行统计")
user_context: UserContext = Field(default_factory=UserContext)
is_shared: bool = Field(default=False, description="是否分享")
share_token: Optional[str] = Field(None, description="分享令牌")
def get_duration(self) -> Optional[timedelta]:
"""计算执行持续时间"""
if not self.started_at:
return None
end_time = self.ended_at or datetime.utcnow()
return end_time - self.started_at
def is_running(self) -> bool:
"""检查是否正在运行"""
return self.status in [ExecutionStatus.QUEUED, ExecutionStatus.RUNNING]
def can_be_cancelled(self) -> bool:
"""检查是否可以取消"""
return self.status in [ExecutionStatus.QUEUED, ExecutionStatus.RUNNING]
ExecutionStatus枚举详细说明
class ExecutionStatus(str, Enum):
"""
执行状态枚举
定义了图执行和节点执行的所有可能状态。
状态转换遵循严格的生命周期规则。
状态说明:
- QUEUED: 已排队等待执行
- RUNNING: 正在执行中
- COMPLETED: 执行成功完成
- FAILED: 执行失败
- CANCELLED: 被用户取消
- TIMEOUT: 执行超时
状态转换规则:
QUEUED -> RUNNING -> (COMPLETED | FAILED | CANCELLED | TIMEOUT)
QUEUED -> CANCELLED (可以在排队时取消)
RUNNING -> CANCELLED (可以在运行时取消)
"""
QUEUED = "queued" # 已排队,等待执行
RUNNING = "running" # 正在执行
COMPLETED = "completed" # 执行完成
FAILED = "failed" # 执行失败
CANCELLED = "cancelled" # 被取消
TIMEOUT = "timeout" # 执行超时
def is_terminal(self) -> bool:
"""检查是否为终止状态"""
return self in [
ExecutionStatus.COMPLETED,
ExecutionStatus.FAILED,
ExecutionStatus.CANCELLED,
ExecutionStatus.TIMEOUT
]
def is_successful(self) -> bool:
"""检查是否为成功状态"""
return self == ExecutionStatus.COMPLETED
Block系统数据模型
Block相关类图
classDiagram
class Block {
<<abstract>>
+id: str
+name: str
+description: str
+categories: List[BlockCategory]
+block_type: BlockType
+input_schema: Type[BaseModel]
+output_schema: Type[BaseModel]
+costs: List[BlockCost]
+credentials_provider_names: List[str]
+execute(input_data, credentials, **kwargs) AsyncGenerator
+validate_credentials(credentials) bool
+get_cost_estimate(input_data) int
}
class BlockSchema {
+block_id: str
+input_schema: dict
+output_schema: dict
+ui_schema: dict
+examples: List[dict]
+validate_input(data) bool
+validate_output(data) bool
+generate_example() dict
}
class BlockCost {
+cost_type: BlockCostType
+cost_amount: int
+cost_currency: str
+cost_filter: dict
+description: str
+calculate_cost(input_data) int
}
class BlockCostType {
<<enumeration>>
RUN
BYTE
SECOND
TOKEN
REQUEST
}
class BlockCategory {
<<enumeration>>
INPUT
OUTPUT
PROCESSING
AI_ML
DATA
INTEGRATION
UTILITY
CUSTOM
}
class BlockType {
<<enumeration>>
STANDARD
INPUT
OUTPUT
WEBHOOK
AGENT
AI
}
Block ||--|| BlockSchema : defines
Block ||--o{ BlockCost : has_costs
Block }|--|| BlockType : has_type
Block }|--o{ BlockCategory : belongs_to
BlockCost }|--|| BlockCostType : has_cost_type
Block抽象基类详细说明
class Block(ABC):
"""
Block抽象基类
定义了所有Block必须实现的接口和通用功能。
Block是AutoGPT平台的核心执行单元,封装了特定的功能逻辑。
类属性:
- id: Block唯一标识符
- name: Block显示名称
- description: Block功能描述
- categories: Block所属分类
- block_type: Block类型
- input_schema: 输入数据结构定义
- output_schema: 输出数据结构定义
- costs: 成本配置列表
- credentials_provider_names: 需要的凭据提供商
抽象方法:
- execute(): 执行Block逻辑的核心方法
通用方法:
- validate_credentials(): 验证凭据有效性
- get_cost_estimate(): 估算执行成本
"""
# 类属性定义
id: str
name: str
description: str
categories: List[BlockCategory]
block_type: BlockType = BlockType.STANDARD
input_schema: Type[BaseModel]
output_schema: Type[BaseModel]
costs: List[BlockCost] = []
credentials_provider_names: List[str] = []
@abstractmethod
async def execute(
self,
input_data: dict,
credentials: dict[str, Any] = {},
**kwargs
) -> AsyncGenerator[tuple[str, Any], None]:
"""
执行Block逻辑的核心方法
这是一个异步生成器方法,支持流式输出。
每次yield返回一个(output_name, output_data)元组。
参数:
input_data: 输入数据字典
credentials: 凭据字典,键为提供商名称
**kwargs: 额外的执行参数
生成:
tuple[str, Any]: (输出名称, 输出数据)元组
异常:
ValidationError: 输入数据验证失败
CredentialsError: 凭据验证失败
ExecutionError: 执行过程中的错误
"""
pass
def validate_credentials(self, credentials: dict[str, Any]) -> bool:
"""
验证凭据有效性
参数:
credentials: 凭据字典
返回:
bool: 凭据是否有效
"""
for provider_name in self.credentials_provider_names:
if provider_name not in credentials:
return False
credential = credentials[provider_name]
if not credential or not hasattr(credential, 'is_valid'):
return False
if not credential.is_valid():
return False
return True
def get_cost_estimate(self, input_data: dict) -> int:
"""
估算执行成本
参数:
input_data: 输入数据
返回:
int: 估算成本(以分为单位)
"""
total_cost = 0
for cost_config in self.costs:
if cost_config.cost_filter:
# 检查是否匹配过滤条件
if not self._matches_filter(input_data, cost_config.cost_filter):
continue
cost = cost_config.calculate_cost(input_data)
total_cost += cost
return total_cost
def _matches_filter(self, input_data: dict, cost_filter: dict) -> bool:
"""检查输入数据是否匹配成本过滤条件"""
for key, expected_value in cost_filter.items():
if input_data.get(key) != expected_value:
return False
return True
BlockCost类详细说明
class BlockCost(BaseModel):
"""
Block成本配置模型
定义了Block执行的成本计算规则。
支持多种成本类型和过滤条件。
字段说明:
- cost_type: 成本类型(按次、按字节、按秒等)
- cost_amount: 成本数量(以分为单位)
- cost_currency: 成本货币(默认USD)
- cost_filter: 成本过滤条件,只有匹配的输入才计费
- description: 成本描述
核心方法:
- calculate_cost(): 根据输入数据计算实际成本
"""
cost_type: BlockCostType = Field(..., description="成本类型")
cost_amount: int = Field(..., ge=0, description="成本数量(分)")
cost_currency: str = Field(default="USD", description="货币类型")
cost_filter: dict = Field(default_factory=dict, description="成本过滤条件")
description: str = Field(..., description="成本描述")
def calculate_cost(self, input_data: dict) -> int:
"""
根据输入数据计算实际成本
参数:
input_data: 输入数据字典
返回:
int: 计算出的成本(以分为单位)
"""
if self.cost_type == BlockCostType.RUN:
# 按次计费:固定成本
return self.cost_amount
elif self.cost_type == BlockCostType.BYTE:
# 按字节计费:根据输入数据大小
import json
data_size = len(json.dumps(input_data).encode('utf-8'))
return self.cost_amount * data_size
elif self.cost_type == BlockCostType.TOKEN:
# 按Token计费:根据文本长度估算
text_content = self._extract_text_content(input_data)
estimated_tokens = len(text_content.split()) * 1.3 # 粗略估算
return int(self.cost_amount * estimated_tokens)
elif self.cost_type == BlockCostType.REQUEST:
# 按请求计费:根据请求数量
request_count = input_data.get('request_count', 1)
return self.cost_amount * request_count
else:
# 其他类型:返回基础成本
return self.cost_amount
def _extract_text_content(self, input_data: dict) -> str:
"""从输入数据中提取文本内容"""
text_fields = ['text', 'content', 'message', 'prompt', 'query']
for field in text_fields:
if field in input_data and isinstance(input_data[field], str):
return input_data[field]
# 如果没有找到明显的文本字段,返回JSON字符串
import json
return json.dumps(input_data)
用户与认证数据模型
用户相关类图
classDiagram
class User {
+id: str
+email: str
+name: str
+role: UserRole
+timezone: str
+locale: str
+created_at: datetime
+updated_at: datetime
+last_login_at: datetime
+is_active: bool
+preferences: UserPreferences
+is_admin() bool
+get_display_name() str
+update_last_login() void
}
class UserRole {
<<enumeration>>
USER
ADMIN
MODERATOR
DEVELOPER
}
class UserPreferences {
+theme: str
+language: str
+notifications_enabled: bool
+auto_save_enabled: bool
+default_graph_settings: dict
+update_preference(key, value) void
}
class UserCredit {
+id: str
+user_id: str
+balance: int
+currency: str
+created_at: datetime
+updated_at: datetime
+add_credits(amount) void
+deduct_credits(amount) bool
+get_balance() int
}
class CreditTransaction {
+id: str
+user_id: str
+amount: int
+transaction_type: TransactionType
+description: str
+reference_id: str
+created_at: datetime
+is_debit() bool
+is_credit() bool
}
class TransactionType {
<<enumeration>>
PURCHASE
EXECUTION_COST
REFUND
BONUS
ADJUSTMENT
}
User }|--|| UserRole : has_role
User ||--|| UserPreferences : has_preferences
User ||--o{ UserCredit : has_credits
User ||--o{ CreditTransaction : has_transactions
CreditTransaction }|--|| TransactionType : has_type
User类详细说明
class User(BaseModel):
"""
用户数据模型
表示平台用户的基本信息和配置。
支持角色管理、偏好设置和活动跟踪。
字段说明:
- id: 用户唯一标识符
- email: 用户邮箱地址(唯一)
- name: 用户显示名称
- role: 用户角色(普通用户、管理员等)
- timezone: 用户时区
- locale: 用户语言环境
- created_at: 账户创建时间
- updated_at: 最后更新时间
- last_login_at: 最后登录时间
- is_active: 账户是否激活
- preferences: 用户偏好设置
核心方法:
- is_admin(): 检查是否为管理员
- get_display_name(): 获取显示名称
- update_last_login(): 更新最后登录时间
"""
id: str = Field(..., description="用户唯一标识符")
email: EmailStr = Field(..., description="用户邮箱")
name: Optional[str] = Field(None, max_length=100, description="用户名称")
role: UserRole = Field(default=UserRole.USER, description="用户角色")
timezone: str = Field(default="UTC", description="用户时区")
locale: str = Field(default="en-US", description="语言环境")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
last_login_at: Optional[datetime] = Field(None, description="最后登录时间")
is_active: bool = Field(default=True, description="是否激活")
preferences: UserPreferences = Field(default_factory=UserPreferences)
def is_admin(self) -> bool:
"""检查用户是否为管理员"""
return self.role in [UserRole.ADMIN, UserRole.MODERATOR]
def get_display_name(self) -> str:
"""获取用户显示名称"""
if self.name:
return self.name
return self.email.split('@')[0]
def update_last_login(self) -> None:
"""更新最后登录时间"""
self.last_login_at = datetime.utcnow()
self.updated_at = datetime.utcnow()
@property
def is_new_user(self) -> bool:
"""检查是否为新用户(注册7天内)"""
if not self.created_at:
return False
days_since_creation = (datetime.utcnow() - self.created_at).days
return days_since_creation <= 7
UserCredit类详细说明
class UserCredit(BaseModel):
"""
用户积分数据模型
管理用户的积分余额和交易记录。
支持多币种和事务性操作。
字段说明:
- id: 积分记录唯一标识符
- user_id: 关联的用户ID
- balance: 当前余额(以分为单位)
- currency: 货币类型(默认USD)
- created_at: 创建时间
- updated_at: 最后更新时间
核心方法:
- add_credits(): 增加积分
- deduct_credits(): 扣除积分
- get_balance(): 获取当前余额
"""
id: str = Field(..., description="积分记录ID")
user_id: str = Field(..., description="用户ID")
balance: int = Field(default=0, ge=0, description="余额(分)")
currency: str = Field(default="USD", description="货币类型")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
def add_credits(self, amount: int, description: str = "") -> CreditTransaction:
"""
增加积分
参数:
amount: 增加的积分数量(分)
description: 交易描述
返回:
CreditTransaction: 交易记录
"""
if amount <= 0:
raise ValueError("Amount must be positive")
self.balance += amount
self.updated_at = datetime.utcnow()
# 创建交易记录
transaction = CreditTransaction(
id=str(uuid.uuid4()),
user_id=self.user_id,
amount=amount,
transaction_type=TransactionType.PURCHASE,
description=description or f"Added {amount} credits",
created_at=datetime.utcnow()
)
return transaction
def deduct_credits(self, amount: int, description: str = "") -> tuple[bool, Optional[CreditTransaction]]:
"""
扣除积分
参数:
amount: 扣除的积分数量(分)
description: 交易描述
返回:
tuple[bool, CreditTransaction]: (是否成功, 交易记录)
"""
if amount <= 0:
raise ValueError("Amount must be positive")
if self.balance < amount:
return False, None
self.balance -= amount
self.updated_at = datetime.utcnow()
# 创建交易记录
transaction = CreditTransaction(
id=str(uuid.uuid4()),
user_id=self.user_id,
amount=-amount, # 负数表示扣除
transaction_type=TransactionType.EXECUTION_COST,
description=description or f"Deducted {amount} credits",
created_at=datetime.utcnow()
)
return True, transaction
def get_balance(self) -> int:
"""获取当前余额"""
return self.balance
集成与凭据数据模型
凭据相关类图
classDiagram
class IntegrationCredentials {
<<abstract>>
+id: str
+user_id: str
+provider: str
+created_at: datetime
+updated_at: datetime
+expires_at: datetime
+is_active: bool
+metadata: dict
+is_expired() bool
+is_valid() bool
+refresh() bool
}
class OAuth2Credentials {
+access_token: str
+refresh_token: str
+token_type: str
+scopes: List[str]
+username: str
+refresh_access_token() bool
+get_bearer_token() str
}
class APIKeyCredentials {
+api_key: str
+key_name: str
+permissions: List[str]
+validate_key() bool
+get_bearer_token() str
}
class CredentialsProvider {
+name: str
+display_name: str
+auth_type: AuthType
+config: dict
+supported_scopes: List[str]
+create_credentials(auth_data) IntegrationCredentials
+validate_credentials(credentials) bool
}
class AuthType {
<<enumeration>>
OAUTH2
API_KEY
BASIC_AUTH
CUSTOM
}
IntegrationCredentials <|-- OAuth2Credentials
IntegrationCredentials <|-- APIKeyCredentials
CredentialsProvider }|--|| AuthType : uses_auth_type
CredentialsProvider ||--o{ IntegrationCredentials : creates
IntegrationCredentials抽象基类
class IntegrationCredentials(BaseModel, ABC):
"""
集成凭据抽象基类
定义了所有凭据类型的通用接口和属性。
支持凭据的生命周期管理和验证。
字段说明:
- id: 凭据唯一标识符
- user_id: 关联的用户ID
- provider: 服务提供商名称
- created_at: 创建时间
- updated_at: 最后更新时间
- expires_at: 过期时间(可选)
- is_active: 是否激活
- metadata: 额外元数据
抽象方法:
- is_valid(): 检查凭据是否有效
- refresh(): 刷新凭据
"""
id: str = Field(..., description="凭据ID")
user_id: str = Field(..., description="用户ID")
provider: str = Field(..., description="服务提供商")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
expires_at: Optional[datetime] = Field(None, description="过期时间")
is_active: bool = Field(default=True, description="是否激活")
metadata: dict = Field(default_factory=dict, description="元数据")
def is_expired(self) -> bool:
"""检查凭据是否过期"""
if not self.expires_at:
return False
return datetime.utcnow() > self.expires_at
@abstractmethod
def is_valid(self) -> bool:
"""检查凭据是否有效"""
pass
@abstractmethod
async def refresh(self) -> bool:
"""刷新凭据"""
pass
OAuth2Credentials类详细说明
class OAuth2Credentials(IntegrationCredentials):
"""
OAuth2凭据数据模型
实现OAuth2认证流程的凭据管理。
支持访问令牌刷新和权限范围管理。
字段说明:
- access_token: 访问令牌
- refresh_token: 刷新令牌
- token_type: 令牌类型(通常为Bearer)
- scopes: 权限范围列表
- username: 关联的用户名(可选)
核心方法:
- refresh_access_token(): 使用刷新令牌获取新的访问令牌
- get_bearer_token(): 获取Bearer格式的认证头
"""
access_token: SecretStr = Field(..., description="访问令牌")
refresh_token: Optional[SecretStr] = Field(None, description="刷新令牌")
token_type: str = Field(default="Bearer", description="令牌类型")
scopes: List[str] = Field(default_factory=list, description="权限范围")
username: Optional[str] = Field(None, description="用户名")
def is_valid(self) -> bool:
"""检查OAuth2凭据是否有效"""
if not self.is_active:
return False
if self.is_expired():
return False
if not self.access_token.get_secret_value():
return False
return True
async def refresh(self) -> bool:
"""刷新OAuth2访问令牌"""
if not self.refresh_token:
return False
try:
# 这里应该调用具体的OAuth2提供商API
# 以下是示例代码
refresh_data = await self._call_refresh_api()
if refresh_data:
self.access_token = SecretStr(refresh_data['access_token'])
if 'refresh_token' in refresh_data:
self.refresh_token = SecretStr(refresh_data['refresh_token'])
# 更新过期时间
if 'expires_in' in refresh_data:
expires_in = refresh_data['expires_in']
self.expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
self.updated_at = datetime.utcnow()
return True
except Exception as e:
logger.error(f"Failed to refresh OAuth2 token: {e}")
return False
def get_bearer_token(self) -> str:
"""获取Bearer格式的认证头"""
return f"Bearer {self.access_token.get_secret_value()}"
async def _call_refresh_api(self) -> Optional[dict]:
"""调用OAuth2刷新API(需要具体实现)"""
# 这里应该根据具体的OAuth2提供商实现
# 返回包含新令牌的字典
pass
监控与日志数据模型
监控相关类图
classDiagram
class ExecutionLog {
+id: str
+execution_id: str
+node_id: str
+level: LogLevel
+message: str
+timestamp: datetime
+metadata: dict
+user_id: str
+get_formatted_message() str
}
class LogLevel {
<<enumeration>>
DEBUG
INFO
WARNING
ERROR
CRITICAL
}
class ExecutionMetrics {
+id: str
+execution_id: str
+node_id: str
+metric_name: str
+metric_value: float
+metric_unit: str
+timestamp: datetime
+tags: dict
+get_prometheus_format() str
}
class SystemMetrics {
+timestamp: datetime
+cpu_usage: float
+memory_usage: float
+disk_usage: float
+active_executions: int
+queue_size: int
+error_rate: float
+calculate_health_score() float
}
class AlertRule {
+id: str
+name: str
+condition: str
+threshold: float
+severity: AlertSeverity
+is_active: bool
+notification_channels: List[str]
+evaluate(metrics) bool
+trigger_alert() void
}
class AlertSeverity {
<<enumeration>>
LOW
MEDIUM
HIGH
CRITICAL
}
ExecutionLog }|--|| LogLevel : has_level
ExecutionMetrics ||--|| SystemMetrics : aggregates_to
AlertRule }|--|| AlertSeverity : has_severity
ExecutionLog类详细说明
class ExecutionLog(BaseModel):
"""
执行日志数据模型
记录图执行和节点执行过程中的日志信息。
支持结构化日志和元数据存储。
字段说明:
- id: 日志记录唯一标识符
- execution_id: 关联的执行ID(图或节点)
- node_id: 节点ID(可选,节点级日志)
- level: 日志级别
- message: 日志消息
- timestamp: 时间戳
- metadata: 结构化元数据
- user_id: 关联的用户ID
核心方法:
- get_formatted_message(): 获取格式化的日志消息
"""
id: str = Field(..., description="日志ID")
execution_id: str = Field(..., description="执行ID")
node_id: Optional[str] = Field(None, description="节点ID")
level: LogLevel = Field(..., description="日志级别")
message: str = Field(..., description="日志消息")
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict, description="元数据")
user_id: str = Field(..., description="用户ID")
def get_formatted_message(self) -> str:
"""获取格式化的日志消息"""
timestamp_str = self.timestamp.strftime("%Y-%m-%d %H:%M:%S")
level_str = self.level.value.upper()
if self.node_id:
return f"[{timestamp_str}] {level_str} [Node:{self.node_id}] {self.message}"
else:
return f"[{timestamp_str}] {level_str} [Execution:{self.execution_id}] {self.message}"
def to_structured_log(self) -> dict:
"""转换为结构化日志格式"""
return {
"timestamp": self.timestamp.isoformat(),
"level": self.level.value,
"message": self.message,
"execution_id": self.execution_id,
"node_id": self.node_id,
"user_id": self.user_id,
"metadata": self.metadata,
}
SystemMetrics类详细说明
class SystemMetrics(BaseModel):
"""
系统指标数据模型
收集和存储系统级别的性能指标。
用于监控平台健康状态和性能趋势。
字段说明:
- timestamp: 指标收集时间
- cpu_usage: CPU使用率(百分比)
- memory_usage: 内存使用率(百分比)
- disk_usage: 磁盘使用率(百分比)
- active_executions: 活跃执行数量
- queue_size: 队列大小
- error_rate: 错误率(百分比)
核心方法:
- calculate_health_score(): 计算系统健康评分
"""
timestamp: datetime = Field(default_factory=datetime.utcnow)
cpu_usage: float = Field(..., ge=0, le=100, description="CPU使用率")
memory_usage: float = Field(..., ge=0, le=100, description="内存使用率")
disk_usage: float = Field(..., ge=0, le=100, description="磁盘使用率")
active_executions: int = Field(..., ge=0, description="活跃执行数")
queue_size: int = Field(..., ge=0, description="队列大小")
error_rate: float = Field(..., ge=0, le=100, description="错误率")
def calculate_health_score(self) -> float:
"""
计算系统健康评分
评分规则:
- CPU使用率权重:0.2
- 内存使用率权重:0.3
- 磁盘使用率权重:0.1
- 错误率权重:0.4
返回:
float: 健康评分(0-100)
"""
# 将使用率转换为健康分数(使用率越低,分数越高)
cpu_score = max(0, 100 - self.cpu_usage)
memory_score = max(0, 100 - self.memory_usage)
disk_score = max(0, 100 - self.disk_usage)
error_score = max(0, 100 - self.error_rate)
# 加权平均
health_score = (
cpu_score * 0.2 +
memory_score * 0.3 +
disk_score * 0.1 +
error_score * 0.4
)
return round(health_score, 2)
def is_healthy(self) -> bool:
"""检查系统是否健康"""
return self.calculate_health_score() >= 70.0
def get_alerts(self) -> List[str]:
"""获取需要告警的指标"""
alerts = []
if self.cpu_usage > 80:
alerts.append(f"High CPU usage: {self.cpu_usage}%")
if self.memory_usage > 85:
alerts.append(f"High memory usage: {self.memory_usage}%")
if self.disk_usage > 90:
alerts.append(f"High disk usage: {self.disk_usage}%")
if self.error_rate > 5:
alerts.append(f"High error rate: {self.error_rate}%")
if self.queue_size > 100:
alerts.append(f"Large queue size: {self.queue_size}")
return alerts
数据关系总览
完整数据关系图
erDiagram
%% 用户相关
User {
string id PK
string email UK
string name
enum role
string timezone
datetime created_at
boolean is_active
}
UserCredit {
string id PK
string user_id FK
int balance
string currency
datetime updated_at
}
%% 图相关
Graph {
string id PK
string name
string user_id FK
int version
boolean is_active
boolean is_template
datetime created_at
}
Node {
string id PK
string graph_id FK
string block_id
string label
json input_default
json metadata
}
Link {
string id PK
string graph_id FK
string source_id FK
string sink_id FK
string source_name
string sink_name
}
%% 执行相关
GraphExecution {
string id PK
string graph_id FK
string user_id FK
enum status
json inputs
json outputs
datetime created_at
datetime started_at
datetime ended_at
}
NodeExecution {
string id PK
string graph_execution_id FK
string node_id FK
enum status
json inputs
json outputs
datetime started_at
datetime ended_at
}
%% Block相关
Block {
string id PK
string name
string description
enum block_type
json input_schema
json output_schema
}
BlockCost {
string id PK
string block_id FK
enum cost_type
int cost_amount
json cost_filter
}
%% 凭据相关
IntegrationCredentials {
string id PK
string user_id FK
string provider
enum auth_type
datetime expires_at
boolean is_active
}
%% 监控相关
ExecutionLog {
string id PK
string execution_id FK
string node_id FK
enum level
string message
datetime timestamp
json metadata
}
ExecutionMetrics {
string id PK
string execution_id FK
string metric_name
float metric_value
datetime timestamp
}
%% 关系定义
User ||--o{ Graph : creates
User ||--o{ GraphExecution : executes
User ||--|| UserCredit : has
User ||--o{ IntegrationCredentials : owns
Graph ||--o{ Node : contains
Graph ||--o{ Link : contains
Graph ||--o{ GraphExecution : executed_as
Node }|--|| Block : implements
Node ||--o{ NodeExecution : executed_as
Link }|--|| Node : source
Link }|--|| Node : sink
GraphExecution ||--o{ NodeExecution : contains
GraphExecution ||--o{ ExecutionLog : generates
GraphExecution ||--o{ ExecutionMetrics : produces
NodeExecution ||--o{ ExecutionLog : generates
NodeExecution ||--o{ ExecutionMetrics : produces
Block ||--o{ BlockCost : has_costs
数据一致性约束
1. 引用完整性约束
-- 用户相关约束
ALTER TABLE graphs ADD CONSTRAINT fk_graph_user
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
ALTER TABLE graph_executions ADD CONSTRAINT fk_execution_user
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
ALTER TABLE graph_executions ADD CONSTRAINT fk_execution_graph
FOREIGN KEY (graph_id) REFERENCES graphs(id) ON DELETE CASCADE;
-- 图结构约束
ALTER TABLE nodes ADD CONSTRAINT fk_node_graph
FOREIGN KEY (graph_id) REFERENCES graphs(id) ON DELETE CASCADE;
ALTER TABLE links ADD CONSTRAINT fk_link_graph
FOREIGN KEY (graph_id) REFERENCES graphs(id) ON DELETE CASCADE;
ALTER TABLE links ADD CONSTRAINT fk_link_source
FOREIGN KEY (source_id) REFERENCES nodes(id) ON DELETE CASCADE;
ALTER TABLE links ADD CONSTRAINT fk_link_sink
FOREIGN KEY (sink_id) REFERENCES nodes(id) ON DELETE CASCADE;
-- 执行相关约束
ALTER TABLE node_executions ADD CONSTRAINT fk_node_exec_graph_exec
FOREIGN KEY (graph_execution_id) REFERENCES graph_executions(id) ON DELETE CASCADE;
ALTER TABLE node_executions ADD CONSTRAINT fk_node_exec_node
FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE;
2. 业务逻辑约束
-- 确保每个图只有一个活跃版本
CREATE UNIQUE INDEX idx_graph_active_version
ON graphs(user_id, name) WHERE is_active = true;
-- 确保用户邮箱唯一
CREATE UNIQUE INDEX idx_user_email ON users(email);
-- 确保执行状态的有效性
ALTER TABLE graph_executions ADD CONSTRAINT chk_execution_status
CHECK (status IN ('queued', 'running', 'completed', 'failed', 'cancelled'));
-- 确保时间逻辑的正确性
ALTER TABLE graph_executions ADD CONSTRAINT chk_execution_times
CHECK (started_at >= created_at AND (ended_at IS NULL OR ended_at >= started_at));
3. 数据完整性检查
async def validate_graph_integrity(graph_id: str) -> List[str]:
"""
验证图数据完整性
检查项目:
1. 所有节点都有对应的Block定义
2. 所有连接的源节点和目标节点都存在
3. 没有孤立的节点(除了入口和出口节点)
4. 没有循环依赖
参数:
graph_id: 要验证的图ID
返回:
List[str]: 发现的问题列表
"""
issues = []
# 获取图数据
graph = await get_graph_by_id(graph_id)
if not graph:
return ["Graph not found"]
# 检查节点Block定义
for node in graph.nodes:
block = get_block(node.block_id)
if not block:
issues.append(f"Node {node.id} references unknown block {node.block_id}")
# 检查连接有效性
node_ids = {node.id for node in graph.nodes}
for link in graph.links:
if link.source_id not in node_ids:
issues.append(f"Link {link.id} references unknown source node {link.source_id}")
if link.sink_id not in node_ids:
issues.append(f"Link {link.id} references unknown sink node {link.sink_id}")
# 检查循环依赖
if has_cycles(graph):
issues.append("Graph contains circular dependencies")
# 检查节点可达性
unreachable_nodes = find_unreachable_nodes(graph)
if unreachable_nodes:
issues.append(f"Unreachable nodes found: {', '.join(unreachable_nodes)}")
return issues
总结
AutoGPT平台的数据结构设计体现了以下特点:
设计优势
- 模块化设计:各个数据模型职责清晰,相互独立又有机关联
- 类型安全:使用Pydantic进行严格的数据验证和类型检查
- 扩展性强:支持版本管理、模板系统和插件化架构
- 监控完备:完整的日志、指标和告警数据模型
- 安全可靠:多层次的数据验证和完整性约束
核心特性
- 图结构管理:支持复杂的工作流定义和版本控制
- 执行引擎:完整的执行生命周期管理和状态跟踪
- Block系统:灵活的插件化架构和成本管理
- 用户管理:完善的用户体系和权限控制
- 集成支持:多种认证方式和凭据管理
- 可观测性:全面的监控、日志和指标收集
最佳实践
- 数据一致性:通过外键约束和业务逻辑验证确保数据完整性
- 性能优化:合理的索引设计和查询优化
- 安全性:敏感数据加密存储和访问控制
- 可维护性:清晰的数据模型文档和变更管理
- 可扩展性:预留扩展字段和版本兼容性设计
通过这套完整的数据结构设计,AutoGPT平台能够支持复杂的AI工作流管理、高效的执行调度和全面的系统监控,为用户提供稳定可靠的智能体开发和运行环境。