Dify-01-Workflow工作流引擎-完整剖析

目录


1. 模块概览

1.1 职责与边界

Workflow工作流引擎是Dify平台的核心编排系统,负责将复杂的AI任务分解为可视化的节点图,并按照依赖关系和条件逻辑进行调度执行。

核心职责:

  • 图结构定义与验证:支持DAG(有向无环图)和有条件分支的工作流
  • 节点编排与调度:根据节点依赖关系动态调度执行
  • 并行执行:无依赖节点自动并行,提升执行效率
  • 状态管理:支持暂停/恢复/停止,状态持久化
  • 事件驱动:通过事件总线实现节点间解耦
  • 变量传递:全局变量池支持节点间数据流转
  • 错误处理:支持节点级别重试和失败处理策略
  • 外部控制:通过命令通道实现外部实时控制

输入/输出:

输入:

  • Graph定义(节点+边)
  • 用户输入变量
  • 系统变量(用户ID、租户ID等)
  • 环境变量

输出:

  • GraphEngineEvent流:节点执行事件、错误事件、完成事件
  • 变量池:最终的所有变量值
  • 执行状态:成功/失败/中断
  • 执行元数据:耗时、Token消耗等

上下游依赖:

上游调用方:

  • WorkflowAppRunner:工作流应用运行器
  • Console API:控制台手动触发
  • Service API:外部API调用

下游依赖:

  • 节点执行器:各类节点实现(LLM、Code、HTTP等)
  • Model Runtime:LLM调用
  • PostgreSQL:状态持久化
  • Redis:命令通道(分布式场景)

1.2 生命周期

stateDiagram-v2
    [*] --> 初始化: 创建GraphEngine
    初始化 --> 就绪: 加载Graph定义
    
    就绪 --> 运行中: run()
    
    state 运行中 {
        [*] --> 加载起始节点
        加载起始节点 --> 节点执行
        节点执行 --> 处理事件
        处理事件 --> 边遍历
        边遍历 --> 更新就绪队列
        更新就绪队列 --> 节点执行: 有就绪节点
        更新就绪队列 --> 完成检查: 无就绪节点
        完成检查 --> 节点执行: 有运行中节点
    }
    
    运行中 --> 成功: 所有节点执行完成
    运行中 --> 失败: 节点执行失败
    运行中 --> 中断: 收到停止命令
    
    成功 --> [*]
    失败 --> [*]
    中断 --> [*]: 可恢复

生命周期阶段说明:

  1. 初始化阶段:创建GraphEngine实例,初始化所有子系统,从持久化状态恢复(如果是断点续传),验证Graph一致性

  2. 就绪阶段:等待外部调用run()方法,此时可以添加Layer扩展功能

  3. 运行阶段:发送GraphRunStartedEvent,循环处理节点执行,实时响应外部命令,流式输出事件

  4. 终止阶段:正常完成、失败或中断,保存最终状态到数据库


2. 整体架构

2.1 架构图

flowchart TB
    subgraph "GraphEngine 核心引擎"
        Dispatcher[Dispatcher<br/>事件分发器]
        ExecutionCoordinator[Execution Coordinator<br/>执行协调器]
    end
    
    subgraph "状态管理 State Management"
        GraphExecution[Graph Execution<br/>执行聚合根]
        GraphStateManager[Graph State Manager<br/>状态管理器]
        VariablePool[Variable Pool<br/>变量池]
    end
    
    subgraph "队列系统 Queue System"
        ReadyQueue[Ready Queue<br/>就绪队列]
        EventQueue[Event Queue<br/>事件队列]
    end
    
    subgraph "节点执行 Node Execution"
        WorkerPool[Worker Pool<br/>工作池]
        NodeExecutor[Node Executor<br/>节点执行器]
        RetryManager[Retry Manager<br/>重试管理器]
    end
    
    subgraph "事件系统 Event System"
        EventManager[Event Manager<br/>事件管理器]
        EventHandler[Event Handler<br/>事件处理器]
    end
    
    subgraph "图遍历 Graph Traversal"
        EdgeProcessor[Edge Processor<br/>边处理器]
        SkipPropagator[Skip Propagator<br/>跳过传播器]
    end
    
    subgraph "响应协调 Response Coordination"
        ResponseCoordinator[Response Coordinator<br/>响应流协调器]
    end
    
    subgraph "命令处理 Command Processing"
        CommandChannel[Command Channel<br/>命令通道]
        CommandProcessor[Command Processor<br/>命令处理器]
    end
    
    subgraph "错误处理 Error Handling"
        ErrorHandler[Error Handler<br/>错误处理器]
    end
    
    subgraph "扩展层 Extensibility"
        Layers[Graph Engine Layers<br/>可插拔层]
    end
    
    Dispatcher --> ExecutionCoordinator
    Dispatcher --> EventQueue
    Dispatcher --> EventHandler
    
    ExecutionCoordinator --> GraphStateManager
    ExecutionCoordinator --> CommandProcessor
    ExecutionCoordinator --> WorkerPool
    
    GraphStateManager --> GraphExecution
    GraphStateManager --> ReadyQueue
    GraphStateManager --> VariablePool
    
    WorkerPool --> NodeExecutor
    NodeExecutor --> RetryManager
    NodeExecutor --> EventQueue
    
    EventHandler --> EdgeProcessor
    EventHandler --> ErrorHandler
    EventHandler --> EventManager
    
    EdgeProcessor --> SkipPropagator
    EdgeProcessor --> GraphStateManager
    EdgeProcessor --> ResponseCoordinator
    
    CommandProcessor --> CommandChannel
    CommandProcessor --> GraphExecution
    
    Layers -.扩展.-> Dispatcher

