概述

AutoGPT执行引擎是平台的核心执行模块,负责智能体工作流的分布式调度和执行。模块基于RabbitMQ消息队列和线程池实现高并发任务处理,通过Redis集群锁确保跨节点的执行一致性,支持节点级粒度的状态追踪和错误恢复机制。采用多线程异步执行模型,可高效处理复杂的图结构工作流。

1. 执行引擎整体架构

1.1 执行引擎设计原则

AutoGPT执行引擎遵循以下核心设计原则:

  • 分布式协调:基于RabbitMQ的消息队列实现跨节点任务分发
  • 高并发处理:线程池模型支持多图并行执行
  • 状态可追踪:细粒度的执行状态记录和实时更新
  • 容错机制:集群锁防止重复执行,支持失败重试和优雅降级
  • 资源管理:积分扣费、文件清理和内存控制

1.2 执行引擎架构图

graph TB
    subgraph "AutoGPT执行引擎架构"
        subgraph "消息队列层 - Message Queue Layer"
            RabbitMQ[RabbitMQ消息队列]
            ExecQueue[执行队列]
            CancelQueue[取消队列]
        end
        
        subgraph "执行管理层 - Execution Management"
            ExecManager[ExecutionManager]
            ThreadPool[线程池执行器]
            RunConsumer[执行消费者线程]
            CancelConsumer[取消消费者线程]
        end
        
        subgraph "任务处理层 - Task Processing"
            ExecProcessor[ExecutionProcessor]
            GraphExecutor[图执行器]
            NodeExecutor[节点执行器]
            BlockExecutor[Block执行器]
        end
        
        subgraph "状态管理层 - State Management"
            ClusterLock[集群锁管理器]
            ExecStatus[执行状态跟踪]
            ProgressMonitor[进度监控器]
            StatsCollector[统计收集器]
        end
        
        subgraph "资源管理层 - Resource Management"
            CreditManager[积分管理器]
            FileManager[文件管理器]
            MemoryManager[内存管理器]
            CredManager[凭据管理器]
        end
        
        subgraph "数据持久化层 - Data Persistence"
            ExecDB[执行数据库]
            GraphDB[图数据库]
            UserDB[用户数据库]
            RedisCache[Redis缓存]
        end
        
        subgraph "通知层 - Notification Layer"
            EventBus[事件总线]
            WSServer[WebSocket服务器]
            NotificationMgr[通知管理器]
        end
    end
    
    %% 连接关系
    RabbitMQ --> ExecQueue
    RabbitMQ --> CancelQueue
    
    ExecQueue --> RunConsumer
    CancelQueue --> CancelConsumer
    
    RunConsumer --> ExecManager
    CancelConsumer --> ExecManager
    
    ExecManager --> ThreadPool
    ThreadPool --> ExecProcessor
    
    ExecProcessor --> GraphExecutor
    GraphExecutor --> NodeExecutor
    NodeExecutor --> BlockExecutor
    
    ExecProcessor --> ClusterLock
    ExecProcessor --> ExecStatus
    ExecProcessor --> ProgressMonitor
    
    GraphExecutor --> CreditManager
    NodeExecutor --> FileManager
    BlockExecutor --> CredManager
    
    ExecProcessor --> ExecDB
    GraphExecutor --> GraphDB
    CreditManager --> UserDB
    ClusterLock --> RedisCache
    
    ExecProcessor --> EventBus
    EventBus --> WSServer
    EventBus --> NotificationMgr

图1-1: AutoGPT执行引擎架构图

此架构图展示了执行引擎的完整分层结构。消息队列层负责任务的分发和协调,执行管理层管理线程池和消费者,任务处理层实现具体的执行逻辑,状态管理层追踪执行状态,资源管理层控制资源使用,数据持久化层提供数据存储,通知层实现实时状态推送。

1.3 执行流程概览

sequenceDiagram
    participant Client as 客户端/API
    participant Queue as RabbitMQ队列
    participant Manager as ExecutionManager
    participant Processor as ExecutionProcessor
    participant Executor as GraphExecutor
    participant Node as NodeExecutor
    participant DB as 数据库
    participant WS as WebSocket

    Client->>Queue: 提交图执行请求
    Queue->>Manager: 消费执行消息
    Manager->>Manager: 获取集群锁
    Manager->>Processor: 提交到线程池
    Processor->>DB: 更新状态为RUNNING
    Processor->>Executor: 开始图执行
    
    loop 每个节点
        Executor->>Node: 执行节点
        Node->>Node: 执行Block逻辑
        Node->>DB: 保存节点输出
        Node->>WS: 推送节点状态
        Node->>Executor: 返回输出
        Executor->>Executor: 确定下一批节点
    end
    
    Executor->>DB: 更新状态为COMPLETED
    Executor->>WS: 推送完成事件
    Executor-->>Manager: 执行完成
    Manager->>Manager: 释放集群锁

图1-2: 执行引擎工作流程时序图

此时序图展示了从任务提交到执行完成的完整流程。执行请求通过RabbitMQ队列进入,ExecutionManager获取集群锁后将任务提交到线程池,ExecutionProcessor更新状态并启动图执行,GraphExecutor按拓扑顺序执行节点,NodeExecutor处理单个节点的Block逻辑,执行过程中实时更新数据库和推送WebSocket事件。

