CrewAI-04-Flow-完整剖析

模块概览

职责与定位

Flow(流程)是 CrewAI 框架中的事件驱动工作流引擎,负责:

  1. 工作流编排:定义方法间的依赖关系和执行顺序
  2. 状态管理:维护结构化或非结构化的流程状态
  3. 条件路由:根据方法输出动态选择执行路径
  4. 事件监听:响应方法完成事件并触发后续方法
  5. 持久化支持:可选地持久化流程状态和执行历史
  6. 可视化:生成流程结构的可视化图表
  7. Crew 集成:在流程中调用 Crew 执行复杂任务

输入与输出

输入

  • 初始输入参数(通过 @start() 方法接收)
  • 前序方法的返回值(通过 @listen() 方法接收)

输出

  • 最后一个方法的返回值
  • 流程状态(flow.state

上下游依赖

上游依赖(被调用)

  • 用户代码:通过 flow.kickoff()flow.kickoff_async() 启动
  • CLI 工具:crewai flow run 命令

下游依赖(调用)

  • Crew:在流程方法中调用 Crew
  • Task:直接执行任务
  • Events:发送流程执行事件

生命周期

stateDiagram-v2
    [*] --> 创建: Flow() 或 Flow[StateT]()
    创建 --> 配置: 装饰方法(@start/@listen/@router)
    配置 --> 就绪: 等待 kickoff
    就绪 --> 启动: kickoff() 或 kickoff_async()

    启动 --> 执行开始方法: @start() 方法
    执行开始方法 --> 事件触发: 方法完成事件

    事件触发 --> Listen判断: 是否有 @listen()
    Listen判断 --> 执行监听方法: 条件满足

    执行监听方法 --> Router判断: 是否为 @router()
    Router判断 --> 执行路由方法: 是 router
    Router判断 --> 事件触发: 非 router

    执行路由方法 --> 路由决策: 返回路由目标
    路由决策 --> 事件触发: 触发路由目标事件

    事件触发 --> Listen判断: 有后续监听器
    Listen判断 --> 完成: 无后续监听器
    完成 --> [*]

整体服务架构图

1) 分层架构视图

flowchart TB
    subgraph "用户接口层"
        UserCode[用户代码<br/>定义 Flow 类]
        CLI[CLI 工具<br/>crewai flow run]
    end

    subgraph "API 层"
        KickoffAPI[kickoff / kickoff_async<br/>流程启动 API]
        PlotAPI[plot<br/>可视化 API]
        StateAPI[state 属性<br/>状态访问]
    end

    subgraph "装饰器注册层"
        StartDec[@start 装饰器]
        ListenDec[@listen 装饰器]
        RouterDec[@router 装饰器]
        PersistDec[@persist 装饰器]
        FlowMeta[FlowMeta 元类<br/>装饰器元数据收集]
    end

    subgraph "核心执行引擎"
        FlowCore[Flow 核心类]
        ExecutionMgr[执行管理器<br/>_execute_start_method<br/>_execute_method<br/>_execute_listeners]
        ConditionEval[条件评估器<br/>_evaluate_condition<br/>_find_triggered_methods]
        MethodRouter[方法路由器<br/>Router 结果处理]
    end

    subgraph "条件系统"
        AndOr[and_ / or_ 函数]
        ConditionDict[FlowCondition 字典]
        PendingTracker[待定 AND 条件追踪<br/>_pending_and_listeners]
    end

    subgraph "状态管理层"
        StateHolder[状态容器<br/>_state: T | dict]
        MethodOutputs[方法输出缓存<br/>_method_outputs]
        CompletedMethods[已完成方法追踪<br/>_completed_methods]
        ExecutionCounts[执行计数器<br/>_method_execution_counts]
    end

    subgraph "持久化层"
        PersistenceAPI[FlowPersistence 接口]
        SQLiteImpl[SQLiteFlowPersistence]
        StateSerializer[状态序列化器]
        StateLoader[状态加载器]
    end

    subgraph "事件系统"
        EventBus[crewai_event_bus]
        FlowEvents[Flow 事件<br/>FlowStartedEvent<br/>FlowFinishedEvent]
        MethodEvents[Method 事件<br/>MethodExecutionStartedEvent<br/>MethodExecutionFinishedEvent<br/>MethodExecutionFailedEvent]
    end

    subgraph "可视化层"
        FlowPlot[FlowPlot 可视化器]
        LevelCalc[层级计算<br/>calculate_node_levels]
        PositionCalc[位置计算<br/>compute_positions]
        PyVisNet[PyVis Network]
        HTMLGen[HTML 生成器]
    end

    subgraph "外部集成"
        CrewIntegration[Crew 集成]
        AsyncSupport[异步支持<br/>asyncio]
        TracingSupport[追踪支持<br/>OpenTelemetry]
    end

    UserCode --> StartDec
    UserCode --> ListenDec
    UserCode --> RouterDec
    UserCode --> PersistDec

    StartDec --> FlowMeta
    ListenDec --> FlowMeta
    RouterDec --> FlowMeta
    PersistDec --> FlowMeta

    FlowMeta --> FlowCore

    UserCode --> KickoffAPI
    CLI --> KickoffAPI
    UserCode --> PlotAPI

    KickoffAPI --> FlowCore
    PlotAPI --> FlowPlot
    StateAPI --> StateHolder

    FlowCore --> ExecutionMgr
    FlowCore --> StateHolder
    FlowCore --> EventBus

    ExecutionMgr --> ConditionEval
    ExecutionMgr --> MethodRouter
    ExecutionMgr --> MethodOutputs
    ExecutionMgr --> CompletedMethods

    ConditionEval --> AndOr
    ConditionEval --> ConditionDict
    ConditionEval --> PendingTracker

    FlowCore --> PersistenceAPI
    PersistenceAPI --> SQLiteImpl
    SQLiteImpl --> StateSerializer
    SQLiteImpl --> StateLoader

    ExecutionMgr --> EventBus
    EventBus --> FlowEvents
    EventBus --> MethodEvents

    FlowPlot --> LevelCalc
    FlowPlot --> PositionCalc
    FlowPlot --> PyVisNet
    PyVisNet --> HTMLGen

    ExecutionMgr --> CrewIntegration
    ExecutionMgr --> AsyncSupport
    EventBus --> TracingSupport

2) 模块间交互矩阵

调用方 被调方 调用方式 数据流向 错误传播 说明
用户代码 Flow.init() 直接调用 inputs → state 抛出异常 初始化 Flow 实例
用户代码 Flow.kickoff() 同步调用 inputs → result 抛出异常 启动 Flow 执行
kickoff() kickoff_async() asyncio.run() inputs → result 抛出异常 同步包装异步执行
kickoff_async() _execute_start_method() await 并行 - → result 抛出异常 执行所有 start 方法
_execute_start_method() _execute_method() await method → result 抛出异常 执行单个方法
_execute_start_method() _execute_listeners() await result → - 抛出异常 触发监听器链
_execute_listeners() _find_triggered_methods() 直接调用 trigger → list[methods] 返回空列表 查找触发的方法
_execute_listeners() _execute_single_listener() await 并行 result → - 抛出异常 执行单个监听器
_execute_listeners() _evaluate_condition() 直接调用 condition → bool 返回 False 评估条件
_execute_method() crewai_event_bus.emit() 异步触发 event → - 不阻塞 发送执行事件
_execute_method() user_method() 调用/await args → result 抛出异常 执行用户方法
Flow FlowPersistence 条件调用 state → - 抛出异常 持久化状态
Flow.plot() FlowPlot.plot() 直接调用 flow → HTML 抛出异常 生成可视化
FlowMeta 装饰器元数据 类创建时 decorators → metadata - 收集装饰器信息

3) 详细架构图(核心执行流)

flowchart TB
    subgraph "Flow 核心层"
        Flow[Flow 类<br/>流程定义与编排]
        FlowState[状态容器<br/>_state: T \| dict]
    end

    subgraph "装饰器层"
        Start[@start<br/>标记开始方法]
        Listen[@listen<br/>标记监听方法]
        Router[@router<br/>标记路由方法]
    end

    subgraph "条件系统"
        And[and_<br/>与条件组合]
        Or[or_<br/>或条件组合]
        FlowCondition[FlowCondition<br/>条件字典]
        ConditionEval[条件评估器<br/>递归评估嵌套条件]
    end

    subgraph "执行引擎"
        FlowMeta[FlowMeta 元类<br/>收集装饰器元数据]
        ExecuteStart[_execute_start_method<br/>执行 start 方法]
        ExecuteMethod[_execute_method<br/>执行单个方法]
        ExecuteListeners[_execute_listeners<br/>执行监听器链]
        FindTriggered[_find_triggered_methods<br/>查找触发的方法]
        ExecuteSingle[_execute_single_listener<br/>执行单个监听器]
    end

    subgraph "持久化层"
        Persist[@persist 装饰器<br/>类级/方法级]
        PersistAPI[FlowPersistence 接口]
        SQLite[SQLiteFlowPersistence<br/>SQLite 存储]
    end

    subgraph "可视化层"
        FlowPlot[FlowPlot 可视化器<br/>生成流程图]
        PyVis[PyVis Network<br/>图形渲染]
        HTMLTemplate[HTML 模板<br/>最终输出]
    end

    subgraph "事件系统"
        EventBus[crewai_event_bus<br/>事件总线]
        FlowStarted[FlowStartedEvent]
        MethodStarted[MethodExecutionStartedEvent]
        MethodFinished[MethodExecutionFinishedEvent]
        FlowFinished[FlowFinishedEvent]
    end

    subgraph "Crew 集成"
        CrewCall[在方法中调用 Crew.kickoff]
        StateShare[状态共享<br/>Flow.state ↔ Crew inputs]
    end

    Flow --> Start
    Flow --> Listen
    Flow --> Router

    Start --> FlowMeta
    Listen --> FlowMeta
    Router --> FlowMeta

    Listen --> And
    Listen --> Or
    And --> FlowCondition
    Or --> FlowCondition

    FlowMeta --> Flow
    Flow --> ExecuteStart
    ExecuteStart --> ExecuteMethod
    ExecuteStart --> ExecuteListeners

    ExecuteListeners --> FindTriggered
    FindTriggered --> ConditionEval
    ConditionEval --> FlowCondition

    ExecuteListeners --> ExecuteSingle
    ExecuteSingle --> ExecuteMethod

    Flow --> FlowState

    ExecuteMethod --> EventBus
    EventBus --> MethodStarted
    EventBus --> MethodFinished
    Flow --> EventBus
    EventBus --> FlowStarted
    EventBus --> FlowFinished

    Flow --> Persist
    Persist --> PersistAPI
    PersistAPI --> SQLite

    Flow --> FlowPlot
    FlowPlot --> PyVis
    PyVis --> HTMLTemplate

    ExecuteMethod --> CrewCall
    CrewCall --> StateShare
    StateShare --> FlowState