2.2 架构分层说明

1. 核心引擎层 (GraphEngine)

GraphEngine是整个工作流引擎的门面,对外提供简单的run()接口。内部通过模块化设计,将职责分配给各个专门的子系统。

  • Dispatcher(事件分发器):核心调度循环,负责从事件队列中取出事件,分发给对应的处理器,驱动整个执行流程
  • ExecutionCoordinator(执行协调器):高层协调逻辑,检查命令、触发Worker扩缩容、判断执行完成

2. 状态管理层

负责工作流执行过程中所有状态的维护和变更。

  • GraphExecution(执行聚合根):DDD聚合根,维护工作流运行级别的状态
  • GraphStateManager(状态管理器):统一的状态管理门面
  • VariablePool(变量池):全局变量存储,支持节点间数据传递

3. 队列系统

解耦节点依赖,支持并行执行。

  • ReadyQueue(就绪队列):存储可立即执行的节点,支持优先级调度
  • EventQueue(事件队列):节点执行产生的事件临时缓冲

4. 节点执行层

负责节点的实际执行。

  • WorkerPool(工作池):线程池管理,动态扩缩容
  • NodeExecutor(节点执行器):在Worker线程中运行,调用节点的_run()方法
  • RetryManager(重试管理器):节点失败时根据配置执行重试逻辑

5. 事件系统

事件驱动架构,解耦各模块。

  • EventManager(事件管理器):收集和发布事件,支持流式输出给外部调用方
  • EventHandler(事件处理器):注册各类事件的处理逻辑

6. 图遍历层

负责根据执行结果决定下一步执行路径。

  • EdgeProcessor(边处理器):处理节点执行后的出边,判断条件分支
  • SkipPropagator(跳过传播器):传播跳过状态

7. 响应协调层

管理流式响应的生成和路径追踪。

8. 命令处理层

接收外部控制命令。

  • CommandChannel(命令通道):命令传输通道(内存/Redis)
  • CommandProcessor(命令处理器):处理AbortCommand等命令

9. 错误处理层

统一的错误处理和恢复。

10. 扩展层

通过Layer模式扩展功能。


3. 核心数据结构

3.1 Graph(图结构)

3.1.1 UML类图

classDiagram
    class Graph {
        +dict~str,Node~ nodes
        +dict~str,Edge~ edges
        +dict~str,list[str]~ in_edges
        +dict~str,list[str]~ out_edges
        +Node root_node
        +add_node(node)
        +add_edge(edge)
        +get_node(node_id) Node
    }
    
    class Node {
        <<abstract>>
        +str id
        +str tenant_id
        +NodeState state
        +GraphRuntimeState graph_runtime_state
        +run() Generator
        #_run() NodeRunResult
    }
    
    class Edge {
        +str id
        +str tail
        +str head
        +str source_handle
        +NodeState state
    }
    
    Graph "1" *-- "*" Node
    Graph "1" *-- "*" Edge
    Edge ..> Node

3.1.2 字段说明

Graph:

