CrewAI-03-Crew-完整剖析

模块概览

职责与定位

Crew(团队)是 CrewAI 框架中的核心编排器,负责:

  1. 多 Agent 协作:管理多个 Agent 的协同工作
  2. 任务编排:按照指定的流程(sequential/hierarchical)执行任务序列
  3. 上下文管理:在任务之间传递和聚合输出
  4. 资源控制:管理 RPM 限速、缓存、内存系统
  5. 训练与回放:支持 Crew 的训练和任务回放功能
  6. 生命周期钩子:提供 before/after kickoff 回调
  7. 输出聚合:将所有任务输出汇总为 CrewOutput

输入与输出

输入

  • agents:Agent 列表
  • tasks:Task 列表
  • inputs:动态输入参数字典(用于模板插值)
  • process:执行流程(sequential/hierarchical)

输出

  • CrewOutput:包含所有任务输出、token 统计、执行元数据

上下游依赖

上游依赖(被调用)

  • 用户代码:通过 Crew.kickoff() 启动
  • Flow 引擎:在流程中调用 Crew
  • CLI 工具:crewai run 命令

下游依赖(调用)

  • Task:执行任务
  • Agent:通过任务调用 Agent
  • Memory:管理短期、长期、实体、外部记忆
  • Cache:处理结果缓存
  • Knowledge:向 Agent 提供知识库
  • Events:发送 Crew 开始、完成、失败事件

生命周期

stateDiagram-v2
    [*] --> 创建: Crew(...) 或 @crew 装饰器
    创建 --> 配置: 设置 agents/tasks/process
    配置 --> 就绪: 等待 kickoff
    就绪 --> 回调前: @before_kickoff
    回调前 --> 插值: 输入参数插值
    插值 --> 进程选择: sequential 或 hierarchical

    进程选择 --> Sequential: process=sequential
    进程选择 --> Hierarchical: process=hierarchical

    Sequential --> 任务执行: _execute_tasks()
    Hierarchical --> 创建管理者: _create_manager_agent()
    创建管理者 --> 任务执行

    任务执行 --> 同步任务: 逐个执行
    任务执行 --> 异步任务: 后台执行

    同步任务 --> 条件任务: ConditionalTask 判断
    异步任务 --> 等待完成: future.result()

    条件任务 --> 聚合输出: CrewOutput
    等待完成 --> 聚合输出

    聚合输出 --> 回调后: @after_kickoff
    回调后 --> 完成: return CrewOutput
    完成 --> [*]

整体服务架构图

flowchart TB
    subgraph "上游接口层 (Entry Points)"
        UserCode[用户代码<br/>crew.kickoff()]
        Flow[Flow 引擎<br/>调用 Crew]
        CLI[CLI 工具<br/>crewai run]
        API[API 接口<br/>HTTP/RPC]
    end

    subgraph "Crew 核心协调层"
        Crew[Crew 编排器<br/>kickoff/kickoff_async]
        CrewOutput[CrewOutput<br/>输出聚合]
        Callbacks[生命周期回调<br/>before/after_kickoff]
    end

    subgraph "流程选择层 (Process Layer)"
        ProcessRouter{Process 类型}
        Sequential[Sequential Process<br/>顺序执行流程]
        Hierarchical[Hierarchical Process<br/>层级管理流程]
    end

    subgraph "任务执行层 (Task Execution)"
        ExecuteTasks[_execute_tasks<br/>任务循环执行器]
        GetAgent[_get_agent_to_use<br/>Agent 选择]
        PrepareTools[_prepare_tools<br/>工具准备]
        GetContext[_get_context<br/>上下文聚合]
        ConditionalCheck[_handle_conditional_task<br/>条件判断]
    end

    subgraph "Manager 管理层 (Hierarchical Only)"
        ManagerAgent[Manager Agent<br/>自动生成协调者]
        DelegateTools[委派工具<br/>DelegateTaskTool]
        AskTools[询问工具<br/>AskQuestionTool]
    end

    subgraph "Task 执行层"
        TaskSync[Task.execute_sync<br/>同步执行]
        TaskAsync[Task.execute_async<br/>异步执行]
        FutureManager[Future 管理<br/>异步任务协调]
    end

    subgraph "Agent 执行层"
        AgentExecutor[Agent.execute_task<br/>任务执行]
        AgentExecutorImpl[AgentExecutor.invoke<br/>LLM 调用]
        ToolInvoke[Tool 调用<br/>工具执行]
        LLMCall[LLM 调用<br/>生成响应]
    end

    subgraph "资源管理层 (Shared Resources)"
        Memory[Memory 系统<br/>Short/Long/Entity/External]
        Cache[Cache 系统<br/>工具结果缓存]
        Knowledge[Knowledge 系统<br/>RAG 知识库]
        RPMController[RPM Controller<br/>速率限制]
    end

    subgraph "事件与日志层 (Observability)"
        EventBus[Event Bus<br/>事件总线]
        Logger[Logger<br/>日志系统]
        Storage[Task Output Storage<br/>执行记录持久化]
        Tracing[Tracing<br/>OpenTelemetry]
    end

    subgraph "输出处理层"
        TaskOutput[TaskOutput<br/>任务输出]
        Guardrails[Guardrails<br/>输出验证]
        OutputFormat[Output Format<br/>Pydantic/JSON]
    end

    %% 主流程连接
    UserCode --> Crew
    Flow --> Crew
    CLI --> Crew
    API --> Crew

    Crew --> Callbacks
    Callbacks --> ProcessRouter

    ProcessRouter -->|sequential| Sequential
    ProcessRouter -->|hierarchical| Hierarchical

    Sequential --> ExecuteTasks
    Hierarchical --> ManagerAgent
    ManagerAgent --> DelegateTools
    ManagerAgent --> AskTools
    Hierarchical --> ExecuteTasks

    ExecuteTasks --> GetAgent
    ExecuteTasks --> PrepareTools
    ExecuteTasks --> GetContext
    ExecuteTasks --> ConditionalCheck

    ExecuteTasks --> TaskSync
    ExecuteTasks --> TaskAsync
    TaskAsync --> FutureManager

    TaskSync --> AgentExecutor
    FutureManager --> AgentExecutor

    AgentExecutor --> AgentExecutorImpl
    AgentExecutorImpl --> ToolInvoke
    AgentExecutorImpl --> LLMCall

    AgentExecutor --> TaskOutput
    TaskOutput --> Guardrails
    Guardrails --> OutputFormat
    OutputFormat --> CrewOutput

    %% 资源管理连接
    Crew -.->|初始化| Memory
    Crew -.->|初始化| Cache
    Crew -.->|初始化| Knowledge
    Crew -.->|初始化| RPMController

    AgentExecutor -.->|查询| Memory
    AgentExecutor -.->|查询| Knowledge
    ToolInvoke -.->|缓存| Cache
    LLMCall -.->|限速| RPMController

    %% 事件日志连接
    Crew -->|发送| EventBus
    ExecuteTasks -->|发送| EventBus
    AgentExecutor -->|发送| EventBus
    Crew -->|记录| Logger
    ExecuteTasks -->|存储| Storage
    Crew -.->|追踪| Tracing

    style Crew fill:#e1bee7
    style ProcessRouter fill:#fff9c4
    style ManagerAgent fill:#c5e1a5
    style AgentExecutor fill:#b3e5fc
    style Memory fill:#ffccbc
    style EventBus fill:#d1c4e9

架构分层说明

1) 上游接口层 (Entry Points)

职责:提供多种方式启动 Crew 执行

  • 用户代码:最常见的入口,直接调用 crew.kickoff(inputs)
  • Flow 引擎:在 Flow 流程中调用 Crew
  • CLI 工具:通过命令行 crewai run 启动
  • API 接口:通过 HTTP/RPC 接口触发(Enterprise 功能)

2) Crew 核心协调层

职责:统一入口,生命周期管理,流程协调

关键组件

  • Crew.kickoff():同步执行入口
  • Crew.kickoff_async():异步执行入口
  • before_kickoff_callbacks:启动前回调列表
  • after_kickoff_callbacks:完成后回调列表
  • CrewOutput:输出聚合器

核心流程

  1. 执行 before_kickoff 回调
  2. 插值输入参数(interpolate_inputs)
  3. 初始化资源(memory, cache, knowledge)
  4. 选择执行流程(sequential/hierarchical)
  5. 执行任务序列
  6. 聚合输出
  7. 执行 after_kickoff 回调
  8. 返回 CrewOutput

3) 流程选择层 (Process Layer)

职责:根据配置选择执行模式

Sequential Process(顺序流程)

  • 任务按定义顺序依次执行
  • Agent 直接执行分配的任务
  • 适用场景:线性工作流、明确的任务依赖

Hierarchical Process(层级流程)

  • 自动创建 Manager Agent
  • Manager 负责任务委派和协调
  • Worker Agent 接受指令并汇报
  • 适用场景:复杂项目管理、动态任务分配

4) 任务执行层 (Task Execution)

职责:执行任务循环,协调 Agent 和 Task

关键函数

  • _execute_tasks():核心任务执行循环
  • _get_agent_to_use():确定执行 Agent(Sequential 用 task.agent,Hierarchical 用 manager)
  • _prepare_tools():准备工具列表(合并 agent.tools + task.tools,添加缓存包装)
  • _get_context():聚合上下文(从前序任务获取输出)
  • _handle_conditional_task():处理条件任务(检查是否应该执行)

执行模式

  • 同步执行task.execute_sync(),阻塞等待结果
  • 异步执行task.execute_async(),返回 Future,稍后获取结果

5) Agent 执行层

职责:执行具体的 LLM 调用和工具使用

调用链

Agent.execute_task()
  → Agent.create_agent_executor()  # 如果未初始化
  → AgentExecutor.invoke()         # LangChain executor
    → LLM 调用(生成响应)
    → Tool 调用(如果需要)
    → 循环直到完成
  → 返回最终输出

6) 资源管理层 (Shared Resources)

职责:提供共享资源和基础设施

Memory 系统(四种类型):

  • ShortTermMemory:当前会话的上下文(ChromaDB + RAG)
  • LongTermMemory:跨会话的任务结果(SQLite)
  • EntityMemory:实体跟踪(人物、地点、概念)
  • ExternalMemory:外部记忆集成(Mem0)

Cache 系统

  • 缓存工具调用结果
  • 避免重复的昂贵操作
  • 基于工具名称和输入参数的哈希

Knowledge 系统

  • RAG 知识库
  • 为 Agent 提供领域知识
  • 支持多种知识源(文件、URL、字符串)

RPM Controller

  • 速率限制(每分钟请求数)
  • 避免触发 API 限额
  • 自动等待和重试

7) 事件与日志层 (Observability)

职责:提供可观测性和调试支持

事件系统

  • CrewKickoffStartedEvent:Crew 启动
  • CrewKickoffCompletedEvent:Crew 完成
  • CrewKickoffFailedEvent:Crew 失败
  • TaskStartedEvent:任务开始
  • TaskCompletedEvent:任务完成
  • AgentStepEvent:Agent 步骤

