概述
AutoGPT执行引擎是平台的核心执行模块,负责智能体工作流的分布式调度和执行。模块基于RabbitMQ消息队列和线程池实现高并发任务处理,通过Redis集群锁确保跨节点的执行一致性,支持节点级粒度的状态追踪和错误恢复机制。采用多线程异步执行模型,可高效处理复杂的图结构工作流。
1. 执行引擎整体架构
1.1 执行引擎设计原则
AutoGPT执行引擎遵循以下核心设计原则:
- 分布式协调:基于RabbitMQ的消息队列实现跨节点任务分发
- 高并发处理:线程池模型支持多图并行执行
- 状态可追踪:细粒度的执行状态记录和实时更新
- 容错机制:集群锁防止重复执行,支持失败重试和优雅降级
- 资源管理:积分扣费、文件清理和内存控制
1.2 执行引擎架构图
graph TB
subgraph "AutoGPT执行引擎架构"
subgraph "消息队列层 - Message Queue Layer"
RabbitMQ[RabbitMQ消息队列]
ExecQueue[执行队列]
CancelQueue[取消队列]
end
subgraph "执行管理层 - Execution Management"
ExecManager[ExecutionManager]
ThreadPool[线程池执行器]
RunConsumer[执行消费者线程]
CancelConsumer[取消消费者线程]
end
subgraph "任务处理层 - Task Processing"
ExecProcessor[ExecutionProcessor]
GraphExecutor[图执行器]
NodeExecutor[节点执行器]
BlockExecutor[Block执行器]
end
subgraph "状态管理层 - State Management"
ClusterLock[集群锁管理器]
ExecStatus[执行状态跟踪]
ProgressMonitor[进度监控器]
StatsCollector[统计收集器]
end
subgraph "资源管理层 - Resource Management"
CreditManager[积分管理器]
FileManager[文件管理器]
MemoryManager[内存管理器]
CredManager[凭据管理器]
end
subgraph "数据持久化层 - Data Persistence"
ExecDB[执行数据库]
GraphDB[图数据库]
UserDB[用户数据库]
RedisCache[Redis缓存]
end
subgraph "通知层 - Notification Layer"
EventBus[事件总线]
WSServer[WebSocket服务器]
NotificationMgr[通知管理器]
end
end
%% 连接关系
RabbitMQ --> ExecQueue
RabbitMQ --> CancelQueue
ExecQueue --> RunConsumer
CancelQueue --> CancelConsumer
RunConsumer --> ExecManager
CancelConsumer --> ExecManager
ExecManager --> ThreadPool
ThreadPool --> ExecProcessor
ExecProcessor --> GraphExecutor
GraphExecutor --> NodeExecutor
NodeExecutor --> BlockExecutor
ExecProcessor --> ClusterLock
ExecProcessor --> ExecStatus
ExecProcessor --> ProgressMonitor
GraphExecutor --> CreditManager
NodeExecutor --> FileManager
BlockExecutor --> CredManager
ExecProcessor --> ExecDB
GraphExecutor --> GraphDB
CreditManager --> UserDB
ClusterLock --> RedisCache
ExecProcessor --> EventBus
EventBus --> WSServer
EventBus --> NotificationMgr
图1-1: AutoGPT执行引擎架构图
此架构图展示了执行引擎的完整分层结构。消息队列层负责任务的分发和协调,执行管理层管理线程池和消费者,任务处理层实现具体的执行逻辑,状态管理层追踪执行状态,资源管理层控制资源使用,数据持久化层提供数据存储,通知层实现实时状态推送。
1.3 执行流程概览
sequenceDiagram
participant Client as 客户端/API
participant Queue as RabbitMQ队列
participant Manager as ExecutionManager
participant Processor as ExecutionProcessor
participant Executor as GraphExecutor
participant Node as NodeExecutor
participant DB as 数据库
participant WS as WebSocket
Client->>Queue: 提交图执行请求
Queue->>Manager: 消费执行消息
Manager->>Manager: 获取集群锁
Manager->>Processor: 提交到线程池
Processor->>DB: 更新状态为RUNNING
Processor->>Executor: 开始图执行
loop 每个节点
Executor->>Node: 执行节点
Node->>Node: 执行Block逻辑
Node->>DB: 保存节点输出
Node->>WS: 推送节点状态
Node->>Executor: 返回输出
Executor->>Executor: 确定下一批节点
end
Executor->>DB: 更新状态为COMPLETED
Executor->>WS: 推送完成事件
Executor-->>Manager: 执行完成
Manager->>Manager: 释放集群锁
图1-2: 执行引擎工作流程时序图
此时序图展示了从任务提交到执行完成的完整流程。执行请求通过RabbitMQ队列进入,ExecutionManager获取集群锁后将任务提交到线程池,ExecutionProcessor更新状态并启动图执行,GraphExecutor按拓扑顺序执行节点,NodeExecutor处理单个节点的Block逻辑,执行过程中实时更新数据库和推送WebSocket事件。
2. ExecutionManager执行管理器
2.1 ExecutionManager核心职责
ExecutionManager是执行引擎的入口组件,负责消息队列管理、线程池调度和集群协调。
# /autogpt_platform/backend/backend/executor/manager.py
class ExecutionManager(AppProcess):
"""
执行管理器主类
核心职责:
1. 管理RabbitMQ消息队列的连接和消费
2. 维护执行线程池和活跃任务列表
3. 协调集群锁确保任务唯一性
4. 处理任务的启动、取消和清理
5. 监控系统资源和执行指标
架构特点:
- 双消费者模型:执行消费者和取消消费者并行工作
- 线程池隔离:每个图执行在独立线程中运行
- 集群锁机制:基于Redis的分布式锁防止重复执行
- 优雅停机:支持等待运行中任务完成
"""
def __init__(self):
super().__init__()
self.pool_size = settings.config.num_graph_workers
self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {}
self.executor_id = str(uuid.uuid4())
self._executor = None
self._stop_consuming = None
self._cancel_thread = None
self._cancel_client = None
self._run_thread = None
self._run_client = None
self._execution_locks = {}
@property
def executor(self) -> ThreadPoolExecutor:
"""
获取线程池执行器
延迟初始化模式,首次访问时创建线程池。
线程池大小由配置决定,支持多图并行执行。
返回:
ThreadPoolExecutor: 配置好的线程池实例
"""
if self._executor is None:
self._executor = ThreadPoolExecutor(
max_workers=self.pool_size,
initializer=init_worker,
)
return self._executor
2.2 消息队列消费机制
ExecutionManager使用两个独立的消费者线程处理执行和取消消息。
def run(self):
"""
启动执行管理器服务
启动流程:
1. 初始化Prometheus监控端点
2. 启动执行消息消费者线程
3. 启动取消消息消费者线程
4. 等待停止信号
5. 优雅停机,等待运行中任务完成
"""
# 启动Prometheus指标服务器
if settings.config.enable_prometheus:
start_http_server(settings.config.prometheus_port)
# 启动消费者线程
self.run_thread.start()
self.cancel_thread.start()
# 等待停止信号
while not self.stop_consuming.is_set():
time.sleep(1)
# 优雅停机
self._graceful_shutdown()
def _consume_execution_run(self):
"""
消费执行队列的核心循环
工作流程:
1. 连接到RabbitMQ执行队列
2. 设置消息预取数量
3. 注册消息处理回调
4. 启动阻塞消费循环
5. 停止时清理连接
"""
configure_logging()
set_service_name("ExecutionManager-RunConsumer")
# 连接到执行队列
self.run_client.connect()
# 设置预取数量,控制并发
self.run_client.channel.basic_qos(
prefetch_count=self.pool_size
)
# 注册消息处理器
self.run_client.channel.basic_consume(
queue=GRAPH_EXECUTION_QUEUE_NAME,
on_message_callback=self._handle_run_message,
auto_ack=False,
)
# 开始消费循环
while not self.stop_consuming.is_set():
self.run_client.connection.process_data_events(time_limit=1)
self.run_client.close()
2.3 执行请求处理
def _handle_run_message(
self,
_channel: BlockingChannel,
method: Basic.Deliver,
_properties: BasicProperties,
body: bytes,
):
"""
处理执行队列消息的核心方法
处理流程:
1. 解析消息体获取执行请求
2. 检查本地是否已有相同执行
3. 尝试获取集群锁
4. 将任务提交到线程池
5. 注册完成回调
6. 确认消息
参数:
method: RabbitMQ消息方法
body: 消息体字节流
"""
# 定义ACK辅助函数
def _ack_message(reject: bool = False, requeue: bool = False):
if reject:
_channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=requeue
)
else:
_channel.basic_ack(delivery_tag=method.delivery_tag)
try:
# 解析执行请求
graph_exec_entry = GraphExecutionEntry.model_validate_json(body)
graph_exec_id = graph_exec_entry.graph_exec_id
logger.info(
f"Received RUN for graph_exec_id={graph_exec_id}"
)
# 检查本地重复执行
if graph_exec_id in self.active_graph_runs:
logger.warning(
f"Graph {graph_exec_id} already running locally"
)
_ack_message(reject=True, requeue=True)
return
# 尝试获取集群锁
cluster_lock = ClusterLock(
redis=redis.get_redis(),
key=f"exec_lock:{graph_exec_id}",
owner_id=self.executor_id,
timeout=settings.config.cluster_lock_timeout,
)
current_owner = cluster_lock.try_acquire()
if current_owner != self.executor_id:
if current_owner is not None:
logger.warning(
f"Graph {graph_exec_id} already running on pod {current_owner}"
)
else:
logger.warning(
f"Could not acquire lock for {graph_exec_id}"
)
_ack_message(reject=True, requeue=True)
return
# 保存锁引用
self._execution_locks[graph_exec_id] = cluster_lock
logger.info(
f"Acquired cluster lock for {graph_exec_id}"
)
# 创建取消事件
cancel_event = threading.Event()
# 提交到线程池
future = self.executor.submit(
execute_graph,
graph_exec_entry,
cancel_event,
cluster_lock
)
# 保存活跃执行引用
self.active_graph_runs[graph_exec_id] = (future, cancel_event)
self._update_prompt_metrics()
# 注册完成回调
def _on_run_done(f: Future):
logger.info(
f"Graph execution {graph_exec_id} completed"
)
# 清理资源
self.active_graph_runs.pop(graph_exec_id, None)
lock = self._execution_locks.pop(graph_exec_id, None)
if lock:
lock.release()
self._update_prompt_metrics()
future.add_done_callback(_on_run_done)
# 确认消息
_ack_message()
except Exception as e:
logger.exception(f"Error handling run message: {e}")
_ack_message(reject=True, requeue=False)
2.4 取消请求处理
def _handle_cancel_message(
self,
_channel: BlockingChannel,
method: Basic.Deliver,
_properties: BasicProperties,
body: bytes,
):
"""
处理取消队列消息
取消流程:
1. 解析取消事件
2. 查找对应的活跃执行
3. 设置取消事件标志
4. 等待执行线程响应
5. 确认消息
参数:
method: RabbitMQ消息方法
body: 消息体字节流
"""
def _ack_message():
_channel.basic_ack(delivery_tag=method.delivery_tag)
try:
# 解析取消事件
cancel_event_data = CancelExecutionEvent.model_validate_json(body)
graph_exec_id = cancel_event_data.graph_exec_id
logger.info(f"Received CANCEL for graph_exec_id={graph_exec_id}")
# 查找活跃执行
if graph_exec_id in self.active_graph_runs:
future, cancel_event = self.active_graph_runs[graph_exec_id]
# 设置取消标志
cancel_event.set()
logger.info(
f"Cancelled graph execution {graph_exec_id}"
)
else:
logger.warning(
f"Graph execution {graph_exec_id} not found in active runs"
)
_ack_message()
except Exception as e:
logger.exception(f"Error handling cancel message: {e}")
_ack_message()
3. ExecutionProcessor执行处理器
3.1 ExecutionProcessor核心功能
ExecutionProcessor是执行引擎的核心处理单元,负责图和节点的实际执行逻辑。
class ExecutionProcessor:
"""
执行处理器类
核心职责:
1. 初始化图执行环境
2. 按拓扑顺序执行节点
3. 管理节点间数据流转
4. 处理执行错误和状态更新
5. 收集执行统计信息
执行模型:
- 图执行:在单独的工作线程中运行
- 节点执行:在专用事件循环中异步执行
- Block执行:支持同步和异步两种模式
"""
def on_graph_executor_start(self):
"""
初始化图执行器进程
初始化步骤:
1. 配置日志系统
2. 设置服务名称
3. 创建凭据管理器
4. 启动节点执行事件循环
5. 启动节点评估事件循环
"""
configure_logging()
set_service_name("GraphExecutor")
self.tid = threading.get_ident()
self.creds_manager = IntegrationCredentialsManager()
# 创建节点执行事件循环
self.node_execution_loop = asyncio.new_event_loop()
self.node_evaluation_loop = asyncio.new_event_loop()
# 启动事件循环线程
self.node_execution_thread = threading.Thread(
target=self.node_execution_loop.run_forever,
daemon=True
)
self.node_evaluation_thread = threading.Thread(
target=self.node_evaluation_loop.run_forever,
daemon=True
)
self.node_execution_thread.start()
self.node_evaluation_thread.start()
logger.info(f"GraphExecutor {self.tid} started")
3.2 图执行主流程
def on_graph_execution(
self,
graph_exec: GraphExecutionEntry,
cancel: threading.Event,
cluster_lock: ClusterLock,
):
"""
图执行的主入口函数
执行步骤:
1. 初始化日志元数据和数据库客户端
2. 检查执行状态和用户余额
3. 获取图定义和验证结构
4. 创建节点执行队列
5. 启动节点执行循环
6. 等待所有节点完成
7. 更新最终状态和统计信息
参数:
graph_exec: 图执行请求对象
cancel: 取消事件标志
cluster_lock: 集群锁对象
"""
# 初始化日志元数据
log_metadata = LogMetadata(
logger=_logger,
user_id=graph_exec.user_id,
graph_eid=graph_exec.graph_exec_id,
graph_id=graph_exec.graph_id,
)
db_client = get_db_client()
# 获取执行元数据
exec_meta = db_client.get_graph_execution_meta(
user_id=graph_exec.user_id,
execution_id=graph_exec.graph_exec_id,
)
if exec_meta is None:
log_metadata.warning("Graph execution not found")
return
# 检查状态
if exec_meta.status not in [ExecutionStatus.QUEUED, ExecutionStatus.RUNNING]:
log_metadata.info(f"Execution already in {exec_meta.status} state")
return
# 检查用户余额
user_credit = db_client.get_user_credit(graph_exec.user_id)
if user_credit.balance <= 0:
log_metadata.warning("Insufficient balance")
db_client.update_graph_execution_status(
graph_exec.user_id,
graph_exec.graph_exec_id,
ExecutionStatus.FAILED,
error="Insufficient balance"
)
return
# 获取图定义
graph = db_client.get_graph(
graph_exec.graph_id,
graph_exec.graph_version,
user_id=graph_exec.user_id,
)
if not graph:
log_metadata.error("Graph not found")
return
# 验证图结构
try:
graph.validate_graph(for_run=True)
except Exception as e:
log_metadata.error(f"Graph validation failed: {e}")
db_client.update_graph_execution_status(
graph_exec.user_id,
graph_exec.graph_exec_id,
ExecutionStatus.FAILED,
error=str(e)
)
return
# 更新状态为RUNNING
db_client.update_graph_execution_status(
graph_exec.user_id,
graph_exec.graph_exec_id,
ExecutionStatus.RUNNING,
started_at=datetime.utcnow()
)
# 推送WebSocket事件
get_execution_event_bus().publish(
graph_exec.user_id,
{
"type": "execution_started",
"execution_id": graph_exec.graph_exec_id,
"graph_id": graph_exec.graph_id,
}
)
# 初始化执行统计
graph_stats = GraphExecutionStats()
graph_stats_lock = threading.Lock()
# 创建节点执行队列
node_queue = asyncio.Queue()
# 获取入口节点
entry_nodes = graph.get_entry_nodes()
for node in entry_nodes:
node_exec_entry = NodeExecutionEntry(
node_id=node.id,
graph_exec_id=graph_exec.graph_exec_id,
inputs=graph_exec.inputs.get(node.id, {}),
)
node_queue.put_nowait(node_exec_entry)
# 执行节点循环
asyncio.run_coroutine_threadsafe(
self._execute_nodes(
node_queue,
graph,
graph_exec,
cancel,
(graph_stats, graph_stats_lock),
),
self.node_execution_loop
).result()
# 更新最终状态
final_status = (
ExecutionStatus.CANCELLED if cancel.is_set()
else ExecutionStatus.COMPLETED if graph_stats.failed_nodes == 0
else ExecutionStatus.FAILED
)
db_client.update_graph_execution_status(
graph_exec.user_id,
graph_exec.graph_exec_id,
final_status,
ended_at=datetime.utcnow(),
stats=graph_stats
)
# 推送完成事件
get_execution_event_bus().publish(
graph_exec.user_id,
{
"type": "execution_completed",
"execution_id": graph_exec.graph_exec_id,
"status": final_status.value,
"stats": graph_stats.dict(),
}
)
3.3 节点执行循环
async def _execute_nodes(
self,
node_queue: asyncio.Queue,
graph: Graph,
graph_exec: GraphExecutionEntry,
cancel: threading.Event,
graph_stats_pair: tuple[GraphExecutionStats, threading.Lock],
):
"""
节点执行的主循环
循环逻辑:
1. 从队列获取待执行节点
2. 检查取消标志
3. 执行节点逻辑
4. 确定并入队下一批节点
5. 重复直到队列为空
特点:
- 支持并发执行多个节点
- 自动处理节点间依赖关系
- 实时更新执行进度
参数:
node_queue: 节点执行队列
graph: 图定义对象
graph_exec: 图执行请求
cancel: 取消事件
graph_stats_pair: 统计信息和锁
"""
# 创建执行任务集合
pending_tasks = set()
max_concurrent = settings.config.max_concurrent_nodes
while True:
# 检查取消标志
if cancel.is_set():
logger.info("Execution cancelled")
break
# 等待队列或任务完成
if node_queue.empty() and not pending_tasks:
break
# 限制并发数
while len(pending_tasks) < max_concurrent and not node_queue.empty():
try:
node_exec = node_queue.get_nowait()
except asyncio.QueueEmpty:
break
# 创建执行任务
task = asyncio.create_task(
self.on_node_execution(
node_exec,
graph,
graph_exec,
graph_stats_pair,
)
)
pending_tasks.add(task)
# 等待至少一个任务完成
if pending_tasks:
done, pending_tasks = await asyncio.wait(
pending_tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 处理完成的任务
for task in done:
try:
node_stats = task.result()
# 入队下一批节点
next_nodes = await self._enqueue_next_nodes(
graph,
node_stats,
graph_exec,
node_queue
)
except Exception as e:
logger.exception(f"Node execution error: {e}")
3.4 单节点执行逻辑
async def on_node_execution(
self,
node_exec: NodeExecutionEntry,
node_exec_progress: NodeExecutionProgress,
nodes_input_masks: Optional[NodesInputMasks],
graph_stats_pair: tuple[GraphExecutionStats, threading.Lock],
) -> NodeExecutionStats:
"""
执行单个节点
执行步骤:
1. 获取节点定义和Block实例
2. 准备输入数据和凭据
3. 创建执行记录
4. 调用Block的execute方法
5. 收集输出数据
6. 更新执行状态和统计
7. 扣除积分费用
参数:
node_exec: 节点执行请求
node_exec_progress: 进度追踪对象
nodes_input_masks: 输入掩码配置
graph_stats_pair: 图统计信息
返回:
NodeExecutionStats: 节点执行统计
"""
# 获取节点定义
node = graph.get_node(node_exec.node_id)
if not node:
raise ValueError(f"Node {node_exec.node_id} not found")
# 获取Block实例
block = get_block(node.block_id)
if not block:
raise ValueError(f"Block {node.block_id} not found")
# 初始化日志元数据
log_metadata = LogMetadata(
logger=_logger,
user_id=graph_exec.user_id,
graph_eid=graph_exec.graph_exec_id,
node_id=node.id,
node_eid=node_exec.id,
block_name=block.name,
)
# 创建统计对象
stats = NodeExecutionStats(
node_id=node.id,
block_name=block.name,
started_at=datetime.utcnow(),
)
db_client = get_db_client()
try:
# 创建节点执行记录
db_client.create_node_execution(
node_exec.id,
graph_exec.graph_exec_id,
node.id,
ExecutionStatus.RUNNING,
inputs=node_exec.inputs,
)
# 准备执行输入
block_input = BlockInput(
**node.input_default,
**node_exec.inputs,
)
# 获取凭据
credentials = self.creds_manager.get_credentials(
graph_exec.user_id,
node_exec.credentials_inputs or {},
)
# 执行Block
log_metadata.info(f"Executing block {block.name}")
outputs = {}
async for output_name, output_data in block.execute(
block_input,
credentials=credentials,
user_context=graph_exec.user_context,
):
outputs[output_name] = output_data
# 实时推送输出
node_exec_progress.add_output(output_name, output_data)
# 更新成功状态
stats.status = ExecutionStatus.COMPLETED
stats.outputs = outputs
stats.ended_at = datetime.utcnow()
db_client.update_node_execution(
node_exec.id,
ExecutionStatus.COMPLETED,
outputs=outputs,
ended_at=stats.ended_at,
)
# 计算并扣除积分
cost = block_usage_cost(block, block_input, outputs)
if cost > 0:
db_client.deduct_credits(
graph_exec.user_id,
cost,
f"Node execution: {node.id}",
)
stats.cost = cost
log_metadata.info(f"Node completed, cost: {cost}")
except Exception as e:
# 记录错误
stats.status = ExecutionStatus.FAILED
stats.error = str(e)
stats.ended_at = datetime.utcnow()
db_client.update_node_execution(
node_exec.id,
ExecutionStatus.FAILED,
error=str(e),
ended_at=stats.ended_at,
)
log_metadata.error(f"Node failed: {e}")
# 更新图统计
graph_stats, lock = graph_stats_pair
with lock:
graph_stats.total_nodes += 1
if stats.status == ExecutionStatus.COMPLETED:
graph_stats.completed_nodes += 1
else:
graph_stats.failed_nodes += 1
graph_stats.total_cost += stats.cost
return stats
4. 集群锁管理机制
4.1 ClusterLock实现
ClusterLock基于Redis实现分布式锁,确保同一图执行在集群中只运行一次。
# /autogpt_platform/backend/backend/executor/cluster_lock.py
class ClusterLock:
"""
基于Redis的集群锁
功能特点:
1. 分布式互斥:跨节点的执行唯一性保证
2. 超时机制:防止死锁,自动释放
3. 所有权验证:只有锁持有者可以释放
4. 健康检查:支持锁状态查询
使用场景:
- 防止重复执行同一图
- 协调多个执行管理器节点
- 故障恢复时的状态同步
"""
def __init__(
self,
redis: Redis,
key: str,
owner_id: str,
timeout: int = 300,
):
"""
初始化集群锁
参数:
redis: Redis客户端实例
key: 锁的键名
owner_id: 锁持有者标识符
timeout: 锁超时时间(秒)
"""
self.redis = redis
self.key = key
self.owner_id = owner_id
self.timeout = timeout
self._acquired = False
def try_acquire(self) -> Optional[str]:
"""
尝试获取锁
获取流程:
1. 使用SET NX命令尝试设置键
2. 如果成功,返回当前owner_id
3. 如果失败,返回当前锁持有者
4. Redis不可用时返回None
返回:
str: 锁持有者ID,获取成功时为自己的owner_id
None: Redis不可用或操作失败
"""
try:
# 尝试设置锁,NX表示键不存在时才设置
result = self.redis.set(
self.key,
self.owner_id,
nx=True,
ex=self.timeout,
)
if result:
self._acquired = True
logger.info(
f"Acquired lock {self.key} for {self.owner_id}"
)
return self.owner_id
else:
# 锁已被其他进程持有
current_owner = self.redis.get(self.key)
if current_owner:
logger.debug(
f"Lock {self.key} held by {current_owner}"
)
return current_owner.decode() if isinstance(current_owner, bytes) else current_owner
return None
except Exception as e:
logger.error(f"Failed to acquire lock {self.key}: {e}")
return None
def release(self) -> bool:
"""
释放锁
释放流程:
1. 验证当前进程是锁持有者
2. 删除Redis键
3. 更新内部状态
返回:
bool: 是否成功释放
"""
if not self._acquired:
logger.warning(f"Attempting to release unacquired lock {self.key}")
return False
try:
# 使用Lua脚本确保原子性
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(
lua_script,
1,
self.key,
self.owner_id,
)
if result:
self._acquired = False
logger.info(f"Released lock {self.key}")
return True
else:
logger.warning(
f"Failed to release lock {self.key}: not owner"
)
return False
except Exception as e:
logger.error(f"Error releasing lock {self.key}: {e}")
return False
def is_locked(self) -> bool:
"""
检查锁是否仍然有效
返回:
bool: 锁是否存在
"""
try:
return self.redis.exists(self.key) > 0
except Exception as e:
logger.error(f"Error checking lock {self.key}: {e}")
return False
def extend(self, additional_time: int) -> bool:
"""
延长锁的超时时间
用于长时间运行的任务,防止锁在执行过程中过期
参数:
additional_time: 额外的超时时间(秒)
返回:
bool: 是否成功延长
"""
if not self._acquired:
return False
try:
# 使用Lua脚本原子性地验证和延长
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(
lua_script,
1,
self.key,
self.owner_id,
self.timeout + additional_time,
)
return bool(result)
except Exception as e:
logger.error(f"Error extending lock {self.key}: {e}")
return False
4.2 集群锁使用时序图
sequenceDiagram
participant EM1 as ExecutionManager1
participant EM2 as ExecutionManager2
participant Redis as Redis
participant Graph as GraphExecutor
par 并发执行请求
EM1->>Redis: SET NX exec_lock:123
EM2->>Redis: SET NX exec_lock:123
end
Redis-->>EM1: OK (获取成功)
Redis-->>EM2: NULL (已被占用)
EM2->>EM2: 消息重新入队
EM1->>Graph: 提交执行任务
Graph->>Graph: 执行图逻辑
alt 需要延长锁
Graph->>Redis: EXPIRE exec_lock:123 600
Redis-->>Graph: OK
end
Graph->>Graph: 执行完成
Graph->>Redis: DEL exec_lock:123 (if owner)
Redis-->>Graph: OK
Graph-->>EM1: 执行结果
图4-1: 集群锁协调多节点执行时序图
此时序图展示了集群锁如何协调多个ExecutionManager节点。当两个节点同时收到相同的执行请求时,只有一个能成功获取锁并执行任务,另一个将消息重新入队等待。执行过程中可以延长锁的超时时间,执行完成后释放锁。
5. 执行状态追踪与通知
5.1 实时状态推送
执行引擎通过事件总线实时推送执行状态到WebSocket服务器。
# /autogpt_platform/backend/backend/executor/utils.py
class NodeExecutionProgress:
"""
节点执行进度追踪器
功能特点:
1. 实时输出收集
2. WebSocket事件推送
3. 状态变更通知
4. 错误信息记录
"""
def __init__(
self,
user_id: str,
graph_exec_id: str,
node_id: str,
node_exec_id: str,
):
self.user_id = user_id
self.graph_exec_id = graph_exec_id
self.node_id = node_id
self.node_exec_id = node_exec_id
self.outputs = {}
def add_output(self, name: str, data: Any):
"""
添加节点输出并推送事件
参数:
name: 输出端口名称
data: 输出数据
"""
self.outputs[name] = data
# 推送输出事件
get_execution_event_bus().publish(
self.user_id,
{
"type": "node_output",
"execution_id": self.graph_exec_id,
"node_id": self.node_id,
"node_execution_id": self.node_exec_id,
"output_name": name,
"output_data": data,
}
)
def update_status(self, status: ExecutionStatus):
"""
更新节点执行状态
参数:
status: 新的执行状态
"""
get_execution_event_bus().publish(
self.user_id,
{
"type": "node_status_update",
"execution_id": self.graph_exec_id,
"node_id": self.node_id,
"node_execution_id": self.node_exec_id,
"status": status.value,
}
)
def report_error(self, error: Exception):
"""
报告节点执行错误
参数:
error: 异常对象
"""
get_execution_event_bus().publish(
self.user_id,
{
"type": "node_error",
"execution_id": self.graph_exec_id,
"node_id": self.node_id,
"node_execution_id": self.node_exec_id,
"error": str(error),
"error_type": type(error).__name__,
}
)
5.2 执行完成通知
# /autogpt_platform/backend/backend/notifications/notifications.py
async def send_execution_notification(
user_id: str,
execution_id: str,
graph_name: str,
status: ExecutionStatus,
stats: GraphExecutionStats,
):
"""
发送执行完成通知
通知渠道:
1. WebSocket实时推送
2. 邮件通知(如果启用)
3. Discord通知(如果配置)
参数:
user_id: 用户ID
execution_id: 执行ID
graph_name: 图名称
status: 执行状态
stats: 执行统计信息
"""
# 构建通知数据
notification = NotificationEventModel(
type=NotificationType.AGENT_RUN_COMPLETED,
user_id=user_id,
data=AgentRunData(
execution_id=execution_id,
graph_name=graph_name,
status=status.value,
total_nodes=stats.total_nodes,
completed_nodes=stats.completed_nodes,
failed_nodes=stats.failed_nodes,
total_cost=stats.total_cost,
execution_time=stats.execution_time,
),
timestamp=datetime.utcnow(),
)
# 发送通知
await queue_notification(notification)
6. Scheduler调度器
6.1 Scheduler核心功能
Scheduler负责定时任务的调度和执行,支持Cron表达式定义的周期性任务。
# /autogpt_platform/backend/backend/executor/scheduler.py
class Scheduler(AppService):
"""
任务调度器
核心功能:
1. 定时触发图执行
2. 支持Cron表达式定义周期
3. 持久化调度任务状态
4. 故障恢复和重试机制
存储机制:
- 使用SQLAlchemy JobStore持久化任务
- 支持集群环境的任务调度
- 自动清理过期任务
"""
scheduler: BackgroundScheduler
def run_service(self):
"""
启动调度器服务
初始化步骤:
1. 创建异步事件循环
2. 配置调度器执行器
3. 设置JobStore
4. 加载已有调度任务
5. 启动调度器
"""
# 初始化事件循环
global _event_loop
_event_loop = asyncio.new_event_loop()
global _event_loop_thread
_event_loop_thread = threading.Thread(
target=_event_loop.run_forever,
daemon=True,
name="SchedulerEventLoop"
)
_event_loop_thread.start()
# 获取数据库配置
db_schema, db_url = _extract_schema_from_url(
os.getenv("DIRECT_URL")
)
# 配置调度器
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
self.scheduler = BackgroundScheduler(
executors={
"default": ThreadPoolExecutor(
max_workers=self.db_pool_size()
),
},
job_defaults={
"coalesce": True, # 跳过冗余的错过任务
"max_instances": 1000, # 最大并发实例
"misfire_grace_time": None, # 错过任务的宽限时间
},
jobstores={
Jobstores.EXECUTION.value: SQLAlchemyJobStore(
url=db_url,
tablename=f"{db_schema}.apscheduler_execution_jobs",
),
},
)
# 启动调度器
self.scheduler.start()
logger.info("Scheduler started")
# 保持运行
while not self.stop_event.is_set():
time.sleep(1)
# 停止调度器
self.scheduler.shutdown(wait=True)
logger.info("Scheduler stopped")
6.2 调度任务创建
async def create_scheduled_execution(
user_id: str,
graph_id: str,
schedule_cron: str,
inputs: dict[str, Any],
name: Optional[str] = None,
) -> str:
"""
创建定时执行任务
创建流程:
1. 验证Cron表达式格式
2. 创建调度任务记录
3. 添加到调度器
4. 返回任务ID
参数:
user_id: 用户ID
graph_id: 要执行的图ID
schedule_cron: Cron表达式
inputs: 执行输入数据
name: 任务名称
返回:
str: 任务ID
"""
# 验证Cron表达式
try:
CronTrigger.from_crontab(schedule_cron)
except Exception as e:
raise ValueError(f"Invalid cron expression: {e}")
# 创建任务ID
job_id = str(uuid.uuid4())
# 添加到调度器
scheduler = get_scheduler()
scheduler.scheduler.add_job(
func=_execute_scheduled_graph,
trigger=CronTrigger.from_crontab(schedule_cron),
args=[user_id, graph_id, inputs],
id=job_id,
name=name or f"Scheduled execution for {graph_id}",
jobstore=Jobstores.EXECUTION.value,
replace_existing=True,
)
logger.info(
f"Created scheduled job {job_id} for graph {graph_id}"
)
return job_id
async def _execute_scheduled_graph(
user_id: str,
graph_id: str,
inputs: dict[str, Any],
):
"""
执行调度的图任务
执行步骤:
1. 验证用户余额
2. 创建执行请求
3. 提交到执行队列
4. 记录调度日志
参数:
user_id: 用户ID
graph_id: 图ID
inputs: 执行输入
"""
logger.info(
f"Executing scheduled graph {graph_id} for user {user_id}"
)
try:
# 检查用户余额
db_client = get_db_client()
user_credit = db_client.get_user_credit(user_id)
if user_credit.balance <= 0:
logger.warning(
f"Insufficient balance for scheduled execution: {user_id}"
)
return
# 创建执行请求
from backend.util.execution import add_graph_execution
execution_meta = await add_graph_execution(
graph_id=graph_id,
user_id=user_id,
inputs=inputs,
)
logger.info(
f"Scheduled execution created: {execution_meta.id}"
)
except Exception as e:
logger.exception(
f"Error executing scheduled graph: {e}"
)
7. 执行引擎性能优化
7.1 线程池配置优化
# /autogpt_platform/backend/backend/util/settings.py
class ExecutorSettings(BaseSettings):
"""
执行器配置
性能参数:
- num_graph_workers: 图执行线程池大小
- max_concurrent_nodes: 单图内最大并发节点数
- cluster_lock_timeout: 集群锁超时时间
- execution_timeout: 单个执行的最大时长
"""
num_graph_workers: int = Field(
default=10,
description="图执行线程池大小",
ge=1,
le=100,
)
max_concurrent_nodes: int = Field(
default=5,
description="单图内最大并发节点数",
ge=1,
le=50,
)
cluster_lock_timeout: int = Field(
default=300,
description="集群锁超时时间(秒)",
ge=30,
le=3600,
)
execution_timeout: int = Field(
default=1800,
description="单个执行的最大时长(秒)",
ge=60,
le=7200,
)
@property
def optimal_thread_pool_size(self) -> int:
"""
计算最优线程池大小
基于以下因素:
1. CPU核心数
2. IO密集型工作负载特征
3. 系统内存限制
返回:
int: 推荐的线程池大小
"""
import os
cpu_count = os.cpu_count() or 4
# IO密集型任务,线程数可以超过CPU核心数
optimal_size = min(
cpu_count * 4,
self.num_graph_workers
)
return optimal_size
7.2 执行监控指标
# Prometheus指标定义
active_runs_gauge = Gauge(
"autogpt_active_executions",
"当前活跃的图执行数量",
["executor_id"]
)
execution_duration_histogram = Histogram(
"autogpt_execution_duration_seconds",
"图执行耗时分布",
["graph_id", "status"],
buckets=(1, 5, 10, 30, 60, 120, 300, 600, 1800, 3600)
)
node_execution_counter = Counter(
"autogpt_node_executions_total",
"节点执行总数",
["block_type", "status"]
)
credit_deduction_counter = Counter(
"autogpt_credits_deducted_total",
"积分扣除总额",
["user_id"]
)