2. ExecutionManager执行管理器

2.1 ExecutionManager核心职责

ExecutionManager是执行引擎的入口组件,负责消息队列管理、线程池调度和集群协调。

# /autogpt_platform/backend/backend/executor/manager.py

class ExecutionManager(AppProcess):
    """
    执行管理器主类
    
    核心职责:
    1. 管理RabbitMQ消息队列的连接和消费
    2. 维护执行线程池和活跃任务列表
    3. 协调集群锁确保任务唯一性
    4. 处理任务的启动、取消和清理
    5. 监控系统资源和执行指标
    
    架构特点:
    - 双消费者模型:执行消费者和取消消费者并行工作
    - 线程池隔离:每个图执行在独立线程中运行
    - 集群锁机制:基于Redis的分布式锁防止重复执行
    - 优雅停机:支持等待运行中任务完成
    """
    
    def __init__(self):
        super().__init__()
        self.pool_size = settings.config.num_graph_workers
        self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {}
        self.executor_id = str(uuid.uuid4())
        
        self._executor = None
        self._stop_consuming = None
        self._cancel_thread = None
        self._cancel_client = None
        self._run_thread = None
        self._run_client = None
        self._execution_locks = {}

    @property
    def executor(self) -> ThreadPoolExecutor:
        """
        获取线程池执行器
        
        延迟初始化模式,首次访问时创建线程池。
        线程池大小由配置决定,支持多图并行执行。
        
        返回:
            ThreadPoolExecutor: 配置好的线程池实例
        """
        if self._executor is None:
            self._executor = ThreadPoolExecutor(
                max_workers=self.pool_size,
                initializer=init_worker,
            )
        return self._executor

2.2 消息队列消费机制

ExecutionManager使用两个独立的消费者线程处理执行和取消消息。

def run(self):
    """
    启动执行管理器服务
    
    启动流程:
    1. 初始化Prometheus监控端点
    2. 启动执行消息消费者线程
    3. 启动取消消息消费者线程
    4. 等待停止信号
    5. 优雅停机,等待运行中任务完成
    """
    # 启动Prometheus指标服务器
    if settings.config.enable_prometheus:
        start_http_server(settings.config.prometheus_port)
    
    # 启动消费者线程
    self.run_thread.start()
    self.cancel_thread.start()
    
    # 等待停止信号
    while not self.stop_consuming.is_set():
        time.sleep(1)
    
    # 优雅停机
    self._graceful_shutdown()

def _consume_execution_run(self):
    """
    消费执行队列的核心循环
    
    工作流程:
    1. 连接到RabbitMQ执行队列
    2. 设置消息预取数量
    3. 注册消息处理回调
    4. 启动阻塞消费循环
    5. 停止时清理连接
    """
    configure_logging()
    set_service_name("ExecutionManager-RunConsumer")
    
    # 连接到执行队列
    self.run_client.connect()
    
    # 设置预取数量,控制并发
    self.run_client.channel.basic_qos(
        prefetch_count=self.pool_size
    )
    
    # 注册消息处理器
    self.run_client.channel.basic_consume(
        queue=GRAPH_EXECUTION_QUEUE_NAME,
        on_message_callback=self._handle_run_message,
        auto_ack=False,
    )
    
    # 开始消费循环
    while not self.stop_consuming.is_set():
        self.run_client.connection.process_data_events(time_limit=1)
    
    self.run_client.close()

2.3 执行请求处理

def _handle_run_message(
    self,
    _channel: BlockingChannel,
    method: Basic.Deliver,
    _properties: BasicProperties,
    body: bytes,
):
    """
    处理执行队列消息的核心方法
    
    处理流程:
    1. 解析消息体获取执行请求
    2. 检查本地是否已有相同执行
    3. 尝试获取集群锁
    4. 将任务提交到线程池
    5. 注册完成回调
    6. 确认消息
    
    参数:
        method: RabbitMQ消息方法
        body: 消息体字节流
    """
    # 定义ACK辅助函数
    def _ack_message(reject: bool = False, requeue: bool = False):
        if reject:
            _channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=requeue
            )
        else:
            _channel.basic_ack(delivery_tag=method.delivery_tag)
    
    try:
        # 解析执行请求
        graph_exec_entry = GraphExecutionEntry.model_validate_json(body)
        graph_exec_id = graph_exec_entry.graph_exec_id
        
        logger.info(
            f"Received RUN for graph_exec_id={graph_exec_id}"
        )
        
        # 检查本地重复执行
        if graph_exec_id in self.active_graph_runs:
            logger.warning(
                f"Graph {graph_exec_id} already running locally"
            )
            _ack_message(reject=True, requeue=True)
            return
        
        # 尝试获取集群锁
        cluster_lock = ClusterLock(
            redis=redis.get_redis(),
            key=f"exec_lock:{graph_exec_id}",
            owner_id=self.executor_id,
            timeout=settings.config.cluster_lock_timeout,
        )
        
        current_owner = cluster_lock.try_acquire()
        if current_owner != self.executor_id:
            if current_owner is not None:
                logger.warning(
                    f"Graph {graph_exec_id} already running on pod {current_owner}"
                )
            else:
                logger.warning(
                    f"Could not acquire lock for {graph_exec_id}"
                )
            _ack_message(reject=True, requeue=True)
            return
        
        # 保存锁引用
        self._execution_locks[graph_exec_id] = cluster_lock
        
        logger.info(
            f"Acquired cluster lock for {graph_exec_id}"
        )
        
        # 创建取消事件
        cancel_event = threading.Event()
        
        # 提交到线程池
        future = self.executor.submit(
            execute_graph, 
            graph_exec_entry, 
            cancel_event, 
            cluster_lock
        )
        
        # 保存活跃执行引用
        self.active_graph_runs[graph_exec_id] = (future, cancel_event)
        self._update_prompt_metrics()
        
        # 注册完成回调
        def _on_run_done(f: Future):
            logger.info(
                f"Graph execution {graph_exec_id} completed"
            )
            # 清理资源
            self.active_graph_runs.pop(graph_exec_id, None)
            lock = self._execution_locks.pop(graph_exec_id, None)
            if lock:
                lock.release()
            self._update_prompt_metrics()
        
        future.add_done_callback(_on_run_done)
        
        # 确认消息
        _ack_message()
        
    except Exception as e:
        logger.exception(f"Error handling run message: {e}")
        _ack_message(reject=True, requeue=False)