日志系统

  • 结构化日志
  • 支持多种日志级别
  • 可配置输出目标

执行记录存储

  • 持久化任务输出
  • 支持回放(replay)功能

追踪系统

  • OpenTelemetry 集成
  • 分布式追踪
  • 性能分析

8) 输出处理层

职责:验证和格式化输出

Guardrails

  • 自定义验证规则
  • 失败时自动重试
  • 支持多轮验证

Output Format

  • Pydantic 模型:结构化输出
  • JSON 字典:灵活输出
  • Raw 字符串:原始输出

模块交互矩阵

调用方 → 被调方 Crew Task Agent Memory Cache Knowledge Events Storage
用户代码 kickoff() - - - - - - -
Flow kickoff() - - - - - - -
Crew - execute_sync/async - 初始化 初始化 初始化 emit() update()
Task - - execute_task() - - - emit() -
Agent - - - search() read/add() query() emit() -
Callbacks - - - - - - - -

交互模式说明

1) 同步调用(Synchronous)

  • Crew → Task.execute_sync():阻塞等待任务完成
  • Task → Agent.execute_task():阻塞等待 Agent 输出
  • Agent → LLM.invoke():阻塞等待 LLM 响应

2) 异步调用(Asynchronous)

  • Crew → Task.execute_async():返回 Future,不阻塞
  • 异步任务并行执行,同步任务等待所有前置异步任务完成

3) 事件发布(Event-Driven)

  • Crew → EventBus.emit(CrewKickoffStartedEvent):非阻塞
  • Task → EventBus.emit(TaskStartedEvent):非阻塞
  • Agent → EventBus.emit(AgentStepEvent):非阻塞

4) 资源查询(Query)

  • Agent → Memory.search(query):查询相关记忆
  • Agent → Knowledge.query(query):查询知识库
  • Tool → Cache.read(tool, input):查询缓存

5) 回调通知(Callback)

  • Crew → before_kickoff_callbacks:启动前通知
  • Crew → after_kickoff_callbacks:完成后通知
  • Task → task_callback:任务完成通知
  • Agent → step_callback:步骤完成通知

架构要点说明

1) Sequential 与 Hierarchical 流程的区别

Sequential Process(顺序流程)

  • 任务按定义顺序依次执行
  • 每个任务使用前序任务的输出作为上下文
  • Agent 直接执行分配给他们的任务
  • 适用场景:线性工作流、明确的任务依赖链

Hierarchical Process(层级流程)

  • 自动创建一个 Manager Agent 作为协调者
  • Manager 负责任务分配、委派和监督
  • Worker Agent 接受委派并汇报结果
  • 适用场景:复杂的项目管理、需要动态任务分配

2) 内存系统的四种类型

  • Short-term Memory:存储当前任务的上下文(ChromaDB + RAG)
  • Long-term Memory:存储跨会话的任务结果(SQLite)
  • Entity Memory:跟踪人物、地点、概念等实体(RAG)
  • External Memory:集成 Mem0 的高级记忆功能

3) 任务上下文的三种来源

  • NOT_SPECIFIED(默认):自动使用所有前序任务的输出
  • 显式 context:任务明确指定 context=[task1, task2]
  • conversation_history:从输入参数注入会话历史

4) 异步任务的执行约束

  • 多个异步任务可以并行执行
  • 异步任务不能依赖其他异步任务的输出
  • 必须有同步任务作为分隔点
  • 同步任务会等待所有前置异步任务完成

5) Manager Agent 的自动生成

在 Hierarchical Process 中,Crew 会自动创建 Manager Agent:

  • 角色:根据 manager_agent 配置或默认角色
  • 工具:包含任务委派工具(DelegateTaskTool)
  • 职责:分析任务、分配给合适的 Agent、聚合结果

数据结构 UML 图

Crew 核心类图

classDiagram
    class Crew {
        +UUID id
        +list[BaseAgent] agents
        +list[Task] tasks
        +Process process
        +bool verbose
        +bool memory
        +ShortTermMemory short_term_memory
        +LongTermMemory long_term_memory
        +EntityMemory entity_memory
        +ExternalMemory external_memory
        +bool cache
        +Cache cache_handler
        +Knowledge knowledge
        +int max_rpm
        +RPMController rpm_controller
        +str prompt_file
        +I18N i18n
        +BaseLLM manager_llm
        +str manager_agent
        +bool step_callback
        +bool task_callback
        +bool full_output
        +bool share_crew
        +dict inputs
        +bool respect_context_window
        +bool planning
        +list[Task] planning_tasks
        +EmbedderConfig embedder
        +bool training
        +dict training_data
        +kickoff(inputs)
        +kickoff_for_each(inputs)
        +train(n_iterations, filename, inputs)
        +replay(task_id, inputs)
        +test(n_iterations, openai_model_name)
        +copy()
        +reset_memories(command_type)
        -_execute_tasks(tasks, start_index)
        -_run_sequential_process()
        -_run_hierarchical_process()
        -_get_context(task, task_outputs)
        -_prepare_tools(agent, task, tools)
        -_create_manager_agent()
        -_handle_conditional_task()
    }

    class CrewOutput {
        +str raw
        +BaseModel pydantic
        +dict json_dict
        +list[TaskOutput] tasks_output
        +UsageMetrics token_usage
        +json()
        +to_dict()
    }

    class Process {
        <<enumeration>>
        sequential
        hierarchical
    }

    class Memory {
        <<interface>>
        +save(value, metadata)
        +search(query, limit, score_threshold)
        +set_crew(crew)
    }

    class ShortTermMemory {
        +save(value, metadata)
        +search(query, limit, score_threshold)
    }

    class LongTermMemory {
        +save(value, metadata)
        +search(query, limit, score_threshold)
    }

    class EntityMemory {
        +save(value, metadata)
        +search(query, limit, score_threshold)
    }

    class ExternalMemory {
        +save(value, metadata)
        +search(query, limit, score_threshold)
    }

    class Cache {
        +read(tool, input)
        +add(tool, input, output)
    }

    class RPMController {
        +int max_rpm
        +int _current_rpm
        +check_or_wait()
        +stop_rpm_counter()
    }

    class I18N {
        +str prompt_file
        +dict _prompts
        +slice(slice)
        +errors(error)
        +tools(tool)
    }

    Crew --> CrewOutput : 生成
    Crew --> Process : 使用
    Crew --> Memory : 管理
    Memory <|-- ShortTermMemory : 实现
    Memory <|-- LongTermMemory : 实现
    Memory <|-- EntityMemory : 实现
    Memory <|-- ExternalMemory : 实现
    Crew --> Cache : 使用
    Crew --> RPMController : 使用
    Crew --> I18N : 使用

关键字段说明

Crew 核心字段

字段 类型 约束 默认值 说明
id UUID frozen=True uuid4() Crew 唯一标识符
agents list[BaseAgent] required - 参与的 Agent 列表
tasks list[Task] required - 任务列表
process Process - sequential 执行流程(sequential/hierarchical)
verbose bool | int | str - 0 日志详细程度(0-2)
memory bool - False 是否启用内存系统
short_term_memory ShortTermMemory | None - None 短期记忆实例
long_term_memory LongTermMemory | None - None 长期记忆实例
entity_memory EntityMemory | None - None 实体记忆实例
external_memory ExternalMemory | None - None 外部记忆实例
contextual_memory bool - False 是否启用上下文记忆
cache bool - True 是否启用缓存
cache_handler Cache | None - None 缓存处理器实例
knowledge Knowledge | None - None 知识库实例
knowledge_sources list[BaseKnowledgeSource] - [] 知识源列表
max_rpm int | None - None 最大 RPM 限制
rpm_controller RPMController | None - None RPM 控制器实例
prompt_file str | None - None 自定义提示词文件路径
i18n I18N - I18N() 国际化支持
manager_llm BaseLLM | None - None Manager Agent 的 LLM
manager_agent str | None - None Manager Agent 配置
step_callback Callable | None - None Agent 步骤回调
task_callback Callable | None - None 任务完成回调
full_output bool - False 是否返回完整输出
share_crew bool - False 是否在 Agent 间共享 Crew 信息
inputs dict - {} 输入参数字典
respect_context_window bool - True 是否尊重 LLM 上下文窗口限制
planning bool - False 是否启用自动规划
planning_tasks list[Task] - [] 规划任务列表
embedder EmbedderConfig | None - None 嵌入模型配置
training bool - False 是否处于训练模式
training_data dict - {} 训练数据

CrewOutput 字段

字段 类型 约束 默认值 说明
raw str - "" 最后一个任务的原始输出
pydantic BaseModel | None - None 最后一个任务的 Pydantic 输出
json_dict dict[str, Any] | None - None 最后一个任务的 JSON 输出
tasks_output list[TaskOutput] - [] 所有任务的输出列表
token_usage UsageMetrics - UsageMetrics() Token 使用统计

UsageMetrics 字段

class UsageMetrics(BaseModel):
    total_tokens: int = 0
    prompt_tokens: int = 0
    completion_tokens: int = 0
    successful_requests: int = 0

API 详细规格

API 1: kickoff

基本信息

  • 方法签名Crew.kickoff(inputs: dict[str, Any] | None = None) -> CrewOutput
  • 调用方式:实例方法
  • 幂等性:否(依赖 LLM 生成和当前状态)

功能说明

启动 Crew 执行,按照配置的流程执行所有任务并返回聚合结果。

请求结构体

class KickoffParams:
    inputs: Optional[dict[str, Any]] = None  # 输入参数字典
字段 类型 必填 默认 约束 说明
inputs dict[str, Any] | None None - 模板变量和上下文参数

输入参数示例

inputs = {
    "topic": "AI Safety",             # 模板变量
    "count": 5,                        # 模板变量
    "crew_chat_messages": [            # 会话历史
        {"role": "user", "content": "Find latest AI news"},
        {"role": "assistant", "content": "Here are the findings..."}
    ],
    "crewai_trigger_payload": {...}   # Trigger payload(特殊用途)
}

响应结构体

class CrewOutput(BaseModel):
    raw: str                         # 最后一个任务的原始输出
    pydantic: Optional[BaseModel]    # 最后一个任务的 Pydantic 输出
    json_dict: Optional[dict]        # 最后一个任务的 JSON 输出
    tasks_output: list[TaskOutput]   # 所有任务的输出列表
    token_usage: UsageMetrics        # Token 使用统计

核心代码