架构要点说明


详细时序图与调用链路

时序图 1:Flow 完整执行流程(kickoff)

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Flow as Flow 实例
    participant Kickoff as kickoff()
    participant KickoffAsync as kickoff_async()
    participant EventBus as 事件总线
    participant StartExec as _execute_start_method()
    participant MethodExec as _execute_method()
    participant ListenerExec as _execute_listeners()
    participant FindTriggered as _find_triggered_methods()
    participant UserMethod as 用户方法
    participant Persistence as 持久化层

    User->>Flow: flow = MyFlow()
    Note over Flow: FlowMeta 已收集装饰器元数据
    User->>Kickoff: flow.kickoff(inputs)

    Kickoff->>KickoffAsync: asyncio.run(_run_flow())
    Note over KickoffAsync: 同步包装异步执行

    KickoffAsync->>KickoffAsync: 重置状态<br/>_completed_methods.clear()

    alt 存在 inputs['id'] 且有持久化
        KickoffAsync->>Persistence: load_state(restore_uuid)
        Persistence-->>KickoffAsync: stored_state
        KickoffAsync->>KickoffAsync: _restore_state(stored_state)
    end

    KickoffAsync->>KickoffAsync: _initialize_state(inputs)

    KickoffAsync->>EventBus: emit(FlowStartedEvent)
    Note over EventBus: 触发 Flow 启动事件

    KickoffAsync->>StartExec: 并行执行所有 @start 方法
    Note over StartExec: asyncio.gather(*tasks)

    loop 每个 start 方法
        StartExec->>StartExec: 检查是否已完成<br/>_completed_methods

        alt 方法未完成
            StartExec->>StartExec: 注入 trigger_payload
            StartExec->>MethodExec: await _execute_method(method_name, method)

            MethodExec->>EventBus: emit(MethodExecutionStartedEvent)

            alt 异步方法
                MethodExec->>UserMethod: await method(*args, **kwargs)
            else 同步方法
                MethodExec->>UserMethod: method(*args, **kwargs)
            end

            UserMethod-->>MethodExec: result

            MethodExec->>MethodExec: 记录输出<br/>_method_outputs.append(result)
            MethodExec->>MethodExec: 更新计数<br/>_method_execution_counts
            MethodExec->>MethodExec: 标记完成<br/>_completed_methods.add()

            MethodExec->>EventBus: emit(MethodExecutionFinishedEvent)

            MethodExec-->>StartExec: result

            StartExec->>ListenerExec: await _execute_listeners(method_name, result)

            Note over ListenerExec: 执行监听器链(详见时序图2

        end
    end

    KickoffAsync->>KickoffAsync: final_output = _method_outputs[-1]

    KickoffAsync->>EventBus: emit(FlowFinishedEvent)

    KickoffAsync-->>Kickoff: final_output
    Kickoff-->>User: final_output

时序图说明

图意概述: 该时序图展示了 Flow 从用户调用 kickoff() 到返回最终结果的完整执行流程,包括状态初始化、事件发送、start 方法并行执行、监听器链触发等关键步骤。

关键路径分解

  1. 同步到异步转换(步骤 1-3)

    • 用户调用同步的 kickoff() 方法
    • 内部通过 asyncio.run() 包装异步的 kickoff_async()
    • 这样用户代码无需关心异步,但内部享受异步的并发优势
  2. 状态管理(步骤 4-8)

    • 重置执行状态:清空已完成方法、方法输出
    • 尝试从持久化存储恢复状态(如果提供了 id
    • 用输入参数初始化/更新状态
  3. 事件驱动(步骤 9、25)

    • Flow 启动前发送 FlowStartedEvent
    • Flow 完成后发送 FlowFinishedEvent
    • 每个方法执行前后发送 MethodExecutionStartedEventMethodExecutionFinishedEvent
    • 事件系统支持追踪、监控、调试
  4. 并行执行 start 方法(步骤 10-23)

    • 使用 asyncio.gather() 并行执行所有 @start() 方法
    • 每个 start 方法独立执行,互不阻塞
    • 支持异步方法(自动 await)和同步方法
  5. 监听器链触发(步骤 23)

    • 每个 start 方法完成后,触发其监听器
    • 监听器可以链式触发更多监听器
    • 支持条件组合(AND/OR)

边界与约束

边界 说明
并发度 len(_start_methods) start 方法并行执行
超时 无内置超时 需在用户方法中自行处理
幂等性 每次 kickoff 重新执行
顺序保证 部分保证 start 方法无序,listen 链有序

异常处理

  • 任何方法抛出异常会立即中断执行
  • 发送 MethodExecutionFailedEvent
  • 异常向上传播到 kickoff() 调用方
  • 已完成的方法状态不会回滚

性能考虑

  • 并行优势:多个 start 方法并行执行,提高吞吐量
  • 事件开销:每个方法执行触发 2 个事件,有一定性能成本
  • 状态持久化:仅在配置 @persist 时才会持久化,避免不必要的 I/O

时序图 2:监听器执行与条件评估

sequenceDiagram
    autonumber
    participant Caller as 调用方<br/>(_execute_start_method)
    participant ListenerExec as _execute_listeners()
    participant FindTriggered as _find_triggered_methods()
    participant ConditionEval as _evaluate_condition()
    participant PendingTracker as _pending_and_listeners
    participant SingleExec as _execute_single_listener()
    participant MethodExec as _execute_method()
    participant UserMethod as 用户方法

    Caller->>ListenerExec: _execute_listeners(trigger_method, result)
    Note over ListenerExec: trigger_method = "step_one"<br/>result = "step1_result"

    rect rgb(255, 245, 235)
        Note over ListenerExec,FindTriggered: 阶段 1: 处理 Router(顺序执行)

        ListenerExec->>ListenerExec: current_trigger = trigger_method

        loop 直到没有 router 被触发
            ListenerExec->>FindTriggered: _find_triggered_methods(current_trigger, router_only=True)

            FindTriggered->>FindTriggered: 遍历 _listeners

            loop 每个监听器
                FindTriggered->>FindTriggered: 检查是否为 router<br/>listener_name in _routers

                alt 是 router
                    FindTriggered->>ConditionEval: _evaluate_condition(condition, trigger_method)

                    alt OR 条件
                        ConditionEval->>ConditionEval: 检查 trigger_method in methods
                        ConditionEval-->>FindTriggered: True/False
                    else AND 条件
                        ConditionEval->>PendingTracker: 检查/更新待定集合
                        PendingTracker->>PendingTracker: 移除 trigger_method
                        alt 所有条件满足
                            PendingTracker-->>ConditionEval: True
                        else 仍有待定
                            PendingTracker-->>ConditionEval: False
                        end
                        ConditionEval-->>FindTriggered: True/False
                    end
                end
            end

            FindTriggered-->>ListenerExec: triggered_routers = ["decision_router"]

            alt 有 router 被触发
                ListenerExec->>SingleExec: await _execute_single_listener(router_name, result)

                SingleExec->>SingleExec: 检查方法签名<br/>是否接受 result 参数

                alt 接受参数
                    SingleExec->>MethodExec: _execute_method(router_name, method, result=result)
                else 不接受参数
                    SingleExec->>MethodExec: _execute_method(router_name, method)
                end

                MethodExec->>UserMethod: await/call method()
                UserMethod-->>MethodExec: router_result = "high_confidence"
                MethodExec-->>SingleExec: router_result

                SingleExec->>ListenerExec: 递归 _execute_listeners(router_name, router_result)

                ListenerExec->>ListenerExec: current_trigger = router_result
                ListenerExec->>ListenerExec: router_results.append(router_result)
            else 无 router 触发
                Note over ListenerExec: 退出 router 循环
            end
        end
    end

    rect rgb(235, 245, 255)
        Note over ListenerExec,SingleExec: 阶段 2: 处理普通 Listener(并行执行)

        ListenerExec->>ListenerExec: all_triggers = [trigger_method, *router_results]

        loop 每个 trigger
            ListenerExec->>FindTriggered: _find_triggered_methods(current_trigger, router_only=False)

            FindTriggered->>FindTriggered: 遍历 _listeners(跳过 router)

            loop 每个监听器
                alt 不是 router 且不是 start
                    FindTriggered->>ConditionEval: _evaluate_condition(condition, trigger)
                    Note over ConditionEval: 同样的条件评估逻辑
                    ConditionEval-->>FindTriggered: True/False
                end
            end

            FindTriggered-->>ListenerExec: triggered_listeners = ["handle_a", "handle_b"]

            alt 有监听器被触发
                par 并行执行所有监听器
                    ListenerExec->>SingleExec: _execute_single_listener("handle_a", result)
                    SingleExec->>MethodExec: _execute_method("handle_a", method)
                    MethodExec->>UserMethod: await/call handle_a()
                    UserMethod-->>MethodExec: result_a
                    MethodExec-->>SingleExec: result_a
                    SingleExec->>ListenerExec: 递归 _execute_listeners("handle_a", result_a)
                and
                    ListenerExec->>SingleExec: _execute_single_listener("handle_b", result)
                    SingleExec->>MethodExec: _execute_method("handle_b", method)
                    MethodExec->>UserMethod: await/call handle_b()
                    UserMethod-->>MethodExec: result_b
                    MethodExec-->>SingleExec: result_b
                    SingleExec->>ListenerExec: 递归 _execute_listeners("handle_b", result_b)
                end

                Note over ListenerExec,SingleExec: asyncio.gather(*tasks)
            end
        end
    end

    ListenerExec-->>Caller: 完成

    rect rgb(245, 255, 245)
        Note over Caller,UserMethod: 特殊情况:循环流程

        alt router 结果触发已完成的 start 方法
            ListenerExec->>ListenerExec: 检查 start 方法条件
            alt 条件满足且方法已完成
                ListenerExec->>ListenerExec: 临时清除 _is_execution_resuming
                ListenerExec->>Caller: await _execute_start_method(method_name)
                Note over Caller: 循环重新执行
            end
        end
    end

时序图说明

图意概述: 该时序图详细展示了监听器执行的三个阶段:Router 顺序执行、普通 Listener 并行执行、以及循环流程的特殊处理。涵盖了 OR/AND 条件评估的完整逻辑。

关键路径分解

  1. 阶段 1:Router 顺序执行(步骤 3-29)

    • 目的:Router 返回的路由路径决定后续流程,必须顺序执行
    • 流程
      • 循环查找被当前 trigger 触发的 router
      • 执行 router 方法,获取路由结果(如 "high_confidence"
      • 将路由结果作为新的 trigger,继续查找 router
      • 直到没有 router 被触发
    • 示例
      @start()
      def analyze(self): return "analyzed"
      
      @router("analyzed")
      def decide(self):
          return "path_a" if condition else "path_b"
      
      @router("path_a")
      def sub_decide(self):
          return "final_path"
      
      执行顺序:analyze → decide → sub_decide
  2. 阶段 2:Listener 并行执行(步骤 30-52)

    • 目的:普通监听器之间无依赖,可以并行执行提高性能
    • 流程
      • 收集所有 trigger(原始 trigger + router 结果)
      • 对每个 trigger,查找触发的非 router 监听器
      • 使用 asyncio.gather() 并行执行所有监听器
    • 示例
      @listen("data_ready")
      def process_a(self): pass  # 并行
      
      @listen("data_ready")
      def process_b(self): pass  # 并行
      
  3. 阶段 3:循环流程处理(步骤 53-58)

    • 目的:支持循环/迭代模式的 Flow
    • 流程
      • Router 结果可能触发已完成的 start 方法
      • 临时清除 _is_execution_resuming 标志
      • 重新执行 start 方法
    • 示例
      @start("loop")
      def iterate(self):
          self.state["count"] += 1
          return "processed"
      
      @router("processed")
      def check(self):
          return "loop" if self.state["count"] < 5 else "done"
      

条件评估详解

OR 条件(步骤 17):

@listen(or_("method_a", "method_b"))
def handler(self): pass
  • 任一 trigger 匹配即触发
  • 实现:trigger_method in methods

AND 条件(步骤 18-26):

@listen(and_("method_a", "method_b"))
def handler(self): pass
  • 所有 trigger 都匹配才触发
  • 实现:
    1. 初始化待定集合:{"method_a", "method_b"}
    2. 每次 trigger 移除一个:discard(trigger_method)
    3. 集合为空时触发:not pending_set

嵌套条件

@listen(or_(and_("a", "b"), "c"))
def handler(self): pass
  • 递归评估:_evaluate_condition() 处理嵌套结构

边界与约束

场景 行为 说明
Router 无返回值 跳过 返回 None 不触发路由
Router 返回列表 多路由 每个路由目标都会触发
Listener 并发数 无限制 asyncio.gather 并发数取决于监听器数量
循环深度 无限制 需在用户代码中控制退出条件
AND 条件超时 永不触发 如果某个 trigger 永不到达,永远等待

性能优化点

  1. Router 顺序执行:避免路由决策的竞态条件
  2. Listener 并行执行:充分利用异步并发能力
  3. 递归触发:每个监听器完成后递归触发下游,形成调用链

时序图 3:状态持久化与恢复

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Flow as Flow 实例
    participant Kickoff as kickoff_async()
    participant Persistence as FlowPersistence
    participant SQLite as SQLiteFlowPersistence
    participant MethodExec as _execute_method()
    participant UserMethod as 用户方法

    rect rgb(255, 245, 245)
        Note over User,SQLite: 场景 1: 首次执行(无持久化状态)

        User->>Flow: flow = MyFlow()
        User->>Kickoff: flow.kickoff()

        Kickoff->>Kickoff: 检查 inputs['id']
        Note over Kickoff: inputs = None   'id'

        Kickoff->>Kickoff: _initialize_state(inputs)
        Note over Kickoff: 使用默认状态

        Kickoff->>MethodExec: 执行 start 方法
        MethodExec->>UserMethod: method()
        UserMethod->>UserMethod: 修改 self.state
        UserMethod-->>MethodExec: result

        alt @persist 装饰
            MethodExec->>Persistence: save_state(flow_id, state)
            Persistence->>SQLite: 序列化并存储
            SQLite->>SQLite: INSERT INTO flow_state
            SQLite-->>Persistence: success
            Persistence-->>MethodExec: success
        end

        MethodExec-->>Kickoff: result
        Kickoff-->>User: final_output
    end

    rect rgb(245, 255, 245)
        Note over User,SQLite: 场景 2: 恢复执行(有持久化状态)

        User->>Flow: flow = MyFlow()
        Note over Flow: 新实例,状态为默认值

        User->>Kickoff: flow.kickoff({"id": "uuid-123"})

        Kickoff->>Kickoff: 检查 inputs['id']  persistence
        Note over Kickoff: 发现 id = "uuid-123"

        Kickoff->>Persistence: load_state("uuid-123")
        Persistence->>SQLite: 从数据库加载
        SQLite->>SQLite: SELECT * FROM flow_state WHERE id='uuid-123'
        SQLite-->>Persistence: stored_state = {<br/>"state": {...},<br/>"completed_methods": [...],<br/>"method_outputs": [...]<br/>}

        Persistence-->>Kickoff: stored_state

        Kickoff->>Kickoff: _restore_state(stored_state)

        rect rgb(255, 255, 235)
            Note over Kickoff: 恢复状态字段

            Kickoff->>Kickoff: self._state = stored_state["state"]
            Kickoff->>Kickoff: self._completed_methods = set(stored_state["completed_methods"])
            Kickoff->>Kickoff: self._method_outputs = stored_state["method_outputs"]
            Kickoff->>Kickoff: self._method_execution_counts = stored_state["execution_counts"]
            Kickoff->>Kickoff: self._is_execution_resuming = True
        end

        Kickoff->>MethodExec: 执行 start 方法

        alt 方法已在 _completed_methods
            MethodExec->>MethodExec: 跳过执行
            Note over MethodExec: 但仍触发监听器
            MethodExec->>MethodExec: _execute_listeners(method_name, last_output)
        else 方法未完成
            MethodExec->>UserMethod: 正常执行 method()
            UserMethod-->>MethodExec: result

            alt @persist 装饰
                MethodExec->>Persistence: save_state(flow_id, state)
                Persistence->>SQLite: UPDATE flow_state
                SQLite-->>Persistence: success
            end
        end

        MethodExec-->>Kickoff: result
        Kickoff->>Kickoff: self._is_execution_resuming = False
        Kickoff-->>User: final_output
    end

    rect rgb(245, 245, 255)
        Note over User,SQLite: 场景 3: 持久化配置(类级/方法级)

        Note over Flow: 类级持久化
        User->>User: @persist(verbose=True)<br/>class MyFlow(Flow): ...
        Note over Flow: 所有方法自动持久化

        Note over Flow: 方法级持久化
        User->>User: @persist<br/>@start()<br/>def critical_step(self): ...
        Note over Flow:  critical_step 持久化
    end

时序图说明

图意概述: 该时序图展示了 Flow 状态持久化的三种场景:首次执行(无持久化)、恢复执行(有持久化状态)、以及不同级别的持久化配置。

关键路径分解

  1. 场景 1:首次执行(步骤 1-16)

    • 用户创建 Flow 实例并调用 kickoff()
    • 没有提供 id 或持久化未配置
    • 使用默认状态初始化
    • 执行方法时,如果有 @persist 装饰,保存状态到 SQLite
    • 保存内容包括:
      • state:Flow 状态
      • completed_methods:已完成方法列表
      • method_outputs:方法输出历史
      • execution_counts:方法执行计数
  2. 场景 2:恢复执行(步骤 17-43)

    • 用户提供 id 参数:flow.kickoff({"id": "uuid-123"})
    • Flow 从持久化存储加载状态
    • 恢复所有状态字段
    • 设置 _is_execution_resuming = True
    • 已完成的方法跳过执行,但仍触发监听器
    • 未完成的方法正常执行
    • 用途
      • 断点续传:Flow 执行失败后从中断点恢复
      • 长时间运行:分多次执行复杂 Flow
  3. 场景 3:持久化配置(步骤 44-50)

    • 类级持久化@persist 装饰类
      • 所有方法自动持久化
      • 适用于需要完整恢复的 Flow
    • 方法级持久化@persist 装饰方法
      • 仅特定方法持久化
      • 适用于仅关键步骤需要恢复的场景

持久化数据结构

stored_state = {
    "id": "uuid-123",
    "state": {
        "counter": 5,
        "results": ["r1", "r2"],
        # ... 其他状态字段
    },
    "completed_methods": ["start_method", "process_method"],
    "method_outputs": ["result1", "result2"],
    "execution_counts": {
        "start_method": 1,
        "process_method": 3  # 可能在循环中执行多次
    },
    "timestamp": 1234567890.123
}

边界与约束

边界 值/行为 说明
状态大小 无硬性限制 SQLite 默认最大 1GB,可配置
恢复超时 无超时 加载状态是同步操作
并发写入 串行化 SQLite 文件锁避免冲突
状态版本 不支持 状态结构变更需手动处理
ID 唯一性 必须唯一 重复 ID 会覆盖旧状态

异常处理

  • 加载失败:状态不存在或损坏
    • 记录警告日志
    • 使用默认状态继续执行
  • 保存失败:磁盘满或权限不足
    • 抛出 RuntimeError
    • 中断 Flow 执行

性能考虑

  • I/O 开销:每个持久化方法执行后都会写入 SQLite
  • 优化策略
    • 仅对关键方法使用方法级持久化
    • 批量写入:收集多个状态更新后一次写入(当前未实现)
    • 异步持久化:不阻塞方法执行(当前未实现)

时序图 4:Flow 可视化生成

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Flow as Flow 实例
    participant PlotAPI as plot()
    participant EventBus as 事件总线
    participant FlowPlot as FlowPlot 类
    participant LevelCalc as calculate_node_levels()
    participant PositionCalc as compute_positions()
    participant PyVis as PyVis Network
    participant NodeAdder as add_nodes_to_network()
    participant EdgeAdder as add_edges()
    participant HTMLGen as HTML 生成器
    participant FileSystem as 文件系统

    User->>PlotAPI: flow.plot("my_workflow")

    PlotAPI->>EventBus: emit(FlowPlotEvent)
    Note over EventBus: 触发可视化事件

    PlotAPI->>FlowPlot: FlowPlot(flow)
    FlowPlot->>FlowPlot: 初始化配置<br/>colors, node_styles

    FlowPlot->>PyVis: Network(directed=True, height="750px")
    PyVis-->>FlowPlot: net

    FlowPlot->>PyVis: net.set_options({physics: false})
    Note over PyVis: 禁用物理引擎<br/>使用固定布局

    rect rgb(255, 245, 235)
        Note over FlowPlot,PositionCalc: 阶段 1: 计算节点布局

        FlowPlot->>LevelCalc: calculate_node_levels(flow)

        LevelCalc->>LevelCalc: 分析 _start_methods
        Note over LevelCalc: start 方法在第 0 层

        LevelCalc->>LevelCalc: 遍历 _listeners 构建层级

        loop 广度优先遍历
            LevelCalc->>LevelCalc: 父节点层级 + 1
            Note over LevelCalc: 处理 OR/AND 条件
        end

        LevelCalc-->>FlowPlot: node_levels = {<br/>"start": 0,<br/>"process": 1,<br/>"decide": 2,<br/>...}

        FlowPlot->>PositionCalc: compute_positions(flow, node_levels)

        PositionCalc->>PositionCalc: 计算每层节点数
        PositionCalc->>PositionCalc: 计算每层宽度

        loop 每个节点
            PositionCalc->>PositionCalc: x = layer_index * X_SPACING
            PositionCalc->>PositionCalc: y = node_index * Y_SPACING - layer_height/2
            Note over PositionCalc: 居中对齐
        end

        PositionCalc-->>FlowPlot: node_positions = {<br/>"start": (0, 0),<br/>"process": (300, 0),<br/>...}
    end

    rect rgb(245, 255, 235)
        Note over FlowPlot,NodeAdder: 阶段 2: 添加节点

        FlowPlot->>NodeAdder: add_nodes_to_network(net, flow, positions, styles)

        loop 每个方法
            NodeAdder->>NodeAdder: 确定节点类型<br/>(start/listen/router)

            NodeAdder->>NodeAdder: 确定节点样式<br/>color, shape, border

            alt 方法调用 Crew
                NodeAdder->>NodeAdder: 添加虚线边框<br/>borderWidthSelected: 4
            end

            NodeAdder->>NodeAdder: 格式化标签<br/><b>方法名</b><br/><i>类型</i>

            NodeAdder->>PyVis: net.add_node(<br/>id=method_name,<br/>label=formatted_label,<br/>x=pos[0], y=pos[1],<br/>**style_options)
        end

        NodeAdder-->>FlowPlot: 完成
    end

    rect rgb(245, 235, 255)
        Note over FlowPlot,EdgeAdder: 阶段 3: 添加边

        FlowPlot->>EdgeAdder: add_edges(net, flow, positions, colors)

        loop 每个监听器
            EdgeAdder->>EdgeAdder: 获取 trigger 方法

            alt OR 条件
                EdgeAdder->>EdgeAdder: 实线箭头
                EdgeAdder->>PyVis: net.add_edge(<br/>from=trigger,<br/>to=listener,<br/>dashes=False,<br/>color="purple")
            else AND 条件
                EdgeAdder->>EdgeAdder: 虚线箭头
                EdgeAdder->>PyVis: net.add_edge(<br/>from=trigger,<br/>to=listener,<br/>dashes=True,<br/>color="blue")
            end
        end

        loop 每个 router
            EdgeAdder->>EdgeAdder: 获取 router_paths

            loop 每个路由路径
                EdgeAdder->>PyVis: net.add_edge(<br/>from=router,<br/>to=path,<br/>dashes=False,<br/>color="green",<br/>label=path_name)
            end
        end

        EdgeAdder-->>FlowPlot: 完成
    end

    rect rgb(255, 255, 235)
        Note over FlowPlot,FileSystem: 阶段 4: 生成 HTML

        FlowPlot->>PyVis: net.generate_html()
        PyVis-->>FlowPlot: network_html

        FlowPlot->>HTMLGen: 读取模板<br/>crewai_flow_visual_template.html
        HTMLGen-->>FlowPlot: html_template

        FlowPlot->>HTMLGen: 生成图例 HTML
        HTMLGen-->>FlowPlot: legend_html

        FlowPlot->>FlowPlot: 替换模板变量<br/>{{ title }}<br/>{{ network_content }}<br/>{{ legend }}

        FlowPlot->>FileSystem: 写入文件<br/>"my_workflow.html"
        FileSystem-->>FlowPlot: success

        FlowPlot->>User: 打印消息<br/>"Plot saved as my_workflow.html"
    end

    User->>User: 在浏览器打开<br/>"my_workflow.html"

时序图说明

图意概述: 该时序图展示了 Flow 可视化生成的完整流程,从用户调用 plot() 到生成交互式 HTML 文件的四个阶段:布局计算、节点添加、边添加、HTML 生成。

关键路径分解

  1. 阶段 1:计算节点布局(步骤 8-26)

    • 层级计算
      • Start 方法在第 0 层
      • 广度优先遍历监听器,每层 +1
      • 处理 AND/OR 条件的最大层级
    • 位置计算
      • X 坐标:layer_index * X_SPACING (默认 300px)
      • Y 坐标:层内居中对齐
      • 避免节点重叠
  2. 阶段 2:添加节点(步骤 27-42)

    • 节点类型识别
      • Start:圆形,绿色边框
      • Listen:椭圆形,紫色边框
      • Router:菱形,蓝色边框
    • 特殊标记
      • 调用 Crew 的方法:虚线边框
    • 标签格式化
      <b>method_name</b>
      <i>@start</i>
      
  3. 阶段 3:添加边(步骤 43-60)

    • 边类型
      • OR 条件:实线箭头(紫色)
      • AND 条件:虚线箭头(蓝色)
      • Router 路径:实线箭头(绿色),带标签
    • 边权重
      • 根据层级差计算边的长度
  4. 阶段 4:生成 HTML(步骤 61-72)

    • 模板替换
      • {{ title }}:流程名称
      • {{ network_content }}:PyVis 生成的网络图
      • {{ legend }}:图例 HTML
    • 输出文件
      • 独立的 HTML 文件,可直接在浏览器打开
      • 包含交互功能:缩放、拖拽、节点点击

可视化元素说明

元素类型 样式 说明
Start 节点 ⭕ 绿色圆形 流程入口
Listen 节点 ⬭ 紫色椭圆 监听器
Router 节点 ◆ 蓝色菱形 路由器
Crew 调用节点 - - 虚线边框 调用 Crew 的方法
OR 边 ─→ 紫色实线 或条件
AND 边 ┄→ 蓝色虚线 与条件
Router 边 ═→ 绿色实线 路由路径

边界与约束

边界 说明
最大节点数 ~100 超过后布局可能混乱
最大层级 无限制 但深层级可读性差
文件大小 ~1-5 MB 包含 PyVis 库和网络数据
浏览器兼容性 现代浏览器 需支持 JavaScript

性能考虑

  • 布局计算:O(n) 时间复杂度,n 为方法数
  • 节点渲染:PyVis 使用 vis.js,支持大规模网络
  • 交互性能:节点数 < 100 时流畅,> 200 时可能卡顿

调用链路详解

调用链路 1:用户代码 → kickoff → 方法执行

用户代码
   flow.kickoff(inputs)
Flow.kickoff()
   asyncio.run(_run_flow())
Flow.kickoff_async()
   1. 状态初始化
     - _initialize_state(inputs)
     - load_state(id) [可选]
     - _restore_state(stored_state) [可选]
   2. 发送 FlowStartedEvent
     - crewai_event_bus.emit(FlowStartedEvent)
   3. 并行执行 start 方法
     - asyncio.gather(*[_execute_start_method(m) for m in _start_methods])
  
_execute_start_method(start_method_name)
   1. 检查是否已完成
     - if start_method_name in _completed_methods
   2. 注入 trigger_payload
     - _inject_trigger_payload_for_start_method(method)
   3. 执行方法
     - _execute_method(start_method_name, enhanced_method)
  
_execute_method(method_name, method, *args, **kwargs)
   1. 发送 MethodExecutionStartedEvent
     - crewai_event_bus.emit(MethodExecutionStartedEvent)
   2. 调用用户方法
     - result = await method()  method()
   3. 记录结果
     - _method_outputs.append(result)
     - _method_execution_counts[method_name] += 1
     - _completed_methods.add(method_name)
   4. 发送 MethodExecutionFinishedEvent
     - crewai_event_bus.emit(MethodExecutionFinishedEvent)
   5. 持久化(如果配置)
     - persistence.save_state(flow_id, state)
   返回 result
_execute_start_method()
   4. 触发监听器
     - _execute_listeners(start_method_name, result)
  
[进入调用链路 2]

关键代码

# lib/crewai/src/crewai/flow/flow.py:906-917
def kickoff(self, inputs: dict[str, Any] | None = None) -> Any:
    """同步启动 Flow,内部包装异步执行"""
    async def _run_flow() -> Any:
        return await self.kickoff_async(inputs)
    return asyncio.run(_run_flow())

# lib/crewai/src/crewai/flow/flow.py:919-1036
async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> Any:
    """异步启动 Flow,核心执行逻辑"""
    # 1. 状态初始化和恢复
    is_restoring = inputs and "id" in inputs and self._persistence is not None
    if not is_restoring:
        self._completed_methods.clear()
        self._method_outputs.clear()
        self._pending_and_listeners.clear()

    # 2. 从持久化存储恢复状态
    if "id" in inputs and self._persistence is not None:
        stored_state = self._persistence.load_state(inputs["id"])
        if stored_state:
            self._restore_state(stored_state)

    # 3. 发送 FlowStartedEvent
    crewai_event_bus.emit(self, FlowStartedEvent(...))

    # 4. 并行执行所有 start 方法
    tasks = [
        self._execute_start_method(start_method)
        for start_method in self._start_methods
    ]
    await asyncio.gather(*tasks)

    # 5. 返回最后一个输出
    final_output = self._method_outputs[-1] if self._method_outputs else None
    crewai_event_bus.emit(self, FlowFinishedEvent(...))
    return final_output

# lib/crewai/src/crewai/flow/flow.py:1039-1072
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
    """执行单个 start 方法"""
    # 1. 检查是否已完成(恢复场景)
    if start_method_name in self._completed_methods:
        if self._is_execution_resuming:
            # 跳过执行,但触发监听器
            last_output = self._method_outputs[-1] if self._method_outputs else None
            await self._execute_listeners(start_method_name, last_output)
            return

    # 2. 获取方法并注入 payload
    method = self._methods[start_method_name]
    enhanced_method = self._inject_trigger_payload_for_start_method(method)

    # 3. 执行方法
    result = await self._execute_method(start_method_name, enhanced_method)

    # 4. 触发监听器
    await self._execute_listeners(start_method_name, result)

# lib/crewai/src/crewai/flow/flow.py:1110-1172
async def _execute_method(
    self, method_name: FlowMethodName, method: Callable, *args, **kwargs
) -> Any:
    """执行单个方法的核心逻辑"""
    try:
        # 1. 发送开始事件
        crewai_event_bus.emit(self, MethodExecutionStartedEvent(...))

        # 2. 执行方法(支持同步和异步)
        result = (
            await method(*args, **kwargs)
            if asyncio.iscoroutinefunction(method)
            else method(*args, **kwargs)
        )

        # 3. 记录结果
        self._method_outputs.append(result)
        self._method_execution_counts[method_name] = (
            self._method_execution_counts.get(method_name, 0) + 1
        )
        self._completed_methods.add(method_name)

        # 4. 发送完成事件
        crewai_event_bus.emit(self, MethodExecutionFinishedEvent(...))

        return result
    except Exception as e:
        # 5. 发送失败事件
        crewai_event_bus.emit(self, MethodExecutionFailedEvent(...))
        raise e

调用链路 2:监听器触发 → 条件评估 → 递归执行

_execute_listeners(trigger_method, result)
  ↓ 阶段 1: 处理 Router(顺序执行)
  │   ↓ 循环直到没有 router 被触发
  │   ├→ _find_triggered_methods(current_trigger, router_only=True)
  │   │   ↓ 遍历 _listeners
  │   │   ├→ 检查 listener 是否为 router
  │   │   ├→ _evaluate_condition(condition, trigger_method, listener_name)
  │   │   │   ↓ 判断条件类型
  │   │   │   ├→ OR 条件:any(trigger in methods)
  │   │   │   └→ AND 条件:
  │   │   │       ├─ 初始化 _pending_and_listeners[key] = set(methods)
  │   │   │       ├─ discard(trigger_method)
  │   │   │       └─ 返回 len(pending_set) == 0
  │   │   └→ 返回 triggered_routers = [...]
  │   ↓
  │   ├→ _execute_single_listener(router_name, result)
  │   │   ↓ 1. 检查方法签名
  │   │   │   - inspect.signature(method)
  │   │   │   - 决定是否传递 result 参数
  │   │   ↓ 2. 执行方法
  │   │   │   - _execute_method(router_name, method, ...)
  │   │   ↓ 3. 获取 router 结果
  │   │   │   - router_result = _method_outputs[-1]
  │   │   ↓ 4. 递归触发监听器
  │   │   │   - _execute_listeners(router_name, router_result)
  │   │   └→ 返回
  │   ↓
  │   ├─ current_trigger = router_result
  │   └─ router_results.append(router_result)
  ↓ 阶段 2: 处理普通 Listener(并行执行)
  │   ↓ all_triggers = [trigger_method, *router_results]
  │   ↓ 遍历每个 trigger
  │   ├→ _find_triggered_methods(current_trigger, router_only=False)
  │   │   ↓ 遍历 _listeners(跳过 router 和 start)
  │   │   ├→ _evaluate_condition(condition, trigger_method, listener_name)
  │   │   └→ 返回 triggered_listeners = [...]
  │   ↓
  │   ├→ asyncio.gather(*[
  │   │     _execute_single_listener(listener_name, result)
  │   │     for listener_name in triggered_listeners
  │   │  ])
  │   │   ↓ 并行执行所有监听器
  │   │   ├→ _execute_single_listener(listener_1, result)
  │   │   │   ├─ _execute_method(listener_1, method)
  │   │   │   └─ _execute_listeners(listener_1, result_1) [递归]
  │   │   ├→ _execute_single_listener(listener_2, result)
  │   │   │   ├─ _execute_method(listener_2, method)
  │   │   │   └─ _execute_listeners(listener_2, result_2) [递归]
  │   │   └→ ...
  │   └→ 完成
  ↓ 阶段 3: 处理循环流程(可选)
  │   ↓ 如果 router 结果触发已完成的 start 方法
  │   ├→ 遍历 _start_methods
  │   ├→ 检查条件是否满足
  │   └→ _execute_start_method(method_name) [循环重新执行]
  └→ 返回

关键代码

# lib/crewai/src/crewai/flow/flow.py:1174-1261
async def _execute_listeners(
    self, trigger_method: FlowMethodName, result: Any
) -> None:
    """执行所有被触发的监听器和路由器"""

    # 阶段 1: 处理 Router(顺序执行)
    router_results = []
    current_trigger = trigger_method

    while True:
        routers_triggered = self._find_triggered_methods(
            current_trigger, router_only=True
        )
        if not routers_triggered:
            break

        for router_name in routers_triggered:
            await self._execute_single_listener(router_name, result)
            router_result = self._method_outputs[-1] if self._method_outputs else None
            if router_result:
                router_results.append(router_result)
            current_trigger = FlowMethodName(str(router_result)) if router_result else FlowMethodName("")

    # 阶段 2: 处理普通 Listener(并行执行)
    all_triggers = [trigger_method, *router_results]

    for current_trigger in all_triggers:
        if current_trigger:
            listeners_triggered = self._find_triggered_methods(
                current_trigger, router_only=False
            )
            if listeners_triggered:
                tasks = [
                    self._execute_single_listener(listener_name, result)
                    for listener_name in listeners_triggered
                ]
                await asyncio.gather(*tasks)

        # 阶段 3: 处理循环流程
        if current_trigger in router_results:
            for method_name in self._start_methods:
                if method_name in self._listeners:
                    condition_data = self._listeners[method_name]
                    should_trigger = False
                    # ... 条件检查 ...
                    if should_trigger and method_name in self._completed_methods:
                        was_resuming = self._is_execution_resuming
                        self._is_execution_resuming = False
                        await self._execute_start_method(method_name)
                        self._is_execution_resuming = was_resuming

# lib/crewai/src/crewai/flow/flow.py:1311-1373
def _find_triggered_methods(
    self, trigger_method: FlowMethodName, router_only: bool
) -> list[FlowMethodName]:
    """查找被触发的方法"""
    triggered: list[FlowMethodName] = []

    for listener_name, condition_data in self._listeners.items():
        is_router = listener_name in self._routers

        # 根据 router_only 过滤
        if router_only != is_router:
            continue

        # 跳过非 router 的 start 方法
        if not router_only and listener_name in self._start_methods:
            continue

        # 评估条件
        if is_simple_flow_condition(condition_data):
            condition_type, methods = condition_data

            if condition_type == "OR":
                if trigger_method in methods:
                    triggered.append(listener_name)
            elif condition_type == "AND":
                pending_key = PendingListenerKey(listener_name)
                if pending_key not in self._pending_and_listeners:
                    self._pending_and_listeners[pending_key] = set(methods)
                if trigger_method in self._pending_and_listeners[pending_key]:
                    self._pending_and_listeners[pending_key].discard(trigger_method)

                if not self._pending_and_listeners[pending_key]:
                    triggered.append(listener_name)
                    self._pending_and_listeners.pop(pending_key, None)

        elif is_flow_condition_dict(condition_data):
            if self._evaluate_condition(condition_data, trigger_method, listener_name):
                triggered.append(listener_name)

    return triggered

# lib/crewai/src/crewai/flow/flow.py:1263-1309
def _evaluate_condition(
    self,
    condition: FlowMethodName | FlowCondition,
    trigger_method: FlowMethodName,
    listener_name: FlowMethodName,
) -> bool:
    """递归评估条件(支持嵌套)"""

    # 简单条件:直接比较方法名
    if is_flow_method_name(condition):
        return condition == trigger_method

    # 复杂条件:递归评估
    if is_flow_condition_dict(condition):
        normalized = _normalize_condition(condition)
        cond_type = normalized.get("type", "OR")
        sub_conditions = normalized.get("conditions", [])

        if cond_type == "OR":
            # 任一子条件满足
            return any(
                self._evaluate_condition(sub_cond, trigger_method, listener_name)
                for sub_cond in sub_conditions
            )

        if cond_type == "AND":
            # 所有子条件满足
            pending_key = PendingListenerKey(f"{listener_name}:{id(condition)}")

            if pending_key not in self._pending_and_listeners:
                all_methods = set(_extract_all_methods(condition))
                self._pending_and_listeners[pending_key] = all_methods

            if trigger_method in self._pending_and_listeners[pending_key]:
                self._pending_and_listeners[pending_key].discard(trigger_method)

            if not self._pending_and_listeners[pending_key]:
                self._pending_and_listeners.pop(pending_key, None)
                return True

            return False

    return False

# lib/crewai/src/crewai/flow/flow.py:1375-1427
async def _execute_single_listener(
    self, listener_name: FlowMethodName, result: Any
) -> None:
    """执行单个监听器"""

    # 1. 检查是否已完成(避免重复执行)
    if listener_name in self._completed_methods:
        return

    # 2. 获取方法
    method = self._methods[listener_name]

    # 3. 检查方法签名,决定是否传递 result
    sig = inspect.signature(method)
    params = list(sig.parameters.values())

    # 跳过 self 参数
    non_self_params = [p for p in params if p.name != "self"]

    # 4. 执行方法
    if non_self_params:
        # 方法接受参数,传递 result
        listener_result = await self._execute_method(listener_name, method, result=result)
    else:
        # 方法不接受参数
        listener_result = await self._execute_method(listener_name, method)

    # 5. 递归触发监听器
    await self._execute_listeners(listener_name, listener_result)

架构要点说明(续)

1) Flow 与 Crew 的区别

Flow

  • 用途:编排方法间的依赖和顺序
  • 粒度:方法级
  • 状态:显式状态管理
  • 适用场景:复杂业务逻辑、条件分支、循环

Crew

  • 用途:协调 Agent 执行任务
  • 粒度:任务级
  • 状态:隐式(通过任务输出)
  • 适用场景:多 Agent 协作、AI 生成内容

组合使用

class MyFlow(Flow):
    @start()
    def data_collection(self):
        crew = DataCollectionCrew()
        result = crew.crew().kickoff()
        self.state["data"] = result.raw
        return "collected"

    @listen("collected")
    def data_analysis(self):
        crew = AnalysisCrew()
        result = crew.crew().kickoff(inputs={"data": self.state["data"]})
        return result.raw

2) 状态管理的两种模式

结构化状态(Pydantic)

class WorkflowState(BaseModel):
    step: str = "init"
    results: list[str] = []
    user_id: str

class MyFlow(Flow[WorkflowState]):
    def __init__(self):
        super().__init__(initial_state=WorkflowState(user_id="123"))

非结构化状态(字典)

class MyFlow(Flow):
    @start()
    def init(self):
        self.state["counter"] = 0
        self.state["results"] = []

3) 条件组合的三种方式