2.4 取消请求处理

def _handle_cancel_message(
    self,
    _channel: BlockingChannel,
    method: Basic.Deliver,
    _properties: BasicProperties,
    body: bytes,
):
    """
    处理取消队列消息
    
    取消流程:
    1. 解析取消事件
    2. 查找对应的活跃执行
    3. 设置取消事件标志
    4. 等待执行线程响应
    5. 确认消息
    
    参数:
        method: RabbitMQ消息方法
        body: 消息体字节流
    """
    def _ack_message():
        _channel.basic_ack(delivery_tag=method.delivery_tag)
    
    try:
        # 解析取消事件
        cancel_event_data = CancelExecutionEvent.model_validate_json(body)
        graph_exec_id = cancel_event_data.graph_exec_id
        
        logger.info(f"Received CANCEL for graph_exec_id={graph_exec_id}")
        
        # 查找活跃执行
        if graph_exec_id in self.active_graph_runs:
            future, cancel_event = self.active_graph_runs[graph_exec_id]
            
            # 设置取消标志
            cancel_event.set()
            
            logger.info(
                f"Cancelled graph execution {graph_exec_id}"
            )
        else:
            logger.warning(
                f"Graph execution {graph_exec_id} not found in active runs"
            )
        
        _ack_message()
        
    except Exception as e:
        logger.exception(f"Error handling cancel message: {e}")
        _ack_message()

3. ExecutionProcessor执行处理器

3.1 ExecutionProcessor核心功能

ExecutionProcessor是执行引擎的核心处理单元,负责图和节点的实际执行逻辑。

class ExecutionProcessor:
    """
    执行处理器类
    
    核心职责:
    1. 初始化图执行环境
    2. 按拓扑顺序执行节点
    3. 管理节点间数据流转
    4. 处理执行错误和状态更新
    5. 收集执行统计信息
    
    执行模型:
    - 图执行:在单独的工作线程中运行
    - 节点执行:在专用事件循环中异步执行
    - Block执行:支持同步和异步两种模式
    """
    
    def on_graph_executor_start(self):
        """
        初始化图执行器进程
        
        初始化步骤:
        1. 配置日志系统
        2. 设置服务名称
        3. 创建凭据管理器
        4. 启动节点执行事件循环
        5. 启动节点评估事件循环
        """
        configure_logging()
        set_service_name("GraphExecutor")
        
        self.tid = threading.get_ident()
        self.creds_manager = IntegrationCredentialsManager()
        
        # 创建节点执行事件循环
        self.node_execution_loop = asyncio.new_event_loop()
        self.node_evaluation_loop = asyncio.new_event_loop()
        
        # 启动事件循环线程
        self.node_execution_thread = threading.Thread(
            target=self.node_execution_loop.run_forever, 
            daemon=True
        )
        self.node_evaluation_thread = threading.Thread(
            target=self.node_evaluation_loop.run_forever, 
            daemon=True
        )
        
        self.node_execution_thread.start()
        self.node_evaluation_thread.start()
        
        logger.info(f"GraphExecutor {self.tid} started")

3.2 图执行主流程