def kickoff(self, inputs: dict[str, Any] | None = None) -> CrewOutput:
    """启动 Crew 执行"""
    self.inputs = inputs or {}

    try:
        # 1. 初始化(首次执行)
        if not self._execution_started:
            self._execution_started = True
            self._execution_logs = []
            self._task_outputs_replayed_from_memory = []

        # 2. 重置内部状态
        self._finished_tasks = set()
        self._tasks_index_map = {str(task.id): i for i, task in enumerate(self.tasks)}
        self._copy_agents()

        # 3. 执行 @before_kickoff 回调
        if hasattr(self.__class__, "before_kickoff"):
            self.__class__.before_kickoff(self)

        # 4. 发送 CrewStartedEvent
        crewai_event_bus.emit(
            self, CrewStartedEvent(inputs=self.inputs, crew=self)
        )

        # 5. 插值任务
        self._interpolate_tasks(self.inputs)

        # 6. 设置 token 处理
        self._set_tokens_process()

        # 7. 初始化内存和知识库
        self._initialize_memory()
        self._initialize_crew_knowledge()

        # 8. 注入关系(crew → agents → tasks)
        self._inject_relationships()

        # 9. 生成规划任务(如果启用)
        if self.planning:
            self.planning_tasks = self._create_planning_tasks()

        # 10. 选择执行流程
        if self.process == Process.sequential:
            result = self._run_sequential_process()
        else:
            result = self._run_hierarchical_process()

        # 11. 聚合 token 使用
        metrics = self._finish_execution(result.tasks_output)
        result.token_usage = metrics

        # 12. 执行 @after_kickoff 回调
        if hasattr(self.__class__, "after_kickoff"):
            self.__class__.after_kickoff(self, result)

        # 13. 发送 CrewCompletedEvent
        crewai_event_bus.emit(
            self, CrewCompletedEvent(output=result, crew=self)
        )

        return result

    except Exception as e:
        # 发送 CrewFailedEvent
        crewai_event_bus.emit(
            self, CrewFailedEvent(error=str(e), crew=self)
        )
        raise e

调用链与上游函数

用户代码调用

from crewai import Crew, Agent, Task

# 定义 Crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    memory=True,
)

# 启动执行
result = crew.kickoff(inputs={"topic": "AI Safety"})

# 访问结果
print(result.raw)
print(result.token_usage.total_tokens)
for task_output in result.tasks_output:
    print(f"{task_output.description}: {task_output.raw}")

Flow 中调用

class MyFlow(Flow):
    @start()
    def run_crew_step(self):
        crew = MyCrew()
        result = crew.crew().kickoff(inputs={"query": self.state.query})
        self.state.crew_result = result.raw
        return result

完整执行时序图(从上游接口到输出)

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Crew as Crew 编排器
    participant CB as 回调系统
    participant Memory as Memory 系统
    participant Cache as Cache 系统
    participant Knowledge as Knowledge 系统
    participant Process as Process 
    participant TaskExec as 任务执行层
    participant Task as Task
    participant Agent as Agent
    participant AgentExec as AgentExecutor
    participant LLM as LLM
    participant Tool as Tool
    participant Event as Event Bus
    participant Storage as Storage
    participant Output as Output 

    %% 启动阶段
    User->>Crew: kickoff(inputs)
    activate Crew

    Crew->>Event: emit(CrewKickoffStartedEvent)
    Crew->>CB: 执行 before_kickoff_callbacks
    CB-->>Crew: 修改后的 inputs

    %% 初始化阶段
    Crew->>Crew: _interpolate_inputs(inputs)
    Note over Crew: 插值任务和 Agent 的模板变量

    Crew->>Memory: _initialize_memory()
    activate Memory
    Memory->>Memory: 创建 ShortTermMemory
    Memory->>Memory: 创建 LongTermMemory
    Memory->>Memory: 创建 EntityMemory
    Memory->>Memory: 创建 ExternalMemory (如果配置)
    Memory-->>Crew: Memory 系统就绪
    deactivate Memory

    Crew->>Cache: 初始化 CacheHandler
    Cache-->>Crew: Cache 就绪

    Crew->>Knowledge: _initialize_crew_knowledge()
    Knowledge-->>Crew: Knowledge 就绪

    Crew->>Crew:  Agent 注入资源
    Note over Crew: agent.crew = self<br/>agent.set_knowledge()<br/>agent.create_agent_executor()

    %% 流程选择
    alt process == sequential
        Crew->>Process: _run_sequential_process()
    else process == hierarchical
        Crew->>Process: _run_hierarchical_process()
        Process->>Process: _create_manager_agent()
        Note over Process: 创建 Manager Agent<br/>添加 DelegateTaskTool
    end

    %% 任务执行循环
    Process->>TaskExec: _execute_tasks(tasks)
    activate TaskExec

    loop 遍历每个任务
        TaskExec->>TaskExec: _get_agent_to_use(task)
        Note over TaskExec: Sequential: task.agent<br/>Hierarchical: manager_agent

        TaskExec->>TaskExec: _prepare_tools(agent, task)
        Note over TaskExec: 合并 agent.tools + task.tools<br/>添加缓存包装<br/>添加委派工具(如果允许)

        TaskExec->>TaskExec: _get_context(task, task_outputs)
        Note over TaskExec: 聚合前序任务的输出

        alt ConditionalTask
            TaskExec->>Task: should_execute(previous_output)
            alt 条件不满足
                Task-->>TaskExec: get_skipped_task_output()
                TaskExec->>Storage: _store_execution_log()
                Note over TaskExec: 跳过任务,继续下一个
            end
        end

        alt async_execution == True
            TaskExec->>Task: execute_async(agent, context, tools)
            activate Task
            Task->>Task: 创建 Future
            Task->>Agent: 后台线程执行
            Task-->>TaskExec: 返回 Future
            deactivate Task
            Note over TaskExec: 不等待,继续下一个任务
        else 同步任务
            alt 存在未完成的异步任务
                TaskExec->>TaskExec: _process_async_tasks(futures)
                Note over TaskExec: 等待所有异步任务完成<br/>收集结果
            end

            TaskExec->>Task: execute_sync(agent, context, tools)
            activate Task

            Task->>Event: emit(TaskStartedEvent)
            Task->>Agent: execute_task(task, context, tools)
            activate Agent

            %% Agent 执行
            Agent->>Agent: 构建 task_prompt
            Note over Agent: 包含:role, goal, backstory,<br/>task description, context

            alt  memory 系统
                Agent->>Memory: search(task.description)
                Memory-->>Agent: 相关记忆列表
                Note over Agent: 将记忆添加到 context
            end

            alt  knowledge 系统
                Agent->>Knowledge: query(task.description)
                Knowledge-->>Agent: 相关知识片段
                Note over Agent: 将知识添加到 context
            end

            Agent->>AgentExec: invoke(task_prompt)
            activate AgentExec

            loop LLM 循环(直到完成)
                AgentExec->>LLM: generate(prompt, tools)
                activate LLM
                LLM-->>AgentExec: LLM Response (text or tool call)
                deactivate LLM

                alt 需要调用工具
                    AgentExec->>Cache: read(tool, input)
                    alt 缓存命中
                        Cache-->>AgentExec: 缓存结果
                    else 缓存未命中
                        AgentExec->>Tool: invoke(input)
                        activate Tool
                        Tool-->>AgentExec: Tool Output
                        deactivate Tool
                        AgentExec->>Cache: add(tool, input, output)
                    end

                    AgentExec->>Memory: save(tool_output)
                    Note over AgentExec: 继续 LLM 循环
                else 任务完成
                    Note over AgentExec: 退出循环
                end
            end

            AgentExec-->>Agent: 最终输出
            deactivate AgentExec

            Agent->>Memory: save(final_output)
            Agent-->>Task: 最终输出字符串
            deactivate Agent

            %% 输出处理
            Task->>Output: _export_output(result)
            activate Output
            Output->>Output: 提取 pydantic_output
            Output->>Output: 提取 json_output
            Output-->>Task: (pydantic, json)
            deactivate Output

            Task->>Output: 创建 TaskOutput
            Output->>Output: TaskOutput(raw, pydantic, json_dict)

            alt 配置了 Guardrails
                loop 遍历每个 guardrail
                    Output->>Output: _invoke_guardrail_function()
                    alt 验证失败
                        Output->>Agent: 重新执行任务
                        Note over Output: 使用 guardrail 反馈<br/>重试直到成功或达到 max_retries
                    end
                end
            end

            Task->>Event: emit(TaskCompletedEvent)
            Task-->>TaskExec: TaskOutput
            deactivate Task

            TaskExec->>TaskExec: _process_task_result(task, output)
            TaskExec->>Storage: _store_execution_log(task, output)
            Note over Storage: 持久化任务输出<br/>支持回放功能
        end
    end

    alt 仍有未完成的异步任务
        TaskExec->>TaskExec: _process_async_tasks(futures)
    end

    TaskExec->>TaskExec: _create_crew_output(task_outputs)
    Note over TaskExec: 聚合所有任务输出<br/>选择最后一个有效输出作为主输出

    TaskExec-->>Crew: CrewOutput
    deactivate TaskExec

    %% 完成阶段
    Crew->>Crew: calculate_usage_metrics()
    Note over Crew: 统计所有 Agent  token 使用

    Crew->>CB: 执行 after_kickoff_callbacks
    CB-->>Crew: 修改后的 CrewOutput

    Crew->>Event: emit(CrewKickoffCompletedEvent)
    Crew-->>User: CrewOutput
    deactivate Crew

时序图功能详解

阶段 1:启动和初始化(步骤 1-15)

1-3. 入口和事件发布

  • 用户调用 crew.kickoff(inputs)
  • 发送 CrewKickoffStartedEvent 到事件总线
  • 执行 before_kickoff_callbacks,允许用户修改 inputs

4-5. 输入插值

  • _interpolate_inputs():将 inputs 中的值插入到任务和 Agent 的模板变量中
  • 例如:"{topic}" 被替换为 inputs["topic"] 的值

6-10. 内存系统初始化

  • 创建四种类型的记忆:
    • ShortTermMemory:基于 ChromaDB 的向量存储
    • LongTermMemory:基于 SQLite 的持久化存储
    • EntityMemory:实体跟踪(人物、地点、概念)
    • ExternalMemory:可选,集成 Mem0

11-13. 缓存和知识库初始化

  • CacheHandler:初始化工具结果缓存
  • Knowledge:初始化 RAG 知识库

14-15. Agent 资源注入

  • 为每个 Agent 设置 crewknowledgememory
  • 调用 agent.create_agent_executor() 创建执行器

阶段 2:流程选择(步骤 16-20)

Sequential Process

  • 直接调用 _execute_tasks()
  • Agent 按任务分配直接执行

Hierarchical Process

  • 先创建 Manager Agent
  • Manager Agent 自动获得 DelegateTaskToolAskQuestionTool
  • Manager 负责任务委派和协调

阶段 3:任务执行循环(步骤 21-85)