字段 类型 说明
nodes dict[str, Node] 节点字典,key为node_id
edges dict[str, Edge] 边字典,key为edge_id
in_edges dict[str, list[str]] 入边映射
out_edges dict[str, list[str]] 出边映射
root_node Node 根节点(起始节点)

Node:

字段 类型 说明
id str 节点执行实例ID
_node_id str 节点配置ID
state NodeState 节点执行状态
graph_runtime_state GraphRuntimeState 共享的运行时状态引用

Edge:

字段 类型 说明
tail str 源节点ID
head str 目标节点ID
source_handle str 源句柄(条件分支)

3.2 GraphRuntimeState(运行时状态)

3.2.1 UML类图

classDiagram
    class GraphRuntimeState {
        -VariablePool _variable_pool
        -float _start_at
        -int _total_tokens
        -dict _outputs
        +variable_pool VariablePool
        +set_output(key, value)
        +add_tokens(tokens)
    }
    
    class VariablePool {
        +defaultdict variable_dictionary
        +dict user_inputs
        +SystemVariable system_variables
        +add(selector, value)
        +get(selector) Segment
        +convert_template(template) SegmentGroup
    }
    
    GraphRuntimeState "1" *-- "1" VariablePool

3.2.2 核心方法

class VariablePool:
    def add(self, selector: Sequence[str], value: Any, /):
        """添加变量到变量池
        
        Args:
            selector: [node_id, variable_name]
            value: 任意可序列化的值
        """
        node_id, name = selector
        segment = variable_factory.build_segment(value)
        self.variable_dictionary[node_id][name] = segment
    
    def get(self, selector: Sequence[str], /) -> Segment | None:
        """获取变量
        
        Args:
            selector: 支持多种格式
                - [node_id, var_name]:获取完整变量
                - [node_id, var_name, field]:获取对象字段
        
        Returns:
            Segment对象或None
        """
        node_id, name = selector[:2]
        segment = self.variable_dictionary[node_id].get(name)
        
        if len(selector) > 2:
            # 处理嵌套属性访问
            for attr in selector[2:]:
                segment = self._get_nested_attribute(segment, attr)
        
        return segment

4. API详细规格

4.1 WorkflowEntry.run()

4.1.1 基本信息

  • 名称WorkflowEntry.run()
  • 用途:启动工作流执行,流式输出执行事件
  • 调用场景:应用运行时调用

4.1.2 方法签名

def run(self) -> Generator[GraphEngineEvent, None, None]:
    """运行工作流,生成器返回事件流"""

4.1.3 返回值

Generator[GraphEngineEvent, None, None]:生成器,逐个产出事件

事件类型:

事件类 说明 包含信息
GraphRunStartedEvent 工作流开始 开始时间
GraphNodeRunStartedEvent 节点开始执行 node_id, inputs
GraphNodeRunSucceededEvent 节点成功完成 node_id, outputs
GraphNodeRunFailedEvent 节点执行失败 node_id, error
GraphRunSucceededEvent 工作流成功完成 outputs
GraphRunFailedEvent 工作流失败 error

4.1.4 核心实现

def run(self) -> Generator[GraphEngineEvent, None, None]:
    graph_engine = self.graph_engine

    try:
        # 运行图引擎,yield所有事件
        generator = graph_engine.run()
        yield from generator
    except GenerateTaskStoppedError:
        # 任务被外部停止,正常退出
        pass
    except Exception as e:
        # (此处省略日志记录)
        yield GraphRunFailedEvent(error=str(e))
        return

4.1.5 调用示例

# 创建WorkflowEntry
workflow_entry = WorkflowEntry(
    tenant_id="tenant-123",
    app_id="app-456",
    workflow_id="wf-789",
    graph_config=graph_config,
    graph=graph,
    user_id="user-001",
    user_from=UserFrom.ACCOUNT,
    invoke_from=InvokeFrom.SERVICE,
    call_depth=0,
    variable_pool=variable_pool,
    graph_runtime_state=graph_runtime_state,
)

# 执行工作流并处理事件
for event in workflow_entry.run():
    if isinstance(event, GraphRunStartedEvent):
        print("Workflow started")
    elif isinstance(event, GraphNodeRunSucceededEvent):
        print(f"Node {event.node_id} completed")
    elif isinstance(event, GraphRunSucceededEvent):
        print(f"Workflow succeeded: {event.outputs}")

4.2 GraphEngine.run()

4.2.1 核心执行流程