def on_graph_execution(
    self,
    graph_exec: GraphExecutionEntry,
    cancel: threading.Event,
    cluster_lock: ClusterLock,
):
    """
    图执行的主入口函数
    
    执行步骤:
    1. 初始化日志元数据和数据库客户端
    2. 检查执行状态和用户余额
    3. 获取图定义和验证结构
    4. 创建节点执行队列
    5. 启动节点执行循环
    6. 等待所有节点完成
    7. 更新最终状态和统计信息
    
    参数:
        graph_exec: 图执行请求对象
        cancel: 取消事件标志
        cluster_lock: 集群锁对象
    """
    # 初始化日志元数据
    log_metadata = LogMetadata(
        logger=_logger,
        user_id=graph_exec.user_id,
        graph_eid=graph_exec.graph_exec_id,
        graph_id=graph_exec.graph_id,
    )
    
    db_client = get_db_client()
    
    # 获取执行元数据
    exec_meta = db_client.get_graph_execution_meta(
        user_id=graph_exec.user_id,
        execution_id=graph_exec.graph_exec_id,
    )
    
    if exec_meta is None:
        log_metadata.warning("Graph execution not found")
        return
    
    # 检查状态
    if exec_meta.status not in [ExecutionStatus.QUEUED, ExecutionStatus.RUNNING]:
        log_metadata.info(f"Execution already in {exec_meta.status} state")
        return
    
    # 检查用户余额
    user_credit = db_client.get_user_credit(graph_exec.user_id)
    if user_credit.balance <= 0:
        log_metadata.warning("Insufficient balance")
        db_client.update_graph_execution_status(
            graph_exec.user_id,
            graph_exec.graph_exec_id,
            ExecutionStatus.FAILED,
            error="Insufficient balance"
        )
        return
    
    # 获取图定义
    graph = db_client.get_graph(
        graph_exec.graph_id,
        graph_exec.graph_version,
        user_id=graph_exec.user_id,
    )
    
    if not graph:
        log_metadata.error("Graph not found")
        return
    
    # 验证图结构
    try:
        graph.validate_graph(for_run=True)
    except Exception as e:
        log_metadata.error(f"Graph validation failed: {e}")
        db_client.update_graph_execution_status(
            graph_exec.user_id,
            graph_exec.graph_exec_id,
            ExecutionStatus.FAILED,
            error=str(e)
        )
        return
    
    # 更新状态为RUNNING
    db_client.update_graph_execution_status(
        graph_exec.user_id,
        graph_exec.graph_exec_id,
        ExecutionStatus.RUNNING,
        started_at=datetime.utcnow()
    )
    
    # 推送WebSocket事件
    get_execution_event_bus().publish(
        graph_exec.user_id,
        {
            "type": "execution_started",
            "execution_id": graph_exec.graph_exec_id,
            "graph_id": graph_exec.graph_id,
        }
    )
    
    # 初始化执行统计
    graph_stats = GraphExecutionStats()
    graph_stats_lock = threading.Lock()
    
    # 创建节点执行队列
    node_queue = asyncio.Queue()
    
    # 获取入口节点
    entry_nodes = graph.get_entry_nodes()
    for node in entry_nodes:
        node_exec_entry = NodeExecutionEntry(
            node_id=node.id,
            graph_exec_id=graph_exec.graph_exec_id,
            inputs=graph_exec.inputs.get(node.id, {}),
        )
        node_queue.put_nowait(node_exec_entry)
    
    # 执行节点循环
    asyncio.run_coroutine_threadsafe(
        self._execute_nodes(
            node_queue,
            graph,
            graph_exec,
            cancel,
            (graph_stats, graph_stats_lock),
        ),
        self.node_execution_loop
    ).result()
    
    # 更新最终状态
    final_status = (
        ExecutionStatus.CANCELLED if cancel.is_set()
        else ExecutionStatus.COMPLETED if graph_stats.failed_nodes == 0
        else ExecutionStatus.FAILED
    )
    
    db_client.update_graph_execution_status(
        graph_exec.user_id,
        graph_exec.graph_exec_id,
        final_status,
        ended_at=datetime.utcnow(),
        stats=graph_stats
    )
    
    # 推送完成事件
    get_execution_event_bus().publish(
        graph_exec.user_id,
        {
            "type": "execution_completed",
            "execution_id": graph_exec.graph_exec_id,
            "status": final_status.value,
            "stats": graph_stats.dict(),
        }
    )

3.3 节点执行循环

async def _execute_nodes(
    self,
    node_queue: asyncio.Queue,
    graph: Graph,
    graph_exec: GraphExecutionEntry,
    cancel: threading.Event,
    graph_stats_pair: tuple[GraphExecutionStats, threading.Lock],
):
    """
    节点执行的主循环
    
    循环逻辑:
    1. 从队列获取待执行节点
    2. 检查取消标志
    3. 执行节点逻辑
    4. 确定并入队下一批节点
    5. 重复直到队列为空
    
    特点:
    - 支持并发执行多个节点
    - 自动处理节点间依赖关系
    - 实时更新执行进度
    
    参数:
        node_queue: 节点执行队列
        graph: 图定义对象
        graph_exec: 图执行请求
        cancel: 取消事件
        graph_stats_pair: 统计信息和锁
    """
    # 创建执行任务集合
    pending_tasks = set()
    max_concurrent = settings.config.max_concurrent_nodes
    
    while True:
        # 检查取消标志
        if cancel.is_set():
            logger.info("Execution cancelled")
            break
        
        # 等待队列或任务完成
        if node_queue.empty() and not pending_tasks:
            break
        
        # 限制并发数
        while len(pending_tasks) < max_concurrent and not node_queue.empty():
            try:
                node_exec = node_queue.get_nowait()
            except asyncio.QueueEmpty:
                break
            
            # 创建执行任务
            task = asyncio.create_task(
                self.on_node_execution(
                    node_exec,
                    graph,
                    graph_exec,
                    graph_stats_pair,
                )
            )
            pending_tasks.add(task)
        
        # 等待至少一个任务完成
        if pending_tasks:
            done, pending_tasks = await asyncio.wait(
                pending_tasks,
                return_when=asyncio.FIRST_COMPLETED
            )
            
            # 处理完成的任务
            for task in done:
                try:
                    node_stats = task.result()
                    
                    # 入队下一批节点
                    next_nodes = await self._enqueue_next_nodes(
                        graph,
                        node_stats,
                        graph_exec,
                        node_queue
                    )
                    
                except Exception as e:
                    logger.exception(f"Node execution error: {e}")