单条件监听

@listen("method_a")
def method_b(self, result):
    # method_a 完成后执行
    pass

AND 条件

@listen(and_("method_a", "method_b"))
def method_c(self):
    # method_a 和 method_b 都完成后执行
    pass

OR 条件

@listen(or_("method_a", "method_b"))
def method_c(self, result):
    # method_a 或 method_b 其中一个完成就执行
    pass

4) Router 的动态路由机制

@router("analyze")
def decision(self):
    if self.state["score"] > 0.8:
        return "high_confidence"
    elif self.state["errors"]:
        return "error_handling"
    return "manual_review"

@listen("high_confidence")
def auto_process(self):
    # 自动处理路径
    pass

@listen("error_handling")
def handle_errors(self):
    # 错误处理路径
    pass

@listen("manual_review")
def manual_check(self):
    # 人工审核路径
    pass

5) 持久化的两种级别

类级持久化(所有方法):

@persist(verbose=True)
class MyFlow(Flow):
    # 所有方法的状态变更都会持久化
    pass

方法级持久化(选择性):

class MyFlow(Flow):
    @persist
    @start()
    def critical_step(self):
        # 仅此方法的状态变更会持久化
        pass

    @start()
    def temp_step(self):
        # 此方法不持久化
        pass

数据结构 UML 图