21-26. 任务准备

  • _get_agent_to_use():确定执行 Agent
    • Sequential:使用 task.agent
    • Hierarchical:使用 manager_agent
  • _prepare_tools():合并工具
    • agent.tools + task.tools
    • 添加缓存包装(CachedTool
    • 添加委派工具(如果 allow_delegation=True
  • _get_context():聚合上下文
    • 如果 task.context is NOT_SPECIFIED:使用所有前序任务输出
    • 如果 task.context 是列表:仅使用指定任务的输出

27-32. 条件任务处理

  • 对于 ConditionalTask,调用 should_execute(previous_output)
  • 如果条件不满足,跳过任务,记录日志,继续下一个

33-38. 异步任务执行

  • 创建 Future 对象
  • 在后台线程中执行 Agent.execute_task()
  • 不阻塞,继续下一个任务
  • 异步任务不能依赖其他异步任务的输出

39-84. 同步任务执行(核心流程):

39-42. 等待异步任务

  • 如果存在未完成的异步任务,先等待所有异步任务完成
  • 调用 future.result() 阻塞获取结果

43-46. 任务开始

  • 发送 TaskStartedEvent
  • 调用 Agent.execute_task(task, context, tools)

47-49. 构建提示词

  • 合并 rolegoalbackstorytask.descriptioncontext
  • 构成完整的 task_prompt

50-55. 记忆查询(如果启用):

  • 调用 Memory.search(task.description)
  • 检索相关的历史记忆
  • 将记忆添加到上下文中

56-60. 知识库查询(如果启用):

  • 调用 Knowledge.query(task.description)
  • 检索相关的知识片段
  • 将知识添加到上下文中

61-78. Agent 执行循环(LLM 反馈循环):

  • 62-64. LLM 调用:调用 LLM.generate() 生成响应
  • 65-76. 工具调用(如果需要):
    • 检查缓存 Cache.read(tool, input)
    • 缓存命中:直接使用缓存结果
    • 缓存未命中:调用 Tool.invoke(input),结果写入缓存
    • 工具输出保存到 Memory
    • 继续 LLM 循环(将工具输出作为新的输入)
  • 77-78. 任务完成:LLM 决定任务完成,退出循环

79-81. 记忆保存

  • 将最终输出保存到 Memory
  • 返回最终输出字符串给 Task

82-88. 输出处理

  • _export_output():提取 Pydantic 和 JSON 输出
  • 创建 TaskOutput 对象(包含 rawpydanticjson_dict

89-94. Guardrails 验证(如果配置):

  • 遍历每个 guardrail 函数
  • 如果验证失败,重新执行任务
  • 使用 guardrail 反馈改进输出
  • 重试直到成功或达到 max_retries

95-99. 任务完成

  • 发送 TaskCompletedEvent
  • 返回 TaskOutput 给任务执行层
  • 调用 _process_task_result() 处理结果
  • 调用 _store_execution_log() 持久化输出(支持回放)

阶段 4:输出聚合(步骤 86-90)

86-88. 处理剩余异步任务

  • 如果仍有未完成的异步任务,等待所有完成
  • 收集所有异步任务的输出

89-90. 创建 CrewOutput

  • 聚合所有任务输出
  • 选择最后一个有效输出作为主输出(CrewOutput.raw
  • 所有任务输出保存在 CrewOutput.tasks_output 列表中

阶段 5:完成和清理(步骤 91-96)

91-92. 统计 Token 使用

  • 遍历所有 Agent,收集 LLM token 使用统计
  • 聚合到 CrewOutput.token_usage

93-94. 执行完成回调

  • 调用 after_kickoff_callbacks
  • 允许用户修改最终输出

95-96. 发送完成事件

  • 发送 CrewKickoffCompletedEvent 到事件总线
  • 返回 CrewOutput 给用户

边界与异常

异常情况处理

  1. 空 Agent 或 Task 列表

    • 异常类型:ValueError
    • 触发条件:agentstasks 为空
    • 处理方式:抛出异常,提示用户配置
    • 回退策略:无
  2. 任务执行失败

    • 异常类型:Task 执行中抛出的异常
    • 触发条件:Agent 执行失败、Guardrail 验证失败等
    • 处理方式:发送 CrewFailedEvent,抛出异常
    • 回退策略:用户可在 try-catch 中处理
  3. Manager Agent 创建失败(Hierarchical):

    • 异常类型:配置错误
    • 触发条件:manager_agent 配置无效
    • 处理方式:使用默认 Manager 配置
    • 回退策略:自动生成默认 Manager
  4. 内存初始化失败

    • 异常类型:存储不可用
    • 触发条件:CREWAI_STORAGE_DIR 无写入权限
    • 处理方式:警告日志,继续执行(不使用内存)
    • 回退策略:禁用内存功能
  5. RPM 限制超时

    • 异常类型:不抛出异常,阻塞等待
    • 触发条件:达到 max_rpm 限制
    • 处理方式:自动等待 60 秒后重试
    • 回退策略:无,强制等待

API 2: kickoff_for_each

基本信息

  • 方法签名Crew.kickoff_for_each(inputs: list[dict[str, Any]]) -> list[CrewOutput]
  • 调用方式:实例方法
  • 幂等性:否

功能说明

为多组输入参数批量执行 Crew,返回每次执行的结果列表。

请求结构体

字段 类型 必填 默认 约束 说明
inputs list[dict[str, Any]] - - 多组输入参数列表

核心代码

def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
    """批量执行 Crew"""
    results: list[CrewOutput] = []

    for input_data in inputs:
        # 重置执行状态
        self._execution_started = False

        # 执行单次 kickoff
        result = self.kickoff(inputs=input_data)
        results.append(result)

        # 清理内存(可选)
        if self.memory:
            self.reset_memories("long_term")

    return results

使用示例

# 定义多组输入
inputs_list = [
    {"topic": "AI Safety", "count": 5},
    {"topic": "Climate Change", "count": 10},
    {"topic": "Healthcare", "count": 7},
]

# 批量执行
results = crew.kickoff_for_each(inputs=inputs_list)

# 处理结果
for i, result in enumerate(results):
    print(f"Result {i+1}:")
    print(result.raw)
    print(f"Tokens used: {result.token_usage.total_tokens}\n")

API 3: train

基本信息

  • 方法签名Crew.train(n_iterations: int, filename: str, inputs: dict[str, Any] | None = None) -> None
  • 调用方式:实例方法
  • 幂等性:否

功能说明

训练 Crew,通过多次迭代收集人工反馈并优化 Agent 行为。

请求结构体

字段 类型 必填 默认 约束 说明
n_iterations int - >0 训练迭代次数
filename str - - 保存训练数据的文件名
inputs dict[str, Any] | None None - 输入参数

核心代码

def train(
    self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None
) -> None:
    """训练 Crew"""
    # 1. 启用训练模式
    self.training = True

    # 2. 禁用内存(训练时不需要)
    original_memory_config = self.memory
    self.memory = False

    # 3. 初始化训练数据
    training_data_file = f"{filename}.pkl"
    if os.path.exists(training_data_file):
        with open(training_data_file, "rb") as f:
            self.training_data = pickle.load(f)
    else:
        self.training_data = {}

    # 4. 执行多轮训练
    for iteration in range(n_iterations):
        print(f"\n=== Training Iteration {iteration + 1}/{n_iterations} ===\n")

        # 执行 Crew
        result = self.kickoff(inputs=inputs)

        # 收集人工反馈
        print("\nCrew execution completed. Please provide feedback:")
        feedback = input("Was the output satisfactory? (yes/no): ")

        if feedback.lower() == "yes":
            # 保存成功的执行记录
            for task in self.tasks:
                if task.output:
                    self.training_data[str(task.id)] = {
                        "description": task.description,
                        "expected_output": task.expected_output,
                        "actual_output": task.output.raw,
                        "feedback": "positive",
                    }
        else:
            # 收集改进建议
            improvement = input("What should be improved? ")
            for task in self.tasks:
                if task.output:
                    self.training_data[str(task.id)] = {
                        "description": task.description,
                        "expected_output": task.expected_output,
                        "actual_output": task.output.raw,
                        "feedback": "negative",
                        "improvement": improvement,
                    }

        # 保存训练数据
        with open(training_data_file, "wb") as f:
            pickle.dump(self.training_data, f)

    # 5. 恢复配置
    self.training = False
    self.memory = original_memory_config

    print(f"\nTraining completed. Data saved to {training_data_file}")

使用示例

# 训练 Crew
crew.train(
    n_iterations=5,
    filename="research_crew_training",
    inputs={"topic": "AI Trends"}
)

# 训练后,Crew 会利用训练数据优化 Agent 行为
# 训练数据保存在 research_crew_training.pkl

API 4: replay

基本信息

  • 方法签名Crew.replay(task_id: str, inputs: dict[str, Any] | None = None) -> CrewOutput
  • 调用方式:实例方法
  • 幂等性:否

功能说明

从指定任务开始重新执行 Crew,使用之前存储的执行记录作为前序任务的输出。

请求结构体

字段 类型 必填 默认 约束 说明
task_id str - - 要重新执行的任务 ID
inputs dict[str, Any] | None None - 输入参数

核心代码

def replay(self, task_id: str, inputs: dict[str, Any] | None = None) -> CrewOutput:
    """从指定任务回放执行"""
    # 1. 查找任务索引
    if task_id not in self._tasks_index_map:
        raise ValueError(f"Task with id {task_id} not found")

    start_index = self._tasks_index_map[task_id]

    # 2. 加载历史执行记录
    self._load_execution_logs()

    # 3. 恢复前序任务的输出
    task_outputs_to_replay: list[TaskOutput] = []
    for i in range(start_index):
        task = self.tasks[i]
        # 从存储中读取任务输出
        saved_output = self._read_task_output_from_log(str(task.id))
        if saved_output:
            task_outputs_to_replay.append(saved_output)

    self._task_outputs_replayed_from_memory = task_outputs_to_replay

    # 4. 从指定任务开始执行
    if self.process == Process.sequential:
        return self._run_sequential_process_from_index(start_index)
    else:
        # Hierarchical 不支持 replay
        raise ValueError("Replay is not supported for hierarchical process")

使用示例

# 首次执行
result = crew.kickoff(inputs={"topic": "AI Safety"})

# 获取某个任务的 ID
research_task_id = str(crew.tasks[0].id)

# 从该任务重新执行(例如修改了 Agent 配置后)
replay_result = crew.replay(
    task_id=research_task_id,
    inputs={"topic": "AI Safety"}
)

API 5: test

基本信息

  • 方法签名Crew.test(n_iterations: int, openai_model_name: str, inputs: dict[str, Any] | None = None) -> None
  • 调用方式:实例方法
  • 幂等性:否

功能说明

测试 Crew,评估不同配置下的性能。

核心代码

def test(
    self,
    n_iterations: int,
    openai_model_name: str,
    inputs: dict[str, Any] | None = None,
) -> None:
    """测试 Crew 性能"""
    results = []

    for i in range(n_iterations):
        print(f"\n=== Test Iteration {i + 1}/{n_iterations} ===\n")

        # 执行 Crew
        start_time = time.time()
        result = self.kickoff(inputs=inputs)
        execution_time = time.time() - start_time

        # 收集指标
        metrics = {
            "iteration": i + 1,
            "execution_time": execution_time,
            "total_tokens": result.token_usage.total_tokens,
            "successful_requests": result.token_usage.successful_requests,
            "output_length": len(result.raw),
        }
        results.append(metrics)

        print(f"Execution Time: {execution_time:.2f}s")
        print(f"Total Tokens: {metrics['total_tokens']}")

    # 计算统计数据
    avg_time = sum(r["execution_time"] for r in results) / len(results)
    avg_tokens = sum(r["total_tokens"] for r in results) / len(results)

    print(f"\n=== Test Summary ===")
    print(f"Average Execution Time: {avg_time:.2f}s")
    print(f"Average Total Tokens: {avg_tokens:.0f}")

调用链路深度分析

路径 1:用户代码 → kickoff() → Sequential Process 完整链路

1.1 入口层调用链

# 用户代码
result = crew.kickoff(inputs={"topic": "AI Safety"})

# ↓ 进入 Crew.kickoff()
def kickoff(self, inputs: dict[str, Any] | None = None) -> CrewOutput:
    # 1. 设置 OpenTelemetry baggage context
    ctx = baggage.set_baggage("crew_context", CrewContext(id=str(self.id), key=self.key))
    token = attach(ctx)

    try:
        # 2. 执行 before_kickoff 回调
        for before_callback in self.before_kickoff_callbacks:
            inputs = before_callback(inputs)

        # 3. 发送 CrewKickoffStartedEvent
        crewai_event_bus.emit(self, CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs))

        # 4. 重置任务输出处理器
        self._task_output_handler.reset()

        # 5. 插值输入参数
        if inputs is not None:
            self._inputs = inputs
            self._interpolate_inputs(inputs)

        # 6. 设置任务回调
        self._set_tasks_callbacks()

        # 7. 初始化国际化
        i18n = I18N(prompt_file=self.prompt_file)

        # 8. 为每个 Agent 注入资源
        for agent in self.agents:
            agent.i18n = i18n
            agent.crew = self
            agent.set_knowledge(crew_embedder=self.embedder)
            agent.function_calling_llm = self.function_calling_llm
            agent.step_callback = self.step_callback
            agent.create_agent_executor()

        # 9. 处理规划(如果启用)
        if self.planning:
            self._handle_crew_planning()

        # 10. 选择执行流程
        if self.process == Process.sequential:
            result = self._run_sequential_process()
        elif self.process == Process.hierarchical:
            result = self._run_hierarchical_process()

        # 11. 执行 after_kickoff 回调
        for after_callback in self.after_kickoff_callbacks:
            result = after_callback(result)

        # 12. 计算 token 使用
        self.usage_metrics = self.calculate_usage_metrics()

        return result

    except Exception as e:
        crewai_event_bus.emit(self, CrewKickoffFailedEvent(error=str(e), crew_name=self.name))
        raise
    finally:
        detach(token)

1.2 插值层调用链

# Crew._interpolate_inputs()
def _interpolate_inputs(self, inputs: dict[str, Any]) -> None:
    """将 inputs 插入到任务和 Agent 的模板变量中"""
    # 插值所有任务
    for task in self.tasks:
        task.interpolate_inputs_and_add_conversation_history(inputs)
        # ↓ 进入 Task.interpolate_inputs_and_add_conversation_history()
        #   替换 task.description 中的 {placeholders}
        #   替换 task.expected_output 中的 {placeholders}
        #   处理会话历史(crew_chat_messages)

    # 插值所有 Agent
    for agent in self.agents:
        agent.interpolate_inputs(inputs)
        # ↓ 进入 Agent.interpolate_inputs()
        #   替换 agent.role 中的 {placeholders}
        #   替换 agent.goal 中的 {placeholders}
        #   替换 agent.backstory 中的 {placeholders}

1.3 Sequential Process 执行链

# Crew._run_sequential_process()
def _run_sequential_process(self) -> CrewOutput:
    """顺序执行流程"""
    return self._execute_tasks(self.tasks)
    # ↓ 进入核心任务执行循环

# Crew._execute_tasks()
def _execute_tasks(
    self,
    tasks: list[Task],
    start_index: int | None = 0,
    was_replayed: bool = False,
) -> CrewOutput:
    """任务执行循环"""
    task_outputs: list[TaskOutput] = []
    futures: list[tuple[Task, Future[TaskOutput], int]] = []

    for task_index, task in enumerate(tasks):
        # 跳过已执行的任务(回放场景)
        if start_index is not None and task_index < start_index:
            continue

        # 1. 获取执行 Agent
        agent_to_use = self._get_agent_to_use(task)
        # ↓ Sequential: 返回 task.agent
        # ↓ Hierarchical: 返回 self.manager_agent

        # 2. 准备工具
        tools_for_task = task.tools or agent_to_use.tools or []
        tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
        # ↓ 进入 _prepare_tools()

        # 3. 获取上下文
        context = self._get_context(task, task_outputs)
        # ↓ 进入 _get_context()

        # 4. 处理条件任务
        if isinstance(task, ConditionalTask):
            skipped_output = self._handle_conditional_task(...)
            if skipped_output:
                task_outputs.append(skipped_output)
                continue

        # 5. 执行任务
        if task.async_execution:
            # 异步执行
            future = task.execute_async(agent=agent_to_use, context=context, tools=tools_for_task)
            futures.append((task, future, task_index))
        else:
            # 同步执行
            if futures:
                # 先完成所有异步任务
                task_outputs = self._process_async_tasks(futures)
                futures.clear()

            # 执行当前同步任务
            task_output = task.execute_sync(agent=agent_to_use, context=context, tools=tools_for_task)
            # ↓ 进入 Task.execute_sync()

            task_outputs.append(task_output)
            self._process_task_result(task, task_output)
            self._store_execution_log(task, task_output, task_index, was_replayed)

    # 6. 等待剩余异步任务
    if futures:
        task_outputs = self._process_async_tasks(futures)

    # 7. 创建 CrewOutput
    return self._create_crew_output(task_outputs)

1.4 工具准备链

# Crew._prepare_tools()
def _prepare_tools(
    self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
    """准备工具列表"""
    # 1. 添加委派工具(如果允许)
    if hasattr(agent, "allow_delegation") and getattr(agent, "allow_delegation", False):
        if self.process == Process.hierarchical:
            tools = self._update_manager_tools(task, tools)
            # ↓ 进入 _update_manager_tools()
            #   添加 DelegateTaskTool
        elif agent:
            tools = self._add_delegation_tools(task, tools)
            # ↓ 进入 _add_delegation_tools()
            #   agent.get_delegation_tools(agents)

    # 2. 添加代码执行工具
    if hasattr(agent, "allow_code_execution") and getattr(agent, "allow_code_execution", False):
        tools = self._add_code_execution_tools(agent, tools)
        # ↓ agent.get_code_execution_tools()

    # 3. 添加多模态工具
    if hasattr(agent, "multimodal") and getattr(agent, "multimodal", False):
        tools = self._add_multimodal_tools(agent, tools)
        # ↓ agent.get_multimodal_tools()

    # 4. 添加平台工具
    if hasattr(agent, "apps") and getattr(agent, "apps", None):
        tools = self._add_platform_tools(task, tools)
        # ↓ agent.get_platform_tools(apps)

    # 5. 添加 MCP 工具
    if hasattr(agent, "mcps") and getattr(agent, "mcps", None):
        tools = self._add_mcp_tools(task, tools)
        # ↓ agent.get_mcp_tools(mcps)

    return tools

1.5 上下文聚合链

# Crew._get_context()
def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
    """获取任务上下文"""
    if not task.context:
        return ""

    if task.context is NOT_SPECIFIED:
        # 默认:聚合所有前序任务的输出
        return aggregate_raw_outputs_from_task_outputs(task_outputs)
        # ↓ 进入 aggregate_raw_outputs_from_task_outputs()
        #   return "\n".join([output.raw for output in task_outputs])
    else:
        # 显式指定:仅聚合特定任务的输出
        return aggregate_raw_outputs_from_tasks(task.context)
        # ↓ 进入 aggregate_raw_outputs_from_tasks()
        #   查找对应的任务输出
        #   return "\n".join([task.output.raw for task in context_tasks if task.output])

1.6 Task 执行链

# Task.execute_sync()
def execute_sync(
    self,
    agent: BaseAgent | None = None,
    context: str | None = None,
    tools: list[Any] | None = None,
) -> TaskOutput:
    """同步执行任务"""
    return self._execute_core(agent, context, tools)
    # ↓ 进入 _execute_core()

# Task._execute_core()
def _execute_core(
    self,
    agent: BaseAgent | None,
    context: str | None,
    tools: list[Any] | None,
) -> TaskOutput:
    """任务执行核心逻辑"""
    agent = agent or self.agent
    self.agent = agent

    # 1. 记录开始时间
    self.start_time = datetime.datetime.now()

    # 2. 设置上下文
    self.prompt_context = context
    tools = tools or self.tools or []

    # 3. 记录处理 Agent
    self.processed_by_agents.add(agent.role)

    # 4. 发送 TaskStartedEvent
    crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))

    # 5. 执行 Agent 任务
    result = agent.execute_task(task=self, context=context, tools=tools)
    # ↓ 进入 Agent.execute_task()

    # 6. 导出输出
    pydantic_output, json_output = self._export_output(result)
    # ↓ 进入 _export_output()
    #   解析 JSON 或 Pydantic 模型

    # 7. 创建 TaskOutput
    task_output = TaskOutput(
        name=self.name or self.description,
        description=self.description,
        expected_output=self.expected_output,
        raw=result,
        pydantic=pydantic_output,
        json_dict=json_output,
        agent=agent.role,
        output_format=self._get_output_format(),
    )

    # 8. 执行 Guardrails 验证
    if self._guardrails:
        for idx, guardrail in enumerate(self._guardrails):
            task_output = self._invoke_guardrail_function(
                task_output=task_output,
                agent=agent,
                tools=tools,
                guardrail=guardrail,
                guardrail_index=idx,
            )
            # ↓ 如果验证失败,重新执行任务

    # 9. 调用回调
    if self.callback:
        self.callback(task_output)

    # 10. 发送 TaskCompletedEvent
    crewai_event_bus.emit(self, TaskCompletedEvent(task_output=task_output, task=self))

    return task_output

1.7 Agent 执行链

# Agent.execute_task()
def execute_task(
    self,
    task: Task,
    context: str | None = None,
    tools: list[BaseTool] | None = None,
) -> Any:
    """执行任务"""
    # 1. 设置任务引用
    self.task = task

    # 2. 构建任务提示词
    task_prompt = task.prompt()
    # ↓ 进入 Task.prompt()
    #   合并 task.description, expected_output, context

    # 3. 添加工具描述
    if tools:
        self.tools_description = self._render_text_description_and_args(tools)

    # 4. 查询记忆系统(如果启用)
    if self._is_any_available_memory():
        memory_context = self._build_memory_context(task.description)
        # ↓ 进入 _build_memory_context()
        #   查询 short_term_memory
        #   查询 long_term_memory
        #   查询 entity_memory
        task_prompt = f"{memory_context}\n\n{task_prompt}"

    # 5. 查询知识库(如果启用)
    if self.knowledge:
        knowledge_context = self.knowledge.query([task.description])
        # ↓ 进入 Knowledge.query()
        #   执行 RAG 检索
        task_prompt = f"{knowledge_context}\n\n{task_prompt}"

    # 6. 执行任务(带超时)
    if task.max_execution_time:
        result = self._execute_with_timeout(task_prompt, task, task.max_execution_time)
        # ↓ 进入 _execute_with_timeout()
    else:
        result = self._execute_without_timeout(task_prompt, task)
        # ↓ 进入 _execute_without_timeout()

    # 7. 保存到长期记忆
    if self.crew and self.crew._long_term_memory:
        self.crew._long_term_memory.save(result, metadata={"agent": self.role, "task": task.description})

    return result

# Agent._execute_without_timeout()
def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any:
    """执行任务(无超时)"""
    if not self.agent_executor:
        raise RuntimeError("Agent executor is not initialized.")

    # 调用 AgentExecutor
    return self.agent_executor.invoke({
        "input": task_prompt,
        "tool_names": self.agent_executor.tools_names,
        "tools": self.agent_executor.tools_description,
        "ask_for_human_input": task.human_input,
    })["output"]
    # ↓ 进入 AgentExecutor.invoke()
    #   这是 LangChain 的 AgentExecutor
    #   会循环调用 LLM 和 Tool,直到任务完成

1.8 AgentExecutor 循环链(LangChain)

# AgentExecutor.invoke() (LangChain 内部)
def invoke(self, inputs: dict) -> dict:
    """AgentExecutor 执行循环"""
    # 1. 初始化
    iterations = 0
    intermediate_steps = []

    while iterations < self.max_iterations:
        # 2. 调用 LLM
        output = self.llm.predict(
            input=inputs["input"],
            intermediate_steps=intermediate_steps,
            ...
        )
        # ↓ 进入 LLM.predict()
        #   调用 OpenAI/Anthropic/etc API
        #   返回文本或工具调用

        # 3. 解析 LLM 输出
        action = self.output_parser.parse(output)

        if isinstance(action, AgentFinish):
            # 任务完成
            return {"output": action.return_values["output"]}

        # 4. 执行工具
        tool = self.tools_by_name[action.tool]

        # 5. 检查缓存(CrewAI 特性)
        if hasattr(tool, "cache_function"):
            cached_result = tool.cache_function(action.tool_input)
            if cached_result:
                observation = cached_result
            else:
                observation = tool.run(action.tool_input)
                tool.cache_function.cache_result(action.tool_input, observation)
        else:
            observation = tool.run(action.tool_input)
            # ↓ 进入 Tool.run()
            #   执行工具逻辑

        # 6. 保存中间步骤
        intermediate_steps.append((action, observation))

        # 7. 保存到短期记忆(CrewAI 特性)
        if self.agent.crew and self.agent.crew._short_term_memory:
            self.agent.crew._short_term_memory.save(
                observation,
                metadata={"tool": action.tool, "agent": self.agent.role}
            )

        iterations += 1

    raise ValueError("Agent exceeded max iterations")

1.9 输出聚合链

# Crew._create_crew_output()
def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
    """创建 Crew 输出"""
    if not task_outputs:
        raise ValueError("No task outputs available to create crew output.")

    # 1. 过滤空输出
    valid_outputs = [t for t in task_outputs if t.raw]
    if not valid_outputs:
        raise ValueError("No valid task outputs available to create crew output.")

    # 2. 获取最后一个有效输出
    final_task_output = valid_outputs[-1]

    # 3. 完成执行
    self._finish_execution(final_task_output.raw)
    # ↓ 停止 RPM controller

    # 4. 计算 token 使用
    self.token_usage = self.calculate_usage_metrics()
    # ↓ 进入 calculate_usage_metrics()
    #   遍历所有 Agent
    #   收集 LLM token 使用统计

    # 5. 发送完成事件
    crewai_event_bus.emit(
        self,
        CrewKickoffCompletedEvent(
            crew_name=self.name,
            output=final_task_output,
            total_tokens=self.token_usage.total_tokens,
        ),
    )

    # 6. 创建 CrewOutput
    return CrewOutput(
        raw=final_task_output.raw,
        pydantic=final_task_output.pydantic,
        json_dict=final_task_output.json_dict,
        tasks_output=task_outputs,
        token_usage=self.token_usage,
    )

路径 2:用户代码 → kickoff() → Hierarchical Process 完整链路

2.1 Hierarchical Process 执行链

# Crew._run_hierarchical_process()
def _run_hierarchical_process(self) -> CrewOutput:
    """层级执行流程"""
    # 1. 创建 Manager Agent
    self._create_manager_agent()
    # ↓ 进入 _create_manager_agent()

    # 2. 执行任务(Manager 会委派给 Worker Agents)
    return self._execute_tasks(self.tasks)

# Crew._create_manager_agent()
def _create_manager_agent(self):
    """创建 Manager Agent"""
    i18n = I18N(prompt_file=self.prompt_file)

    if self.manager_agent is not None:
        # 使用用户提供的 Manager Agent
        self.manager_agent.allow_delegation = True
        manager = self.manager_agent

        # 验证 Manager 不应有工具
        if manager.tools is not None and len(manager.tools) > 0:
            raise Exception("Manager agent should not have tools")
    else:
        # 自动创建 Manager Agent
        self.manager_llm = create_llm(self.manager_llm)

        manager = Agent(
            role=i18n.retrieve("hierarchical_manager_agent", "role"),
            goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
            backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
            tools=AgentTools(agents=self.agents).tools(),
            # ↓ AgentTools 提供委派工具
            #   - delegate_work: 委派任务给 Worker Agent
            #   - ask_question: 向 Worker Agent 提问
            allow_delegation=True,
            llm=self.manager_llm,
            verbose=self.verbose,
        )
        self.manager_agent = manager

    manager.crew = self

2.2 Manager Agent 执行任务的流程

在 Hierarchical Process 中,_execute_tasks() 的调用链与 Sequential 类似,但关键区别在于:

  1. Agent 选择

    # Crew._get_agent_to_use()
    def _get_agent_to_use(self, task: Task) -> BaseAgent | None:
        if self.process == Process.hierarchical:
            return self.manager_agent  # 总是返回 Manager
        return task.agent  # Sequential 返回 task.agent
    
  2. Manager Agent 的工具: Manager Agent 拥有特殊的委派工具:

    # AgentTools.tools() 返回的委派工具
    [
        DelegateTaskTool(agents=self.agents),
        AskQuestionTool(agents=self.agents),
    ]
    
  3. Manager 执行任务时的流程

    Manager Agent 收到任务
    Manager 分析任务,决定委派策略
    Manager 调用 delegate_work 工具
    DelegateTaskTool 选择合适的 Worker Agent
    Worker Agent 执行子任务
    Worker Agent 返回结果给 Manager
    Manager 聚合结果,决定下一步
    Manager 继续委派或完成任务
    
  4. DelegateTaskTool 的实现

    # DelegateTaskTool.run()
    def run(self, coworker: str, task: str, context: str) -> str:
        """委派任务给 Worker Agent"""
        # 1. 查找 Worker Agent
        agent = self._find_agent_by_role(coworker)
    
        # 2. 创建子任务
        sub_task = Task(
            description=task,
            expected_output="完成任务并返回结果",
            agent=agent,
        )
    
        # 3. 执行子任务
        result = agent.execute_task(sub_task, context=context)
        # ↓ 进入 Agent.execute_task()
        #   与 Sequential Process 相同
    
        # 4. 返回结果给 Manager
        return result
    

关键功能流程剖析

功能 1:Sequential Process 执行流程

功能描述

按照任务定义的顺序依次执行,每个任务使用前序任务的输出作为上下文。

核心代码

def _run_sequential_process(self) -> CrewOutput:
    """顺序执行流程"""
    # 包含规划任务
    tasks = self.planning_tasks + self.tasks if self.planning else self.tasks

    # 执行任务序列
    return self._execute_tasks(tasks)

def _execute_tasks(
    self,
    tasks: list[Task],
    start_index: int | None = 0,
    was_replayed: bool = False,
) -> CrewOutput:
    """执行任务序列"""
    task_outputs: list[TaskOutput] = []
    futures: list[tuple[Task, Future[TaskOutput], int]] = []

    # 添加回放的任务输出
    if was_replayed:
        task_outputs.extend(self._task_outputs_replayed_from_memory)

    # 遍历任务
    for task_index, task in enumerate(tasks):
        # 跳过已执行的任务(回放场景)
        if task_index < start_index:
            continue

        # 1. 处理条件任务
        if isinstance(task, ConditionalTask):
            skipped_output = self._handle_conditional_task(
                task, task_outputs, futures, task_index
            )
            if skipped_output:
                task_outputs.append(skipped_output)
                continue

        # 2. 确定执行 Agent
        agent_to_use = self._get_agent_to_use(task)

        # 3. 准备工具
        tools_for_task = self._prepare_tools(
            agent_to_use, task, task.tools or []
        )

        # 4. 获取上下文
        context = self._get_context(task, task_outputs)

        # 5. 执行任务
        if task.async_execution:
            # 异步执行
            future = task.execute_async(
                agent=agent_to_use,
                context=context,
                tools=tools_for_task,
            )
            futures.append((task, future, task_index))
        else:
            # 同步执行(先完成所有异步任务)
            if futures:
                task_outputs = self._process_async_tasks(futures)
                futures.clear()

            # 执行当前同步任务
            task_output = task.execute_sync(
                agent=agent_to_use,
                context=context,
                tools=tools_for_task,
            )

            task_outputs.append(task_output)
            self._process_task_result(task, task_output)
            self._store_execution_log(task, task_output, task_index)

    # 6. 等待剩余异步任务
    if futures:
        task_outputs = self._process_async_tasks(futures)

    # 7. 创建 CrewOutput
    return self._create_crew_output(task_outputs)

关键子函数

获取上下文

def _get_context(
    self, task: Task, task_outputs: list[TaskOutput]
) -> str:
    """获取任务上下文"""
    if task.context is NOT_SPECIFIED:
        # 默认:使用所有前序任务的输出
        return "\n".join([output.raw for output in task_outputs])
    elif isinstance(task.context, list):
        # 显式指定:仅使用特定任务的输出
        context_outputs = []
        for context_task in task.context:
            # 查找对应的输出
            for output in task_outputs:
                if output.description == context_task.description:
                    context_outputs.append(output.raw)
                    break
        return "\n".join(context_outputs)
    else:
        return ""

准备工具

def _prepare_tools(
    self, agent: BaseAgent, task: Task, tools: list[BaseTool]
) -> list[BaseTool]:
    """准备任务工具"""
    # 1. 合并 Agent 和 Task 的工具
    combined_tools = list(agent.tools) + list(tools)

    # 2. 添加 RPM 限制包装
    if self.max_rpm:
        wrapped_tools = []
        for tool in combined_tools:
            wrapped_tool = self._wrap_tool_with_rpm_control(tool)
            wrapped_tools.append(wrapped_tool)
        combined_tools = wrapped_tools

    # 3. 添加缓存包装
    if self.cache:
        cached_tools = []
        for tool in combined_tools:
            cached_tool = CacheTool(
                original_tool=tool,
                cache_handler=self.cache_handler,
            )
            cached_tools.append(cached_tool)
        combined_tools = cached_tools

    return combined_tools

功能 2:Hierarchical Process 执行流程

功能描述

自动创建 Manager Agent 负责任务分配和协调,Worker Agent 接受委派并执行任务。

核心代码

def _run_hierarchical_process(self) -> CrewOutput:
    """层级执行流程"""
    # 1. 创建 Manager Agent
    manager = self._create_manager_agent()

    # 2. 创建管理任务
    manager_task = Task(
        description=self._get_manager_task_description(),
        expected_output=self._get_manager_expected_output(),
        agent=manager,
    )

    # 3. Manager 执行管理任务
    # Manager 会根据需要委派任务给 Worker Agents
    result = manager_task.execute_sync(
        agent=manager,
        context=None,
        tools=manager.tools,
    )

    # 4. 收集所有任务输出
    task_outputs = self._collect_hierarchical_outputs()

    # 5. 创建 CrewOutput
    return self._create_crew_output(task_outputs)

def _create_manager_agent(self) -> Agent:
    """创建 Manager Agent"""
    # 1. 获取配置
    if self.manager_agent:
        manager_config = self.manager_agent
    else:
        manager_config = {
            "role": "Project Manager",
            "goal": "Coordinate team to accomplish project goals efficiently",
            "backstory": "Experienced project manager skilled at task delegation",
        }

    # 2. 创建委派工具
    delegate_tools = [
        DelegateTaskTool(agents=self.agents),
        AskQuestionTool(agents=self.agents),
    ]

    # 3. 创建 Manager
    manager = Agent(
        **manager_config,
        llm=self.manager_llm or self.agents[0].llm,
        tools=delegate_tools,
        allow_delegation=True,
        verbose=self.verbose,
    )

    return manager

Manager Agent 的工作流程

sequenceDiagram
    autonumber
    participant Manager as Manager Agent
    participant DelegateTool as DelegateTaskTool
    participant Worker1 as Worker Agent 1
    participant Worker2 as Worker Agent 2

    Manager->>Manager: 分析项目目标
    Manager->>Manager: 制定任务计划

    loop 任务分配循环
        Manager->>DelegateTool: delegate_work(coworker="Researcher", task="Research")
        DelegateTool->>Worker1: execute_task()
        Worker1-->>DelegateTool: TaskOutput
        DelegateTool-->>Manager: 任务结果

        Manager->>Manager: 分析结果
        Manager->>DelegateTool: delegate_work(coworker="Writer", task="Write report")
        DelegateTool->>Worker2: execute_task()
        Worker2-->>DelegateTool: TaskOutput
        DelegateTool-->>Manager: 任务结果
    end

    Manager->>Manager: 聚合所有结果
    Manager-->>Manager: 最终报告

功能 3:Memory 系统集成

功能描述

Crew 管理四种类型的记忆系统,在任务执行过程中自动保存和检索相关信息。

核心代码

def _initialize_memory(self) -> None:
    """初始化内存系统"""
    if not self.memory:
        return

    # 1. 设置存储目录
    storage_dir = os.getenv("CREWAI_STORAGE_DIR", appdirs.user_data_dir("crewai"))

    # 2. 初始化短期记忆
    if not self.short_term_memory:
        self.short_term_memory = ShortTermMemory(
            crew=self,
            embedder_config=self.embedder,
        )

    # 3. 初始化长期记忆
    if not self.long_term_memory:
        self.long_term_memory = LongTermMemory(
            crew=self,
            embedder_config=self.embedder,
        )

    # 4. 初始化实体记忆
    if not self.entity_memory:
        self.entity_memory = EntityMemory(
            crew=self,
            embedder_config=self.embedder,
        )

    # 5. 初始化外部记忆(如果配置)
    if self.external_memory and not self.external_memory:
        self.external_memory = ExternalMemory(
            crew=self,
            provider="mem0",
            config={"api_key": os.getenv("MEM0_API_KEY")},
        )

    # 6. 将记忆系统注入到 Agent
    for agent in self.agents:
        agent.short_term_memory = self.short_term_memory
        agent.long_term_memory = self.long_term_memory
        agent.entity_memory = self.entity_memory
        agent.external_memory = self.external_memory

def reset_memories(self, command_type: str) -> None:
    """重置记忆系统"""
    if command_type == "short_term":
        self.short_term_memory.reset()
    elif command_type == "long_term":
        self.long_term_memory.reset()
    elif command_type == "entity":
        self.entity_memory.reset()
    elif command_type == "external":
        if self.external_memory:
            self.external_memory.reset()
    elif command_type == "all":
        self.short_term_memory.reset()
        self.long_term_memory.reset()
        self.entity_memory.reset()
        if self.external_memory:
            self.external_memory.reset()

Memory 使用示例

# 启用内存系统
crew = Crew(
    agents=[researcher, analyst],
    tasks=[research_task, analysis_task],
    memory=True,  # 启用所有记忆类型
    embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
)

# Agent 自动使用记忆
result = crew.kickoff(inputs={"topic": "AI Trends"})

# 手动查询记忆
relevant_memories = crew.short_term_memory.search(
    query="What are the key AI trends?",
    limit=5,
    score_threshold=0.7,
)

# 清理记忆
crew.reset_memories("short_term")  # 仅清理短期记忆
crew.reset_memories("all")          # 清理所有记忆

功能 4:Planning 自动规划

功能描述

Crew 可以自动生成规划任务,分析项目目标并制定执行计划。

核心代码

def _create_planning_tasks(self) -> list[Task]:
    """创建规划任务"""
    # 1. 创建规划 Agent
    planning_agent = Agent(
        role="Task Planner",
        goal="Create detailed task breakdown and execution plan",
        backstory="Expert in project planning and task decomposition",
        llm=self.agents[0].llm,
        verbose=self.verbose,
    )

    # 2. 创建规划任务
    planning_task = Task(
        description=f"""
Analyze the following project goals and create a detailed execution plan:

Goals:
{self._format_goals_for_planning()}

Tasks to be executed:
{self._format_tasks_for_planning()}

Create a step-by-step plan including:
1. Task dependencies
2. Resource allocation
3. Risk assessment
4. Timeline estimates
""",
        expected_output="A detailed execution plan with task breakdown",
        agent=planning_agent,
    )

    return [planning_task]

def _format_goals_for_planning(self) -> str:
    """格式化 Crew 目标"""
    goals = []
    for agent in self.agents:
        goals.append(f"- {agent.role}: {agent.goal}")
    return "\n".join(goals)

def _format_tasks_for_planning(self) -> str:
    """格式化任务列表"""
    tasks_desc = []
    for task in self.tasks:
        tasks_desc.append(f"- {task.description}")
    return "\n".join(tasks_desc)

Planning 使用示例

# 启用自动规划
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, report_task],
    planning=True,  # 启用自动规划
)