3.4 单节点执行逻辑

async def on_node_execution(
    self,
    node_exec: NodeExecutionEntry,
    node_exec_progress: NodeExecutionProgress,
    nodes_input_masks: Optional[NodesInputMasks],
    graph_stats_pair: tuple[GraphExecutionStats, threading.Lock],
) -> NodeExecutionStats:
    """
    执行单个节点
    
    执行步骤:
    1. 获取节点定义和Block实例
    2. 准备输入数据和凭据
    3. 创建执行记录
    4. 调用Block的execute方法
    5. 收集输出数据
    6. 更新执行状态和统计
    7. 扣除积分费用
    
    参数:
        node_exec: 节点执行请求
        node_exec_progress: 进度追踪对象
        nodes_input_masks: 输入掩码配置
        graph_stats_pair: 图统计信息
        
    返回:
        NodeExecutionStats: 节点执行统计
    """
    # 获取节点定义
    node = graph.get_node(node_exec.node_id)
    if not node:
        raise ValueError(f"Node {node_exec.node_id} not found")
    
    # 获取Block实例
    block = get_block(node.block_id)
    if not block:
        raise ValueError(f"Block {node.block_id} not found")
    
    # 初始化日志元数据
    log_metadata = LogMetadata(
        logger=_logger,
        user_id=graph_exec.user_id,
        graph_eid=graph_exec.graph_exec_id,
        node_id=node.id,
        node_eid=node_exec.id,
        block_name=block.name,
    )
    
    # 创建统计对象
    stats = NodeExecutionStats(
        node_id=node.id,
        block_name=block.name,
        started_at=datetime.utcnow(),
    )
    
    db_client = get_db_client()
    
    try:
        # 创建节点执行记录
        db_client.create_node_execution(
            node_exec.id,
            graph_exec.graph_exec_id,
            node.id,
            ExecutionStatus.RUNNING,
            inputs=node_exec.inputs,
        )
        
        # 准备执行输入
        block_input = BlockInput(
            **node.input_default,
            **node_exec.inputs,
        )
        
        # 获取凭据
        credentials = self.creds_manager.get_credentials(
            graph_exec.user_id,
            node_exec.credentials_inputs or {},
        )
        
        # 执行Block
        log_metadata.info(f"Executing block {block.name}")
        
        outputs = {}
        async for output_name, output_data in block.execute(
            block_input,
            credentials=credentials,
            user_context=graph_exec.user_context,
        ):
            outputs[output_name] = output_data
            
            # 实时推送输出
            node_exec_progress.add_output(output_name, output_data)
        
        # 更新成功状态
        stats.status = ExecutionStatus.COMPLETED
        stats.outputs = outputs
        stats.ended_at = datetime.utcnow()
        
        db_client.update_node_execution(
            node_exec.id,
            ExecutionStatus.COMPLETED,
            outputs=outputs,
            ended_at=stats.ended_at,
        )
        
        # 计算并扣除积分
        cost = block_usage_cost(block, block_input, outputs)
        if cost > 0:
            db_client.deduct_credits(
                graph_exec.user_id,
                cost,
                f"Node execution: {node.id}",
            )
            stats.cost = cost
        
        log_metadata.info(f"Node completed, cost: {cost}")
        
    except Exception as e:
        # 记录错误
        stats.status = ExecutionStatus.FAILED
        stats.error = str(e)
        stats.ended_at = datetime.utcnow()
        
        db_client.update_node_execution(
            node_exec.id,
            ExecutionStatus.FAILED,
            error=str(e),
            ended_at=stats.ended_at,
        )
        
        log_metadata.error(f"Node failed: {e}")
    
    # 更新图统计
    graph_stats, lock = graph_stats_pair
    with lock:
        graph_stats.total_nodes += 1
        if stats.status == ExecutionStatus.COMPLETED:
            graph_stats.completed_nodes += 1
        else:
            graph_stats.failed_nodes += 1
        graph_stats.total_cost += stats.cost
    
    return stats

4. 集群锁管理机制

4.1 ClusterLock实现

ClusterLock基于Redis实现分布式锁,确保同一图执行在集群中只运行一次。

# /autogpt_platform/backend/backend/executor/cluster_lock.py