def run(self) -> Generator[GraphEngineEvent, None, None]:
    try:
        # 1. 初始化扩展层
        self._initialize_layers()

        # 2. 标记开始执行
        self._graph_execution.start()
        yield GraphRunStartedEvent()

        # 3. 启动子系统
        self._start_execution()

        # 4. 流式输出事件(核心循环)
        yield from self._event_manager.emit_events()

        # 5. 处理完成状态
        if self._graph_execution.aborted:
            yield GraphRunAbortedEvent(
                reason="Workflow execution aborted",
                outputs=self._graph_runtime_state.outputs,
            )
        elif self._graph_execution.has_error:
            if self._graph_execution.error:
                raise self._graph_execution.error
        else:
            outputs = self._graph_runtime_state.outputs
            if self._graph_execution.exceptions_count > 0:
                yield GraphRunPartialSucceededEvent(
                    exceptions_count=self._graph_execution.exceptions_count,
                    outputs=outputs,
                )
            else:
                yield GraphRunSucceededEvent(outputs=outputs)

    except Exception as e:
        yield GraphRunFailedEvent(error=str(e))
        raise

    finally:
        # 6. 停止子系统
        self._stop_execution()

4.2.2 启动子系统

def _start_execution(self):
    # 初始化图状态(将起始节点加入就绪队列)
    self._state_manager.initialize_graph_state()
    
    # 启动Worker Pool
    self._worker_pool.start()
    
    # 启动Dispatcher主循环(在单独线程中)
    self._dispatcher_thread = threading.Thread(
        target=self._dispatcher.run,
        daemon=True
    )
    self._dispatcher_thread.start()

4.3 VariablePool.add() 和 get()

4.3.1 add() - 添加变量

def add(self, selector: Sequence[str], value: Any, /):
    # 1. 校验选择器长度
    if len(selector) != 2:
        raise ValueError("Invalid selector")

    # 2. 转换为Variable对象
    if isinstance(value, Segment):
        variable = value
    else:
        # 自动包装为Segment
        segment = variable_factory.build_segment(value)
        variable = segment

    # 3. 存储到二级字典
    node_id, name = selector
    self.variable_dictionary[node_id][name] = variable

4.3.2 get() - 获取变量

def get(self, selector: Sequence[str], /) -> Segment | None:
    # 1. 获取基础变量
    node_id, name = selector[:2]
    segment = self.variable_dictionary[node_id].get(name)

    if segment is None:
        return None

    # 2. 如果只有2个元素,直接返回
    if len(selector) == 2:
        return segment

    # 3. 处理文件属性
    if isinstance(segment, FileSegment):
        attr = selector[2]
        attr_value = file_manager.get_attr(segment.value, attr)
        return variable_factory.build_segment(attr_value)

    # 4. 处理对象嵌套属性
    result = segment
    for attr in selector[2:]:
        result = self._get_nested_attribute(result, attr)
        if result is None:
            return None

    return result

5. 执行流程与时序

5.1 基础工作流执行流程

5.1.1 时序图

sequenceDiagram
    autonumber
    participant C as Caller
    participant WE as WorkflowEntry
    participant GE as GraphEngine
    participant SM as StateManager
    participant RQ as ReadyQueue
    participant WP as WorkerPool
    participant N as Node (LLM)
    participant VP as VariablePool
    
    C->>WE: WorkflowEntry.run()
    WE->>GE: GraphEngine.run()
    GE->>GE: 初始化Layers
    GE-->>WE: yield GraphRunStartedEvent
    
    GE->>SM: initialize_graph_state()
    SM->>RQ: enqueue(start_node)
    GE->>WP: start()
    
    loop 执行循环
        WP->>N: start_node.run()
        N->>VP: 获取user_inputs
        VP-->>N: {"query": "What is AI?"}
        N->>VP: 添加输出变量
        N-->>WP: yield NodeRunSucceededEvent
        
        WP->>SM: 更新节点状态
        WP->>EdgeProcessor: process_edges(start_node)
        EdgeProcessor->>RQ: enqueue(llm_node)
        
        WP->>N: llm_node.run()
        N->>VP: 获取上游变量
        N->>ModelManager: invoke_llm(query)
        
        loop 流式输出
            ModelManager-->>N: chunk
            N-->>WE: yield StreamChunkEvent
        end
        
        N->>VP: 添加输出变量
        N-->>WP: yield NodeRunSucceededEvent
        
        WP->>EdgeProcessor: process_edges(llm_node)
        EdgeProcessor->>RQ: enqueue(end_node)
        
        WP->>N: end_node.run()
        N->>VP: 获取输出变量
        N->>GraphRuntimeState: set_output('answer', text)
        N-->>WP: yield NodeRunSucceededEvent
    end
    
    GE->>GE: 检查执行完成
    GE-->>WE: yield GraphRunSucceededEvent
    WE-->>C: 完成