Flow 核心类图

classDiagram
    class Flow {
        +StateT state
        +dict _methods
        +dict _listeners
        +dict _routers
        +dict _method_execution_counts
        +bool _state_already_persisted
        +kickoff(inputs)
        +kickoff_async(inputs)
        +plot(filename)
        -_build_execution_graph()
        -_execute_start_methods(start_methods)
        -_execute_method(method_name, inputs)
        -_execute_listeners(method_name, result)
        -_trigger_router(router_name, result)
    }

    class FlowState {
        +str id
    }

    class StartMethod {
        +str name
        +Callable func
        +FlowCondition condition
    }

    class ListenMethod {
        +str name
        +Callable func
        +FlowCondition condition
    }

    class RouterMethod {
        +str name
        +Callable func
        +FlowCondition condition
    }

    class FlowCondition {
        <<interface>>
        +evaluate(context)
    }

    class AndCondition {
        +list[FlowCondition] conditions
        +evaluate(context)
    }

    class OrCondition {
        +list[FlowCondition] conditions
        +evaluate(context)
    }

    class FlowPlot {
        +Flow flow
        +dict colors
        +NodeStyles node_styles
        +plot(filename)
        -_generate_final_html(network_html)
    }

    Flow --> FlowState : 管理
    Flow --> StartMethod : 注册
    Flow --> ListenMethod : 注册
    Flow --> RouterMethod : 注册
    Flow --> FlowPlot : 生成可视化

    FlowCondition <|-- AndCondition : 实现
    FlowCondition <|-- OrCondition : 实现

    StartMethod --> FlowCondition : 使用
    ListenMethod --> FlowCondition : 使用
    RouterMethod --> FlowCondition : 使用