# Crew 会先执行规划任务,然后按照规划执行实际任务
result = crew.kickoff(inputs={"project": "Market Analysis"})

# 规划任务的输出会作为后续任务的上下文
planning_output = result.tasks_output[0]  # 第一个输出是规划结果
print(planning_output.raw)

实战示例与最佳实践

示例 1:Sequential Process 基础用法

from crewai import Crew, Agent, Task, Process

# 定义 Agents
researcher = Agent(
    role="Research Analyst",
    goal="Find and analyze relevant information",
    backstory="Expert researcher with attention to detail",
    verbose=True,
)

writer = Agent(
    role="Content Writer",
    goal="Write engaging content based on research",
    backstory="Skilled writer with journalism background",
    verbose=True,
)

# 定义 Tasks
research_task = Task(
    description="Research the latest {topic} trends in 2024",
    expected_output="A comprehensive research report with 5 key findings",
    agent=researcher,
)

write_task = Task(
    description="Write a blog post based on the research",
    expected_output="A 500-word blog post in Markdown format",
    agent=writer,
    markdown=True,
)

# 创建 Crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    verbose=True,
)

# 执行
result = crew.kickoff(inputs={"topic": "AI Safety"})
print(result.raw)
print(f"Total tokens used: {result.token_usage.total_tokens}")

