CrewAI-04-Flow-完整剖析
模块概览
职责与定位
Flow(流程)是 CrewAI 框架中的事件驱动工作流引擎,负责:
- 工作流编排:定义方法间的依赖关系和执行顺序
- 状态管理:维护结构化或非结构化的流程状态
- 条件路由:根据方法输出动态选择执行路径
- 事件监听:响应方法完成事件并触发后续方法
- 持久化支持:可选地持久化流程状态和执行历史
- 可视化:生成流程结构的可视化图表
- Crew 集成:在流程中调用 Crew 执行复杂任务
输入与输出
输入:
- 初始输入参数(通过
@start()方法接收) - 前序方法的返回值(通过
@listen()方法接收)
输出:
- 最后一个方法的返回值
- 流程状态(
flow.state)
上下游依赖
上游依赖(被调用):
- 用户代码:通过
flow.kickoff()或flow.kickoff_async()启动 - CLI 工具:
crewai flow run命令
下游依赖(调用):
Crew:在流程方法中调用 CrewTask:直接执行任务Events:发送流程执行事件
生命周期
stateDiagram-v2
[*] --> 创建: Flow() 或 Flow[StateT]()
创建 --> 配置: 装饰方法(@start/@listen/@router)
配置 --> 就绪: 等待 kickoff
就绪 --> 启动: kickoff() 或 kickoff_async()
启动 --> 执行开始方法: @start() 方法
执行开始方法 --> 事件触发: 方法完成事件
事件触发 --> Listen判断: 是否有 @listen()
Listen判断 --> 执行监听方法: 条件满足
执行监听方法 --> Router判断: 是否为 @router()
Router判断 --> 执行路由方法: 是 router
Router判断 --> 事件触发: 非 router
执行路由方法 --> 路由决策: 返回路由目标
路由决策 --> 事件触发: 触发路由目标事件
事件触发 --> Listen判断: 有后续监听器
Listen判断 --> 完成: 无后续监听器
完成 --> [*]
整体服务架构图
1) 分层架构视图
flowchart TB
subgraph "用户接口层"
UserCode[用户代码<br/>定义 Flow 类]
CLI[CLI 工具<br/>crewai flow run]
end
subgraph "API 层"
KickoffAPI[kickoff / kickoff_async<br/>流程启动 API]
PlotAPI[plot<br/>可视化 API]
StateAPI[state 属性<br/>状态访问]
end
subgraph "装饰器注册层"
StartDec[@start 装饰器]
ListenDec[@listen 装饰器]
RouterDec[@router 装饰器]
PersistDec[@persist 装饰器]
FlowMeta[FlowMeta 元类<br/>装饰器元数据收集]
end
subgraph "核心执行引擎"
FlowCore[Flow 核心类]
ExecutionMgr[执行管理器<br/>_execute_start_method<br/>_execute_method<br/>_execute_listeners]
ConditionEval[条件评估器<br/>_evaluate_condition<br/>_find_triggered_methods]
MethodRouter[方法路由器<br/>Router 结果处理]
end
subgraph "条件系统"
AndOr[and_ / or_ 函数]
ConditionDict[FlowCondition 字典]
PendingTracker[待定 AND 条件追踪<br/>_pending_and_listeners]
end
subgraph "状态管理层"
StateHolder[状态容器<br/>_state: T | dict]
MethodOutputs[方法输出缓存<br/>_method_outputs]
CompletedMethods[已完成方法追踪<br/>_completed_methods]
ExecutionCounts[执行计数器<br/>_method_execution_counts]
end
subgraph "持久化层"
PersistenceAPI[FlowPersistence 接口]
SQLiteImpl[SQLiteFlowPersistence]
StateSerializer[状态序列化器]
StateLoader[状态加载器]
end
subgraph "事件系统"
EventBus[crewai_event_bus]
FlowEvents[Flow 事件<br/>FlowStartedEvent<br/>FlowFinishedEvent]
MethodEvents[Method 事件<br/>MethodExecutionStartedEvent<br/>MethodExecutionFinishedEvent<br/>MethodExecutionFailedEvent]
end
subgraph "可视化层"
FlowPlot[FlowPlot 可视化器]
LevelCalc[层级计算<br/>calculate_node_levels]
PositionCalc[位置计算<br/>compute_positions]
PyVisNet[PyVis Network]
HTMLGen[HTML 生成器]
end
subgraph "外部集成"
CrewIntegration[Crew 集成]
AsyncSupport[异步支持<br/>asyncio]
TracingSupport[追踪支持<br/>OpenTelemetry]
end
UserCode --> StartDec
UserCode --> ListenDec
UserCode --> RouterDec
UserCode --> PersistDec
StartDec --> FlowMeta
ListenDec --> FlowMeta
RouterDec --> FlowMeta
PersistDec --> FlowMeta
FlowMeta --> FlowCore
UserCode --> KickoffAPI
CLI --> KickoffAPI
UserCode --> PlotAPI
KickoffAPI --> FlowCore
PlotAPI --> FlowPlot
StateAPI --> StateHolder
FlowCore --> ExecutionMgr
FlowCore --> StateHolder
FlowCore --> EventBus
ExecutionMgr --> ConditionEval
ExecutionMgr --> MethodRouter
ExecutionMgr --> MethodOutputs
ExecutionMgr --> CompletedMethods
ConditionEval --> AndOr
ConditionEval --> ConditionDict
ConditionEval --> PendingTracker
FlowCore --> PersistenceAPI
PersistenceAPI --> SQLiteImpl
SQLiteImpl --> StateSerializer
SQLiteImpl --> StateLoader
ExecutionMgr --> EventBus
EventBus --> FlowEvents
EventBus --> MethodEvents
FlowPlot --> LevelCalc
FlowPlot --> PositionCalc
FlowPlot --> PyVisNet
PyVisNet --> HTMLGen
ExecutionMgr --> CrewIntegration
ExecutionMgr --> AsyncSupport
EventBus --> TracingSupport
2) 模块间交互矩阵
| 调用方 | 被调方 | 调用方式 | 数据流向 | 错误传播 | 说明 |
|---|---|---|---|---|---|
| 用户代码 | Flow.init() | 直接调用 | inputs → state | 抛出异常 | 初始化 Flow 实例 |
| 用户代码 | Flow.kickoff() | 同步调用 | inputs → result | 抛出异常 | 启动 Flow 执行 |
| kickoff() | kickoff_async() | asyncio.run() | inputs → result | 抛出异常 | 同步包装异步执行 |
| kickoff_async() | _execute_start_method() | await 并行 | - → result | 抛出异常 | 执行所有 start 方法 |
| _execute_start_method() | _execute_method() | await | method → result | 抛出异常 | 执行单个方法 |
| _execute_start_method() | _execute_listeners() | await | result → - | 抛出异常 | 触发监听器链 |
| _execute_listeners() | _find_triggered_methods() | 直接调用 | trigger → list[methods] | 返回空列表 | 查找触发的方法 |
| _execute_listeners() | _execute_single_listener() | await 并行 | result → - | 抛出异常 | 执行单个监听器 |
| _execute_listeners() | _evaluate_condition() | 直接调用 | condition → bool | 返回 False | 评估条件 |
| _execute_method() | crewai_event_bus.emit() | 异步触发 | event → - | 不阻塞 | 发送执行事件 |
| _execute_method() | user_method() | 调用/await | args → result | 抛出异常 | 执行用户方法 |
| Flow | FlowPersistence | 条件调用 | state → - | 抛出异常 | 持久化状态 |
| Flow.plot() | FlowPlot.plot() | 直接调用 | flow → HTML | 抛出异常 | 生成可视化 |
| FlowMeta | 装饰器元数据 | 类创建时 | decorators → metadata | - | 收集装饰器信息 |
3) 详细架构图(核心执行流)
flowchart TB
subgraph "Flow 核心层"
Flow[Flow 类<br/>流程定义与编排]
FlowState[状态容器<br/>_state: T \| dict]
end
subgraph "装饰器层"
Start[@start<br/>标记开始方法]
Listen[@listen<br/>标记监听方法]
Router[@router<br/>标记路由方法]
end
subgraph "条件系统"
And[and_<br/>与条件组合]
Or[or_<br/>或条件组合]
FlowCondition[FlowCondition<br/>条件字典]
ConditionEval[条件评估器<br/>递归评估嵌套条件]
end
subgraph "执行引擎"
FlowMeta[FlowMeta 元类<br/>收集装饰器元数据]
ExecuteStart[_execute_start_method<br/>执行 start 方法]
ExecuteMethod[_execute_method<br/>执行单个方法]
ExecuteListeners[_execute_listeners<br/>执行监听器链]
FindTriggered[_find_triggered_methods<br/>查找触发的方法]
ExecuteSingle[_execute_single_listener<br/>执行单个监听器]
end
subgraph "持久化层"
Persist[@persist 装饰器<br/>类级/方法级]
PersistAPI[FlowPersistence 接口]
SQLite[SQLiteFlowPersistence<br/>SQLite 存储]
end
subgraph "可视化层"
FlowPlot[FlowPlot 可视化器<br/>生成流程图]
PyVis[PyVis Network<br/>图形渲染]
HTMLTemplate[HTML 模板<br/>最终输出]
end
subgraph "事件系统"
EventBus[crewai_event_bus<br/>事件总线]
FlowStarted[FlowStartedEvent]
MethodStarted[MethodExecutionStartedEvent]
MethodFinished[MethodExecutionFinishedEvent]
FlowFinished[FlowFinishedEvent]
end
subgraph "Crew 集成"
CrewCall[在方法中调用 Crew.kickoff]
StateShare[状态共享<br/>Flow.state ↔ Crew inputs]
end
Flow --> Start
Flow --> Listen
Flow --> Router
Start --> FlowMeta
Listen --> FlowMeta
Router --> FlowMeta
Listen --> And
Listen --> Or
And --> FlowCondition
Or --> FlowCondition
FlowMeta --> Flow
Flow --> ExecuteStart
ExecuteStart --> ExecuteMethod
ExecuteStart --> ExecuteListeners
ExecuteListeners --> FindTriggered
FindTriggered --> ConditionEval
ConditionEval --> FlowCondition
ExecuteListeners --> ExecuteSingle
ExecuteSingle --> ExecuteMethod
Flow --> FlowState
ExecuteMethod --> EventBus
EventBus --> MethodStarted
EventBus --> MethodFinished
Flow --> EventBus
EventBus --> FlowStarted
EventBus --> FlowFinished
Flow --> Persist
Persist --> PersistAPI
PersistAPI --> SQLite
Flow --> FlowPlot
FlowPlot --> PyVis
PyVis --> HTMLTemplate
ExecuteMethod --> CrewCall
CrewCall --> StateShare
StateShare --> FlowState
架构要点说明
详细时序图与调用链路
时序图 1:Flow 完整执行流程(kickoff)
sequenceDiagram
autonumber
participant User as 用户代码
participant Flow as Flow 实例
participant Kickoff as kickoff()
participant KickoffAsync as kickoff_async()
participant EventBus as 事件总线
participant StartExec as _execute_start_method()
participant MethodExec as _execute_method()
participant ListenerExec as _execute_listeners()
participant FindTriggered as _find_triggered_methods()
participant UserMethod as 用户方法
participant Persistence as 持久化层
User->>Flow: flow = MyFlow()
Note over Flow: FlowMeta 已收集装饰器元数据
User->>Kickoff: flow.kickoff(inputs)
Kickoff->>KickoffAsync: asyncio.run(_run_flow())
Note over KickoffAsync: 同步包装异步执行
KickoffAsync->>KickoffAsync: 重置状态<br/>_completed_methods.clear()
alt 存在 inputs['id'] 且有持久化
KickoffAsync->>Persistence: load_state(restore_uuid)
Persistence-->>KickoffAsync: stored_state
KickoffAsync->>KickoffAsync: _restore_state(stored_state)
end
KickoffAsync->>KickoffAsync: _initialize_state(inputs)
KickoffAsync->>EventBus: emit(FlowStartedEvent)
Note over EventBus: 触发 Flow 启动事件
KickoffAsync->>StartExec: 并行执行所有 @start 方法
Note over StartExec: asyncio.gather(*tasks)
loop 每个 start 方法
StartExec->>StartExec: 检查是否已完成<br/>_completed_methods
alt 方法未完成
StartExec->>StartExec: 注入 trigger_payload
StartExec->>MethodExec: await _execute_method(method_name, method)
MethodExec->>EventBus: emit(MethodExecutionStartedEvent)
alt 异步方法
MethodExec->>UserMethod: await method(*args, **kwargs)
else 同步方法
MethodExec->>UserMethod: method(*args, **kwargs)
end
UserMethod-->>MethodExec: result
MethodExec->>MethodExec: 记录输出<br/>_method_outputs.append(result)
MethodExec->>MethodExec: 更新计数<br/>_method_execution_counts
MethodExec->>MethodExec: 标记完成<br/>_completed_methods.add()
MethodExec->>EventBus: emit(MethodExecutionFinishedEvent)
MethodExec-->>StartExec: result
StartExec->>ListenerExec: await _execute_listeners(method_name, result)
Note over ListenerExec: 执行监听器链(详见时序图2)
end
end
KickoffAsync->>KickoffAsync: final_output = _method_outputs[-1]
KickoffAsync->>EventBus: emit(FlowFinishedEvent)
KickoffAsync-->>Kickoff: final_output
Kickoff-->>User: final_output
时序图说明
图意概述:
该时序图展示了 Flow 从用户调用 kickoff() 到返回最终结果的完整执行流程,包括状态初始化、事件发送、start 方法并行执行、监听器链触发等关键步骤。
关键路径分解:
-
同步到异步转换(步骤 1-3)
- 用户调用同步的
kickoff()方法 - 内部通过
asyncio.run()包装异步的kickoff_async() - 这样用户代码无需关心异步,但内部享受异步的并发优势
- 用户调用同步的
-
状态管理(步骤 4-8)
- 重置执行状态:清空已完成方法、方法输出
- 尝试从持久化存储恢复状态(如果提供了
id) - 用输入参数初始化/更新状态
-
事件驱动(步骤 9、25)
- Flow 启动前发送
FlowStartedEvent - Flow 完成后发送
FlowFinishedEvent - 每个方法执行前后发送
MethodExecutionStartedEvent和MethodExecutionFinishedEvent - 事件系统支持追踪、监控、调试
- Flow 启动前发送
-
并行执行 start 方法(步骤 10-23)
- 使用
asyncio.gather()并行执行所有@start()方法 - 每个 start 方法独立执行,互不阻塞
- 支持异步方法(自动 await)和同步方法
- 使用
-
监听器链触发(步骤 23)
- 每个 start 方法完成后,触发其监听器
- 监听器可以链式触发更多监听器
- 支持条件组合(AND/OR)
边界与约束:
| 边界 | 值 | 说明 |
|---|---|---|
| 并发度 | len(_start_methods) | start 方法并行执行 |
| 超时 | 无内置超时 | 需在用户方法中自行处理 |
| 幂等性 | 否 | 每次 kickoff 重新执行 |
| 顺序保证 | 部分保证 | start 方法无序,listen 链有序 |
异常处理:
- 任何方法抛出异常会立即中断执行
- 发送
MethodExecutionFailedEvent - 异常向上传播到
kickoff()调用方 - 已完成的方法状态不会回滚
性能考虑:
- 并行优势:多个 start 方法并行执行,提高吞吐量
- 事件开销:每个方法执行触发 2 个事件,有一定性能成本
- 状态持久化:仅在配置
@persist时才会持久化,避免不必要的 I/O
时序图 2:监听器执行与条件评估
sequenceDiagram
autonumber
participant Caller as 调用方<br/>(_execute_start_method)
participant ListenerExec as _execute_listeners()
participant FindTriggered as _find_triggered_methods()
participant ConditionEval as _evaluate_condition()
participant PendingTracker as _pending_and_listeners
participant SingleExec as _execute_single_listener()
participant MethodExec as _execute_method()
participant UserMethod as 用户方法
Caller->>ListenerExec: _execute_listeners(trigger_method, result)
Note over ListenerExec: trigger_method = "step_one"<br/>result = "step1_result"
rect rgb(255, 245, 235)
Note over ListenerExec,FindTriggered: 阶段 1: 处理 Router(顺序执行)
ListenerExec->>ListenerExec: current_trigger = trigger_method
loop 直到没有 router 被触发
ListenerExec->>FindTriggered: _find_triggered_methods(current_trigger, router_only=True)
FindTriggered->>FindTriggered: 遍历 _listeners
loop 每个监听器
FindTriggered->>FindTriggered: 检查是否为 router<br/>listener_name in _routers
alt 是 router
FindTriggered->>ConditionEval: _evaluate_condition(condition, trigger_method)
alt OR 条件
ConditionEval->>ConditionEval: 检查 trigger_method in methods
ConditionEval-->>FindTriggered: True/False
else AND 条件
ConditionEval->>PendingTracker: 检查/更新待定集合
PendingTracker->>PendingTracker: 移除 trigger_method
alt 所有条件满足
PendingTracker-->>ConditionEval: True
else 仍有待定
PendingTracker-->>ConditionEval: False
end
ConditionEval-->>FindTriggered: True/False
end
end
end
FindTriggered-->>ListenerExec: triggered_routers = ["decision_router"]
alt 有 router 被触发
ListenerExec->>SingleExec: await _execute_single_listener(router_name, result)
SingleExec->>SingleExec: 检查方法签名<br/>是否接受 result 参数
alt 接受参数
SingleExec->>MethodExec: _execute_method(router_name, method, result=result)
else 不接受参数
SingleExec->>MethodExec: _execute_method(router_name, method)
end
MethodExec->>UserMethod: await/call method()
UserMethod-->>MethodExec: router_result = "high_confidence"
MethodExec-->>SingleExec: router_result
SingleExec->>ListenerExec: 递归 _execute_listeners(router_name, router_result)
ListenerExec->>ListenerExec: current_trigger = router_result
ListenerExec->>ListenerExec: router_results.append(router_result)
else 无 router 触发
Note over ListenerExec: 退出 router 循环
end
end
end
rect rgb(235, 245, 255)
Note over ListenerExec,SingleExec: 阶段 2: 处理普通 Listener(并行执行)
ListenerExec->>ListenerExec: all_triggers = [trigger_method, *router_results]
loop 每个 trigger
ListenerExec->>FindTriggered: _find_triggered_methods(current_trigger, router_only=False)
FindTriggered->>FindTriggered: 遍历 _listeners(跳过 router)
loop 每个监听器
alt 不是 router 且不是 start
FindTriggered->>ConditionEval: _evaluate_condition(condition, trigger)
Note over ConditionEval: 同样的条件评估逻辑
ConditionEval-->>FindTriggered: True/False
end
end
FindTriggered-->>ListenerExec: triggered_listeners = ["handle_a", "handle_b"]
alt 有监听器被触发
par 并行执行所有监听器
ListenerExec->>SingleExec: _execute_single_listener("handle_a", result)
SingleExec->>MethodExec: _execute_method("handle_a", method)
MethodExec->>UserMethod: await/call handle_a()
UserMethod-->>MethodExec: result_a
MethodExec-->>SingleExec: result_a
SingleExec->>ListenerExec: 递归 _execute_listeners("handle_a", result_a)
and
ListenerExec->>SingleExec: _execute_single_listener("handle_b", result)
SingleExec->>MethodExec: _execute_method("handle_b", method)
MethodExec->>UserMethod: await/call handle_b()
UserMethod-->>MethodExec: result_b
MethodExec-->>SingleExec: result_b
SingleExec->>ListenerExec: 递归 _execute_listeners("handle_b", result_b)
end
Note over ListenerExec,SingleExec: asyncio.gather(*tasks)
end
end
end
ListenerExec-->>Caller: 完成
rect rgb(245, 255, 245)
Note over Caller,UserMethod: 特殊情况:循环流程
alt router 结果触发已完成的 start 方法
ListenerExec->>ListenerExec: 检查 start 方法条件
alt 条件满足且方法已完成
ListenerExec->>ListenerExec: 临时清除 _is_execution_resuming
ListenerExec->>Caller: await _execute_start_method(method_name)
Note over Caller: 循环重新执行
end
end
end
时序图说明
图意概述: 该时序图详细展示了监听器执行的三个阶段:Router 顺序执行、普通 Listener 并行执行、以及循环流程的特殊处理。涵盖了 OR/AND 条件评估的完整逻辑。
关键路径分解:
-
阶段 1:Router 顺序执行(步骤 3-29)
- 目的:Router 返回的路由路径决定后续流程,必须顺序执行
- 流程:
- 循环查找被当前 trigger 触发的 router
- 执行 router 方法,获取路由结果(如
"high_confidence") - 将路由结果作为新的 trigger,继续查找 router
- 直到没有 router 被触发
- 示例:
执行顺序:analyze → decide → sub_decide
@start() def analyze(self): return "analyzed" @router("analyzed") def decide(self): return "path_a" if condition else "path_b" @router("path_a") def sub_decide(self): return "final_path"
-
阶段 2:Listener 并行执行(步骤 30-52)
- 目的:普通监听器之间无依赖,可以并行执行提高性能
- 流程:
- 收集所有 trigger(原始 trigger + router 结果)
- 对每个 trigger,查找触发的非 router 监听器
- 使用
asyncio.gather()并行执行所有监听器
- 示例:
@listen("data_ready") def process_a(self): pass # 并行 @listen("data_ready") def process_b(self): pass # 并行
-
阶段 3:循环流程处理(步骤 53-58)
- 目的:支持循环/迭代模式的 Flow
- 流程:
- Router 结果可能触发已完成的 start 方法
- 临时清除
_is_execution_resuming标志 - 重新执行 start 方法
- 示例:
@start("loop") def iterate(self): self.state["count"] += 1 return "processed" @router("processed") def check(self): return "loop" if self.state["count"] < 5 else "done"
条件评估详解:
OR 条件(步骤 17):
@listen(or_("method_a", "method_b"))
def handler(self): pass
- 任一 trigger 匹配即触发
- 实现:
trigger_method in methods
AND 条件(步骤 18-26):
@listen(and_("method_a", "method_b"))
def handler(self): pass
- 所有 trigger 都匹配才触发
- 实现:
- 初始化待定集合:
{"method_a", "method_b"} - 每次 trigger 移除一个:
discard(trigger_method) - 集合为空时触发:
not pending_set
- 初始化待定集合:
嵌套条件:
@listen(or_(and_("a", "b"), "c"))
def handler(self): pass
- 递归评估:
_evaluate_condition()处理嵌套结构
边界与约束:
| 场景 | 行为 | 说明 |
|---|---|---|
| Router 无返回值 | 跳过 | 返回 None 不触发路由 |
| Router 返回列表 | 多路由 | 每个路由目标都会触发 |
| Listener 并发数 | 无限制 | asyncio.gather 并发数取决于监听器数量 |
| 循环深度 | 无限制 | 需在用户代码中控制退出条件 |
| AND 条件超时 | 永不触发 | 如果某个 trigger 永不到达,永远等待 |
性能优化点:
- Router 顺序执行:避免路由决策的竞态条件
- Listener 并行执行:充分利用异步并发能力
- 递归触发:每个监听器完成后递归触发下游,形成调用链
时序图 3:状态持久化与恢复
sequenceDiagram
autonumber
participant User as 用户代码
participant Flow as Flow 实例
participant Kickoff as kickoff_async()
participant Persistence as FlowPersistence
participant SQLite as SQLiteFlowPersistence
participant MethodExec as _execute_method()
participant UserMethod as 用户方法
rect rgb(255, 245, 245)
Note over User,SQLite: 场景 1: 首次执行(无持久化状态)
User->>Flow: flow = MyFlow()
User->>Kickoff: flow.kickoff()
Kickoff->>Kickoff: 检查 inputs['id']
Note over Kickoff: inputs = None 或 无 'id'
Kickoff->>Kickoff: _initialize_state(inputs)
Note over Kickoff: 使用默认状态
Kickoff->>MethodExec: 执行 start 方法
MethodExec->>UserMethod: method()
UserMethod->>UserMethod: 修改 self.state
UserMethod-->>MethodExec: result
alt @persist 装饰
MethodExec->>Persistence: save_state(flow_id, state)
Persistence->>SQLite: 序列化并存储
SQLite->>SQLite: INSERT INTO flow_state
SQLite-->>Persistence: success
Persistence-->>MethodExec: success
end
MethodExec-->>Kickoff: result
Kickoff-->>User: final_output
end
rect rgb(245, 255, 245)
Note over User,SQLite: 场景 2: 恢复执行(有持久化状态)
User->>Flow: flow = MyFlow()
Note over Flow: 新实例,状态为默认值
User->>Kickoff: flow.kickoff({"id": "uuid-123"})
Kickoff->>Kickoff: 检查 inputs['id'] 和 persistence
Note over Kickoff: 发现 id = "uuid-123"
Kickoff->>Persistence: load_state("uuid-123")
Persistence->>SQLite: 从数据库加载
SQLite->>SQLite: SELECT * FROM flow_state WHERE id='uuid-123'
SQLite-->>Persistence: stored_state = {<br/>"state": {...},<br/>"completed_methods": [...],<br/>"method_outputs": [...]<br/>}
Persistence-->>Kickoff: stored_state
Kickoff->>Kickoff: _restore_state(stored_state)
rect rgb(255, 255, 235)
Note over Kickoff: 恢复状态字段
Kickoff->>Kickoff: self._state = stored_state["state"]
Kickoff->>Kickoff: self._completed_methods = set(stored_state["completed_methods"])
Kickoff->>Kickoff: self._method_outputs = stored_state["method_outputs"]
Kickoff->>Kickoff: self._method_execution_counts = stored_state["execution_counts"]
Kickoff->>Kickoff: self._is_execution_resuming = True
end
Kickoff->>MethodExec: 执行 start 方法
alt 方法已在 _completed_methods
MethodExec->>MethodExec: 跳过执行
Note over MethodExec: 但仍触发监听器
MethodExec->>MethodExec: _execute_listeners(method_name, last_output)
else 方法未完成
MethodExec->>UserMethod: 正常执行 method()
UserMethod-->>MethodExec: result
alt @persist 装饰
MethodExec->>Persistence: save_state(flow_id, state)
Persistence->>SQLite: UPDATE flow_state
SQLite-->>Persistence: success
end
end
MethodExec-->>Kickoff: result
Kickoff->>Kickoff: self._is_execution_resuming = False
Kickoff-->>User: final_output
end
rect rgb(245, 245, 255)
Note over User,SQLite: 场景 3: 持久化配置(类级/方法级)
Note over Flow: 类级持久化
User->>User: @persist(verbose=True)<br/>class MyFlow(Flow): ...
Note over Flow: 所有方法自动持久化
Note over Flow: 方法级持久化
User->>User: @persist<br/>@start()<br/>def critical_step(self): ...
Note over Flow: 仅 critical_step 持久化
end
时序图说明
图意概述: 该时序图展示了 Flow 状态持久化的三种场景:首次执行(无持久化)、恢复执行(有持久化状态)、以及不同级别的持久化配置。
关键路径分解:
-
场景 1:首次执行(步骤 1-16)
- 用户创建 Flow 实例并调用
kickoff() - 没有提供
id或持久化未配置 - 使用默认状态初始化
- 执行方法时,如果有
@persist装饰,保存状态到 SQLite - 保存内容包括:
state:Flow 状态completed_methods:已完成方法列表method_outputs:方法输出历史execution_counts:方法执行计数
- 用户创建 Flow 实例并调用
-
场景 2:恢复执行(步骤 17-43)
- 用户提供
id参数:flow.kickoff({"id": "uuid-123"}) - Flow 从持久化存储加载状态
- 恢复所有状态字段
- 设置
_is_execution_resuming = True - 已完成的方法跳过执行,但仍触发监听器
- 未完成的方法正常执行
- 用途:
- 断点续传:Flow 执行失败后从中断点恢复
- 长时间运行:分多次执行复杂 Flow
- 用户提供
-
场景 3:持久化配置(步骤 44-50)
- 类级持久化:
@persist装饰类- 所有方法自动持久化
- 适用于需要完整恢复的 Flow
- 方法级持久化:
@persist装饰方法- 仅特定方法持久化
- 适用于仅关键步骤需要恢复的场景
- 类级持久化:
持久化数据结构:
stored_state = {
"id": "uuid-123",
"state": {
"counter": 5,
"results": ["r1", "r2"],
# ... 其他状态字段
},
"completed_methods": ["start_method", "process_method"],
"method_outputs": ["result1", "result2"],
"execution_counts": {
"start_method": 1,
"process_method": 3 # 可能在循环中执行多次
},
"timestamp": 1234567890.123
}
边界与约束:
| 边界 | 值/行为 | 说明 |
|---|---|---|
| 状态大小 | 无硬性限制 | SQLite 默认最大 1GB,可配置 |
| 恢复超时 | 无超时 | 加载状态是同步操作 |
| 并发写入 | 串行化 | SQLite 文件锁避免冲突 |
| 状态版本 | 不支持 | 状态结构变更需手动处理 |
| ID 唯一性 | 必须唯一 | 重复 ID 会覆盖旧状态 |
异常处理:
- 加载失败:状态不存在或损坏
- 记录警告日志
- 使用默认状态继续执行
- 保存失败:磁盘满或权限不足
- 抛出
RuntimeError - 中断 Flow 执行
- 抛出
性能考虑:
- I/O 开销:每个持久化方法执行后都会写入 SQLite
- 优化策略:
- 仅对关键方法使用方法级持久化
- 批量写入:收集多个状态更新后一次写入(当前未实现)
- 异步持久化:不阻塞方法执行(当前未实现)
时序图 4:Flow 可视化生成
sequenceDiagram
autonumber
participant User as 用户代码
participant Flow as Flow 实例
participant PlotAPI as plot()
participant EventBus as 事件总线
participant FlowPlot as FlowPlot 类
participant LevelCalc as calculate_node_levels()
participant PositionCalc as compute_positions()
participant PyVis as PyVis Network
participant NodeAdder as add_nodes_to_network()
participant EdgeAdder as add_edges()
participant HTMLGen as HTML 生成器
participant FileSystem as 文件系统
User->>PlotAPI: flow.plot("my_workflow")
PlotAPI->>EventBus: emit(FlowPlotEvent)
Note over EventBus: 触发可视化事件
PlotAPI->>FlowPlot: FlowPlot(flow)
FlowPlot->>FlowPlot: 初始化配置<br/>colors, node_styles
FlowPlot->>PyVis: Network(directed=True, height="750px")
PyVis-->>FlowPlot: net
FlowPlot->>PyVis: net.set_options({physics: false})
Note over PyVis: 禁用物理引擎<br/>使用固定布局
rect rgb(255, 245, 235)
Note over FlowPlot,PositionCalc: 阶段 1: 计算节点布局
FlowPlot->>LevelCalc: calculate_node_levels(flow)
LevelCalc->>LevelCalc: 分析 _start_methods
Note over LevelCalc: start 方法在第 0 层
LevelCalc->>LevelCalc: 遍历 _listeners 构建层级
loop 广度优先遍历
LevelCalc->>LevelCalc: 父节点层级 + 1
Note over LevelCalc: 处理 OR/AND 条件
end
LevelCalc-->>FlowPlot: node_levels = {<br/>"start": 0,<br/>"process": 1,<br/>"decide": 2,<br/>...}
FlowPlot->>PositionCalc: compute_positions(flow, node_levels)
PositionCalc->>PositionCalc: 计算每层节点数
PositionCalc->>PositionCalc: 计算每层宽度
loop 每个节点
PositionCalc->>PositionCalc: x = layer_index * X_SPACING
PositionCalc->>PositionCalc: y = node_index * Y_SPACING - layer_height/2
Note over PositionCalc: 居中对齐
end
PositionCalc-->>FlowPlot: node_positions = {<br/>"start": (0, 0),<br/>"process": (300, 0),<br/>...}
end
rect rgb(245, 255, 235)
Note over FlowPlot,NodeAdder: 阶段 2: 添加节点
FlowPlot->>NodeAdder: add_nodes_to_network(net, flow, positions, styles)
loop 每个方法
NodeAdder->>NodeAdder: 确定节点类型<br/>(start/listen/router)
NodeAdder->>NodeAdder: 确定节点样式<br/>color, shape, border
alt 方法调用 Crew
NodeAdder->>NodeAdder: 添加虚线边框<br/>borderWidthSelected: 4
end
NodeAdder->>NodeAdder: 格式化标签<br/><b>方法名</b><br/><i>类型</i>
NodeAdder->>PyVis: net.add_node(<br/>id=method_name,<br/>label=formatted_label,<br/>x=pos[0], y=pos[1],<br/>**style_options)
end
NodeAdder-->>FlowPlot: 完成
end
rect rgb(245, 235, 255)
Note over FlowPlot,EdgeAdder: 阶段 3: 添加边
FlowPlot->>EdgeAdder: add_edges(net, flow, positions, colors)
loop 每个监听器
EdgeAdder->>EdgeAdder: 获取 trigger 方法
alt OR 条件
EdgeAdder->>EdgeAdder: 实线箭头
EdgeAdder->>PyVis: net.add_edge(<br/>from=trigger,<br/>to=listener,<br/>dashes=False,<br/>color="purple")
else AND 条件
EdgeAdder->>EdgeAdder: 虚线箭头
EdgeAdder->>PyVis: net.add_edge(<br/>from=trigger,<br/>to=listener,<br/>dashes=True,<br/>color="blue")
end
end
loop 每个 router
EdgeAdder->>EdgeAdder: 获取 router_paths
loop 每个路由路径
EdgeAdder->>PyVis: net.add_edge(<br/>from=router,<br/>to=path,<br/>dashes=False,<br/>color="green",<br/>label=path_name)
end
end
EdgeAdder-->>FlowPlot: 完成
end
rect rgb(255, 255, 235)
Note over FlowPlot,FileSystem: 阶段 4: 生成 HTML
FlowPlot->>PyVis: net.generate_html()
PyVis-->>FlowPlot: network_html
FlowPlot->>HTMLGen: 读取模板<br/>crewai_flow_visual_template.html
HTMLGen-->>FlowPlot: html_template
FlowPlot->>HTMLGen: 生成图例 HTML
HTMLGen-->>FlowPlot: legend_html
FlowPlot->>FlowPlot: 替换模板变量<br/>{{ title }}<br/>{{ network_content }}<br/>{{ legend }}
FlowPlot->>FileSystem: 写入文件<br/>"my_workflow.html"
FileSystem-->>FlowPlot: success
FlowPlot->>User: 打印消息<br/>"Plot saved as my_workflow.html"
end
User->>User: 在浏览器打开<br/>"my_workflow.html"
时序图说明
图意概述:
该时序图展示了 Flow 可视化生成的完整流程,从用户调用 plot() 到生成交互式 HTML 文件的四个阶段:布局计算、节点添加、边添加、HTML 生成。
关键路径分解:
-
阶段 1:计算节点布局(步骤 8-26)
- 层级计算:
- Start 方法在第 0 层
- 广度优先遍历监听器,每层 +1
- 处理 AND/OR 条件的最大层级
- 位置计算:
- X 坐标:
layer_index * X_SPACING(默认 300px) - Y 坐标:层内居中对齐
- 避免节点重叠
- X 坐标:
- 层级计算:
-
阶段 2:添加节点(步骤 27-42)
- 节点类型识别:
- Start:圆形,绿色边框
- Listen:椭圆形,紫色边框
- Router:菱形,蓝色边框
- 特殊标记:
- 调用 Crew 的方法:虚线边框
- 标签格式化:
<b>method_name</b> <i>@start</i>
- 节点类型识别:
-
阶段 3:添加边(步骤 43-60)
- 边类型:
- OR 条件:实线箭头(紫色)
- AND 条件:虚线箭头(蓝色)
- Router 路径:实线箭头(绿色),带标签
- 边权重:
- 根据层级差计算边的长度
- 边类型:
-
阶段 4:生成 HTML(步骤 61-72)
- 模板替换:
{{ title }}:流程名称{{ network_content }}:PyVis 生成的网络图{{ legend }}:图例 HTML
- 输出文件:
- 独立的 HTML 文件,可直接在浏览器打开
- 包含交互功能:缩放、拖拽、节点点击
- 模板替换:
可视化元素说明:
| 元素类型 | 样式 | 说明 |
|---|---|---|
| Start 节点 | ⭕ 绿色圆形 | 流程入口 |
| Listen 节点 | ⬭ 紫色椭圆 | 监听器 |
| Router 节点 | ◆ 蓝色菱形 | 路由器 |
| Crew 调用节点 | - - 虚线边框 | 调用 Crew 的方法 |
| OR 边 | ─→ 紫色实线 | 或条件 |
| AND 边 | ┄→ 蓝色虚线 | 与条件 |
| Router 边 | ═→ 绿色实线 | 路由路径 |
边界与约束:
| 边界 | 值 | 说明 |
|---|---|---|
| 最大节点数 | ~100 | 超过后布局可能混乱 |
| 最大层级 | 无限制 | 但深层级可读性差 |
| 文件大小 | ~1-5 MB | 包含 PyVis 库和网络数据 |
| 浏览器兼容性 | 现代浏览器 | 需支持 JavaScript |
性能考虑:
- 布局计算:O(n) 时间复杂度,n 为方法数
- 节点渲染:PyVis 使用 vis.js,支持大规模网络
- 交互性能:节点数 < 100 时流畅,> 200 时可能卡顿
调用链路详解
调用链路 1:用户代码 → kickoff → 方法执行
用户代码
↓ flow.kickoff(inputs)
Flow.kickoff()
↓ asyncio.run(_run_flow())
Flow.kickoff_async()
↓ 1. 状态初始化
│ - _initialize_state(inputs)
│ - load_state(id) [可选]
│ - _restore_state(stored_state) [可选]
↓ 2. 发送 FlowStartedEvent
│ - crewai_event_bus.emit(FlowStartedEvent)
↓ 3. 并行执行 start 方法
│ - asyncio.gather(*[_execute_start_method(m) for m in _start_methods])
↓
_execute_start_method(start_method_name)
↓ 1. 检查是否已完成
│ - if start_method_name in _completed_methods
↓ 2. 注入 trigger_payload
│ - _inject_trigger_payload_for_start_method(method)
↓ 3. 执行方法
│ - _execute_method(start_method_name, enhanced_method)
↓
_execute_method(method_name, method, *args, **kwargs)
↓ 1. 发送 MethodExecutionStartedEvent
│ - crewai_event_bus.emit(MethodExecutionStartedEvent)
↓ 2. 调用用户方法
│ - result = await method() 或 method()
↓ 3. 记录结果
│ - _method_outputs.append(result)
│ - _method_execution_counts[method_name] += 1
│ - _completed_methods.add(method_name)
↓ 4. 发送 MethodExecutionFinishedEvent
│ - crewai_event_bus.emit(MethodExecutionFinishedEvent)
↓ 5. 持久化(如果配置)
│ - persistence.save_state(flow_id, state)
↓ 返回 result
_execute_start_method()
↓ 4. 触发监听器
│ - _execute_listeners(start_method_name, result)
↓
[进入调用链路 2]
关键代码:
# lib/crewai/src/crewai/flow/flow.py:906-917
def kickoff(self, inputs: dict[str, Any] | None = None) -> Any:
"""同步启动 Flow,内部包装异步执行"""
async def _run_flow() -> Any:
return await self.kickoff_async(inputs)
return asyncio.run(_run_flow())
# lib/crewai/src/crewai/flow/flow.py:919-1036
async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> Any:
"""异步启动 Flow,核心执行逻辑"""
# 1. 状态初始化和恢复
is_restoring = inputs and "id" in inputs and self._persistence is not None
if not is_restoring:
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
# 2. 从持久化存储恢复状态
if "id" in inputs and self._persistence is not None:
stored_state = self._persistence.load_state(inputs["id"])
if stored_state:
self._restore_state(stored_state)
# 3. 发送 FlowStartedEvent
crewai_event_bus.emit(self, FlowStartedEvent(...))
# 4. 并行执行所有 start 方法
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
# 5. 返回最后一个输出
final_output = self._method_outputs[-1] if self._method_outputs else None
crewai_event_bus.emit(self, FlowFinishedEvent(...))
return final_output
# lib/crewai/src/crewai/flow/flow.py:1039-1072
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
"""执行单个 start 方法"""
# 1. 检查是否已完成(恢复场景)
if start_method_name in self._completed_methods:
if self._is_execution_resuming:
# 跳过执行,但触发监听器
last_output = self._method_outputs[-1] if self._method_outputs else None
await self._execute_listeners(start_method_name, last_output)
return
# 2. 获取方法并注入 payload
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
# 3. 执行方法
result = await self._execute_method(start_method_name, enhanced_method)
# 4. 触发监听器
await self._execute_listeners(start_method_name, result)
# lib/crewai/src/crewai/flow/flow.py:1110-1172
async def _execute_method(
self, method_name: FlowMethodName, method: Callable, *args, **kwargs
) -> Any:
"""执行单个方法的核心逻辑"""
try:
# 1. 发送开始事件
crewai_event_bus.emit(self, MethodExecutionStartedEvent(...))
# 2. 执行方法(支持同步和异步)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
# 3. 记录结果
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
self._completed_methods.add(method_name)
# 4. 发送完成事件
crewai_event_bus.emit(self, MethodExecutionFinishedEvent(...))
return result
except Exception as e:
# 5. 发送失败事件
crewai_event_bus.emit(self, MethodExecutionFailedEvent(...))
raise e
调用链路 2:监听器触发 → 条件评估 → 递归执行
_execute_listeners(trigger_method, result)
↓ 阶段 1: 处理 Router(顺序执行)
│ ↓ 循环直到没有 router 被触发
│ ├→ _find_triggered_methods(current_trigger, router_only=True)
│ │ ↓ 遍历 _listeners
│ │ ├→ 检查 listener 是否为 router
│ │ ├→ _evaluate_condition(condition, trigger_method, listener_name)
│ │ │ ↓ 判断条件类型
│ │ │ ├→ OR 条件:any(trigger in methods)
│ │ │ └→ AND 条件:
│ │ │ ├─ 初始化 _pending_and_listeners[key] = set(methods)
│ │ │ ├─ discard(trigger_method)
│ │ │ └─ 返回 len(pending_set) == 0
│ │ └→ 返回 triggered_routers = [...]
│ ↓
│ ├→ _execute_single_listener(router_name, result)
│ │ ↓ 1. 检查方法签名
│ │ │ - inspect.signature(method)
│ │ │ - 决定是否传递 result 参数
│ │ ↓ 2. 执行方法
│ │ │ - _execute_method(router_name, method, ...)
│ │ ↓ 3. 获取 router 结果
│ │ │ - router_result = _method_outputs[-1]
│ │ ↓ 4. 递归触发监听器
│ │ │ - _execute_listeners(router_name, router_result)
│ │ └→ 返回
│ ↓
│ ├─ current_trigger = router_result
│ └─ router_results.append(router_result)
↓
↓ 阶段 2: 处理普通 Listener(并行执行)
│ ↓ all_triggers = [trigger_method, *router_results]
│ ↓ 遍历每个 trigger
│ ├→ _find_triggered_methods(current_trigger, router_only=False)
│ │ ↓ 遍历 _listeners(跳过 router 和 start)
│ │ ├→ _evaluate_condition(condition, trigger_method, listener_name)
│ │ └→ 返回 triggered_listeners = [...]
│ ↓
│ ├→ asyncio.gather(*[
│ │ _execute_single_listener(listener_name, result)
│ │ for listener_name in triggered_listeners
│ │ ])
│ │ ↓ 并行执行所有监听器
│ │ ├→ _execute_single_listener(listener_1, result)
│ │ │ ├─ _execute_method(listener_1, method)
│ │ │ └─ _execute_listeners(listener_1, result_1) [递归]
│ │ ├→ _execute_single_listener(listener_2, result)
│ │ │ ├─ _execute_method(listener_2, method)
│ │ │ └─ _execute_listeners(listener_2, result_2) [递归]
│ │ └→ ...
│ └→ 完成
↓
↓ 阶段 3: 处理循环流程(可选)
│ ↓ 如果 router 结果触发已完成的 start 方法
│ ├→ 遍历 _start_methods
│ ├→ 检查条件是否满足
│ └→ _execute_start_method(method_name) [循环重新执行]
↓
└→ 返回
关键代码:
# lib/crewai/src/crewai/flow/flow.py:1174-1261
async def _execute_listeners(
self, trigger_method: FlowMethodName, result: Any
) -> None:
"""执行所有被触发的监听器和路由器"""
# 阶段 1: 处理 Router(顺序执行)
router_results = []
current_trigger = trigger_method
while True:
routers_triggered = self._find_triggered_methods(
current_trigger, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
router_result = self._method_outputs[-1] if self._method_outputs else None
if router_result:
router_results.append(router_result)
current_trigger = FlowMethodName(str(router_result)) if router_result else FlowMethodName("")
# 阶段 2: 处理普通 Listener(并行执行)
all_triggers = [trigger_method, *router_results]
for current_trigger in all_triggers:
if current_trigger:
listeners_triggered = self._find_triggered_methods(
current_trigger, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# 阶段 3: 处理循环流程
if current_trigger in router_results:
for method_name in self._start_methods:
if method_name in self._listeners:
condition_data = self._listeners[method_name]
should_trigger = False
# ... 条件检查 ...
if should_trigger and method_name in self._completed_methods:
was_resuming = self._is_execution_resuming
self._is_execution_resuming = False
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
# lib/crewai/src/crewai/flow/flow.py:1311-1373
def _find_triggered_methods(
self, trigger_method: FlowMethodName, router_only: bool
) -> list[FlowMethodName]:
"""查找被触发的方法"""
triggered: list[FlowMethodName] = []
for listener_name, condition_data in self._listeners.items():
is_router = listener_name in self._routers
# 根据 router_only 过滤
if router_only != is_router:
continue
# 跳过非 router 的 start 方法
if not router_only and listener_name in self._start_methods:
continue
# 评估条件
if is_simple_flow_condition(condition_data):
condition_type, methods = condition_data
if condition_type == "OR":
if trigger_method in methods:
triggered.append(listener_name)
elif condition_type == "AND":
pending_key = PendingListenerKey(listener_name)
if pending_key not in self._pending_and_listeners:
self._pending_and_listeners[pending_key] = set(methods)
if trigger_method in self._pending_and_listeners[pending_key]:
self._pending_and_listeners[pending_key].discard(trigger_method)
if not self._pending_and_listeners[pending_key]:
triggered.append(listener_name)
self._pending_and_listeners.pop(pending_key, None)
elif is_flow_condition_dict(condition_data):
if self._evaluate_condition(condition_data, trigger_method, listener_name):
triggered.append(listener_name)
return triggered
# lib/crewai/src/crewai/flow/flow.py:1263-1309
def _evaluate_condition(
self,
condition: FlowMethodName | FlowCondition,
trigger_method: FlowMethodName,
listener_name: FlowMethodName,
) -> bool:
"""递归评估条件(支持嵌套)"""
# 简单条件:直接比较方法名
if is_flow_method_name(condition):
return condition == trigger_method
# 复杂条件:递归评估
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", "OR")
sub_conditions = normalized.get("conditions", [])
if cond_type == "OR":
# 任一子条件满足
return any(
self._evaluate_condition(sub_cond, trigger_method, listener_name)
for sub_cond in sub_conditions
)
if cond_type == "AND":
# 所有子条件满足
pending_key = PendingListenerKey(f"{listener_name}:{id(condition)}")
if pending_key not in self._pending_and_listeners:
all_methods = set(_extract_all_methods(condition))
self._pending_and_listeners[pending_key] = all_methods
if trigger_method in self._pending_and_listeners[pending_key]:
self._pending_and_listeners[pending_key].discard(trigger_method)
if not self._pending_and_listeners[pending_key]:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
return False
# lib/crewai/src/crewai/flow/flow.py:1375-1427
async def _execute_single_listener(
self, listener_name: FlowMethodName, result: Any
) -> None:
"""执行单个监听器"""
# 1. 检查是否已完成(避免重复执行)
if listener_name in self._completed_methods:
return
# 2. 获取方法
method = self._methods[listener_name]
# 3. 检查方法签名,决定是否传递 result
sig = inspect.signature(method)
params = list(sig.parameters.values())
# 跳过 self 参数
non_self_params = [p for p in params if p.name != "self"]
# 4. 执行方法
if non_self_params:
# 方法接受参数,传递 result
listener_result = await self._execute_method(listener_name, method, result=result)
else:
# 方法不接受参数
listener_result = await self._execute_method(listener_name, method)
# 5. 递归触发监听器
await self._execute_listeners(listener_name, listener_result)
架构要点说明(续)
1) Flow 与 Crew 的区别
Flow:
- 用途:编排方法间的依赖和顺序
- 粒度:方法级
- 状态:显式状态管理
- 适用场景:复杂业务逻辑、条件分支、循环
Crew:
- 用途:协调 Agent 执行任务
- 粒度:任务级
- 状态:隐式(通过任务输出)
- 适用场景:多 Agent 协作、AI 生成内容
组合使用:
class MyFlow(Flow):
@start()
def data_collection(self):
crew = DataCollectionCrew()
result = crew.crew().kickoff()
self.state["data"] = result.raw
return "collected"
@listen("collected")
def data_analysis(self):
crew = AnalysisCrew()
result = crew.crew().kickoff(inputs={"data": self.state["data"]})
return result.raw
2) 状态管理的两种模式
结构化状态(Pydantic):
class WorkflowState(BaseModel):
step: str = "init"
results: list[str] = []
user_id: str
class MyFlow(Flow[WorkflowState]):
def __init__(self):
super().__init__(initial_state=WorkflowState(user_id="123"))
非结构化状态(字典):
class MyFlow(Flow):
@start()
def init(self):
self.state["counter"] = 0
self.state["results"] = []
3) 条件组合的三种方式
单条件监听:
@listen("method_a")
def method_b(self, result):
# method_a 完成后执行
pass
AND 条件:
@listen(and_("method_a", "method_b"))
def method_c(self):
# method_a 和 method_b 都完成后执行
pass
OR 条件:
@listen(or_("method_a", "method_b"))
def method_c(self, result):
# method_a 或 method_b 其中一个完成就执行
pass
4) Router 的动态路由机制
@router("analyze")
def decision(self):
if self.state["score"] > 0.8:
return "high_confidence"
elif self.state["errors"]:
return "error_handling"
return "manual_review"
@listen("high_confidence")
def auto_process(self):
# 自动处理路径
pass
@listen("error_handling")
def handle_errors(self):
# 错误处理路径
pass
@listen("manual_review")
def manual_check(self):
# 人工审核路径
pass
5) 持久化的两种级别
类级持久化(所有方法):
@persist(verbose=True)
class MyFlow(Flow):
# 所有方法的状态变更都会持久化
pass
方法级持久化(选择性):
class MyFlow(Flow):
@persist
@start()
def critical_step(self):
# 仅此方法的状态变更会持久化
pass
@start()
def temp_step(self):
# 此方法不持久化
pass
数据结构 UML 图
Flow 核心类图
classDiagram
class Flow {
+StateT state
+dict _methods
+dict _listeners
+dict _routers
+dict _method_execution_counts
+bool _state_already_persisted
+kickoff(inputs)
+kickoff_async(inputs)
+plot(filename)
-_build_execution_graph()
-_execute_start_methods(start_methods)
-_execute_method(method_name, inputs)
-_execute_listeners(method_name, result)
-_trigger_router(router_name, result)
}
class FlowState {
+str id
}
class StartMethod {
+str name
+Callable func
+FlowCondition condition
}
class ListenMethod {
+str name
+Callable func
+FlowCondition condition
}
class RouterMethod {
+str name
+Callable func
+FlowCondition condition
}
class FlowCondition {
<<interface>>
+evaluate(context)
}
class AndCondition {
+list[FlowCondition] conditions
+evaluate(context)
}
class OrCondition {
+list[FlowCondition] conditions
+evaluate(context)
}
class FlowPlot {
+Flow flow
+dict colors
+NodeStyles node_styles
+plot(filename)
-_generate_final_html(network_html)
}
Flow --> FlowState : 管理
Flow --> StartMethod : 注册
Flow --> ListenMethod : 注册
Flow --> RouterMethod : 注册
Flow --> FlowPlot : 生成可视化
FlowCondition <|-- AndCondition : 实现
FlowCondition <|-- OrCondition : 实现
StartMethod --> FlowCondition : 使用
ListenMethod --> FlowCondition : 使用
RouterMethod --> FlowCondition : 使用
关键字段说明
Flow 核心字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| state | StateT | dict | - | {} | 流程状态 |
| _methods | dict[str, Method] | private | {} | 方法注册表 |
| _listeners | dict[str, list[Listener]] | private | {} | 监听器映射 |
| _routers | dict[str, Router] | private | {} | 路由器映射 |
| _method_execution_counts | dict[str, int] | private | {} | 方法执行次数统计 |
| _state_already_persisted | bool | private | False | 状态是否已持久化 |
FlowState 字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| id | str | 自动生成 | uuid4() | 流程实例唯一标识符 |
Method 装饰器属性
| 属性 | 类型 | 说明 |
|---|---|---|
| name | str | 方法名称 |
| func | Callable | 实际方法函数 |
| condition | FlowCondition | 执行条件(可选) |
| trigger | str | Condition | 触发条件(listen/router) |
API 详细规格
API 1: kickoff
基本信息
- 方法签名:
Flow.kickoff(**inputs) -> Any - 调用方式:实例方法
- 幂等性:否
功能说明
同步启动 Flow 执行,执行所有 @start() 方法并触发后续的监听器和路由器。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| inputs | Any | 否 | - | - | 传递给 start 方法的参数 |
核心代码
def kickoff(self, **inputs: Any) -> Any:
"""同步启动 Flow"""
# 1. 构建执行图
self._build_execution_graph()
# 2. 获取所有 start 方法
start_methods = self._get_start_methods()
if not start_methods:
raise ValueError("No @start() methods found in Flow")
# 3. 执行所有 start 方法
return self._execute_start_methods(start_methods, inputs)
def _execute_start_methods(
self, start_methods: list[StartMethod], inputs: dict[str, Any]
) -> Any:
"""执行 start 方法"""
results = []
for start_method in start_methods:
# 检查条件
if start_method.condition:
if not start_method.condition.evaluate({"inputs": inputs, "state": self.state}):
continue
# 执行方法
result = self._execute_method(start_method.name, inputs)
results.append(result)
# 触发监听器
self._execute_listeners(start_method.name, result)
# 返回最后一个结果
return results[-1] if results else None
def _execute_method(
self, method_name: str, inputs: dict[str, Any] | None = None
) -> Any:
"""执行单个方法"""
method = self._methods[method_name]
# 更新执行计数
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
# 调用方法
if inputs:
result = method.func(self, **inputs)
else:
result = method.func(self)
# 持久化状态(如果需要)
if self._should_persist_method(method_name):
self._persist_state()
return result
def _execute_listeners(
self, method_name: str, result: Any
) -> None:
"""执行监听器"""
if method_name not in self._listeners:
return
for listener in self._listeners[method_name]:
# 检查条件
if listener.condition:
if not listener.condition.evaluate({
"result": result,
"state": self.state,
"method": method_name
}):
continue
# 执行监听方法
listener_result = self._execute_method(
listener.name,
{"result": result} if listener.needs_result else None
)
# 检查是否为路由器
if listener.name in self._routers:
self._trigger_router(listener.name, listener_result)
else:
# 递归触发后续监听器
self._execute_listeners(listener.name, listener_result)
def _trigger_router(
self, router_name: str, router_result: Any
) -> None:
"""触发路由器"""
router = self._routers[router_name]
# router_result 应该是路由目标(字符串或条件)
if isinstance(router_result, str):
# 触发路由目标的监听器
self._execute_listeners(router_result, None)
elif isinstance(router_result, list):
# 多个路由目标
for target in router_result:
self._execute_listeners(target, None)
使用示例
class SimpleFlow(Flow):
@start()
def step_one(self):
print("Step 1 executing")
self.state["data"] = "processed"
return "step1_done"
@listen("step1_done")
def step_two(self, result):
print(f"Step 2 received: {result}")
return "final_result"
# 执行
flow = SimpleFlow()
result = flow.kickoff()
print(result) # "final_result"
API 2: kickoff_async
基本信息
- 方法签名:
Flow.kickoff_async(**inputs) -> Awaitable[Any] - 调用方式:实例方法(异步)
- 幂等性:否
功能说明
异步启动 Flow 执行,支持 async/await 模式。
核心代码
async def kickoff_async(self, **inputs: Any) -> Any:
"""异步启动 Flow"""
# 1. 构建执行图
self._build_execution_graph()
# 2. 获取所有 start 方法
start_methods = self._get_start_methods()
if not start_methods:
raise ValueError("No @start() methods found in Flow")
# 3. 异步执行所有 start 方法
return await self._execute_start_methods_async(start_methods, inputs)
async def _execute_start_methods_async(
self, start_methods: list[StartMethod], inputs: dict[str, Any]
) -> Any:
"""异步执行 start 方法"""
results = []
for start_method in start_methods:
# 检查条件
if start_method.condition:
if not start_method.condition.evaluate({"inputs": inputs, "state": self.state}):
continue
# 异步执行方法
result = await self._execute_method_async(start_method.name, inputs)
results.append(result)
# 触发监听器
await self._execute_listeners_async(start_method.name, result)
return results[-1] if results else None
async def _execute_method_async(
self, method_name: str, inputs: dict[str, Any] | None = None
) -> Any:
"""异步执行单个方法"""
method = self._methods[method_name]
# 更新执行计数
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
# 调用方法(支持同步和异步方法)
if inspect.iscoroutinefunction(method.func):
# 异步方法
if inputs:
result = await method.func(self, **inputs)
else:
result = await method.func(self)
else:
# 同步方法
if inputs:
result = method.func(self, **inputs)
else:
result = method.func(self)
# 持久化状态
if self._should_persist_method(method_name):
await self._persist_state_async()
return result
使用示例
class AsyncFlow(Flow):
@start()
async def async_step_one(self):
# 异步 API 调用
data = await fetch_data_from_api()
self.state["data"] = data
return "step1_done"
@listen("step1_done")
async def async_step_two(self, result):
# 异步处理
processed = await process_async(self.state["data"])
return processed
# 异步执行
async def main():
flow = AsyncFlow()
result = await flow.kickoff_async()
print(result)
import asyncio
asyncio.run(main())
API 3: plot
基本信息
- 方法签名:
Flow.plot(filename: str = "flow_plot") -> None - 调用方式:实例方法
- 幂等性:是
功能说明
生成 Flow 的可视化图表,保存为 HTML 文件。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| filename | str | 否 | “flow_plot” | - | 输出文件名(不含扩展名) |
核心代码
def plot(self, filename: str = "flow_plot") -> None:
"""生成流程可视化"""
visualizer = FlowPlot(self)
visualizer.plot(filename)
class FlowPlot:
"""流程可视化生成器"""
def plot(self, filename: str) -> None:
"""生成并保存 HTML 可视化"""
# 1. 初始化 PyVis 网络
net = Network(directed=True, height="750px", bgcolor=self.colors["bg"])
# 2. 禁用物理引擎
net.set_options("""
var options = {
"physics": {"enabled": false}
}
""")
# 3. 计算节点层级
node_levels = calculate_node_levels(self.flow)
# 4. 计算节点位置
node_positions = compute_positions(self.flow, node_levels)
# 5. 添加节点到网络
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
# 6. 添加边到网络
add_edges(net, self.flow, node_positions, self.colors)
# 7. 生成 HTML
network_html = net.generate_html()
final_html = self._generate_final_html(network_html)
# 8. 保存文件
with open(f"{filename}.html", "w", encoding="utf-8") as f:
f.write(final_html)
print(f"Plot saved as {filename}.html")
使用示例
# 定义 Flow
class MyFlow(Flow):
@start()
def init(self):
return "start"
@listen("start")
def process(self, result):
return "processed"
@router("processed")
def decide(self):
return "path_a" if self.state.get("flag") else "path_b"
@listen("path_a")
def handle_a(self):
pass
@listen("path_b")
def handle_b(self):
pass
# 生成可视化
flow = MyFlow()
flow.plot("my_workflow") # 生成 my_workflow.html
API 4: @start 装饰器
基本信息
- 装饰器签名:
@start(condition: str | FlowCondition | Callable | None = None) - 应用范围:Flow 类的方法
- 作用:标记方法为 Flow 的入口点
功能说明
将方法标记为 Flow 的开始方法,Flow 启动时会执行所有 @start() 方法。
核心代码
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
"""标记 Flow 的开始方法"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
# 创建 StartMethod 对象
start_method = StartMethod(
name=func.__name__,
func=func,
condition=_parse_condition(condition) if condition else None,
)
# 设置标记
func._is_start_method = True # type: ignore[attr-defined]
func._start_condition = start_method.condition # type: ignore[attr-defined]
return func # type: ignore[return-value]
return decorator
使用示例
基础用法:
class MyFlow(Flow):
@start()
def initialize(self):
self.state["initialized"] = True
return "ready"
条件启动:
class ConditionalFlow(Flow):
@start(condition=lambda ctx: ctx["inputs"].get("mode") == "auto")
def auto_start(self):
# 仅当 inputs.mode == "auto" 时执行
pass
@start(condition=lambda ctx: ctx["inputs"].get("mode") == "manual")
def manual_start(self):
# 仅当 inputs.mode == "manual" 时执行
pass
# 使用
flow = ConditionalFlow()
flow.kickoff(mode="auto") # 仅执行 auto_start
多个 start 方法:
class MultiStartFlow(Flow):
@start()
def start_one(self):
self.state["one"] = True
return "one_done"
@start()
def start_two(self):
self.state["two"] = True
return "two_done"
# 两个 start 方法都会执行
API 5: @listen 装饰器
基本信息
- 装饰器签名:
@listen(condition: str | FlowCondition | Callable) - 应用范围:Flow 类的方法
- 作用:监听指定方法或条件的完成事件
功能说明
将方法标记为监听器,当指定的方法完成或条件满足时执行。
核心代码
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
"""创建监听器方法"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
# 创建 ListenMethod 对象
listen_method = ListenMethod(
name=func.__name__,
func=func,
trigger=_parse_condition(condition),
)
# 设置标记
func._is_listen_method = True # type: ignore[attr-defined]
func._listen_trigger = listen_method.trigger # type: ignore[attr-defined]
return func # type: ignore[return-value]
return decorator
使用示例
监听方法名:
class MyFlow(Flow):
@start()
def step_one(self):
return "result_1"
@listen("step_one")
def step_two(self, result):
# step_one 完成后执行
print(f"Received: {result}")
return "result_2"
监听 AND 条件:
class ParallelFlow(Flow):
@start()
def task_a(self):
self.state["a"] = "done"
return "a_done"
@start()
def task_b(self):
self.state["b"] = "done"
return "b_done"
@listen(and_("task_a", "task_b"))
def combine(self):
# task_a 和 task_b 都完成后执行
print("Both tasks completed")
监听 OR 条件:
class AlternativeFlow(Flow):
@start()
def path_a(self):
return "a_result"
@start()
def path_b(self):
return "b_result"
@listen(or_("path_a", "path_b"))
def handle_either(self, result):
# path_a 或 path_b 其中一个完成就执行
print(f"Got result: {result}")
API 6: @router 装饰器
基本信息
- 装饰器签名:
@router(condition: str | FlowCondition | Callable) - 应用范围:Flow 类的方法
- 作用:根据返回值动态路由到不同的方法
功能说明
将方法标记为路由器,方法的返回值决定接下来执行哪些监听器。
核心代码
def router(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""创建路由器方法"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
# 创建 RouterMethod 对象
router_method = RouterMethod(
name=func.__name__,
func=func,
trigger=_parse_condition(condition),
)
# 设置标记
func._is_router_method = True # type: ignore[attr-defined]
func._router_trigger = router_method.trigger # type: ignore[attr-defined]
return func # type: ignore[return-value]
return decorator
使用示例
基础路由:
class RoutingFlow(Flow):
@start()
def analyze(self):
self.state["score"] = 0.85
return "analyzed"
@router("analyzed")
def decision(self):
if self.state["score"] > 0.8:
return "high_quality"
elif self.state["score"] > 0.5:
return "medium_quality"
else:
return "low_quality"
@listen("high_quality")
def auto_approve(self):
print("Auto-approved")
@listen("medium_quality")
def manual_review(self):
print("Needs review")
@listen("low_quality")
def reject(self):
print("Rejected")
多路由返回:
class MultiRouteFlow(Flow):
@router("process")
def complex_decision(self):
routes = []
if self.state.get("notify_user"):
routes.append("send_notification")
if self.state.get("update_db"):
routes.append("update_database")
if self.state.get("log_event"):
routes.append("log_to_system")
return routes # 返回多个路由目标
循环路由:
class LoopFlow(Flow):
def __init__(self):
super().__init__()
self.state["counter"] = 0
self.state["max_iterations"] = 5
@start("loop")
def process_iteration(self):
self.state["counter"] += 1
print(f"Iteration {self.state['counter']}")
return "processed"
@router("processed")
def should_continue(self):
if self.state["counter"] < self.state["max_iterations"]:
return "loop" # 继续循环
return "complete"
@listen("complete")
def finalize(self):
print("Loop completed")
关键功能流程剖析
功能 1:执行图构建
功能描述
Flow 在启动前构建执行图,分析方法间的依赖关系和触发条件。
核心代码
def _build_execution_graph(self) -> None:
"""构建执行图"""
# 1. 收集所有装饰的方法
for attr_name in dir(self):
attr = getattr(self, attr_name)
# Start 方法
if hasattr(attr, "_is_start_method"):
self._methods[attr_name] = StartMethod(
name=attr_name,
func=attr,
condition=getattr(attr, "_start_condition", None),
)
# Listen 方法
elif hasattr(attr, "_is_listen_method"):
listen_method = ListenMethod(
name=attr_name,
func=attr,
trigger=getattr(attr, "_listen_trigger"),
)
self._methods[attr_name] = listen_method
# 注册监听器
trigger = listen_method.trigger
if isinstance(trigger, str):
# 监听方法名
self._listeners.setdefault(trigger, []).append(listen_method)
elif isinstance(trigger, AndCondition):
# AND 条件:注册到所有触发方法
for condition in trigger.conditions:
if isinstance(condition, str):
self._listeners.setdefault(condition, []).append(listen_method)
elif isinstance(trigger, OrCondition):
# OR 条件:注册到所有触发方法
for condition in trigger.conditions:
if isinstance(condition, str):
self._listeners.setdefault(condition, []).append(listen_method)
# Router 方法
elif hasattr(attr, "_is_router_method"):
router_method = RouterMethod(
name=attr_name,
func=attr,
trigger=getattr(attr, "_router_trigger"),
)
self._methods[attr_name] = router_method
self._routers[attr_name] = router_method
# Router 也是监听器
trigger = router_method.trigger
if isinstance(trigger, str):
self._listeners.setdefault(trigger, []).append(router_method)
# 2. 验证执行图
self._validate_execution_graph()
def _validate_execution_graph(self) -> None:
"""验证执行图的正确性"""
# 检查是否有 start 方法
start_methods = [m for m in self._methods.values() if isinstance(m, StartMethod)]
if not start_methods:
raise ValueError("Flow must have at least one @start() method")
# 检查循环依赖
visited = set()
def check_cycles(method_name: str, path: list[str]):
if method_name in path:
raise ValueError(f"Circular dependency detected: {' -> '.join(path + [method_name])}")
if method_name in visited:
return
visited.add(method_name)
path.append(method_name)
# 检查监听器
if method_name in self._listeners:
for listener in self._listeners[method_name]:
check_cycles(listener.name, path.copy())
path.pop()
for start_method in start_methods:
check_cycles(start_method.name, [])
功能 2:AND/OR 条件组合
功能描述
提供灵活的条件组合,支持复杂的执行逻辑。
核心代码
def and_(*conditions: str | FlowCondition | Callable) -> AndCondition:
"""创建 AND 条件组合"""
parsed_conditions = [_parse_condition(c) for c in conditions]
return AndCondition(conditions=parsed_conditions)
def or_(*conditions: str | FlowCondition | Callable) -> OrCondition:
"""创建 OR 条件组合"""
parsed_conditions = [_parse_condition(c) for c in conditions]
return OrCondition(conditions=parsed_conditions)
class AndCondition(FlowCondition):
"""AND 条件:所有条件都满足"""
conditions: list[FlowCondition]
def evaluate(self, context: dict[str, Any]) -> bool:
# 所有条件都必须满足
return all(c.evaluate(context) for c in self.conditions)
class OrCondition(FlowCondition):
"""OR 条件:任一条件满足"""
conditions: list[FlowCondition]
def evaluate(self, context: dict[str, Any]) -> bool:
# 任一条件满足即可
return any(c.evaluate(context) for c in self.conditions)
复杂组合示例
class ComplexFlow(Flow):
@start()
def task_a(self):
return "a_done"
@start()
def task_b(self):
return "b_done"
@start()
def task_c(self):
return "c_done"
# (A AND B) OR C
@listen(or_(and_("task_a", "task_b"), "task_c"))
def combined(self):
# (task_a 和 task_b 都完成) 或 (task_c 完成)
pass
功能 3:状态持久化
功能描述
将 Flow 状态持久化到存储,支持断点续传和状态恢复。
核心代码
@persist(verbose=True)
class PersistentFlow(Flow):
"""类级持久化"""
pass
class SelectiveFlow(Flow):
"""方法级持久化"""
@persist
@start()
def critical_step(self):
# 仅此方法持久化
pass
def persist(
target: type[Flow] | Callable | None = None,
verbose: bool = False,
) -> Any:
"""持久化装饰器"""
def class_decorator(cls: type[Flow]) -> type[Flow]:
# 类级持久化:所有方法都持久化
cls._persist_all_methods = True
cls._persist_verbose = verbose
return cls
def method_decorator(func: Callable) -> Callable:
# 方法级持久化:仅此方法持久化
func._persist_state = True
func._persist_verbose = verbose
return func
if target is None:
# 带参数调用:@persist(verbose=True)
return class_decorator
elif isinstance(target, type):
# 类装饰器:@persist
return class_decorator(target)
else:
# 方法装饰器:@persist
return method_decorator(target)
def _persist_state(self) -> None:
"""持久化状态到存储"""
if self._state_already_persisted:
return
# 1. 序列化状态
if isinstance(self.state, BaseModel):
state_data = self.state.model_dump()
else:
state_data = dict(self.state)
# 2. 添加元数据
persist_data = {
"state": state_data,
"execution_counts": self._method_execution_counts,
"timestamp": time.time(),
"flow_id": self.state.get("id") if isinstance(self.state, dict) else self.state.id,
}
# 3. 保存到文件
storage_dir = os.getenv("CREWAI_STORAGE_DIR", appdirs.user_data_dir("crewai"))
persist_file = os.path.join(storage_dir, f"flow_{persist_data['flow_id']}.json")
with open(persist_file, "w") as f:
json.dump(persist_data, f, indent=2)
if self._persist_verbose:
print(f"State persisted to {persist_file}")
self._state_already_persisted = True
def _load_state(self, flow_id: str) -> bool:
"""从存储加载状态"""
storage_dir = os.getenv("CREWAI_STORAGE_DIR", appdirs.user_data_dir("crewai"))
persist_file = os.path.join(storage_dir, f"flow_{flow_id}.json")
if not os.path.exists(persist_file):
return False
with open(persist_file, "r") as f:
persist_data = json.load(f)
# 恢复状态
if isinstance(self.state, BaseModel):
self.state = self.state.__class__(**persist_data["state"])
else:
self.state.update(persist_data["state"])
# 恢复执行计数
self._method_execution_counts = persist_data["execution_counts"]
if self._persist_verbose:
print(f"State loaded from {persist_file}")
return True
使用示例
@persist(verbose=True)
class CheckpointFlow(Flow[WorkflowState]):
@start()
def step_one(self):
self.state.step = "step_one"
self.state.results.append("Step 1 done")
return "step1_done"
@listen("step1_done")
def step_two(self):
# 如果在此处失败,可以从 step_one 的状态恢复
self.state.step = "step_two"
self.state.results.append("Step 2 done")
return "step2_done"
# 首次执行
flow = CheckpointFlow(initial_state=WorkflowState())
try:
result = flow.kickoff()
except Exception as e:
print(f"Flow failed: {e}")
# 从保存的状态恢复
flow_id = flow.state.id
recovered_flow = CheckpointFlow(initial_state=WorkflowState())
if recovered_flow._load_state(flow_id):
# 继续执行
result = recovered_flow.kickoff()
实战示例与最佳实践
示例 1:基础线性流程
class SimpleWorkflow(Flow):
@start()
def collect_data(self):
print("Collecting data...")
self.state["data"] = ["item1", "item2", "item3"]
return "data_collected"
@listen("data_collected")
def process_data(self, result):
print(f"Processing data: {self.state['data']}")
self.state["processed"] = [item.upper() for item in self.state["data"]]
return "data_processed"
@listen("data_processed")
def save_results(self, result):
print(f"Saving results: {self.state['processed']}")
return "complete"
# 执行
flow = SimpleWorkflow()
result = flow.kickoff()
print(result) # "complete"
示例 2:并行任务与聚合
class ParallelWorkflow(Flow):
@start()
def fetch_from_api_a(self):
# 模拟 API 调用
self.state["api_a_data"] = {"source": "A", "value": 100}
return "api_a_done"
@start()
def fetch_from_api_b(self):
# 模拟 API 调用
self.state["api_b_data"] = {"source": "B", "value": 200}
return "api_b_done"
@start()
def fetch_from_api_c(self):
# 模拟 API 调用
self.state["api_c_data"] = {"source": "C", "value": 300}
return "api_c_done"
@listen(and_("api_a_done", "api_b_done", "api_c_done"))
def aggregate_results(self):
# 所有 API 都完成后聚合
total = (
self.state["api_a_data"]["value"]
+ self.state["api_b_data"]["value"]
+ self.state["api_c_data"]["value"]
)
self.state["total"] = total
print(f"Total: {total}")
return total
# 执行(三个 start 方法并行执行,然后聚合)
flow = ParallelWorkflow()
result = flow.kickoff()
print(result) # 600
示例 3:条件路由与决策
class DecisionFlow(Flow[WorkflowState]):
@start()
def analyze_request(self):
# 分析请求
self.state.score = 0.75 # 假设计算的分数
self.state.has_errors = False
return "analyzed"
@router("analyzed")
def route_decision(self):
if self.state.has_errors:
return "error_path"
elif self.state.score > 0.8:
return "high_confidence"
elif self.state.score > 0.5:
return "medium_confidence"
else:
return "low_confidence"
@listen("high_confidence")
def auto_approve(self):
print("✅ Auto-approved")
self.state.status = "approved"
@listen("medium_confidence")
def manual_review(self):
print("⚠️ Needs manual review")
self.state.status = "review"
@listen("low_confidence")
def auto_reject(self):
print("❌ Auto-rejected")
self.state.status = "rejected"
@listen("error_path")
def handle_errors(self):
print("🔧 Handling errors")
self.state.status = "error"
# 执行
flow = DecisionFlow(initial_state=WorkflowState())
flow.kickoff()
print(f"Final status: {flow.state.status}")
示例 4:Flow 与 Crew 集成
from crewai import Crew, Agent, Task
class IntegratedFlow(Flow):
@start()
def data_preparation(self):
# 准备数据
self.state["topic"] = "AI Safety"
self.state["requirements"] = "5 key findings"
return "data_ready"
@listen("data_ready")
def run_research_crew(self, result):
# 调用 Crew 进行研究
researcher = Agent(
role="Research Analyst",
goal="Find relevant information",
backstory="Expert researcher",
)
research_task = Task(
description=f"Research {self.state['topic']}",
expected_output=self.state["requirements"],
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
)
result = crew.kickoff()
self.state["research_output"] = result.raw
return "research_done"
@listen("research_done")
def run_writing_crew(self, result):
# 调用 Crew 进行写作
writer = Agent(
role="Content Writer",
goal="Write engaging content",
backstory="Skilled writer",
)
write_task = Task(
description=f"Write a blog post about: {self.state['research_output']}",
expected_output="500-word blog post",
agent=writer,
markdown=True,
)
crew = Crew(
agents=[writer],
tasks=[write_task],
)
result = crew.kickoff()
self.state["final_output"] = result.raw
return result.raw
# 执行
flow = IntegratedFlow()
final_result = flow.kickoff()
print(final_result)
示例 5:循环流程(迭代优化)
class IterativeFlow(Flow):
def __init__(self):
super().__init__()
self.state["iteration"] = 0
self.state["max_iterations"] = 5
self.state["quality_threshold"] = 0.9
self.state["current_quality"] = 0.0
@start("loop")
def generate_content(self):
self.state["iteration"] += 1
print(f"\nIteration {self.state['iteration']}")
# 生成内容(模拟)
import random
self.state["current_quality"] = random.uniform(0.5, 1.0)
print(f"Quality: {self.state['current_quality']:.2f}")
return "generated"
@router("generated")
def check_quality(self):
if self.state["current_quality"] >= self.state["quality_threshold"]:
print("✅ Quality threshold met!")
return "success"
elif self.state["iteration"] >= self.state["max_iterations"]:
print("⚠️ Max iterations reached")
return "max_iterations"
else:
print("🔄 Quality not met, retrying...")
return "loop" # 继续循环
@listen("success")
def finalize_success(self):
print(f"Completed successfully in {self.state['iteration']} iterations")
@listen("max_iterations")
def finalize_failure(self):
print("Could not reach quality threshold")
# 执行
flow = IterativeFlow()
flow.kickoff()
最佳实践总结
1. 何时使用 Flow vs Crew
使用 Flow 当:
- 需要复杂的条件分支和循环
- 需要精确控制执行顺序
- 需要在步骤间共享和修改状态
- 需要集成非 AI 的业务逻辑
使用 Crew 当:
- 需要多个 AI Agent 协作
- 任务依赖关系简单(顺序或层级)
- 主要是 AI 生成任务
- 需要记忆系统和知识库
组合使用:
- Flow 作为主编排器,在各步骤调用 Crew
- Flow 处理业务逻辑,Crew 处理 AI 任务
2. 状态管理策略
结构化状态(推荐):
class MyState(BaseModel):
step: str = "init"
results: list[str] = Field(default_factory=list)
metadata: dict = Field(default_factory=dict)
class MyFlow(Flow[MyState]):
pass
非结构化状态(简单场景):
class MyFlow(Flow):
@start()
def init(self):
self.state["key"] = "value"
3. 错误处理
class RobustFlow(Flow):
@start()
def risky_operation(self):
try:
result = some_risky_function()
self.state["success"] = True
return "success"
except Exception as e:
self.state["error"] = str(e)
self.state["success"] = False
return "error"
@listen("success")
def handle_success(self):
print("Operation succeeded")
@listen("error")
def handle_error(self):
print(f"Operation failed: {self.state['error']}")
# 实现错误恢复逻辑
4. 性能优化
- 并行执行:使用多个
@start()方法 - 异步支持:使用
kickoff_async()和 async 方法 - 避免过深的监听链:超过 10 层考虑重构
- 状态持久化:仅在必要时使用
5. 可视化与调试
# 生成流程图
flow = MyFlow()
flow.plot("my_workflow") # 生成 HTML 可视化
# 查看执行图
print(flow._methods) # 所有方法
print(flow._listeners) # 监听器映射
print(flow._routers) # 路由器映射
总结
Flow 模块是 CrewAI 框架中的事件驱动工作流引擎,通过以下机制实现灵活的流程编排:
- 装饰器驱动:
@start、@listen、@router三种装饰器定义流程 - 灵活的条件组合:AND/OR 条件支持复杂逻辑
- 动态路由:基于运行时状态的条件分支
- 状态管理:结构化或非结构化的状态持久化
- Crew 集成:无缝调用 Crew 执行 AI 任务
- 可视化支持:自动生成流程图
- 异步支持:支持 async/await 模式
该模块的设计遵循 事件驱动架构,将方法间的依赖关系解耦,通过事件监听和路由实现灵活的工作流编排,适用于需要复杂控制流的场景。