关键字段说明

Flow 核心字段

字段 类型 约束 默认值 说明
state StateT | dict - {} 流程状态
_methods dict[str, Method] private {} 方法注册表
_listeners dict[str, list[Listener]] private {} 监听器映射
_routers dict[str, Router] private {} 路由器映射
_method_execution_counts dict[str, int] private {} 方法执行次数统计
_state_already_persisted bool private False 状态是否已持久化

FlowState 字段

字段 类型 约束 默认值 说明
id str 自动生成 uuid4() 流程实例唯一标识符

Method 装饰器属性

属性 类型 说明
name str 方法名称
func Callable 实际方法函数
condition FlowCondition 执行条件(可选)
trigger str | Condition 触发条件(listen/router)

API 详细规格

API 1: kickoff

基本信息

  • 方法签名Flow.kickoff(**inputs) -> Any
  • 调用方式:实例方法
  • 幂等性:否

功能说明

同步启动 Flow 执行,执行所有 @start() 方法并触发后续的监听器和路由器。

请求结构体

字段 类型 必填 默认 约束 说明
inputs Any - - 传递给 start 方法的参数

核心代码

def kickoff(self, **inputs: Any) -> Any:
    """同步启动 Flow"""
    # 1. 构建执行图
    self._build_execution_graph()

    # 2. 获取所有 start 方法
    start_methods = self._get_start_methods()

    if not start_methods:
        raise ValueError("No @start() methods found in Flow")

    # 3. 执行所有 start 方法
    return self._execute_start_methods(start_methods, inputs)

