Dify-01-Workflow工作流引擎-完整剖析
目录
1. 模块概览
1.1 职责与边界
Workflow工作流引擎是Dify平台的核心编排系统,负责将复杂的AI任务分解为可视化的节点图,并按照依赖关系和条件逻辑进行调度执行。
核心职责:
- 图结构定义与验证:支持DAG(有向无环图)和有条件分支的工作流
- 节点编排与调度:根据节点依赖关系动态调度执行
- 并行执行:无依赖节点自动并行,提升执行效率
- 状态管理:支持暂停/恢复/停止,状态持久化
- 事件驱动:通过事件总线实现节点间解耦
- 变量传递:全局变量池支持节点间数据流转
- 错误处理:支持节点级别重试和失败处理策略
- 外部控制:通过命令通道实现外部实时控制
输入/输出:
输入:
- Graph定义(节点+边)
- 用户输入变量
- 系统变量(用户ID、租户ID等)
- 环境变量
输出:
- GraphEngineEvent流:节点执行事件、错误事件、完成事件
- 变量池:最终的所有变量值
- 执行状态:成功/失败/中断
- 执行元数据:耗时、Token消耗等
上下游依赖:
上游调用方:
WorkflowAppRunner:工作流应用运行器Console API:控制台手动触发Service API:外部API调用
下游依赖:
- 节点执行器:各类节点实现(LLM、Code、HTTP等)
- Model Runtime:LLM调用
- PostgreSQL:状态持久化
- Redis:命令通道(分布式场景)
1.2 生命周期
stateDiagram-v2
[*] --> 初始化: 创建GraphEngine
初始化 --> 就绪: 加载Graph定义
就绪 --> 运行中: run()
state 运行中 {
[*] --> 加载起始节点
加载起始节点 --> 节点执行
节点执行 --> 处理事件
处理事件 --> 边遍历
边遍历 --> 更新就绪队列
更新就绪队列 --> 节点执行: 有就绪节点
更新就绪队列 --> 完成检查: 无就绪节点
完成检查 --> 节点执行: 有运行中节点
}
运行中 --> 成功: 所有节点执行完成
运行中 --> 失败: 节点执行失败
运行中 --> 中断: 收到停止命令
成功 --> [*]
失败 --> [*]
中断 --> [*]: 可恢复
生命周期阶段说明:
-
初始化阶段:创建GraphEngine实例,初始化所有子系统,从持久化状态恢复(如果是断点续传),验证Graph一致性
-
就绪阶段:等待外部调用
run()方法,此时可以添加Layer扩展功能 -
运行阶段:发送GraphRunStartedEvent,循环处理节点执行,实时响应外部命令,流式输出事件
-
终止阶段:正常完成、失败或中断,保存最终状态到数据库
2. 整体架构
2.1 架构图
flowchart TB
subgraph "GraphEngine 核心引擎"
Dispatcher[Dispatcher<br/>事件分发器]
ExecutionCoordinator[Execution Coordinator<br/>执行协调器]
end
subgraph "状态管理 State Management"
GraphExecution[Graph Execution<br/>执行聚合根]
GraphStateManager[Graph State Manager<br/>状态管理器]
VariablePool[Variable Pool<br/>变量池]
end
subgraph "队列系统 Queue System"
ReadyQueue[Ready Queue<br/>就绪队列]
EventQueue[Event Queue<br/>事件队列]
end
subgraph "节点执行 Node Execution"
WorkerPool[Worker Pool<br/>工作池]
NodeExecutor[Node Executor<br/>节点执行器]
RetryManager[Retry Manager<br/>重试管理器]
end
subgraph "事件系统 Event System"
EventManager[Event Manager<br/>事件管理器]
EventHandler[Event Handler<br/>事件处理器]
end
subgraph "图遍历 Graph Traversal"
EdgeProcessor[Edge Processor<br/>边处理器]
SkipPropagator[Skip Propagator<br/>跳过传播器]
end
subgraph "响应协调 Response Coordination"
ResponseCoordinator[Response Coordinator<br/>响应流协调器]
end
subgraph "命令处理 Command Processing"
CommandChannel[Command Channel<br/>命令通道]
CommandProcessor[Command Processor<br/>命令处理器]
end
subgraph "错误处理 Error Handling"
ErrorHandler[Error Handler<br/>错误处理器]
end
subgraph "扩展层 Extensibility"
Layers[Graph Engine Layers<br/>可插拔层]
end
Dispatcher --> ExecutionCoordinator
Dispatcher --> EventQueue
Dispatcher --> EventHandler
ExecutionCoordinator --> GraphStateManager
ExecutionCoordinator --> CommandProcessor
ExecutionCoordinator --> WorkerPool
GraphStateManager --> GraphExecution
GraphStateManager --> ReadyQueue
GraphStateManager --> VariablePool
WorkerPool --> NodeExecutor
NodeExecutor --> RetryManager
NodeExecutor --> EventQueue
EventHandler --> EdgeProcessor
EventHandler --> ErrorHandler
EventHandler --> EventManager
EdgeProcessor --> SkipPropagator
EdgeProcessor --> GraphStateManager
EdgeProcessor --> ResponseCoordinator
CommandProcessor --> CommandChannel
CommandProcessor --> GraphExecution
Layers -.扩展.-> Dispatcher
2.2 架构分层说明
1. 核心引擎层 (GraphEngine)
GraphEngine是整个工作流引擎的门面,对外提供简单的run()接口。内部通过模块化设计,将职责分配给各个专门的子系统。
- Dispatcher(事件分发器):核心调度循环,负责从事件队列中取出事件,分发给对应的处理器,驱动整个执行流程
- ExecutionCoordinator(执行协调器):高层协调逻辑,检查命令、触发Worker扩缩容、判断执行完成
2. 状态管理层
负责工作流执行过程中所有状态的维护和变更。
- GraphExecution(执行聚合根):DDD聚合根,维护工作流运行级别的状态
- GraphStateManager(状态管理器):统一的状态管理门面
- VariablePool(变量池):全局变量存储,支持节点间数据传递
3. 队列系统
解耦节点依赖,支持并行执行。
- ReadyQueue(就绪队列):存储可立即执行的节点,支持优先级调度
- EventQueue(事件队列):节点执行产生的事件临时缓冲
4. 节点执行层
负责节点的实际执行。
- WorkerPool(工作池):线程池管理,动态扩缩容
- NodeExecutor(节点执行器):在Worker线程中运行,调用节点的
_run()方法 - RetryManager(重试管理器):节点失败时根据配置执行重试逻辑
5. 事件系统
事件驱动架构,解耦各模块。
- EventManager(事件管理器):收集和发布事件,支持流式输出给外部调用方
- EventHandler(事件处理器):注册各类事件的处理逻辑
6. 图遍历层
负责根据执行结果决定下一步执行路径。
- EdgeProcessor(边处理器):处理节点执行后的出边,判断条件分支
- SkipPropagator(跳过传播器):传播跳过状态
7. 响应协调层
管理流式响应的生成和路径追踪。
8. 命令处理层
接收外部控制命令。
- CommandChannel(命令通道):命令传输通道(内存/Redis)
- CommandProcessor(命令处理器):处理AbortCommand等命令
9. 错误处理层
统一的错误处理和恢复。
10. 扩展层
通过Layer模式扩展功能。
3. 核心数据结构
3.1 Graph(图结构)
3.1.1 UML类图
classDiagram
class Graph {
+dict~str,Node~ nodes
+dict~str,Edge~ edges
+dict~str,list[str]~ in_edges
+dict~str,list[str]~ out_edges
+Node root_node
+add_node(node)
+add_edge(edge)
+get_node(node_id) Node
}
class Node {
<<abstract>>
+str id
+str tenant_id
+NodeState state
+GraphRuntimeState graph_runtime_state
+run() Generator
#_run() NodeRunResult
}
class Edge {
+str id
+str tail
+str head
+str source_handle
+NodeState state
}
Graph "1" *-- "*" Node
Graph "1" *-- "*" Edge
Edge ..> Node
3.1.2 字段说明
Graph:
| 字段 | 类型 | 说明 |
|---|---|---|
nodes |
dict[str, Node] | 节点字典,key为node_id |
edges |
dict[str, Edge] | 边字典,key为edge_id |
in_edges |
dict[str, list[str]] | 入边映射 |
out_edges |
dict[str, list[str]] | 出边映射 |
root_node |
Node | 根节点(起始节点) |
Node:
| 字段 | 类型 | 说明 |
|---|---|---|
id |
str | 节点执行实例ID |
_node_id |
str | 节点配置ID |
state |
NodeState | 节点执行状态 |
graph_runtime_state |
GraphRuntimeState | 共享的运行时状态引用 |
Edge:
| 字段 | 类型 | 说明 |
|---|---|---|
tail |
str | 源节点ID |
head |
str | 目标节点ID |
source_handle |
str | 源句柄(条件分支) |
3.2 GraphRuntimeState(运行时状态)
3.2.1 UML类图
classDiagram
class GraphRuntimeState {
-VariablePool _variable_pool
-float _start_at
-int _total_tokens
-dict _outputs
+variable_pool VariablePool
+set_output(key, value)
+add_tokens(tokens)
}
class VariablePool {
+defaultdict variable_dictionary
+dict user_inputs
+SystemVariable system_variables
+add(selector, value)
+get(selector) Segment
+convert_template(template) SegmentGroup
}
GraphRuntimeState "1" *-- "1" VariablePool
3.2.2 核心方法
class VariablePool:
def add(self, selector: Sequence[str], value: Any, /):
"""添加变量到变量池
Args:
selector: [node_id, variable_name]
value: 任意可序列化的值
"""
node_id, name = selector
segment = variable_factory.build_segment(value)
self.variable_dictionary[node_id][name] = segment
def get(self, selector: Sequence[str], /) -> Segment | None:
"""获取变量
Args:
selector: 支持多种格式
- [node_id, var_name]:获取完整变量
- [node_id, var_name, field]:获取对象字段
Returns:
Segment对象或None
"""
node_id, name = selector[:2]
segment = self.variable_dictionary[node_id].get(name)
if len(selector) > 2:
# 处理嵌套属性访问
for attr in selector[2:]:
segment = self._get_nested_attribute(segment, attr)
return segment
4. API详细规格
4.1 WorkflowEntry.run()
4.1.1 基本信息
- 名称:
WorkflowEntry.run() - 用途:启动工作流执行,流式输出执行事件
- 调用场景:应用运行时调用
4.1.2 方法签名
def run(self) -> Generator[GraphEngineEvent, None, None]:
"""运行工作流,生成器返回事件流"""
4.1.3 返回值
Generator[GraphEngineEvent, None, None]:生成器,逐个产出事件
事件类型:
| 事件类 | 说明 | 包含信息 |
|---|---|---|
GraphRunStartedEvent |
工作流开始 | 开始时间 |
GraphNodeRunStartedEvent |
节点开始执行 | node_id, inputs |
GraphNodeRunSucceededEvent |
节点成功完成 | node_id, outputs |
GraphNodeRunFailedEvent |
节点执行失败 | node_id, error |
GraphRunSucceededEvent |
工作流成功完成 | outputs |
GraphRunFailedEvent |
工作流失败 | error |
4.1.4 核心实现
def run(self) -> Generator[GraphEngineEvent, None, None]:
graph_engine = self.graph_engine
try:
# 运行图引擎,yield所有事件
generator = graph_engine.run()
yield from generator
except GenerateTaskStoppedError:
# 任务被外部停止,正常退出
pass
except Exception as e:
# (此处省略日志记录)
yield GraphRunFailedEvent(error=str(e))
return
4.1.5 调用示例
# 创建WorkflowEntry
workflow_entry = WorkflowEntry(
tenant_id="tenant-123",
app_id="app-456",
workflow_id="wf-789",
graph_config=graph_config,
graph=graph,
user_id="user-001",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE,
call_depth=0,
variable_pool=variable_pool,
graph_runtime_state=graph_runtime_state,
)
# 执行工作流并处理事件
for event in workflow_entry.run():
if isinstance(event, GraphRunStartedEvent):
print("Workflow started")
elif isinstance(event, GraphNodeRunSucceededEvent):
print(f"Node {event.node_id} completed")
elif isinstance(event, GraphRunSucceededEvent):
print(f"Workflow succeeded: {event.outputs}")
4.2 GraphEngine.run()
4.2.1 核心执行流程
def run(self) -> Generator[GraphEngineEvent, None, None]:
try:
# 1. 初始化扩展层
self._initialize_layers()
# 2. 标记开始执行
self._graph_execution.start()
yield GraphRunStartedEvent()
# 3. 启动子系统
self._start_execution()
# 4. 流式输出事件(核心循环)
yield from self._event_manager.emit_events()
# 5. 处理完成状态
if self._graph_execution.aborted:
yield GraphRunAbortedEvent(
reason="Workflow execution aborted",
outputs=self._graph_runtime_state.outputs,
)
elif self._graph_execution.has_error:
if self._graph_execution.error:
raise self._graph_execution.error
else:
outputs = self._graph_runtime_state.outputs
if self._graph_execution.exceptions_count > 0:
yield GraphRunPartialSucceededEvent(
exceptions_count=self._graph_execution.exceptions_count,
outputs=outputs,
)
else:
yield GraphRunSucceededEvent(outputs=outputs)
except Exception as e:
yield GraphRunFailedEvent(error=str(e))
raise
finally:
# 6. 停止子系统
self._stop_execution()
4.2.2 启动子系统
def _start_execution(self):
# 初始化图状态(将起始节点加入就绪队列)
self._state_manager.initialize_graph_state()
# 启动Worker Pool
self._worker_pool.start()
# 启动Dispatcher主循环(在单独线程中)
self._dispatcher_thread = threading.Thread(
target=self._dispatcher.run,
daemon=True
)
self._dispatcher_thread.start()
4.3 VariablePool.add() 和 get()
4.3.1 add() - 添加变量
def add(self, selector: Sequence[str], value: Any, /):
# 1. 校验选择器长度
if len(selector) != 2:
raise ValueError("Invalid selector")
# 2. 转换为Variable对象
if isinstance(value, Segment):
variable = value
else:
# 自动包装为Segment
segment = variable_factory.build_segment(value)
variable = segment
# 3. 存储到二级字典
node_id, name = selector
self.variable_dictionary[node_id][name] = variable
4.3.2 get() - 获取变量
def get(self, selector: Sequence[str], /) -> Segment | None:
# 1. 获取基础变量
node_id, name = selector[:2]
segment = self.variable_dictionary[node_id].get(name)
if segment is None:
return None
# 2. 如果只有2个元素,直接返回
if len(selector) == 2:
return segment
# 3. 处理文件属性
if isinstance(segment, FileSegment):
attr = selector[2]
attr_value = file_manager.get_attr(segment.value, attr)
return variable_factory.build_segment(attr_value)
# 4. 处理对象嵌套属性
result = segment
for attr in selector[2:]:
result = self._get_nested_attribute(result, attr)
if result is None:
return None
return result
5. 执行流程与时序
5.1 基础工作流执行流程
5.1.1 时序图
sequenceDiagram
autonumber
participant C as Caller
participant WE as WorkflowEntry
participant GE as GraphEngine
participant SM as StateManager
participant RQ as ReadyQueue
participant WP as WorkerPool
participant N as Node (LLM)
participant VP as VariablePool
C->>WE: WorkflowEntry.run()
WE->>GE: GraphEngine.run()
GE->>GE: 初始化Layers
GE-->>WE: yield GraphRunStartedEvent
GE->>SM: initialize_graph_state()
SM->>RQ: enqueue(start_node)
GE->>WP: start()
loop 执行循环
WP->>N: start_node.run()
N->>VP: 获取user_inputs
VP-->>N: {"query": "What is AI?"}
N->>VP: 添加输出变量
N-->>WP: yield NodeRunSucceededEvent
WP->>SM: 更新节点状态
WP->>EdgeProcessor: process_edges(start_node)
EdgeProcessor->>RQ: enqueue(llm_node)
WP->>N: llm_node.run()
N->>VP: 获取上游变量
N->>ModelManager: invoke_llm(query)
loop 流式输出
ModelManager-->>N: chunk
N-->>WE: yield StreamChunkEvent
end
N->>VP: 添加输出变量
N-->>WP: yield NodeRunSucceededEvent
WP->>EdgeProcessor: process_edges(llm_node)
EdgeProcessor->>RQ: enqueue(end_node)
WP->>N: end_node.run()
N->>VP: 获取输出变量
N->>GraphRuntimeState: set_output('answer', text)
N-->>WP: yield NodeRunSucceededEvent
end
GE->>GE: 检查执行完成
GE-->>WE: yield GraphRunSucceededEvent
WE-->>C: 完成
5.2 并行节点执行流程
5.2.1 时序图
sequenceDiagram
autonumber
participant D as Dispatcher
participant RQ as ReadyQueue
participant WP as WorkerPool
participant W1 as Worker 1
participant W2 as Worker 2
participant SM as StateManager
Note over D,SM: Start节点完成后
D->>EdgeProcessor: process_edges(start_node)
EdgeProcessor->>RQ: enqueue(llm1)
EdgeProcessor->>RQ: enqueue(llm2)
Note over RQ: 就绪队列: [llm1, llm2]
par 并行执行
D->>WP: submit_task(llm1)
WP->>W1: 分配Worker 1
W1->>LLM1: llm1.run()
LLM1->>LLM1: 调用LLM (5s)
LLM1-->>W1: NodeRunSucceededEvent
and
D->>WP: submit_task(llm2)
WP->>W2: 分配Worker 2
W2->>LLM2: llm2.run()
LLM2->>LLM2: 调用LLM (5s)
LLM2-->>W2: NodeRunSucceededEvent
end
Note over D,SM: 等待两者都完成
D->>SM: 检查aggregator依赖
SM-->>D: llm1, llm2都已完成
D->>RQ: enqueue(aggregator)
关键点说明:
- 依赖检查:EdgeProcessor检查后继节点的所有入边是否都已完成
- 并行度:llm1和llm2同时在就绪队列中,WorkerPool分配不同Worker并行执行
- 性能提升:串行10s,并行5s,提升50%效率
5.3 条件分支执行流程
5.3.1 时序图
sequenceDiagram
autonumber
participant D as Dispatcher
participant C as Classifier Node
participant EP as EdgeProcessor
participant SP as SkipPropagator
participant SM as StateManager
D->>C: classifier.run()
C->>C: 分析问题类型
C->>C: 判定为"creative"
C->>VP: add(['classifier', 'type'], "creative")
C-->>D: NodeRunSucceededEvent
D->>EP: process_edges(classifier)
Note over EP: 边1: creative分支
EP->>EP: 判断source_handle == "creative"
EP->>EP: ✅ 条件满足
EP->>RQ: enqueue(llm_creative)
Note over EP: 边2: factual分支
EP->>EP: 判断source_handle == "factual"
EP->>EP: ❌ 条件不满足
EP->>SP: propagate_skip(llm_factual)
SP->>SM: 标记llm_factual为SKIPPED
SP->>SM: 检查后继节点
SP->>SM: end_node有其他路径,不跳过
跳过传播机制:
- 触发条件:条件分支未选中、If-Else条件不满足
- 传播规则:
- 标记节点为SKIPPED状态
- 递归检查该节点的后继节点
- 如果后继节点的所有前驱都被跳过,则继续传播
- 如果后继节点有其他活跃路径,则停止传播
6. 关键功能深入分析
6.1 Worker Pool动态扩缩容
6.1.1 功能说明
WorkerPool根据就绪队列长度动态调整Worker数量,优化资源使用。
6.1.2 核心实现
class WorkerPool:
def __init__(
self,
min_workers: int = 1,
max_workers: int = 10,
scale_up_threshold: int = 5,
scale_down_idle_time: float = 5.0,
):
self.min_workers = min_workers
self.max_workers = max_workers
self.scale_up_threshold = scale_up_threshold
self.scale_down_idle_time = scale_down_idle_time
self.workers: list[Worker] = []
self.ready_queue = ready_queue
def _scale_workers(self):
"""动态扩缩容"""
queue_length = self.ready_queue.size()
active_workers = len([w for w in self.workers if w.is_busy()])
# 扩容条件:队列长度超过阈值且未达上限
if queue_length > self.scale_up_threshold and len(self.workers) < self.max_workers:
new_worker = Worker(self.ready_queue, self.event_queue)
new_worker.start()
self.workers.append(new_worker)
# (此处省略日志)
# 缩容条件:Worker空闲时间超过阈值且超过最小数量
for worker in self.workers[:]:
if worker.idle_time > self.scale_down_idle_time and len(self.workers) > self.min_workers:
worker.stop()
self.workers.remove(worker)
# (此处省略日志)
6.2 节点重试机制
6.2.1 重试配置
nodes:
- id: http_request
data:
error_strategy: continue
retry:
enabled: true
max_attempts: 3
backoff_factor: 2
retry_on:
- TimeoutError
- ConnectionError
6.2.2 重试实现
class RetryManager:
def should_retry(self, node, error, retry_count):
retry_config = node.retry_config
if not retry_config.enabled:
return False
if retry_count >= retry_config.max_attempts:
return False
# 检查错误类型是否匹配
error_type = type(error).__name__
if error_type not in retry_config.retry_on:
return False
return True
def execute_with_retry(self, node):
retry_count = 0
last_error = None
while retry_count <= node.retry_config.max_attempts:
try:
result = node._run()
return result
except Exception as e:
last_error = e
if not self.should_retry(node, e, retry_count):
raise
# 等待退避时间
wait_time = node.retry_config.backoff_factor ** retry_count
time.sleep(wait_time)
retry_count += 1
raise last_error
6.3 命令通道实现
6.3.1 Redis通道(分布式场景)
class RedisChannel(CommandChannel):
def __init__(self, redis_client: Redis, channel_key: str):
self.redis = redis_client
self.channel_key = channel_key
def send_command(self, command: Command) -> None:
"""发送命令到Redis队列"""
command_json = command.model_dump_json()
self.redis.rpush(self.channel_key, command_json)
def receive_command(self) -> Command | None:
"""从Redis队列接收命令(非阻塞)"""
command_json = self.redis.lpop(self.channel_key)
if not command_json:
return None
return Command.model_validate_json(command_json)
6.3.2 内存通道(单进程场景)
class InMemoryChannel(CommandChannel):
def __init__(self):
self.queue = queue.Queue()
def send_command(self, command: Command) -> None:
self.queue.put(command)
def receive_command(self) -> Command | None:
try:
return self.queue.get_nowait()
except queue.Empty:
return None
7. 实战案例与最佳实践
7.1 案例1:构建多步骤工作流
7.1.1 场景描述
构建一个多步骤工作流:接收用户输入 → 知识库检索 → LLM生成 → 代码执行 → 返回结果。
7.1.2 工作流DSL
graph:
nodes:
- id: start
type: start
data:
variables:
- variable: query
type: string
- id: knowledge_retrieval
type: knowledge-retrieval
data:
dataset_ids: ["dataset-xxx"]
query_variable_selector: ["start", "query"]
top_k: 3
- id: llm
type: llm
data:
model:
provider: openai
name: gpt-4
prompt_template:
- role: system
text: "You are a code generator."
- role: user
text: "Based on: {{#knowledge_retrieval.result#}}\nQuestion: {{#start.query#}}"
- id: code
type: code
data:
code_language: python3
code: "{{#llm.text#}}"
- id: end
type: end
data:
outputs:
- variable: result
value_selector: ["code", "result"]
edges:
- source: start
target: knowledge_retrieval
- source: knowledge_retrieval
target: llm
- source: llm
target: code
- source: code
target: end
7.1.3 执行示例
POST /console/api/workflows/{workflow_id}/run
{
"inputs": {
"query": "Calculate factorial of 5"
},
"response_mode": "streaming"
}
输出:
event: node_started
data: {"node_id": "start", ...}
event: node_finished
data: {"node_id": "start", "outputs": {"query": "Calculate factorial of 5"}}
event: node_started
data: {"node_id": "knowledge_retrieval", ...}
event: node_finished
data: {"node_id": "knowledge_retrieval", "outputs": {"result": [...]}}
event: node_started
data: {"node_id": "llm", ...}
event: text_chunk
data: {"node_id": "llm", "text": "def"}
event: text_chunk
data: {"node_id": "llm", "text": " factorial"}
...
event: workflow_finished
data: {"outputs": {"result": 120}}
7.2 案例2:并行处理优化
7.2.1 场景描述
需要调用多个API获取数据,串行执行耗时长,通过并行优化性能。
7.2.2 优化前(串行)
edges:
- source: start
target: api1
- source: api1
target: api2
- source: api2
target: api3
- source: api3
target: aggregator
耗时:3s + 3s + 3s = 9s
7.2.3 优化后(并行)
edges:
- source: start
target: api1
- source: start
target: api2
- source: start
target: api3
- source: api1
target: aggregator
- source: api2
target: aggregator
- source: api3
target: aggregator
耗时:max(3s, 3s, 3s) = 3s
性能提升:67%
7.3 最佳实践总结
7.3.1 变量命名规范
- 节点ID:使用描述性名称+序号,如
llm-summarize-1 - 变量名:使用蛇形命名,如
user_query,api_response - 避免冲突:不同节点可以有同名变量,利用命名空间隔离
7.3.2 错误处理策略
1. TERMINATE(终止)
- 适用:关键节点失败,无法继续
- 示例:鉴权失败、必需参数缺失
2. CONTINUE(继续)
- 适用:可选节点失败,不影响主流程
- 示例:发送通知失败、记录日志失败
3. SKIP(跳过)
- 适用:某条路径失败,但不影响其他路径
- 示例:多路检索中某个数据源失败
7.3.3 性能优化
1. 减少变量池查找
# 不推荐:多次查找相同变量
for i in range(100):
value = variable_pool.get(['node', 'var'])
process(value)
# 推荐:缓存变量引用
cached_value = variable_pool.get(['node', 'var'])
for i in range(100):
process(cached_value)
2. Worker Pool配置
worker_pool = WorkerPool(
min_workers=1,
max_workers=10,
scale_up_threshold=5, # 队列长度超过5时扩容
scale_down_idle_time=5.0, # 空闲5秒后缩容
)
3. 状态持久化
# 保存状态(暂停场景)
graph_runtime_state._ready_queue_json = ready_queue.dumps()
graph_runtime_state._graph_execution_json = graph_execution.dumps()
# 存入数据库
save_workflow_state(workflow_run_id, graph_runtime_state)
# 恢复状态
loaded_state = load_workflow_state(workflow_run_id)
graph_engine = GraphEngine(
workflow_id=workflow_id,
graph=graph,
graph_runtime_state=loaded_state,
)
7.3.4 监控指标
metrics = {
'total_nodes': 0,
'succeeded': 0,
'failed': 0,
'skipped': 0,
'total_time': 0,
'llm_tokens': 0,
}
for event in workflow_entry.run():
if isinstance(event, NodeRunSucceededEvent):
metrics['succeeded'] += 1
elif isinstance(event, NodeRunFailedEvent):
metrics['failed'] += 1
elif isinstance(event, GraphRunSucceededEvent):
metrics['llm_tokens'] = event.total_tokens
附录
A. 节点类型清单
| 类型 | 说明 | 主要功能 |
|---|---|---|
| START | 起始节点 | 接收用户输入 |
| END | 结束节点 | 收集最终输出 |
| LLM | LLM节点 | 调用大语言模型 |
| CODE | 代码执行 | Python/JavaScript沙箱 |
| HTTP_REQUEST | HTTP请求 | 调用外部API |
| IF_ELSE | 条件分支 | 根据条件选择路径 |
| KNOWLEDGE_RETRIEVAL | 知识库检索 | 检索数据集 |
| TOOL | 工具调用 | 调用注册的工具 |
| ITERATION | 迭代循环 | 批量处理列表 |
| VARIABLE_AGGREGATOR | 变量聚合 | 合并多个变量 |
B. 常见问题
Q1: 工作流执行卡住不动?
A:
- 检查节点依赖是否形成循环
- 检查条件分支是否有未覆盖的情况
- 查看Worker Pool是否有可用Worker
- 检查LLM调用是否超时
Q2: 如何调试工作流?
A:
- 使用
single_step_run()单独测试节点 - 添加
DebugLoggingLayer记录详细日志 - 检查变量池内容
- 查看节点执行记录
Q3: 如何优化工作流性能?
A:
- 使用并行执行减少总耗时
- 优化LLM提示词减少Token消耗
- 缓存重复的API调用结果
- 调整Worker Pool参数
文档版本:v1.0
生成日期:2025-10-04
维护者:Backend Team