class ClusterLock:
    """
    基于Redis的集群锁
    
    功能特点:
    1. 分布式互斥:跨节点的执行唯一性保证
    2. 超时机制:防止死锁,自动释放
    3. 所有权验证:只有锁持有者可以释放
    4. 健康检查:支持锁状态查询
    
    使用场景:
    - 防止重复执行同一图
    - 协调多个执行管理器节点
    - 故障恢复时的状态同步
    """
    
    def __init__(
        self,
        redis: Redis,
        key: str,
        owner_id: str,
        timeout: int = 300,
    ):
        """
        初始化集群锁
        
        参数:
            redis: Redis客户端实例
            key: 锁的键名
            owner_id: 锁持有者标识符
            timeout: 锁超时时间(秒)
        """
        self.redis = redis
        self.key = key
        self.owner_id = owner_id
        self.timeout = timeout
        self._acquired = False
    
    def try_acquire(self) -> Optional[str]:
        """
        尝试获取锁
        
        获取流程:
        1. 使用SET NX命令尝试设置键
        2. 如果成功,返回当前owner_id
        3. 如果失败,返回当前锁持有者
        4. Redis不可用时返回None
        
        返回:
            str: 锁持有者ID,获取成功时为自己的owner_id
            None: Redis不可用或操作失败
        """
        try:
            # 尝试设置锁,NX表示键不存在时才设置
            result = self.redis.set(
                self.key,
                self.owner_id,
                nx=True,
                ex=self.timeout,
            )
            
            if result:
                self._acquired = True
                logger.info(
                    f"Acquired lock {self.key} for {self.owner_id}"
                )
                return self.owner_id
            else:
                # 锁已被其他进程持有
                current_owner = self.redis.get(self.key)
                if current_owner:
                    logger.debug(
                        f"Lock {self.key} held by {current_owner}"
                    )
                    return current_owner.decode() if isinstance(current_owner, bytes) else current_owner
                return None
                
        except Exception as e:
            logger.error(f"Failed to acquire lock {self.key}: {e}")
            return None
    
    def release(self) -> bool:
        """
        释放锁
        
        释放流程:
        1. 验证当前进程是锁持有者
        2. 删除Redis键
        3. 更新内部状态
        
        返回:
            bool: 是否成功释放
        """
        if not self._acquired:
            logger.warning(f"Attempting to release unacquired lock {self.key}")
            return False
        
        try:
            # 使用Lua脚本确保原子性
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
            """
            
            result = self.redis.eval(
                lua_script,
                1,
                self.key,
                self.owner_id,
            )
            
            if result:
                self._acquired = False
                logger.info(f"Released lock {self.key}")
                return True
            else:
                logger.warning(
                    f"Failed to release lock {self.key}: not owner"
                )
                return False
                
        except Exception as e:
            logger.error(f"Error releasing lock {self.key}: {e}")
            return False
    
    def is_locked(self) -> bool:
        """
        检查锁是否仍然有效
        
        返回:
            bool: 锁是否存在
        """
        try:
            return self.redis.exists(self.key) > 0
        except Exception as e:
            logger.error(f"Error checking lock {self.key}: {e}")
            return False
    
    def extend(self, additional_time: int) -> bool:
        """
        延长锁的超时时间
        
        用于长时间运行的任务,防止锁在执行过程中过期
        
        参数:
            additional_time: 额外的超时时间(秒)
            
        返回:
            bool: 是否成功延长
        """
        if not self._acquired:
            return False
        
        try:
            # 使用Lua脚本原子性地验证和延长
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("expire", KEYS[1], ARGV[2])
            else
                return 0
            end
            """
            
            result = self.redis.eval(
                lua_script,
                1,
                self.key,
                self.owner_id,
                self.timeout + additional_time,
            )
            
            return bool(result)
            
        except Exception as e:
            logger.error(f"Error extending lock {self.key}: {e}")
            return False

4.2 集群锁使用时序图

sequenceDiagram
    participant EM1 as ExecutionManager1
    participant EM2 as ExecutionManager2
    participant Redis as Redis
    participant Graph as GraphExecutor

    par 并发执行请求
        EM1->>Redis: SET NX exec_lock:123
        EM2->>Redis: SET NX exec_lock:123
    end
    
    Redis-->>EM1: OK (获取成功)
    Redis-->>EM2: NULL (已被占用)
    
    EM2->>EM2: 消息重新入队
    
    EM1->>Graph: 提交执行任务
    Graph->>Graph: 执行图逻辑
    
    alt 需要延长锁
        Graph->>Redis: EXPIRE exec_lock:123 600
        Redis-->>Graph: OK
    end
    
    Graph->>Graph: 执行完成
    Graph->>Redis: DEL exec_lock:123 (if owner)
    Redis-->>Graph: OK
    Graph-->>EM1: 执行结果

图4-1: 集群锁协调多节点执行时序图

此时序图展示了集群锁如何协调多个ExecutionManager节点。当两个节点同时收到相同的执行请求时,只有一个能成功获取锁并执行任务,另一个将消息重新入队等待。执行过程中可以延长锁的超时时间,执行完成后释放锁。

5. 执行状态追踪与通知

5.1 实时状态推送