def _execute_start_methods(
    self, start_methods: list[StartMethod], inputs: dict[str, Any]
) -> Any:
    """执行 start 方法"""
    results = []

    for start_method in start_methods:
        # 检查条件
        if start_method.condition:
            if not start_method.condition.evaluate({"inputs": inputs, "state": self.state}):
                continue

        # 执行方法
        result = self._execute_method(start_method.name, inputs)
        results.append(result)

        # 触发监听器
        self._execute_listeners(start_method.name, result)

    # 返回最后一个结果
    return results[-1] if results else None

def _execute_method(
    self, method_name: str, inputs: dict[str, Any] | None = None
) -> Any:
    """执行单个方法"""
    method = self._methods[method_name]

    # 更新执行计数
    self._method_execution_counts[method_name] = (
        self._method_execution_counts.get(method_name, 0) + 1
    )

    # 调用方法
    if inputs:
        result = method.func(self, **inputs)
    else:
        result = method.func(self)

    # 持久化状态(如果需要)
    if self._should_persist_method(method_name):
        self._persist_state()

    return result

def _execute_listeners(
    self, method_name: str, result: Any
) -> None:
    """执行监听器"""
    if method_name not in self._listeners:
        return

    for listener in self._listeners[method_name]:
        # 检查条件
        if listener.condition:
            if not listener.condition.evaluate({
                "result": result,
                "state": self.state,
                "method": method_name
            }):
                continue

        # 执行监听方法
        listener_result = self._execute_method(
            listener.name,
            {"result": result} if listener.needs_result else None
        )

        # 检查是否为路由器
        if listener.name in self._routers:
            self._trigger_router(listener.name, listener_result)
        else:
            # 递归触发后续监听器
            self._execute_listeners(listener.name, listener_result)

def _trigger_router(
    self, router_name: str, router_result: Any
) -> None:
    """触发路由器"""
    router = self._routers[router_name]

    # router_result 应该是路由目标(字符串或条件)
    if isinstance(router_result, str):
        # 触发路由目标的监听器
        self._execute_listeners(router_result, None)
    elif isinstance(router_result, list):
        # 多个路由目标
        for target in router_result:
            self._execute_listeners(target, None)

使用示例

class SimpleFlow(Flow):
    @start()
    def step_one(self):
        print("Step 1 executing")
        self.state["data"] = "processed"
        return "step1_done"

    @listen("step1_done")
    def step_two(self, result):
        print(f"Step 2 received: {result}")
        return "final_result"

# 执行
flow = SimpleFlow()
result = flow.kickoff()
print(result)  # "final_result"

API 2: kickoff_async

基本信息

  • 方法签名Flow.kickoff_async(**inputs) -> Awaitable[Any]
  • 调用方式:实例方法(异步)
  • 幂等性:否

功能说明

异步启动 Flow 执行,支持 async/await 模式。

核心代码

async def kickoff_async(self, **inputs: Any) -> Any:
    """异步启动 Flow"""
    # 1. 构建执行图
    self._build_execution_graph()

    # 2. 获取所有 start 方法
    start_methods = self._get_start_methods()

    if not start_methods:
        raise ValueError("No @start() methods found in Flow")

    # 3. 异步执行所有 start 方法
    return await self._execute_start_methods_async(start_methods, inputs)

async def _execute_start_methods_async(
    self, start_methods: list[StartMethod], inputs: dict[str, Any]
) -> Any:
    """异步执行 start 方法"""
    results = []

    for start_method in start_methods:
        # 检查条件
        if start_method.condition:
            if not start_method.condition.evaluate({"inputs": inputs, "state": self.state}):
                continue

        # 异步执行方法
        result = await self._execute_method_async(start_method.name, inputs)
        results.append(result)

        # 触发监听器
        await self._execute_listeners_async(start_method.name, result)

    return results[-1] if results else None

async def _execute_method_async(
    self, method_name: str, inputs: dict[str, Any] | None = None
) -> Any:
    """异步执行单个方法"""
    method = self._methods[method_name]

    # 更新执行计数
    self._method_execution_counts[method_name] = (
        self._method_execution_counts.get(method_name, 0) + 1
    )

    # 调用方法(支持同步和异步方法)
    if inspect.iscoroutinefunction(method.func):
        # 异步方法
        if inputs:
            result = await method.func(self, **inputs)
        else:
            result = await method.func(self)
    else:
        # 同步方法
        if inputs:
            result = method.func(self, **inputs)
        else:
            result = method.func(self)

    # 持久化状态
    if self._should_persist_method(method_name):
        await self._persist_state_async()

    return result

使用示例

class AsyncFlow(Flow):
    @start()
    async def async_step_one(self):
        # 异步 API 调用
        data = await fetch_data_from_api()
        self.state["data"] = data
        return "step1_done"

    @listen("step1_done")
    async def async_step_two(self, result):
        # 异步处理
        processed = await process_async(self.state["data"])
        return processed

# 异步执行
async def main():
    flow = AsyncFlow()
    result = await flow.kickoff_async()
    print(result)

import asyncio
asyncio.run(main())

API 3: plot

基本信息

  • 方法签名Flow.plot(filename: str = "flow_plot") -> None
  • 调用方式:实例方法
  • 幂等性:是

功能说明

生成 Flow 的可视化图表,保存为 HTML 文件。

请求结构体

字段 类型 必填 默认 约束 说明
filename str “flow_plot” - 输出文件名(不含扩展名)

核心代码

def plot(self, filename: str = "flow_plot") -> None:
    """生成流程可视化"""
    visualizer = FlowPlot(self)
    visualizer.plot(filename)

class FlowPlot:
    """流程可视化生成器"""

    def plot(self, filename: str) -> None:
        """生成并保存 HTML 可视化"""
        # 1. 初始化 PyVis 网络
        net = Network(directed=True, height="750px", bgcolor=self.colors["bg"])

        # 2. 禁用物理引擎
        net.set_options("""
            var options = {
                "physics": {"enabled": false}
            }
        """)

        # 3. 计算节点层级
        node_levels = calculate_node_levels(self.flow)

        # 4. 计算节点位置
        node_positions = compute_positions(self.flow, node_levels)

        # 5. 添加节点到网络
        add_nodes_to_network(net, self.flow, node_positions, self.node_styles)

        # 6. 添加边到网络
        add_edges(net, self.flow, node_positions, self.colors)

        # 7. 生成 HTML
        network_html = net.generate_html()
        final_html = self._generate_final_html(network_html)

        # 8. 保存文件
        with open(f"{filename}.html", "w", encoding="utf-8") as f:
            f.write(final_html)

        print(f"Plot saved as {filename}.html")

使用示例

# 定义 Flow
class MyFlow(Flow):
    @start()
    def init(self):
        return "start"

    @listen("start")
    def process(self, result):
        return "processed"

    @router("processed")
    def decide(self):
        return "path_a" if self.state.get("flag") else "path_b"

    @listen("path_a")
    def handle_a(self):
        pass

    @listen("path_b")
    def handle_b(self):
        pass

# 生成可视化
flow = MyFlow()
flow.plot("my_workflow")  # 生成 my_workflow.html

API 4: @start 装饰器

基本信息

  • 装饰器签名@start(condition: str | FlowCondition | Callable | None = None)
  • 应用范围:Flow 类的方法
  • 作用:标记方法为 Flow 的入口点

功能说明

将方法标记为 Flow 的开始方法,Flow 启动时会执行所有 @start() 方法。