5.2 并行节点执行流程

5.2.1 时序图

sequenceDiagram
    autonumber
    participant D as Dispatcher
    participant RQ as ReadyQueue
    participant WP as WorkerPool
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant SM as StateManager
    
    Note over D,SM: Start节点完成后
    D->>EdgeProcessor: process_edges(start_node)
    EdgeProcessor->>RQ: enqueue(llm1)
    EdgeProcessor->>RQ: enqueue(llm2)
    
    Note over RQ: 就绪队列: [llm1, llm2]
    
    par 并行执行
        D->>WP: submit_task(llm1)
        WP->>W1: 分配Worker 1
        W1->>LLM1: llm1.run()
        LLM1->>LLM1: 调用LLM (5s)
        LLM1-->>W1: NodeRunSucceededEvent
    and
        D->>WP: submit_task(llm2)
        WP->>W2: 分配Worker 2
        W2->>LLM2: llm2.run()
        LLM2->>LLM2: 调用LLM (5s)
        LLM2-->>W2: NodeRunSucceededEvent
    end
    
    Note over D,SM: 等待两者都完成
    D->>SM: 检查aggregator依赖
    SM-->>D: llm1, llm2都已完成
    D->>RQ: enqueue(aggregator)

关键点说明:

  1. 依赖检查:EdgeProcessor检查后继节点的所有入边是否都已完成
  2. 并行度:llm1和llm2同时在就绪队列中,WorkerPool分配不同Worker并行执行
  3. 性能提升:串行10s,并行5s,提升50%效率

5.3 条件分支执行流程

5.3.1 时序图

sequenceDiagram
    autonumber
    participant D as Dispatcher
    participant C as Classifier Node
    participant EP as EdgeProcessor
    participant SP as SkipPropagator
    participant SM as StateManager
    
    D->>C: classifier.run()
    C->>C: 分析问题类型
    C->>C: 判定为"creative"
    C->>VP: add(['classifier', 'type'], "creative")
    C-->>D: NodeRunSucceededEvent
    
    D->>EP: process_edges(classifier)
    
    Note over EP: 边1: creative分支
    EP->>EP: 判断source_handle == "creative"
    EP->>EP: ✅ 条件满足
    EP->>RQ: enqueue(llm_creative)
    
    Note over EP: 边2: factual分支
    EP->>EP: 判断source_handle == "factual"
    EP->>EP: ❌ 条件不满足
    EP->>SP: propagate_skip(llm_factual)
    
    SP->>SM: 标记llm_factual为SKIPPED
    SP->>SM: 检查后继节点
    SP->>SM: end_node有其他路径,不跳过

跳过传播机制:

  • 触发条件:条件分支未选中、If-Else条件不满足
  • 传播规则
    • 标记节点为SKIPPED状态
    • 递归检查该节点的后继节点
    • 如果后继节点的所有前驱都被跳过,则继续传播
    • 如果后继节点有其他活跃路径,则停止传播

6. 关键功能深入分析

6.1 Worker Pool动态扩缩容

6.1.1 功能说明

WorkerPool根据就绪队列长度动态调整Worker数量,优化资源使用。

6.1.2 核心实现

class WorkerPool:
    def __init__(
        self,
        min_workers: int = 1,
        max_workers: int = 10,
        scale_up_threshold: int = 5,
        scale_down_idle_time: float = 5.0,
    ):
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_idle_time = scale_down_idle_time
        self.workers: list[Worker] = []
        self.ready_queue = ready_queue
    
    def _scale_workers(self):
        """动态扩缩容"""
        queue_length = self.ready_queue.size()
        active_workers = len([w for w in self.workers if w.is_busy()])
        
        # 扩容条件:队列长度超过阈值且未达上限
        if queue_length > self.scale_up_threshold and len(self.workers) < self.max_workers:
            new_worker = Worker(self.ready_queue, self.event_queue)
            new_worker.start()
            self.workers.append(new_worker)
            # (此处省略日志)
        
        # 缩容条件:Worker空闲时间超过阈值且超过最小数量
        for worker in self.workers[:]:
            if worker.idle_time > self.scale_down_idle_time and len(self.workers) > self.min_workers:
                worker.stop()
                self.workers.remove(worker)
                # (此处省略日志)