执行引擎通过事件总线实时推送执行状态到WebSocket服务器。

# /autogpt_platform/backend/backend/executor/utils.py

class NodeExecutionProgress:
    """
    节点执行进度追踪器
    
    功能特点:
    1. 实时输出收集
    2. WebSocket事件推送
    3. 状态变更通知
    4. 错误信息记录
    """
    
    def __init__(
        self,
        user_id: str,
        graph_exec_id: str,
        node_id: str,
        node_exec_id: str,
    ):
        self.user_id = user_id
        self.graph_exec_id = graph_exec_id
        self.node_id = node_id
        self.node_exec_id = node_exec_id
        self.outputs = {}
    
    def add_output(self, name: str, data: Any):
        """
        添加节点输出并推送事件
        
        参数:
            name: 输出端口名称
            data: 输出数据
        """
        self.outputs[name] = data
        
        # 推送输出事件
        get_execution_event_bus().publish(
            self.user_id,
            {
                "type": "node_output",
                "execution_id": self.graph_exec_id,
                "node_id": self.node_id,
                "node_execution_id": self.node_exec_id,
                "output_name": name,
                "output_data": data,
            }
        )
    
    def update_status(self, status: ExecutionStatus):
        """
        更新节点执行状态
        
        参数:
            status: 新的执行状态
        """
        get_execution_event_bus().publish(
            self.user_id,
            {
                "type": "node_status_update",
                "execution_id": self.graph_exec_id,
                "node_id": self.node_id,
                "node_execution_id": self.node_exec_id,
                "status": status.value,
            }
        )
    
    def report_error(self, error: Exception):
        """
        报告节点执行错误
        
        参数:
            error: 异常对象
        """
        get_execution_event_bus().publish(
            self.user_id,
            {
                "type": "node_error",
                "execution_id": self.graph_exec_id,
                "node_id": self.node_id,
                "node_execution_id": self.node_exec_id,
                "error": str(error),
                "error_type": type(error).__name__,
            }
        )

5.2 执行完成通知

# /autogpt_platform/backend/backend/notifications/notifications.py

async def send_execution_notification(
    user_id: str,
    execution_id: str,
    graph_name: str,
    status: ExecutionStatus,
    stats: GraphExecutionStats,
):
    """
    发送执行完成通知
    
    通知渠道:
    1. WebSocket实时推送
    2. 邮件通知(如果启用)
    3. Discord通知(如果配置)
    
    参数:
        user_id: 用户ID
        execution_id: 执行ID
        graph_name: 图名称
        status: 执行状态
        stats: 执行统计信息
    """
    # 构建通知数据
    notification = NotificationEventModel(
        type=NotificationType.AGENT_RUN_COMPLETED,
        user_id=user_id,
        data=AgentRunData(
            execution_id=execution_id,
            graph_name=graph_name,
            status=status.value,
            total_nodes=stats.total_nodes,
            completed_nodes=stats.completed_nodes,
            failed_nodes=stats.failed_nodes,
            total_cost=stats.total_cost,
            execution_time=stats.execution_time,
        ),
        timestamp=datetime.utcnow(),
    )
    
    # 发送通知
    await queue_notification(notification)

6. Scheduler调度器

6.1 Scheduler核心功能

Scheduler负责定时任务的调度和执行,支持Cron表达式定义的周期性任务。

# /autogpt_platform/backend/backend/executor/scheduler.py

class Scheduler(AppService):
    """
    任务调度器
    
    核心功能:
    1. 定时触发图执行
    2. 支持Cron表达式定义周期
    3. 持久化调度任务状态
    4. 故障恢复和重试机制
    
    存储机制:
    - 使用SQLAlchemy JobStore持久化任务
    - 支持集群环境的任务调度
    - 自动清理过期任务
    """
    
    scheduler: BackgroundScheduler
    
    def run_service(self):
        """
        启动调度器服务
        
        初始化步骤:
        1. 创建异步事件循环
        2. 配置调度器执行器
        3. 设置JobStore
        4. 加载已有调度任务
        5. 启动调度器
        """
        # 初始化事件循环
        global _event_loop
        _event_loop = asyncio.new_event_loop()
        
        global _event_loop_thread
        _event_loop_thread = threading.Thread(
            target=_event_loop.run_forever, 
            daemon=True,
            name="SchedulerEventLoop"
        )
        _event_loop_thread.start()
        
        # 获取数据库配置
        db_schema, db_url = _extract_schema_from_url(
            os.getenv("DIRECT_URL")
        )
        
        # 配置调度器
        from apscheduler.executors.pool import ThreadPoolExecutor
        from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
        
        self.scheduler = BackgroundScheduler(
            executors={
                "default": ThreadPoolExecutor(
                    max_workers=self.db_pool_size()
                ),
            },
            job_defaults={
                "coalesce": True,  # 跳过冗余的错过任务
                "max_instances": 1000,  # 最大并发实例
                "misfire_grace_time": None,  # 错过任务的宽限时间
            },
            jobstores={
                Jobstores.EXECUTION.value: SQLAlchemyJobStore(
                    url=db_url,
                    tablename=f"{db_schema}.apscheduler_execution_jobs",
                ),
            },
        )
        
        # 启动调度器
        self.scheduler.start()
        logger.info("Scheduler started")
        
        # 保持运行
        while not self.stop_event.is_set():
            time.sleep(1)
        
        # 停止调度器
        self.scheduler.shutdown(wait=True)
        logger.info("Scheduler stopped")