示例 2:Hierarchical Process 用法

# 定义 Worker Agents
data_collector = Agent(
    role="Data Collector",
    goal="Collect relevant data from various sources",
    backstory="Data specialist",
)

data_analyst = Agent(
    role="Data Analyst",
    goal="Analyze collected data and extract insights",
    backstory="Statistical analyst",
)

report_writer = Agent(
    role="Report Writer",
    goal="Create comprehensive reports",
    backstory="Business analyst",
)

# 定义任务(Manager 会动态分配)
tasks = [
    Task(
        description="Collect market data for {product}",
        expected_output="Raw market data",
        agent=data_collector,
    ),
    Task(
        description="Analyze market trends",
        expected_output="Analysis report with insights",
        agent=data_analyst,
    ),
    Task(
        description="Create final market analysis report",
        expected_output="Executive summary and recommendations",
        agent=report_writer,
    ),
]

# 创建 Hierarchical Crew
crew = Crew(
    agents=[data_collector, data_analyst, report_writer],
    tasks=tasks,
    process=Process.hierarchical,
    manager_llm="gpt-4",  # Manager 使用更强的模型
    verbose=True,
)

result = crew.kickoff(inputs={"product": "Electric Vehicles"})

示例 3:使用 Memory 系统

# 创建带记忆的 Crew
crew = Crew(
    agents=[researcher, analyst],
    tasks=[research_task, analysis_task],
    memory=True,
    embedder={
        "provider": "openai",
        "config": {"model": "text-embedding-3-small"}
    },
    verbose=True,
)