核心代码

def start(
    condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
    """标记 Flow 的开始方法"""
    def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
        # 创建 StartMethod 对象
        start_method = StartMethod(
            name=func.__name__,
            func=func,
            condition=_parse_condition(condition) if condition else None,
        )

        # 设置标记
        func._is_start_method = True  # type: ignore[attr-defined]
        func._start_condition = start_method.condition  # type: ignore[attr-defined]

        return func  # type: ignore[return-value]

    return decorator

使用示例

基础用法

class MyFlow(Flow):
    @start()
    def initialize(self):
        self.state["initialized"] = True
        return "ready"

条件启动

class ConditionalFlow(Flow):
    @start(condition=lambda ctx: ctx["inputs"].get("mode") == "auto")
    def auto_start(self):
        # 仅当 inputs.mode == "auto" 时执行
        pass

    @start(condition=lambda ctx: ctx["inputs"].get("mode") == "manual")
    def manual_start(self):
        # 仅当 inputs.mode == "manual" 时执行
        pass

# 使用
flow = ConditionalFlow()
flow.kickoff(mode="auto")  # 仅执行 auto_start

多个 start 方法

class MultiStartFlow(Flow):
    @start()
    def start_one(self):
        self.state["one"] = True
        return "one_done"

    @start()
    def start_two(self):
        self.state["two"] = True
        return "two_done"

    # 两个 start 方法都会执行

API 5: @listen 装饰器

基本信息

  • 装饰器签名@listen(condition: str | FlowCondition | Callable)
  • 应用范围:Flow 类的方法
  • 作用:监听指定方法或条件的完成事件

功能说明

将方法标记为监听器,当指定的方法完成或条件满足时执行。

核心代码

def listen(
    condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
    """创建监听器方法"""
    def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
        # 创建 ListenMethod 对象
        listen_method = ListenMethod(
            name=func.__name__,
            func=func,
            trigger=_parse_condition(condition),
        )

        # 设置标记
        func._is_listen_method = True  # type: ignore[attr-defined]
        func._listen_trigger = listen_method.trigger  # type: ignore[attr-defined]

        return func  # type: ignore[return-value]

    return decorator

使用示例

监听方法名

class MyFlow(Flow):
    @start()
    def step_one(self):
        return "result_1"

    @listen("step_one")
    def step_two(self, result):
        # step_one 完成后执行
        print(f"Received: {result}")
        return "result_2"

监听 AND 条件

class ParallelFlow(Flow):
    @start()
    def task_a(self):
        self.state["a"] = "done"
        return "a_done"

    @start()
    def task_b(self):
        self.state["b"] = "done"
        return "b_done"

    @listen(and_("task_a", "task_b"))
    def combine(self):
        # task_a 和 task_b 都完成后执行
        print("Both tasks completed")

监听 OR 条件

class AlternativeFlow(Flow):
    @start()
    def path_a(self):
        return "a_result"

    @start()
    def path_b(self):
        return "b_result"

    @listen(or_("path_a", "path_b"))
    def handle_either(self, result):
        # path_a 或 path_b 其中一个完成就执行
        print(f"Got result: {result}")

API 6: @router 装饰器

基本信息

  • 装饰器签名@router(condition: str | FlowCondition | Callable)
  • 应用范围:Flow 类的方法
  • 作用:根据返回值动态路由到不同的方法

功能说明

将方法标记为路由器,方法的返回值决定接下来执行哪些监听器。

核心代码

def router(
    condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
    """创建路由器方法"""
    def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
        # 创建 RouterMethod 对象
        router_method = RouterMethod(
            name=func.__name__,
            func=func,
            trigger=_parse_condition(condition),
        )

        # 设置标记
        func._is_router_method = True  # type: ignore[attr-defined]
        func._router_trigger = router_method.trigger  # type: ignore[attr-defined]

        return func  # type: ignore[return-value]

    return decorator

使用示例

基础路由

class RoutingFlow(Flow):
    @start()
    def analyze(self):
        self.state["score"] = 0.85
        return "analyzed"

    @router("analyzed")
    def decision(self):
        if self.state["score"] > 0.8:
            return "high_quality"
        elif self.state["score"] > 0.5:
            return "medium_quality"
        else:
            return "low_quality"

    @listen("high_quality")
    def auto_approve(self):
        print("Auto-approved")

    @listen("medium_quality")
    def manual_review(self):
        print("Needs review")

    @listen("low_quality")
    def reject(self):
        print("Rejected")

多路由返回

class MultiRouteFlow(Flow):
    @router("process")
    def complex_decision(self):
        routes = []
        if self.state.get("notify_user"):
            routes.append("send_notification")
        if self.state.get("update_db"):
            routes.append("update_database")
        if self.state.get("log_event"):
            routes.append("log_to_system")
        return routes  # 返回多个路由目标

循环路由

class LoopFlow(Flow):
    def __init__(self):
        super().__init__()
        self.state["counter"] = 0
        self.state["max_iterations"] = 5

    @start("loop")
    def process_iteration(self):
        self.state["counter"] += 1
        print(f"Iteration {self.state['counter']}")
        return "processed"

    @router("processed")
    def should_continue(self):
        if self.state["counter"] < self.state["max_iterations"]:
            return "loop"  # 继续循环
        return "complete"

    @listen("complete")
    def finalize(self):
        print("Loop completed")

关键功能流程剖析

功能 1:执行图构建

功能描述

Flow 在启动前构建执行图,分析方法间的依赖关系和触发条件。

核心代码

def _build_execution_graph(self) -> None:
    """构建执行图"""
    # 1. 收集所有装饰的方法
    for attr_name in dir(self):
        attr = getattr(self, attr_name)

        # Start 方法
        if hasattr(attr, "_is_start_method"):
            self._methods[attr_name] = StartMethod(
                name=attr_name,
                func=attr,
                condition=getattr(attr, "_start_condition", None),
            )

        # Listen 方法
        elif hasattr(attr, "_is_listen_method"):
            listen_method = ListenMethod(
                name=attr_name,
                func=attr,
                trigger=getattr(attr, "_listen_trigger"),
            )
            self._methods[attr_name] = listen_method

            # 注册监听器
            trigger = listen_method.trigger
            if isinstance(trigger, str):
                # 监听方法名
                self._listeners.setdefault(trigger, []).append(listen_method)
            elif isinstance(trigger, AndCondition):
                # AND 条件:注册到所有触发方法
                for condition in trigger.conditions:
                    if isinstance(condition, str):
                        self._listeners.setdefault(condition, []).append(listen_method)
            elif isinstance(trigger, OrCondition):
                # OR 条件:注册到所有触发方法
                for condition in trigger.conditions:
                    if isinstance(condition, str):
                        self._listeners.setdefault(condition, []).append(listen_method)

        # Router 方法
        elif hasattr(attr, "_is_router_method"):
            router_method = RouterMethod(
                name=attr_name,
                func=attr,
                trigger=getattr(attr, "_router_trigger"),
            )
            self._methods[attr_name] = router_method
            self._routers[attr_name] = router_method

            # Router 也是监听器
            trigger = router_method.trigger
            if isinstance(trigger, str):
                self._listeners.setdefault(trigger, []).append(router_method)

    # 2. 验证执行图
    self._validate_execution_graph()

def _validate_execution_graph(self) -> None:
    """验证执行图的正确性"""
    # 检查是否有 start 方法
    start_methods = [m for m in self._methods.values() if isinstance(m, StartMethod)]
    if not start_methods:
        raise ValueError("Flow must have at least one @start() method")

    # 检查循环依赖
    visited = set()
    def check_cycles(method_name: str, path: list[str]):
        if method_name in path:
            raise ValueError(f"Circular dependency detected: {' -> '.join(path + [method_name])}")

        if method_name in visited:
            return

        visited.add(method_name)
        path.append(method_name)

        # 检查监听器
        if method_name in self._listeners:
            for listener in self._listeners[method_name]:
                check_cycles(listener.name, path.copy())

        path.pop()

    for start_method in start_methods:
        check_cycles(start_method.name, [])

功能 2:AND/OR 条件组合

功能描述

提供灵活的条件组合,支持复杂的执行逻辑。

核心代码

def and_(*conditions: str | FlowCondition | Callable) -> AndCondition:
    """创建 AND 条件组合"""
    parsed_conditions = [_parse_condition(c) for c in conditions]
    return AndCondition(conditions=parsed_conditions)

def or_(*conditions: str | FlowCondition | Callable) -> OrCondition:
    """创建 OR 条件组合"""
    parsed_conditions = [_parse_condition(c) for c in conditions]
    return OrCondition(conditions=parsed_conditions)

class AndCondition(FlowCondition):
    """AND 条件:所有条件都满足"""
    conditions: list[FlowCondition]

    def evaluate(self, context: dict[str, Any]) -> bool:
        # 所有条件都必须满足
        return all(c.evaluate(context) for c in self.conditions)

class OrCondition(FlowCondition):
    """OR 条件:任一条件满足"""
    conditions: list[FlowCondition]

    def evaluate(self, context: dict[str, Any]) -> bool:
        # 任一条件满足即可
        return any(c.evaluate(context) for c in self.conditions)

复杂组合示例

class ComplexFlow(Flow):
    @start()
    def task_a(self):
        return "a_done"

    @start()
    def task_b(self):
        return "b_done"

    @start()
    def task_c(self):
        return "c_done"

    # (A AND B) OR C
    @listen(or_(and_("task_a", "task_b"), "task_c"))
    def combined(self):
        # (task_a 和 task_b 都完成) 或 (task_c 完成)
        pass

功能 3:状态持久化

功能描述

将 Flow 状态持久化到存储,支持断点续传和状态恢复。

核心代码

@persist(verbose=True)
class PersistentFlow(Flow):
    """类级持久化"""
    pass

class SelectiveFlow(Flow):
    """方法级持久化"""
    @persist
    @start()
    def critical_step(self):
        # 仅此方法持久化
        pass

def persist(
    target: type[Flow] | Callable | None = None,
    verbose: bool = False,
) -> Any:
    """持久化装饰器"""
    def class_decorator(cls: type[Flow]) -> type[Flow]:
        # 类级持久化:所有方法都持久化
        cls._persist_all_methods = True
        cls._persist_verbose = verbose
        return cls

    def method_decorator(func: Callable) -> Callable:
        # 方法级持久化:仅此方法持久化
        func._persist_state = True
        func._persist_verbose = verbose
        return func

    if target is None:
        # 带参数调用:@persist(verbose=True)
        return class_decorator
    elif isinstance(target, type):
        # 类装饰器:@persist
        return class_decorator(target)
    else:
        # 方法装饰器:@persist
        return method_decorator(target)

def _persist_state(self) -> None:
    """持久化状态到存储"""
    if self._state_already_persisted:
        return

    # 1. 序列化状态
    if isinstance(self.state, BaseModel):
        state_data = self.state.model_dump()
    else:
        state_data = dict(self.state)

    # 2. 添加元数据
    persist_data = {
        "state": state_data,
        "execution_counts": self._method_execution_counts,
        "timestamp": time.time(),
        "flow_id": self.state.get("id") if isinstance(self.state, dict) else self.state.id,
    }

    # 3. 保存到文件
    storage_dir = os.getenv("CREWAI_STORAGE_DIR", appdirs.user_data_dir("crewai"))
    persist_file = os.path.join(storage_dir, f"flow_{persist_data['flow_id']}.json")

    with open(persist_file, "w") as f:
        json.dump(persist_data, f, indent=2)

    if self._persist_verbose:
        print(f"State persisted to {persist_file}")

    self._state_already_persisted = True

def _load_state(self, flow_id: str) -> bool:
    """从存储加载状态"""
    storage_dir = os.getenv("CREWAI_STORAGE_DIR", appdirs.user_data_dir("crewai"))
    persist_file = os.path.join(storage_dir, f"flow_{flow_id}.json")

    if not os.path.exists(persist_file):
        return False

    with open(persist_file, "r") as f:
        persist_data = json.load(f)

    # 恢复状态
    if isinstance(self.state, BaseModel):
        self.state = self.state.__class__(**persist_data["state"])
    else:
        self.state.update(persist_data["state"])

    # 恢复执行计数
    self._method_execution_counts = persist_data["execution_counts"]

    if self._persist_verbose:
        print(f"State loaded from {persist_file}")

    return True

使用示例

@persist(verbose=True)
class CheckpointFlow(Flow[WorkflowState]):
    @start()
    def step_one(self):
        self.state.step = "step_one"
        self.state.results.append("Step 1 done")
        return "step1_done"

    @listen("step1_done")
    def step_two(self):
        # 如果在此处失败,可以从 step_one 的状态恢复
        self.state.step = "step_two"
        self.state.results.append("Step 2 done")
        return "step2_done"

# 首次执行
flow = CheckpointFlow(initial_state=WorkflowState())
try:
    result = flow.kickoff()
except Exception as e:
    print(f"Flow failed: {e}")

# 从保存的状态恢复
flow_id = flow.state.id
recovered_flow = CheckpointFlow(initial_state=WorkflowState())
if recovered_flow._load_state(flow_id):
    # 继续执行
    result = recovered_flow.kickoff()

实战示例与最佳实践

示例 1:基础线性流程

class SimpleWorkflow(Flow):
    @start()
    def collect_data(self):
        print("Collecting data...")
        self.state["data"] = ["item1", "item2", "item3"]
        return "data_collected"

    @listen("data_collected")
    def process_data(self, result):
        print(f"Processing data: {self.state['data']}")
        self.state["processed"] = [item.upper() for item in self.state["data"]]
        return "data_processed"

    @listen("data_processed")
    def save_results(self, result):
        print(f"Saving results: {self.state['processed']}")
        return "complete"

# 执行
flow = SimpleWorkflow()
result = flow.kickoff()
print(result)  # "complete"

示例 2:并行任务与聚合

class ParallelWorkflow(Flow):
    @start()
    def fetch_from_api_a(self):
        # 模拟 API 调用
        self.state["api_a_data"] = {"source": "A", "value": 100}
        return "api_a_done"

    @start()
    def fetch_from_api_b(self):
        # 模拟 API 调用
        self.state["api_b_data"] = {"source": "B", "value": 200}
        return "api_b_done"

    @start()
    def fetch_from_api_c(self):
        # 模拟 API 调用
        self.state["api_c_data"] = {"source": "C", "value": 300}
        return "api_c_done"

    @listen(and_("api_a_done", "api_b_done", "api_c_done"))
    def aggregate_results(self):
        # 所有 API 都完成后聚合
        total = (
            self.state["api_a_data"]["value"]
            + self.state["api_b_data"]["value"]
            + self.state["api_c_data"]["value"]
        )
        self.state["total"] = total
        print(f"Total: {total}")
        return total

# 执行(三个 start 方法并行执行,然后聚合)
flow = ParallelWorkflow()
result = flow.kickoff()
print(result)  # 600

示例 3:条件路由与决策

class DecisionFlow(Flow[WorkflowState]):
    @start()
    def analyze_request(self):
        # 分析请求
        self.state.score = 0.75  # 假设计算的分数
        self.state.has_errors = False
        return "analyzed"

    @router("analyzed")
    def route_decision(self):
        if self.state.has_errors:
            return "error_path"
        elif self.state.score > 0.8:
            return "high_confidence"
        elif self.state.score > 0.5:
            return "medium_confidence"
        else:
            return "low_confidence"

    @listen("high_confidence")
    def auto_approve(self):
        print("✅ Auto-approved")
        self.state.status = "approved"

    @listen("medium_confidence")
    def manual_review(self):
        print("⚠️ Needs manual review")
        self.state.status = "review"

    @listen("low_confidence")
    def auto_reject(self):
        print("❌ Auto-rejected")
        self.state.status = "rejected"

    @listen("error_path")
    def handle_errors(self):
        print("🔧 Handling errors")
        self.state.status = "error"

# 执行
flow = DecisionFlow(initial_state=WorkflowState())
flow.kickoff()
print(f"Final status: {flow.state.status}")

示例 4:Flow 与 Crew 集成

from crewai import Crew, Agent, Task

class IntegratedFlow(Flow):
    @start()
    def data_preparation(self):
        # 准备数据
        self.state["topic"] = "AI Safety"
        self.state["requirements"] = "5 key findings"
        return "data_ready"

    @listen("data_ready")
    def run_research_crew(self, result):
        # 调用 Crew 进行研究
        researcher = Agent(
            role="Research Analyst",
            goal="Find relevant information",
            backstory="Expert researcher",
        )

        research_task = Task(
            description=f"Research {self.state['topic']}",
            expected_output=self.state["requirements"],
            agent=researcher,
        )

        crew = Crew(
            agents=[researcher],
            tasks=[research_task],
        )

        result = crew.kickoff()
        self.state["research_output"] = result.raw
        return "research_done"

    @listen("research_done")
    def run_writing_crew(self, result):
        # 调用 Crew 进行写作
        writer = Agent(
            role="Content Writer",
            goal="Write engaging content",
            backstory="Skilled writer",
        )

        write_task = Task(
            description=f"Write a blog post about: {self.state['research_output']}",
            expected_output="500-word blog post",
            agent=writer,
            markdown=True,
        )

        crew = Crew(
            agents=[writer],
            tasks=[write_task],
        )

        result = crew.kickoff()
        self.state["final_output"] = result.raw
        return result.raw

# 执行
flow = IntegratedFlow()
final_result = flow.kickoff()
print(final_result)

示例 5:循环流程(迭代优化)

class IterativeFlow(Flow):
    def __init__(self):
        super().__init__()
        self.state["iteration"] = 0
        self.state["max_iterations"] = 5
        self.state["quality_threshold"] = 0.9
        self.state["current_quality"] = 0.0

    @start("loop")
    def generate_content(self):
        self.state["iteration"] += 1
        print(f"\nIteration {self.state['iteration']}")

        # 生成内容(模拟)
        import random
        self.state["current_quality"] = random.uniform(0.5, 1.0)
        print(f"Quality: {self.state['current_quality']:.2f}")

        return "generated"

    @router("generated")
    def check_quality(self):
        if self.state["current_quality"] >= self.state["quality_threshold"]:
            print("✅ Quality threshold met!")
            return "success"
        elif self.state["iteration"] >= self.state["max_iterations"]:
            print("⚠️ Max iterations reached")
            return "max_iterations"
        else:
            print("🔄 Quality not met, retrying...")
            return "loop"  # 继续循环

    @listen("success")
    def finalize_success(self):
        print(f"Completed successfully in {self.state['iteration']} iterations")

    @listen("max_iterations")
    def finalize_failure(self):
        print("Could not reach quality threshold")

# 执行
flow = IterativeFlow()
flow.kickoff()

最佳实践总结

1. 何时使用 Flow vs Crew

使用 Flow 当

  • 需要复杂的条件分支和循环
  • 需要精确控制执行顺序
  • 需要在步骤间共享和修改状态
  • 需要集成非 AI 的业务逻辑

使用 Crew 当

  • 需要多个 AI Agent 协作
  • 任务依赖关系简单(顺序或层级)
  • 主要是 AI 生成任务
  • 需要记忆系统和知识库

组合使用

  • Flow 作为主编排器,在各步骤调用 Crew
  • Flow 处理业务逻辑,Crew 处理 AI 任务

2. 状态管理策略

结构化状态(推荐)

class MyState(BaseModel):
    step: str = "init"
    results: list[str] = Field(default_factory=list)
    metadata: dict = Field(default_factory=dict)

class MyFlow(Flow[MyState]):
    pass

非结构化状态(简单场景)

class MyFlow(Flow):
    @start()
    def init(self):
        self.state["key"] = "value"

3. 错误处理

class RobustFlow(Flow):
    @start()
    def risky_operation(self):
        try:
            result = some_risky_function()
            self.state["success"] = True
            return "success"
        except Exception as e:
            self.state["error"] = str(e)
            self.state["success"] = False
            return "error"

    @listen("success")
    def handle_success(self):
        print("Operation succeeded")

    @listen("error")
    def handle_error(self):
        print(f"Operation failed: {self.state['error']}")
        # 实现错误恢复逻辑

4. 性能优化

  • 并行执行:使用多个 @start() 方法
  • 异步支持:使用 kickoff_async() 和 async 方法
  • 避免过深的监听链:超过 10 层考虑重构
  • 状态持久化:仅在必要时使用

5. 可视化与调试

# 生成流程图
flow = MyFlow()
flow.plot("my_workflow")  # 生成 HTML 可视化

# 查看执行图
print(flow._methods)      # 所有方法
print(flow._listeners)    # 监听器映射
print(flow._routers)      # 路由器映射

总结

Flow 模块是 CrewAI 框架中的事件驱动工作流引擎,通过以下机制实现灵活的流程编排:

  1. 装饰器驱动@start@listen@router 三种装饰器定义流程
  2. 灵活的条件组合:AND/OR 条件支持复杂逻辑
  3. 动态路由:基于运行时状态的条件分支
  4. 状态管理:结构化或非结构化的状态持久化
  5. Crew 集成:无缝调用 Crew 执行 AI 任务
  6. 可视化支持:自动生成流程图
  7. 异步支持:支持 async/await 模式

该模块的设计遵循 事件驱动架构,将方法间的依赖关系解耦,通过事件监听和路由实现灵活的工作流编排,适用于需要复杂控制流的场景。