6.2 节点重试机制

6.2.1 重试配置

nodes:
  - id: http_request
    data:
      error_strategy: continue
      retry:
        enabled: true
        max_attempts: 3
        backoff_factor: 2
        retry_on:
          - TimeoutError
          - ConnectionError

6.2.2 重试实现

class RetryManager:
    def should_retry(self, node, error, retry_count):
        retry_config = node.retry_config
        
        if not retry_config.enabled:
            return False
        
        if retry_count >= retry_config.max_attempts:
            return False
        
        # 检查错误类型是否匹配
        error_type = type(error).__name__
        if error_type not in retry_config.retry_on:
            return False
        
        return True
    
    def execute_with_retry(self, node):
        retry_count = 0
        last_error = None
        
        while retry_count <= node.retry_config.max_attempts:
            try:
                result = node._run()
                return result
            except Exception as e:
                last_error = e
                
                if not self.should_retry(node, e, retry_count):
                    raise
                
                # 等待退避时间
                wait_time = node.retry_config.backoff_factor ** retry_count
                time.sleep(wait_time)
                retry_count += 1
        
        raise last_error

6.3 命令通道实现

6.3.1 Redis通道(分布式场景)

class RedisChannel(CommandChannel):
    def __init__(self, redis_client: Redis, channel_key: str):
        self.redis = redis_client
        self.channel_key = channel_key
    
    def send_command(self, command: Command) -> None:
        """发送命令到Redis队列"""
        command_json = command.model_dump_json()
        self.redis.rpush(self.channel_key, command_json)
    
    def receive_command(self) -> Command | None:
        """从Redis队列接收命令(非阻塞)"""
        command_json = self.redis.lpop(self.channel_key)
        if not command_json:
            return None
        return Command.model_validate_json(command_json)

6.3.2 内存通道(单进程场景)

class InMemoryChannel(CommandChannel):
    def __init__(self):
        self.queue = queue.Queue()
    
    def send_command(self, command: Command) -> None:
        self.queue.put(command)
    
    def receive_command(self) -> Command | None:
        try:
            return self.queue.get_nowait()
        except queue.Empty:
            return None

7. 实战案例与最佳实践

7.1 案例1:构建多步骤工作流

7.1.1 场景描述

构建一个多步骤工作流:接收用户输入 → 知识库检索 → LLM生成 → 代码执行 → 返回结果。

7.1.2 工作流DSL

graph:
  nodes:
    - id: start
      type: start
      data:
        variables:
          - variable: query
            type: string
    
    - id: knowledge_retrieval
      type: knowledge-retrieval
      data:
        dataset_ids: ["dataset-xxx"]
        query_variable_selector: ["start", "query"]
        top_k: 3
    
    - id: llm
      type: llm
      data:
        model:
          provider: openai
          name: gpt-4
        prompt_template:
          - role: system
            text: "You are a code generator."
          - role: user
            text: "Based on: {{#knowledge_retrieval.result#}}\nQuestion: {{#start.query#}}"
    
    - id: code
      type: code
      data:
        code_language: python3
        code: "{{#llm.text#}}"
    
    - id: end
      type: end
      data:
        outputs:
          - variable: result
            value_selector: ["code", "result"]
  
  edges:
    - source: start
      target: knowledge_retrieval
    - source: knowledge_retrieval
      target: llm
    - source: llm
      target: code
    - source: code
      target: end

7.1.3 执行示例

POST /console/api/workflows/{workflow_id}/run
{
    "inputs": {
        "query": "Calculate factorial of 5"
    },
    "response_mode": "streaming"
}

输出:

event: node_started
data: {"node_id": "start", ...}

event: node_finished
data: {"node_id": "start", "outputs": {"query": "Calculate factorial of 5"}}

event: node_started
data: {"node_id": "knowledge_retrieval", ...}

event: node_finished
data: {"node_id": "knowledge_retrieval", "outputs": {"result": [...]}}

event: node_started
data: {"node_id": "llm", ...}

event: text_chunk
data: {"node_id": "llm", "text": "def"}

event: text_chunk
data: {"node_id": "llm", "text": " factorial"}

...

event: workflow_finished
data: {"outputs": {"result": 120}}