# 首次执行
result1 = crew.kickoff(inputs={"topic": "AI Trends"})

# 第二次执行(记忆会提供上次执行的上下文)
result2 = crew.kickoff(inputs={"topic": "AI Trends in Healthcare"})

# Agent 会自动检索相关记忆
# 例如:"根据之前关于 AI Trends 的研究..."

示例 4:异步任务执行

# 定义独立的异步任务
task1 = Task(
    description="Research {topic} from academic papers",
    expected_output="Academic research summary",
    agent=academic_researcher,
    async_execution=True,  # 异步执行
)

task2 = Task(
    description="Research {topic} from industry reports",
    expected_output="Industry insights",
    agent=industry_analyst,
    async_execution=True,  # 异步执行
)

task3 = Task(
    description="Research {topic} from social media",
    expected_output="Public sentiment analysis",
    agent=social_media_analyst,
    async_execution=True,  # 异步执行
)

# 同步任务,等待所有异步任务完成
synthesis_task = Task(
    description="Synthesize all research findings",
    expected_output="Comprehensive synthesis report",
    agent=synthesizer,
    async_execution=False,  # 同步执行
)

crew = Crew(
    agents=[academic_researcher, industry_analyst, social_media_analyst, synthesizer],
    tasks=[task1, task2, task3, synthesis_task],
)