6.2 调度任务创建

async def create_scheduled_execution(
    user_id: str,
    graph_id: str,
    schedule_cron: str,
    inputs: dict[str, Any],
    name: Optional[str] = None,
) -> str:
    """
    创建定时执行任务
    
    创建流程:
    1. 验证Cron表达式格式
    2. 创建调度任务记录
    3. 添加到调度器
    4. 返回任务ID
    
    参数:
        user_id: 用户ID
        graph_id: 要执行的图ID
        schedule_cron: Cron表达式
        inputs: 执行输入数据
        name: 任务名称
        
    返回:
        str: 任务ID
    """
    # 验证Cron表达式
    try:
        CronTrigger.from_crontab(schedule_cron)
    except Exception as e:
        raise ValueError(f"Invalid cron expression: {e}")
    
    # 创建任务ID
    job_id = str(uuid.uuid4())
    
    # 添加到调度器
    scheduler = get_scheduler()
    scheduler.scheduler.add_job(
        func=_execute_scheduled_graph,
        trigger=CronTrigger.from_crontab(schedule_cron),
        args=[user_id, graph_id, inputs],
        id=job_id,
        name=name or f"Scheduled execution for {graph_id}",
        jobstore=Jobstores.EXECUTION.value,
        replace_existing=True,
    )
    
    logger.info(
        f"Created scheduled job {job_id} for graph {graph_id}"
    )
    
    return job_id

async def _execute_scheduled_graph(
    user_id: str,
    graph_id: str,
    inputs: dict[str, Any],
):
    """
    执行调度的图任务
    
    执行步骤:
    1. 验证用户余额
    2. 创建执行请求
    3. 提交到执行队列
    4. 记录调度日志
    
    参数:
        user_id: 用户ID
        graph_id: 图ID
        inputs: 执行输入
    """
    logger.info(
        f"Executing scheduled graph {graph_id} for user {user_id}"
    )
    
    try:
        # 检查用户余额
        db_client = get_db_client()
        user_credit = db_client.get_user_credit(user_id)
        
        if user_credit.balance <= 0:
            logger.warning(
                f"Insufficient balance for scheduled execution: {user_id}"
            )
            return
        
        # 创建执行请求
        from backend.util.execution import add_graph_execution
        
        execution_meta = await add_graph_execution(
            graph_id=graph_id,
            user_id=user_id,
            inputs=inputs,
        )
        
        logger.info(
            f"Scheduled execution created: {execution_meta.id}"
        )
        
    except Exception as e:
        logger.exception(
            f"Error executing scheduled graph: {e}"
        )

7. 执行引擎性能优化

7.1 线程池配置优化

# /autogpt_platform/backend/backend/util/settings.py

class ExecutorSettings(BaseSettings):
    """
    执行器配置
    
    性能参数:
    - num_graph_workers: 图执行线程池大小
    - max_concurrent_nodes: 单图内最大并发节点数
    - cluster_lock_timeout: 集群锁超时时间
    - execution_timeout: 单个执行的最大时长
    """
    
    num_graph_workers: int = Field(
        default=10,
        description="图执行线程池大小",
        ge=1,
        le=100,
    )
    
    max_concurrent_nodes: int = Field(
        default=5,
        description="单图内最大并发节点数",
        ge=1,
        le=50,
    )
    
    cluster_lock_timeout: int = Field(
        default=300,
        description="集群锁超时时间(秒)",
        ge=30,
        le=3600,
    )
    
    execution_timeout: int = Field(
        default=1800,
        description="单个执行的最大时长(秒)",
        ge=60,
        le=7200,
    )
    
    @property
    def optimal_thread_pool_size(self) -> int:
        """
        计算最优线程池大小
        
        基于以下因素:
        1. CPU核心数
        2. IO密集型工作负载特征
        3. 系统内存限制
        
        返回:
            int: 推荐的线程池大小
        """
        import os
        cpu_count = os.cpu_count() or 4
        
        # IO密集型任务,线程数可以超过CPU核心数
        optimal_size = min(
            cpu_count * 4,
            self.num_graph_workers
        )
        
        return optimal_size

7.2 执行监控指标

# Prometheus指标定义

active_runs_gauge = Gauge(
    "autogpt_active_executions",
    "当前活跃的图执行数量",
    ["executor_id"]
)

execution_duration_histogram = Histogram(
    "autogpt_execution_duration_seconds",
    "图执行耗时分布",
    ["graph_id", "status"],
    buckets=(1, 5, 10, 30, 60, 120, 300, 600, 1800, 3600)
)

node_execution_counter = Counter(
    "autogpt_node_executions_total",
    "节点执行总数",
    ["block_type", "status"]
)

credit_deduction_counter = Counter(
    "autogpt_credits_deducted_total",
    "积分扣除总额",
    ["user_id"]
)