7.2 案例2:并行处理优化

7.2.1 场景描述

需要调用多个API获取数据,串行执行耗时长,通过并行优化性能。

7.2.2 优化前(串行)

edges:
  - source: start
    target: api1
  - source: api1
    target: api2
  - source: api2
    target: api3
  - source: api3
    target: aggregator

耗时:3s + 3s + 3s = 9s

7.2.3 优化后(并行)

edges:
  - source: start
    target: api1
  - source: start
    target: api2
  - source: start
    target: api3
  - source: api1
    target: aggregator
  - source: api2
    target: aggregator
  - source: api3
    target: aggregator

耗时:max(3s, 3s, 3s) = 3s

性能提升:67%

7.3 最佳实践总结

7.3.1 变量命名规范

  • 节点ID:使用描述性名称+序号,如llm-summarize-1
  • 变量名:使用蛇形命名,如user_query, api_response
  • 避免冲突:不同节点可以有同名变量,利用命名空间隔离

7.3.2 错误处理策略

1. TERMINATE(终止)

  • 适用:关键节点失败,无法继续
  • 示例:鉴权失败、必需参数缺失

2. CONTINUE(继续)

  • 适用:可选节点失败,不影响主流程
  • 示例:发送通知失败、记录日志失败

3. SKIP(跳过)

  • 适用:某条路径失败,但不影响其他路径
  • 示例:多路检索中某个数据源失败

7.3.3 性能优化

1. 减少变量池查找

# 不推荐:多次查找相同变量
for i in range(100):
    value = variable_pool.get(['node', 'var'])
    process(value)

# 推荐:缓存变量引用
cached_value = variable_pool.get(['node', 'var'])
for i in range(100):
    process(cached_value)

2. Worker Pool配置

worker_pool = WorkerPool(
    min_workers=1,
    max_workers=10,
    scale_up_threshold=5,     # 队列长度超过5时扩容
    scale_down_idle_time=5.0, # 空闲5秒后缩容
)

3. 状态持久化

# 保存状态(暂停场景)
graph_runtime_state._ready_queue_json = ready_queue.dumps()
graph_runtime_state._graph_execution_json = graph_execution.dumps()

# 存入数据库
save_workflow_state(workflow_run_id, graph_runtime_state)

# 恢复状态
loaded_state = load_workflow_state(workflow_run_id)
graph_engine = GraphEngine(
    workflow_id=workflow_id,
    graph=graph,
    graph_runtime_state=loaded_state,
)

7.3.4 监控指标

metrics = {
    'total_nodes': 0,
    'succeeded': 0,
    'failed': 0,
    'skipped': 0,
    'total_time': 0,
    'llm_tokens': 0,
}

for event in workflow_entry.run():
    if isinstance(event, NodeRunSucceededEvent):
        metrics['succeeded'] += 1
    elif isinstance(event, NodeRunFailedEvent):
        metrics['failed'] += 1
    elif isinstance(event, GraphRunSucceededEvent):
        metrics['llm_tokens'] = event.total_tokens

附录

A. 节点类型清单

类型 说明 主要功能
START 起始节点 接收用户输入
END 结束节点 收集最终输出
LLM LLM节点 调用大语言模型
CODE 代码执行 Python/JavaScript沙箱
HTTP_REQUEST HTTP请求 调用外部API
IF_ELSE 条件分支 根据条件选择路径
KNOWLEDGE_RETRIEVAL 知识库检索 检索数据集
TOOL 工具调用 调用注册的工具
ITERATION 迭代循环 批量处理列表
VARIABLE_AGGREGATOR 变量聚合 合并多个变量

B. 常见问题

Q1: 工作流执行卡住不动?

A:

  1. 检查节点依赖是否形成循环
  2. 检查条件分支是否有未覆盖的情况
  3. 查看Worker Pool是否有可用Worker
  4. 检查LLM调用是否超时

Q2: 如何调试工作流?

A:

  1. 使用single_step_run()单独测试节点
  2. 添加DebugLoggingLayer记录详细日志
  3. 检查变量池内容
  4. 查看节点执行记录

Q3: 如何优化工作流性能?

A:

  1. 使用并行执行减少总耗时
  2. 优化LLM提示词减少Token消耗
  3. 缓存重复的API调用结果
  4. 调整Worker Pool参数

文档版本:v1.0
生成日期:2025-10-04
维护者:Backend Team