# task1, task2, task3 并行执行
# synthesis_task 等待三者完成后执行
result = crew.kickoff(inputs={"topic": "Climate Change"})

示例 5:使用 @CrewBase 装饰器模式

from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff

@CrewBase
class ResearchCrew:
    """Research crew with decorators"""

    # 配置文件路径
    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"

    @before_kickoff
    def setup_environment(self):
        """启动前回调"""
        print("🚀 Setting up research environment...")
        # 验证 API keys
        # 创建必要的目录
        # 初始化外部服务

    @agent
    def researcher(self) -> Agent:
        return Agent(
            config=self.agents_config['researcher'],
            tools=[SerperDevTool()],
            verbose=True,
            memory=True,
        )

    @agent
    def analyst(self) -> Agent:
        return Agent(
            config=self.agents_config['analyst'],
            verbose=True,
            memory=True,
        )

    @task
    def research_task(self) -> Task:
        return Task(
            config=self.tasks_config['research_task'],
            agent=self.researcher(),
        )

    @task
    def analysis_task(self) -> Task:
        return Task(
            config=self.tasks_config['analysis_task'],
            agent=self.analyst(),
            context=[self.research_task()],
        )

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,  # 自动收集所有 @agent
            tasks=self.tasks,    # 自动收集所有 @task
            process=Process.sequential,
            memory=True,
            verbose=True,
        )

    @after_kickoff
    def cleanup_and_report(self, output: CrewOutput):
        """完成后回调"""
        print(f"✅ Research completed!")
        print(f"📊 Token usage: {output.token_usage.total_tokens}")
        print(f"📝 Generated {len(output.tasks_output)} task outputs")

# 使用
research_crew = ResearchCrew()
result = research_crew.crew().kickoff(inputs={"topic": "Quantum Computing"})

最佳实践总结

1. Process 选择策略

使用 Sequential Process 当

  • 任务有明确的依赖链
  • 工作流是线性的
  • Agent 角色和职责清晰

使用 Hierarchical Process 当

  • 需要动态任务分配
  • 任务之间的依赖复杂
  • 需要一个协调者管理整体进度

2. Memory 配置建议

  • 短期记忆:始终启用,提供任务间的上下文
  • 长期记忆:用于跨会话的知识积累
  • 实体记忆:跟踪重要实体(人物、组织等)
  • 外部记忆:集成 Mem0 用于高级记忆功能
# 推荐配置
crew = Crew(
    agents=[...],
    tasks=[...],
    memory=True,  # 启用所有记忆类型
    embedder={
        "provider": "openai",
        "config": {
            "model": "text-embedding-3-small",  # 性价比高
            "dimensions": 1536,
        }
    },
)

3. RPM 限速策略

  • 开发环境:不设置 max_rpm(无限制)
  • 生产环境:根据 API 限额设置 max_rpm
  • 批量执行:设置合理的 max_rpm 避免 429 错误
# 生产环境推荐
crew = Crew(
    agents=[...],
    tasks=[...],
    max_rpm=60,  # 每分钟最多 60 次请求
)

4. 错误处理与重试

import time
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def safe_kickoff(crew, inputs):
    """带重试的 kickoff"""
    try:
        return crew.kickoff(inputs=inputs)
    except Exception as e:
        print(f"Crew execution failed: {e}")
        # 清理状态
        crew.reset_memories("short_term")
        raise

# 使用
result = safe_kickoff(crew, {"topic": "AI Safety"})

5. 性能优化技巧

使用异步任务

# 独立任务使用异步
task1 = Task(..., async_execution=True)
task2 = Task(..., async_execution=True)
# 同步任务作为分隔点
task3 = Task(..., async_execution=False)

启用缓存

crew = Crew(
    agents=[...],
    tasks=[...],
    cache=True,  # 默认启用
)

优化上下文传递

# 仅传递必要的上下文
task = Task(
    description="...",
    expected_output="...",
    context=[specific_task],  # 而非 NOT_SPECIFIED
)

文档更新总结

本次更新对 CrewAI-03-Crew-完整剖析.md 进行了全面的扩充和深化,主要更新内容包括:

1. 整体服务架构图扩展

新增内容

  • 添加了完整的分层架构图,包含 8 个主要层次:
    • 上游接口层(Entry Points)
    • Crew 核心协调层
    • 流程选择层(Process Layer)
    • 任务执行层(Task Execution)
    • Manager 管理层(Hierarchical Only)
    • Task 执行层
    • Agent 执行层
    • 资源管理层(Shared Resources)
    • 事件与日志层(Observability)
    • 输出处理层

架构分层说明

  • 为每个层次添加了详细的职责说明
  • 说明了各层之间的调用关系和数据流
  • 标注了关键组件和它们的功能

2. 模块交互矩阵

新增内容

  • 创建了调用方和被调方的交互矩阵表格
  • 说明了五种交互模式:
    • 同步调用(Synchronous)
    • 异步调用(Asynchronous)
    • 事件发布(Event-Driven)
    • 资源查询(Query)
    • 回调通知(Callback)

3. 完整执行时序图

新增内容

  • 添加了从用户代码到最终输出的完整时序图(96 个步骤)
  • 包含所有关键参与者:
    • 用户代码
    • Crew 编排器
    • 回调系统
    • Memory 系统
    • Cache 系统
    • Knowledge 系统
    • Process 层
    • 任务执行层
    • Task
    • Agent
    • AgentExecutor
    • LLM
    • Tool
    • Event Bus
    • Storage
    • Output 层

时序图功能详解

  • 分为 5 个主要阶段:

    • 阶段 1:启动和初始化(步骤 1-15)
    • 阶段 2:流程选择(步骤 16-20)
    • 阶段 3:任务执行循环(步骤 21-85)
    • 阶段 4:输出聚合(步骤 86-90)
    • 阶段 5:完成和清理(步骤 91-96)
  • 为每个阶段提供了详细的步骤说明

4. 调用链路深度分析

新增内容

  • 添加了两条完整的调用链路分析:
    • 路径 1:用户代码 → kickoff() → Sequential Process 完整链路
    • 路径 2:用户代码 → kickoff() → Hierarchical Process 完整链路

详细分析内容

路径 1 包含 9 个子链路:

  1. 入口层调用链:从 crew.kickoff() 的完整实现
  2. 插值层调用链_interpolate_inputs() 的工作流程
  3. Sequential Process 执行链_execute_tasks() 的循环逻辑
  4. 工具准备链_prepare_tools() 如何合并和增强工具
  5. 上下文聚合链_get_context() 如何聚合任务输出
  6. Task 执行链Task.execute_sync()_execute_core() 的流程
  7. Agent 执行链Agent.execute_task() 的详细步骤
  8. AgentExecutor 循环链:LangChain 的执行循环机制
  9. 输出聚合链_create_crew_output() 的输出处理

路径 2 包含 2 个子链路:

  1. Hierarchical Process 执行链_run_hierarchical_process() 的流程
  2. Manager Agent 执行任务的流程:Manager 如何委派任务给 Worker

每个子链路都包含

  • 完整的代码示例(带注释)
  • 调用流程箭头(↓)标注下一步调用
  • 关键决策点和条件分支
  • 数据传递和转换过程

5. 架构图与时序图的关联

改进内容

  • 时序图的每个步骤都可以映射回架构图的某个层次
  • 调用链路分析展示了架构图中各模块的具体实现
  • 通过三个维度(架构图、时序图、调用链)形成完整的理解体系

6. 文档组织优化

改进内容

  • 保留了原有的所有内容(数据结构、API 规格、实战示例)
  • 在原有基础上增加了架构分析、时序分析、调用链路分析
  • 形成了从宏观到微观的完整分析链条:
    • 宏观:整体服务架构图
    • 中观:完整执行时序图
    • 微观:调用链路深度分析
    • 实践:实战示例与最佳实践

更新价值

对读者的价值

  1. 全局视角:通过架构图快速了解 Crew 的整体结构
  2. 流程理解:通过时序图理解完整的执行流程
  3. 细节掌握:通过调用链路深入理解每个函数的实现
  4. 实践指导:结合实战示例可以快速上手开发

对技术团队的价值

  1. 新人培训:可作为新成员了解 CrewAI 的入门材料
  2. 问题排查:当遇到问题时可以快速定位到具体的调用链路
  3. 架构演进:为后续的架构优化提供参考
  4. 代码审查:作为代码审查的参考标准

总结

Crew 模块是 CrewAI 框架的核心编排器,通过以下机制实现强大的多 Agent 协作:

  1. 灵活的执行流程:Sequential 和 Hierarchical 两种模式
  2. 完善的记忆系统:四种类型的记忆机制
  3. 智能的上下文管理:自动或手动传递任务输出
  4. 丰富的生命周期钩子:before/after kickoff 回调
  5. 强大的训练与回放:支持 Crew 训练和任务回放
  6. 精细的资源控制:RPM 限速、缓存、知识库集成

架构设计原则

该模块的设计遵循以下原则:

  1. 编排者模式:专注于协调 Agent 和 Task 的协作,将实际执行委托给 Agent,将输出验证委托给 Task 的 Guardrail
  2. 职责分离:每个层次有明确的职责边界,降低耦合度
  3. 事件驱动:通过事件总线实现松耦合的组件通信
  4. 资源共享:Memory、Cache、Knowledge 等资源在所有 Agent 间共享
  5. 可观测性:完善的日志、事件和追踪系统支持调试和监控

关键技术特性

  1. 多进程模型

    • Sequential Process:线性任务流
    • Hierarchical Process:层级委派管理
  2. 异步执行

    • 支持任务异步执行
    • Future 管理异步任务结果
    • 同步点等待所有异步任务完成
  3. 上下文传递

    • 自动聚合前序任务输出
    • 支持显式指定上下文任务
    • 智能的上下文插值
  4. 工具系统

    • 动态工具准备和合并
    • 工具结果缓存
    • 委派工具、代码执行工具、多模态工具等
  5. 输出处理

    • 支持 Pydantic 模型、JSON、Raw 字符串
    • Guardrails 验证机制
    • 自动重试和反馈

性能和可扩展性

  1. 性能优化

    • 工具结果缓存减少重复调用
    • RPM 控制器避免 API 限额
    • 异步任务提高并发能力
  2. 可扩展性

    • 插件化的工具系统
    • 可定制的 Manager Agent
    • 灵活的回调机制
    • 外部记忆集成
  3. 可维护性

    • 清晰的分层架构
    • 完善的日志和事件
    • 任务回放功能
    • 执行记录持久化

该模块保持了清晰的职责边界和良好的可扩展性,是 CrewAI 框架的核心价值所在。