CrewAI-03-Crew-完整剖析
模块概览
职责与定位
Crew(团队)是 CrewAI 框架中的核心编排器,负责:
- 多 Agent 协作:管理多个 Agent 的协同工作
- 任务编排:按照指定的流程(sequential/hierarchical)执行任务序列
- 上下文管理:在任务之间传递和聚合输出
- 资源控制:管理 RPM 限速、缓存、内存系统
- 训练与回放:支持 Crew 的训练和任务回放功能
- 生命周期钩子:提供 before/after kickoff 回调
- 输出聚合:将所有任务输出汇总为 CrewOutput
输入与输出
输入:
agents:Agent 列表tasks:Task 列表inputs:动态输入参数字典(用于模板插值)process:执行流程(sequential/hierarchical)
输出:
CrewOutput:包含所有任务输出、token 统计、执行元数据
上下游依赖
上游依赖(被调用):
- 用户代码:通过
Crew.kickoff()启动 - Flow 引擎:在流程中调用 Crew
- CLI 工具:
crewai run命令
下游依赖(调用):
Task:执行任务Agent:通过任务调用 AgentMemory:管理短期、长期、实体、外部记忆Cache:处理结果缓存Knowledge:向 Agent 提供知识库Events:发送 Crew 开始、完成、失败事件
生命周期
stateDiagram-v2
[*] --> 创建: Crew(...) 或 @crew 装饰器
创建 --> 配置: 设置 agents/tasks/process
配置 --> 就绪: 等待 kickoff
就绪 --> 回调前: @before_kickoff
回调前 --> 插值: 输入参数插值
插值 --> 进程选择: sequential 或 hierarchical
进程选择 --> Sequential: process=sequential
进程选择 --> Hierarchical: process=hierarchical
Sequential --> 任务执行: _execute_tasks()
Hierarchical --> 创建管理者: _create_manager_agent()
创建管理者 --> 任务执行
任务执行 --> 同步任务: 逐个执行
任务执行 --> 异步任务: 后台执行
同步任务 --> 条件任务: ConditionalTask 判断
异步任务 --> 等待完成: future.result()
条件任务 --> 聚合输出: CrewOutput
等待完成 --> 聚合输出
聚合输出 --> 回调后: @after_kickoff
回调后 --> 完成: return CrewOutput
完成 --> [*]
整体服务架构图
flowchart TB
subgraph "上游接口层 (Entry Points)"
UserCode[用户代码<br/>crew.kickoff()]
Flow[Flow 引擎<br/>调用 Crew]
CLI[CLI 工具<br/>crewai run]
API[API 接口<br/>HTTP/RPC]
end
subgraph "Crew 核心协调层"
Crew[Crew 编排器<br/>kickoff/kickoff_async]
CrewOutput[CrewOutput<br/>输出聚合]
Callbacks[生命周期回调<br/>before/after_kickoff]
end
subgraph "流程选择层 (Process Layer)"
ProcessRouter{Process 类型}
Sequential[Sequential Process<br/>顺序执行流程]
Hierarchical[Hierarchical Process<br/>层级管理流程]
end
subgraph "任务执行层 (Task Execution)"
ExecuteTasks[_execute_tasks<br/>任务循环执行器]
GetAgent[_get_agent_to_use<br/>Agent 选择]
PrepareTools[_prepare_tools<br/>工具准备]
GetContext[_get_context<br/>上下文聚合]
ConditionalCheck[_handle_conditional_task<br/>条件判断]
end
subgraph "Manager 管理层 (Hierarchical Only)"
ManagerAgent[Manager Agent<br/>自动生成协调者]
DelegateTools[委派工具<br/>DelegateTaskTool]
AskTools[询问工具<br/>AskQuestionTool]
end
subgraph "Task 执行层"
TaskSync[Task.execute_sync<br/>同步执行]
TaskAsync[Task.execute_async<br/>异步执行]
FutureManager[Future 管理<br/>异步任务协调]
end
subgraph "Agent 执行层"
AgentExecutor[Agent.execute_task<br/>任务执行]
AgentExecutorImpl[AgentExecutor.invoke<br/>LLM 调用]
ToolInvoke[Tool 调用<br/>工具执行]
LLMCall[LLM 调用<br/>生成响应]
end
subgraph "资源管理层 (Shared Resources)"
Memory[Memory 系统<br/>Short/Long/Entity/External]
Cache[Cache 系统<br/>工具结果缓存]
Knowledge[Knowledge 系统<br/>RAG 知识库]
RPMController[RPM Controller<br/>速率限制]
end
subgraph "事件与日志层 (Observability)"
EventBus[Event Bus<br/>事件总线]
Logger[Logger<br/>日志系统]
Storage[Task Output Storage<br/>执行记录持久化]
Tracing[Tracing<br/>OpenTelemetry]
end
subgraph "输出处理层"
TaskOutput[TaskOutput<br/>任务输出]
Guardrails[Guardrails<br/>输出验证]
OutputFormat[Output Format<br/>Pydantic/JSON]
end
%% 主流程连接
UserCode --> Crew
Flow --> Crew
CLI --> Crew
API --> Crew
Crew --> Callbacks
Callbacks --> ProcessRouter
ProcessRouter -->|sequential| Sequential
ProcessRouter -->|hierarchical| Hierarchical
Sequential --> ExecuteTasks
Hierarchical --> ManagerAgent
ManagerAgent --> DelegateTools
ManagerAgent --> AskTools
Hierarchical --> ExecuteTasks
ExecuteTasks --> GetAgent
ExecuteTasks --> PrepareTools
ExecuteTasks --> GetContext
ExecuteTasks --> ConditionalCheck
ExecuteTasks --> TaskSync
ExecuteTasks --> TaskAsync
TaskAsync --> FutureManager
TaskSync --> AgentExecutor
FutureManager --> AgentExecutor
AgentExecutor --> AgentExecutorImpl
AgentExecutorImpl --> ToolInvoke
AgentExecutorImpl --> LLMCall
AgentExecutor --> TaskOutput
TaskOutput --> Guardrails
Guardrails --> OutputFormat
OutputFormat --> CrewOutput
%% 资源管理连接
Crew -.->|初始化| Memory
Crew -.->|初始化| Cache
Crew -.->|初始化| Knowledge
Crew -.->|初始化| RPMController
AgentExecutor -.->|查询| Memory
AgentExecutor -.->|查询| Knowledge
ToolInvoke -.->|缓存| Cache
LLMCall -.->|限速| RPMController
%% 事件日志连接
Crew -->|发送| EventBus
ExecuteTasks -->|发送| EventBus
AgentExecutor -->|发送| EventBus
Crew -->|记录| Logger
ExecuteTasks -->|存储| Storage
Crew -.->|追踪| Tracing
style Crew fill:#e1bee7
style ProcessRouter fill:#fff9c4
style ManagerAgent fill:#c5e1a5
style AgentExecutor fill:#b3e5fc
style Memory fill:#ffccbc
style EventBus fill:#d1c4e9
架构分层说明
1) 上游接口层 (Entry Points)
职责:提供多种方式启动 Crew 执行
- 用户代码:最常见的入口,直接调用
crew.kickoff(inputs) - Flow 引擎:在 Flow 流程中调用 Crew
- CLI 工具:通过命令行
crewai run启动 - API 接口:通过 HTTP/RPC 接口触发(Enterprise 功能)
2) Crew 核心协调层
职责:统一入口,生命周期管理,流程协调
关键组件:
Crew.kickoff():同步执行入口Crew.kickoff_async():异步执行入口before_kickoff_callbacks:启动前回调列表after_kickoff_callbacks:完成后回调列表CrewOutput:输出聚合器
核心流程:
- 执行 before_kickoff 回调
- 插值输入参数(interpolate_inputs)
- 初始化资源(memory, cache, knowledge)
- 选择执行流程(sequential/hierarchical)
- 执行任务序列
- 聚合输出
- 执行 after_kickoff 回调
- 返回 CrewOutput
3) 流程选择层 (Process Layer)
职责:根据配置选择执行模式
Sequential Process(顺序流程):
- 任务按定义顺序依次执行
- Agent 直接执行分配的任务
- 适用场景:线性工作流、明确的任务依赖
Hierarchical Process(层级流程):
- 自动创建 Manager Agent
- Manager 负责任务委派和协调
- Worker Agent 接受指令并汇报
- 适用场景:复杂项目管理、动态任务分配
4) 任务执行层 (Task Execution)
职责:执行任务循环,协调 Agent 和 Task
关键函数:
_execute_tasks():核心任务执行循环_get_agent_to_use():确定执行 Agent(Sequential 用 task.agent,Hierarchical 用 manager)_prepare_tools():准备工具列表(合并 agent.tools + task.tools,添加缓存包装)_get_context():聚合上下文(从前序任务获取输出)_handle_conditional_task():处理条件任务(检查是否应该执行)
执行模式:
- 同步执行:
task.execute_sync(),阻塞等待结果 - 异步执行:
task.execute_async(),返回 Future,稍后获取结果
5) Agent 执行层
职责:执行具体的 LLM 调用和工具使用
调用链:
Agent.execute_task()
→ Agent.create_agent_executor() # 如果未初始化
→ AgentExecutor.invoke() # LangChain executor
→ LLM 调用(生成响应)
→ Tool 调用(如果需要)
→ 循环直到完成
→ 返回最终输出
6) 资源管理层 (Shared Resources)
职责:提供共享资源和基础设施
Memory 系统(四种类型):
- ShortTermMemory:当前会话的上下文(ChromaDB + RAG)
- LongTermMemory:跨会话的任务结果(SQLite)
- EntityMemory:实体跟踪(人物、地点、概念)
- ExternalMemory:外部记忆集成(Mem0)
Cache 系统:
- 缓存工具调用结果
- 避免重复的昂贵操作
- 基于工具名称和输入参数的哈希
Knowledge 系统:
- RAG 知识库
- 为 Agent 提供领域知识
- 支持多种知识源(文件、URL、字符串)
RPM Controller:
- 速率限制(每分钟请求数)
- 避免触发 API 限额
- 自动等待和重试
7) 事件与日志层 (Observability)
职责:提供可观测性和调试支持
事件系统:
CrewKickoffStartedEvent:Crew 启动CrewKickoffCompletedEvent:Crew 完成CrewKickoffFailedEvent:Crew 失败TaskStartedEvent:任务开始TaskCompletedEvent:任务完成AgentStepEvent:Agent 步骤
日志系统:
- 结构化日志
- 支持多种日志级别
- 可配置输出目标
执行记录存储:
- 持久化任务输出
- 支持回放(replay)功能
追踪系统:
- OpenTelemetry 集成
- 分布式追踪
- 性能分析
8) 输出处理层
职责:验证和格式化输出
Guardrails:
- 自定义验证规则
- 失败时自动重试
- 支持多轮验证
Output Format:
- Pydantic 模型:结构化输出
- JSON 字典:灵活输出
- Raw 字符串:原始输出
模块交互矩阵
| 调用方 → 被调方 | Crew | Task | Agent | Memory | Cache | Knowledge | Events | Storage |
|---|---|---|---|---|---|---|---|---|
| 用户代码 | kickoff() | - | - | - | - | - | - | - |
| Flow | kickoff() | - | - | - | - | - | - | - |
| Crew | - | execute_sync/async | - | 初始化 | 初始化 | 初始化 | emit() | update() |
| Task | - | - | execute_task() | - | - | - | emit() | - |
| Agent | - | - | - | search() | read/add() | query() | emit() | - |
| Callbacks | - | - | - | - | - | - | - | - |
交互模式说明
1) 同步调用(Synchronous)
Crew → Task.execute_sync():阻塞等待任务完成Task → Agent.execute_task():阻塞等待 Agent 输出Agent → LLM.invoke():阻塞等待 LLM 响应
2) 异步调用(Asynchronous)
Crew → Task.execute_async():返回 Future,不阻塞- 异步任务并行执行,同步任务等待所有前置异步任务完成
3) 事件发布(Event-Driven)
Crew → EventBus.emit(CrewKickoffStartedEvent):非阻塞Task → EventBus.emit(TaskStartedEvent):非阻塞Agent → EventBus.emit(AgentStepEvent):非阻塞
4) 资源查询(Query)
Agent → Memory.search(query):查询相关记忆Agent → Knowledge.query(query):查询知识库Tool → Cache.read(tool, input):查询缓存
5) 回调通知(Callback)
Crew → before_kickoff_callbacks:启动前通知Crew → after_kickoff_callbacks:完成后通知Task → task_callback:任务完成通知Agent → step_callback:步骤完成通知
架构要点说明
1) Sequential 与 Hierarchical 流程的区别
Sequential Process(顺序流程):
- 任务按定义顺序依次执行
- 每个任务使用前序任务的输出作为上下文
- Agent 直接执行分配给他们的任务
- 适用场景:线性工作流、明确的任务依赖链
Hierarchical Process(层级流程):
- 自动创建一个 Manager Agent 作为协调者
- Manager 负责任务分配、委派和监督
- Worker Agent 接受委派并汇报结果
- 适用场景:复杂的项目管理、需要动态任务分配
2) 内存系统的四种类型
- Short-term Memory:存储当前任务的上下文(ChromaDB + RAG)
- Long-term Memory:存储跨会话的任务结果(SQLite)
- Entity Memory:跟踪人物、地点、概念等实体(RAG)
- External Memory:集成 Mem0 的高级记忆功能
3) 任务上下文的三种来源
- NOT_SPECIFIED(默认):自动使用所有前序任务的输出
- 显式 context:任务明确指定
context=[task1, task2] - conversation_history:从输入参数注入会话历史
4) 异步任务的执行约束
- 多个异步任务可以并行执行
- 异步任务不能依赖其他异步任务的输出
- 必须有同步任务作为分隔点
- 同步任务会等待所有前置异步任务完成
5) Manager Agent 的自动生成
在 Hierarchical Process 中,Crew 会自动创建 Manager Agent:
- 角色:根据
manager_agent配置或默认角色 - 工具:包含任务委派工具(DelegateTaskTool)
- 职责:分析任务、分配给合适的 Agent、聚合结果
数据结构 UML 图
Crew 核心类图
classDiagram
class Crew {
+UUID id
+list[BaseAgent] agents
+list[Task] tasks
+Process process
+bool verbose
+bool memory
+ShortTermMemory short_term_memory
+LongTermMemory long_term_memory
+EntityMemory entity_memory
+ExternalMemory external_memory
+bool cache
+Cache cache_handler
+Knowledge knowledge
+int max_rpm
+RPMController rpm_controller
+str prompt_file
+I18N i18n
+BaseLLM manager_llm
+str manager_agent
+bool step_callback
+bool task_callback
+bool full_output
+bool share_crew
+dict inputs
+bool respect_context_window
+bool planning
+list[Task] planning_tasks
+EmbedderConfig embedder
+bool training
+dict training_data
+kickoff(inputs)
+kickoff_for_each(inputs)
+train(n_iterations, filename, inputs)
+replay(task_id, inputs)
+test(n_iterations, openai_model_name)
+copy()
+reset_memories(command_type)
-_execute_tasks(tasks, start_index)
-_run_sequential_process()
-_run_hierarchical_process()
-_get_context(task, task_outputs)
-_prepare_tools(agent, task, tools)
-_create_manager_agent()
-_handle_conditional_task()
}
class CrewOutput {
+str raw
+BaseModel pydantic
+dict json_dict
+list[TaskOutput] tasks_output
+UsageMetrics token_usage
+json()
+to_dict()
}
class Process {
<<enumeration>>
sequential
hierarchical
}
class Memory {
<<interface>>
+save(value, metadata)
+search(query, limit, score_threshold)
+set_crew(crew)
}
class ShortTermMemory {
+save(value, metadata)
+search(query, limit, score_threshold)
}
class LongTermMemory {
+save(value, metadata)
+search(query, limit, score_threshold)
}
class EntityMemory {
+save(value, metadata)
+search(query, limit, score_threshold)
}
class ExternalMemory {
+save(value, metadata)
+search(query, limit, score_threshold)
}
class Cache {
+read(tool, input)
+add(tool, input, output)
}
class RPMController {
+int max_rpm
+int _current_rpm
+check_or_wait()
+stop_rpm_counter()
}
class I18N {
+str prompt_file
+dict _prompts
+slice(slice)
+errors(error)
+tools(tool)
}
Crew --> CrewOutput : 生成
Crew --> Process : 使用
Crew --> Memory : 管理
Memory <|-- ShortTermMemory : 实现
Memory <|-- LongTermMemory : 实现
Memory <|-- EntityMemory : 实现
Memory <|-- ExternalMemory : 实现
Crew --> Cache : 使用
Crew --> RPMController : 使用
Crew --> I18N : 使用
关键字段说明
Crew 核心字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| id | UUID | frozen=True | uuid4() | Crew 唯一标识符 |
| agents | list[BaseAgent] | required | - | 参与的 Agent 列表 |
| tasks | list[Task] | required | - | 任务列表 |
| process | Process | - | sequential | 执行流程(sequential/hierarchical) |
| verbose | bool | int | str | - | 0 | 日志详细程度(0-2) |
| memory | bool | - | False | 是否启用内存系统 |
| short_term_memory | ShortTermMemory | None | - | None | 短期记忆实例 |
| long_term_memory | LongTermMemory | None | - | None | 长期记忆实例 |
| entity_memory | EntityMemory | None | - | None | 实体记忆实例 |
| external_memory | ExternalMemory | None | - | None | 外部记忆实例 |
| contextual_memory | bool | - | False | 是否启用上下文记忆 |
| cache | bool | - | True | 是否启用缓存 |
| cache_handler | Cache | None | - | None | 缓存处理器实例 |
| knowledge | Knowledge | None | - | None | 知识库实例 |
| knowledge_sources | list[BaseKnowledgeSource] | - | [] | 知识源列表 |
| max_rpm | int | None | - | None | 最大 RPM 限制 |
| rpm_controller | RPMController | None | - | None | RPM 控制器实例 |
| prompt_file | str | None | - | None | 自定义提示词文件路径 |
| i18n | I18N | - | I18N() | 国际化支持 |
| manager_llm | BaseLLM | None | - | None | Manager Agent 的 LLM |
| manager_agent | str | None | - | None | Manager Agent 配置 |
| step_callback | Callable | None | - | None | Agent 步骤回调 |
| task_callback | Callable | None | - | None | 任务完成回调 |
| full_output | bool | - | False | 是否返回完整输出 |
| share_crew | bool | - | False | 是否在 Agent 间共享 Crew 信息 |
| inputs | dict | - | {} | 输入参数字典 |
| respect_context_window | bool | - | True | 是否尊重 LLM 上下文窗口限制 |
| planning | bool | - | False | 是否启用自动规划 |
| planning_tasks | list[Task] | - | [] | 规划任务列表 |
| embedder | EmbedderConfig | None | - | None | 嵌入模型配置 |
| training | bool | - | False | 是否处于训练模式 |
| training_data | dict | - | {} | 训练数据 |
CrewOutput 字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| raw | str | - | "" | 最后一个任务的原始输出 |
| pydantic | BaseModel | None | - | None | 最后一个任务的 Pydantic 输出 |
| json_dict | dict[str, Any] | None | - | None | 最后一个任务的 JSON 输出 |
| tasks_output | list[TaskOutput] | - | [] | 所有任务的输出列表 |
| token_usage | UsageMetrics | - | UsageMetrics() | Token 使用统计 |
UsageMetrics 字段
class UsageMetrics(BaseModel):
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
successful_requests: int = 0
API 详细规格
API 1: kickoff
基本信息
- 方法签名:
Crew.kickoff(inputs: dict[str, Any] | None = None) -> CrewOutput - 调用方式:实例方法
- 幂等性:否(依赖 LLM 生成和当前状态)
功能说明
启动 Crew 执行,按照配置的流程执行所有任务并返回聚合结果。
请求结构体
class KickoffParams:
inputs: Optional[dict[str, Any]] = None # 输入参数字典
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| inputs | dict[str, Any] | None | 否 | None | - | 模板变量和上下文参数 |
输入参数示例:
inputs = {
"topic": "AI Safety", # 模板变量
"count": 5, # 模板变量
"crew_chat_messages": [ # 会话历史
{"role": "user", "content": "Find latest AI news"},
{"role": "assistant", "content": "Here are the findings..."}
],
"crewai_trigger_payload": {...} # Trigger payload(特殊用途)
}
响应结构体
class CrewOutput(BaseModel):
raw: str # 最后一个任务的原始输出
pydantic: Optional[BaseModel] # 最后一个任务的 Pydantic 输出
json_dict: Optional[dict] # 最后一个任务的 JSON 输出
tasks_output: list[TaskOutput] # 所有任务的输出列表
token_usage: UsageMetrics # Token 使用统计
核心代码
def kickoff(self, inputs: dict[str, Any] | None = None) -> CrewOutput:
"""启动 Crew 执行"""
self.inputs = inputs or {}
try:
# 1. 初始化(首次执行)
if not self._execution_started:
self._execution_started = True
self._execution_logs = []
self._task_outputs_replayed_from_memory = []
# 2. 重置内部状态
self._finished_tasks = set()
self._tasks_index_map = {str(task.id): i for i, task in enumerate(self.tasks)}
self._copy_agents()
# 3. 执行 @before_kickoff 回调
if hasattr(self.__class__, "before_kickoff"):
self.__class__.before_kickoff(self)
# 4. 发送 CrewStartedEvent
crewai_event_bus.emit(
self, CrewStartedEvent(inputs=self.inputs, crew=self)
)
# 5. 插值任务
self._interpolate_tasks(self.inputs)
# 6. 设置 token 处理
self._set_tokens_process()
# 7. 初始化内存和知识库
self._initialize_memory()
self._initialize_crew_knowledge()
# 8. 注入关系(crew → agents → tasks)
self._inject_relationships()
# 9. 生成规划任务(如果启用)
if self.planning:
self.planning_tasks = self._create_planning_tasks()
# 10. 选择执行流程
if self.process == Process.sequential:
result = self._run_sequential_process()
else:
result = self._run_hierarchical_process()
# 11. 聚合 token 使用
metrics = self._finish_execution(result.tasks_output)
result.token_usage = metrics
# 12. 执行 @after_kickoff 回调
if hasattr(self.__class__, "after_kickoff"):
self.__class__.after_kickoff(self, result)
# 13. 发送 CrewCompletedEvent
crewai_event_bus.emit(
self, CrewCompletedEvent(output=result, crew=self)
)
return result
except Exception as e:
# 发送 CrewFailedEvent
crewai_event_bus.emit(
self, CrewFailedEvent(error=str(e), crew=self)
)
raise e
调用链与上游函数
用户代码调用:
from crewai import Crew, Agent, Task
# 定义 Crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
memory=True,
)
# 启动执行
result = crew.kickoff(inputs={"topic": "AI Safety"})
# 访问结果
print(result.raw)
print(result.token_usage.total_tokens)
for task_output in result.tasks_output:
print(f"{task_output.description}: {task_output.raw}")
Flow 中调用:
class MyFlow(Flow):
@start()
def run_crew_step(self):
crew = MyCrew()
result = crew.crew().kickoff(inputs={"query": self.state.query})
self.state.crew_result = result.raw
return result
完整执行时序图(从上游接口到输出)
sequenceDiagram
autonumber
participant User as 用户代码
participant Crew as Crew 编排器
participant CB as 回调系统
participant Memory as Memory 系统
participant Cache as Cache 系统
participant Knowledge as Knowledge 系统
participant Process as Process 层
participant TaskExec as 任务执行层
participant Task as Task
participant Agent as Agent
participant AgentExec as AgentExecutor
participant LLM as LLM
participant Tool as Tool
participant Event as Event Bus
participant Storage as Storage
participant Output as Output 层
%% 启动阶段
User->>Crew: kickoff(inputs)
activate Crew
Crew->>Event: emit(CrewKickoffStartedEvent)
Crew->>CB: 执行 before_kickoff_callbacks
CB-->>Crew: 修改后的 inputs
%% 初始化阶段
Crew->>Crew: _interpolate_inputs(inputs)
Note over Crew: 插值任务和 Agent 的模板变量
Crew->>Memory: _initialize_memory()
activate Memory
Memory->>Memory: 创建 ShortTermMemory
Memory->>Memory: 创建 LongTermMemory
Memory->>Memory: 创建 EntityMemory
Memory->>Memory: 创建 ExternalMemory (如果配置)
Memory-->>Crew: Memory 系统就绪
deactivate Memory
Crew->>Cache: 初始化 CacheHandler
Cache-->>Crew: Cache 就绪
Crew->>Knowledge: _initialize_crew_knowledge()
Knowledge-->>Crew: Knowledge 就绪
Crew->>Crew: 为 Agent 注入资源
Note over Crew: agent.crew = self<br/>agent.set_knowledge()<br/>agent.create_agent_executor()
%% 流程选择
alt process == sequential
Crew->>Process: _run_sequential_process()
else process == hierarchical
Crew->>Process: _run_hierarchical_process()
Process->>Process: _create_manager_agent()
Note over Process: 创建 Manager Agent<br/>添加 DelegateTaskTool
end
%% 任务执行循环
Process->>TaskExec: _execute_tasks(tasks)
activate TaskExec
loop 遍历每个任务
TaskExec->>TaskExec: _get_agent_to_use(task)
Note over TaskExec: Sequential: task.agent<br/>Hierarchical: manager_agent
TaskExec->>TaskExec: _prepare_tools(agent, task)
Note over TaskExec: 合并 agent.tools + task.tools<br/>添加缓存包装<br/>添加委派工具(如果允许)
TaskExec->>TaskExec: _get_context(task, task_outputs)
Note over TaskExec: 聚合前序任务的输出
alt ConditionalTask
TaskExec->>Task: should_execute(previous_output)
alt 条件不满足
Task-->>TaskExec: get_skipped_task_output()
TaskExec->>Storage: _store_execution_log()
Note over TaskExec: 跳过任务,继续下一个
end
end
alt async_execution == True
TaskExec->>Task: execute_async(agent, context, tools)
activate Task
Task->>Task: 创建 Future
Task->>Agent: 后台线程执行
Task-->>TaskExec: 返回 Future
deactivate Task
Note over TaskExec: 不等待,继续下一个任务
else 同步任务
alt 存在未完成的异步任务
TaskExec->>TaskExec: _process_async_tasks(futures)
Note over TaskExec: 等待所有异步任务完成<br/>收集结果
end
TaskExec->>Task: execute_sync(agent, context, tools)
activate Task
Task->>Event: emit(TaskStartedEvent)
Task->>Agent: execute_task(task, context, tools)
activate Agent
%% Agent 执行
Agent->>Agent: 构建 task_prompt
Note over Agent: 包含:role, goal, backstory,<br/>task description, context
alt 有 memory 系统
Agent->>Memory: search(task.description)
Memory-->>Agent: 相关记忆列表
Note over Agent: 将记忆添加到 context
end
alt 有 knowledge 系统
Agent->>Knowledge: query(task.description)
Knowledge-->>Agent: 相关知识片段
Note over Agent: 将知识添加到 context
end
Agent->>AgentExec: invoke(task_prompt)
activate AgentExec
loop LLM 循环(直到完成)
AgentExec->>LLM: generate(prompt, tools)
activate LLM
LLM-->>AgentExec: LLM Response (text or tool call)
deactivate LLM
alt 需要调用工具
AgentExec->>Cache: read(tool, input)
alt 缓存命中
Cache-->>AgentExec: 缓存结果
else 缓存未命中
AgentExec->>Tool: invoke(input)
activate Tool
Tool-->>AgentExec: Tool Output
deactivate Tool
AgentExec->>Cache: add(tool, input, output)
end
AgentExec->>Memory: save(tool_output)
Note over AgentExec: 继续 LLM 循环
else 任务完成
Note over AgentExec: 退出循环
end
end
AgentExec-->>Agent: 最终输出
deactivate AgentExec
Agent->>Memory: save(final_output)
Agent-->>Task: 最终输出字符串
deactivate Agent
%% 输出处理
Task->>Output: _export_output(result)
activate Output
Output->>Output: 提取 pydantic_output
Output->>Output: 提取 json_output
Output-->>Task: (pydantic, json)
deactivate Output
Task->>Output: 创建 TaskOutput
Output->>Output: TaskOutput(raw, pydantic, json_dict)
alt 配置了 Guardrails
loop 遍历每个 guardrail
Output->>Output: _invoke_guardrail_function()
alt 验证失败
Output->>Agent: 重新执行任务
Note over Output: 使用 guardrail 反馈<br/>重试直到成功或达到 max_retries
end
end
end
Task->>Event: emit(TaskCompletedEvent)
Task-->>TaskExec: TaskOutput
deactivate Task
TaskExec->>TaskExec: _process_task_result(task, output)
TaskExec->>Storage: _store_execution_log(task, output)
Note over Storage: 持久化任务输出<br/>支持回放功能
end
end
alt 仍有未完成的异步任务
TaskExec->>TaskExec: _process_async_tasks(futures)
end
TaskExec->>TaskExec: _create_crew_output(task_outputs)
Note over TaskExec: 聚合所有任务输出<br/>选择最后一个有效输出作为主输出
TaskExec-->>Crew: CrewOutput
deactivate TaskExec
%% 完成阶段
Crew->>Crew: calculate_usage_metrics()
Note over Crew: 统计所有 Agent 的 token 使用
Crew->>CB: 执行 after_kickoff_callbacks
CB-->>Crew: 修改后的 CrewOutput
Crew->>Event: emit(CrewKickoffCompletedEvent)
Crew-->>User: CrewOutput
deactivate Crew
时序图功能详解
阶段 1:启动和初始化(步骤 1-15)
1-3. 入口和事件发布:
- 用户调用
crew.kickoff(inputs) - 发送
CrewKickoffStartedEvent到事件总线 - 执行
before_kickoff_callbacks,允许用户修改 inputs
4-5. 输入插值:
_interpolate_inputs():将 inputs 中的值插入到任务和 Agent 的模板变量中- 例如:
"{topic}"被替换为inputs["topic"]的值
6-10. 内存系统初始化:
- 创建四种类型的记忆:
ShortTermMemory:基于 ChromaDB 的向量存储LongTermMemory:基于 SQLite 的持久化存储EntityMemory:实体跟踪(人物、地点、概念)ExternalMemory:可选,集成 Mem0
11-13. 缓存和知识库初始化:
CacheHandler:初始化工具结果缓存Knowledge:初始化 RAG 知识库
14-15. Agent 资源注入:
- 为每个 Agent 设置
crew、knowledge、memory等 - 调用
agent.create_agent_executor()创建执行器
阶段 2:流程选择(步骤 16-20)
Sequential Process:
- 直接调用
_execute_tasks() - Agent 按任务分配直接执行
Hierarchical Process:
- 先创建 Manager Agent
- Manager Agent 自动获得
DelegateTaskTool和AskQuestionTool - Manager 负责任务委派和协调
阶段 3:任务执行循环(步骤 21-85)
21-26. 任务准备:
_get_agent_to_use():确定执行 Agent- Sequential:使用
task.agent - Hierarchical:使用
manager_agent
- Sequential:使用
_prepare_tools():合并工具agent.tools+task.tools- 添加缓存包装(
CachedTool) - 添加委派工具(如果
allow_delegation=True)
_get_context():聚合上下文- 如果
task.context is NOT_SPECIFIED:使用所有前序任务输出 - 如果
task.context是列表:仅使用指定任务的输出
- 如果
27-32. 条件任务处理:
- 对于
ConditionalTask,调用should_execute(previous_output) - 如果条件不满足,跳过任务,记录日志,继续下一个
33-38. 异步任务执行:
- 创建
Future对象 - 在后台线程中执行
Agent.execute_task() - 不阻塞,继续下一个任务
- 异步任务不能依赖其他异步任务的输出
39-84. 同步任务执行(核心流程):
39-42. 等待异步任务:
- 如果存在未完成的异步任务,先等待所有异步任务完成
- 调用
future.result()阻塞获取结果
43-46. 任务开始:
- 发送
TaskStartedEvent - 调用
Agent.execute_task(task, context, tools)
47-49. 构建提示词:
- 合并
role、goal、backstory、task.description、context - 构成完整的
task_prompt
50-55. 记忆查询(如果启用):
- 调用
Memory.search(task.description) - 检索相关的历史记忆
- 将记忆添加到上下文中
56-60. 知识库查询(如果启用):
- 调用
Knowledge.query(task.description) - 检索相关的知识片段
- 将知识添加到上下文中
61-78. Agent 执行循环(LLM 反馈循环):
- 62-64. LLM 调用:调用
LLM.generate()生成响应 - 65-76. 工具调用(如果需要):
- 检查缓存
Cache.read(tool, input) - 缓存命中:直接使用缓存结果
- 缓存未命中:调用
Tool.invoke(input),结果写入缓存 - 工具输出保存到 Memory
- 继续 LLM 循环(将工具输出作为新的输入)
- 检查缓存
- 77-78. 任务完成:LLM 决定任务完成,退出循环
79-81. 记忆保存:
- 将最终输出保存到 Memory
- 返回最终输出字符串给 Task
82-88. 输出处理:
_export_output():提取 Pydantic 和 JSON 输出- 创建
TaskOutput对象(包含raw、pydantic、json_dict)
89-94. Guardrails 验证(如果配置):
- 遍历每个 guardrail 函数
- 如果验证失败,重新执行任务
- 使用 guardrail 反馈改进输出
- 重试直到成功或达到
max_retries
95-99. 任务完成:
- 发送
TaskCompletedEvent - 返回
TaskOutput给任务执行层 - 调用
_process_task_result()处理结果 - 调用
_store_execution_log()持久化输出(支持回放)
阶段 4:输出聚合(步骤 86-90)
86-88. 处理剩余异步任务:
- 如果仍有未完成的异步任务,等待所有完成
- 收集所有异步任务的输出
89-90. 创建 CrewOutput:
- 聚合所有任务输出
- 选择最后一个有效输出作为主输出(
CrewOutput.raw) - 所有任务输出保存在
CrewOutput.tasks_output列表中
阶段 5:完成和清理(步骤 91-96)
91-92. 统计 Token 使用:
- 遍历所有 Agent,收集 LLM token 使用统计
- 聚合到
CrewOutput.token_usage
93-94. 执行完成回调:
- 调用
after_kickoff_callbacks - 允许用户修改最终输出
95-96. 发送完成事件:
- 发送
CrewKickoffCompletedEvent到事件总线 - 返回
CrewOutput给用户
边界与异常
异常情况处理:
-
空 Agent 或 Task 列表:
- 异常类型:
ValueError - 触发条件:
agents或tasks为空 - 处理方式:抛出异常,提示用户配置
- 回退策略:无
- 异常类型:
-
任务执行失败:
- 异常类型:Task 执行中抛出的异常
- 触发条件:Agent 执行失败、Guardrail 验证失败等
- 处理方式:发送 CrewFailedEvent,抛出异常
- 回退策略:用户可在 try-catch 中处理
-
Manager Agent 创建失败(Hierarchical):
- 异常类型:配置错误
- 触发条件:
manager_agent配置无效 - 处理方式:使用默认 Manager 配置
- 回退策略:自动生成默认 Manager
-
内存初始化失败:
- 异常类型:存储不可用
- 触发条件:
CREWAI_STORAGE_DIR无写入权限 - 处理方式:警告日志,继续执行(不使用内存)
- 回退策略:禁用内存功能
-
RPM 限制超时:
- 异常类型:不抛出异常,阻塞等待
- 触发条件:达到
max_rpm限制 - 处理方式:自动等待 60 秒后重试
- 回退策略:无,强制等待
API 2: kickoff_for_each
基本信息
- 方法签名:
Crew.kickoff_for_each(inputs: list[dict[str, Any]]) -> list[CrewOutput] - 调用方式:实例方法
- 幂等性:否
功能说明
为多组输入参数批量执行 Crew,返回每次执行的结果列表。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| inputs | list[dict[str, Any]] | 是 | - | - | 多组输入参数列表 |
核心代码
def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
"""批量执行 Crew"""
results: list[CrewOutput] = []
for input_data in inputs:
# 重置执行状态
self._execution_started = False
# 执行单次 kickoff
result = self.kickoff(inputs=input_data)
results.append(result)
# 清理内存(可选)
if self.memory:
self.reset_memories("long_term")
return results
使用示例
# 定义多组输入
inputs_list = [
{"topic": "AI Safety", "count": 5},
{"topic": "Climate Change", "count": 10},
{"topic": "Healthcare", "count": 7},
]
# 批量执行
results = crew.kickoff_for_each(inputs=inputs_list)
# 处理结果
for i, result in enumerate(results):
print(f"Result {i+1}:")
print(result.raw)
print(f"Tokens used: {result.token_usage.total_tokens}\n")
API 3: train
基本信息
- 方法签名:
Crew.train(n_iterations: int, filename: str, inputs: dict[str, Any] | None = None) -> None - 调用方式:实例方法
- 幂等性:否
功能说明
训练 Crew,通过多次迭代收集人工反馈并优化 Agent 行为。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| n_iterations | int | 是 | - | >0 | 训练迭代次数 |
| filename | str | 是 | - | - | 保存训练数据的文件名 |
| inputs | dict[str, Any] | None | 否 | None | - | 输入参数 |
核心代码
def train(
self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None
) -> None:
"""训练 Crew"""
# 1. 启用训练模式
self.training = True
# 2. 禁用内存(训练时不需要)
original_memory_config = self.memory
self.memory = False
# 3. 初始化训练数据
training_data_file = f"{filename}.pkl"
if os.path.exists(training_data_file):
with open(training_data_file, "rb") as f:
self.training_data = pickle.load(f)
else:
self.training_data = {}
# 4. 执行多轮训练
for iteration in range(n_iterations):
print(f"\n=== Training Iteration {iteration + 1}/{n_iterations} ===\n")
# 执行 Crew
result = self.kickoff(inputs=inputs)
# 收集人工反馈
print("\nCrew execution completed. Please provide feedback:")
feedback = input("Was the output satisfactory? (yes/no): ")
if feedback.lower() == "yes":
# 保存成功的执行记录
for task in self.tasks:
if task.output:
self.training_data[str(task.id)] = {
"description": task.description,
"expected_output": task.expected_output,
"actual_output": task.output.raw,
"feedback": "positive",
}
else:
# 收集改进建议
improvement = input("What should be improved? ")
for task in self.tasks:
if task.output:
self.training_data[str(task.id)] = {
"description": task.description,
"expected_output": task.expected_output,
"actual_output": task.output.raw,
"feedback": "negative",
"improvement": improvement,
}
# 保存训练数据
with open(training_data_file, "wb") as f:
pickle.dump(self.training_data, f)
# 5. 恢复配置
self.training = False
self.memory = original_memory_config
print(f"\nTraining completed. Data saved to {training_data_file}")
使用示例
# 训练 Crew
crew.train(
n_iterations=5,
filename="research_crew_training",
inputs={"topic": "AI Trends"}
)
# 训练后,Crew 会利用训练数据优化 Agent 行为
# 训练数据保存在 research_crew_training.pkl
API 4: replay
基本信息
- 方法签名:
Crew.replay(task_id: str, inputs: dict[str, Any] | None = None) -> CrewOutput - 调用方式:实例方法
- 幂等性:否
功能说明
从指定任务开始重新执行 Crew,使用之前存储的执行记录作为前序任务的输出。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| task_id | str | 是 | - | - | 要重新执行的任务 ID |
| inputs | dict[str, Any] | None | 否 | None | - | 输入参数 |
核心代码
def replay(self, task_id: str, inputs: dict[str, Any] | None = None) -> CrewOutput:
"""从指定任务回放执行"""
# 1. 查找任务索引
if task_id not in self._tasks_index_map:
raise ValueError(f"Task with id {task_id} not found")
start_index = self._tasks_index_map[task_id]
# 2. 加载历史执行记录
self._load_execution_logs()
# 3. 恢复前序任务的输出
task_outputs_to_replay: list[TaskOutput] = []
for i in range(start_index):
task = self.tasks[i]
# 从存储中读取任务输出
saved_output = self._read_task_output_from_log(str(task.id))
if saved_output:
task_outputs_to_replay.append(saved_output)
self._task_outputs_replayed_from_memory = task_outputs_to_replay
# 4. 从指定任务开始执行
if self.process == Process.sequential:
return self._run_sequential_process_from_index(start_index)
else:
# Hierarchical 不支持 replay
raise ValueError("Replay is not supported for hierarchical process")
使用示例
# 首次执行
result = crew.kickoff(inputs={"topic": "AI Safety"})
# 获取某个任务的 ID
research_task_id = str(crew.tasks[0].id)
# 从该任务重新执行(例如修改了 Agent 配置后)
replay_result = crew.replay(
task_id=research_task_id,
inputs={"topic": "AI Safety"}
)
API 5: test
基本信息
- 方法签名:
Crew.test(n_iterations: int, openai_model_name: str, inputs: dict[str, Any] | None = None) -> None - 调用方式:实例方法
- 幂等性:否
功能说明
测试 Crew,评估不同配置下的性能。
核心代码
def test(
self,
n_iterations: int,
openai_model_name: str,
inputs: dict[str, Any] | None = None,
) -> None:
"""测试 Crew 性能"""
results = []
for i in range(n_iterations):
print(f"\n=== Test Iteration {i + 1}/{n_iterations} ===\n")
# 执行 Crew
start_time = time.time()
result = self.kickoff(inputs=inputs)
execution_time = time.time() - start_time
# 收集指标
metrics = {
"iteration": i + 1,
"execution_time": execution_time,
"total_tokens": result.token_usage.total_tokens,
"successful_requests": result.token_usage.successful_requests,
"output_length": len(result.raw),
}
results.append(metrics)
print(f"Execution Time: {execution_time:.2f}s")
print(f"Total Tokens: {metrics['total_tokens']}")
# 计算统计数据
avg_time = sum(r["execution_time"] for r in results) / len(results)
avg_tokens = sum(r["total_tokens"] for r in results) / len(results)
print(f"\n=== Test Summary ===")
print(f"Average Execution Time: {avg_time:.2f}s")
print(f"Average Total Tokens: {avg_tokens:.0f}")
调用链路深度分析
路径 1:用户代码 → kickoff() → Sequential Process 完整链路
1.1 入口层调用链
# 用户代码
result = crew.kickoff(inputs={"topic": "AI Safety"})
# ↓ 进入 Crew.kickoff()
def kickoff(self, inputs: dict[str, Any] | None = None) -> CrewOutput:
# 1. 设置 OpenTelemetry baggage context
ctx = baggage.set_baggage("crew_context", CrewContext(id=str(self.id), key=self.key))
token = attach(ctx)
try:
# 2. 执行 before_kickoff 回调
for before_callback in self.before_kickoff_callbacks:
inputs = before_callback(inputs)
# 3. 发送 CrewKickoffStartedEvent
crewai_event_bus.emit(self, CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs))
# 4. 重置任务输出处理器
self._task_output_handler.reset()
# 5. 插值输入参数
if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
# 6. 设置任务回调
self._set_tasks_callbacks()
# 7. 初始化国际化
i18n = I18N(prompt_file=self.prompt_file)
# 8. 为每个 Agent 注入资源
for agent in self.agents:
agent.i18n = i18n
agent.crew = self
agent.set_knowledge(crew_embedder=self.embedder)
agent.function_calling_llm = self.function_calling_llm
agent.step_callback = self.step_callback
agent.create_agent_executor()
# 9. 处理规划(如果启用)
if self.planning:
self._handle_crew_planning()
# 10. 选择执行流程
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result = self._run_hierarchical_process()
# 11. 执行 after_kickoff 回调
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
# 12. 计算 token 使用
self.usage_metrics = self.calculate_usage_metrics()
return result
except Exception as e:
crewai_event_bus.emit(self, CrewKickoffFailedEvent(error=str(e), crew_name=self.name))
raise
finally:
detach(token)
1.2 插值层调用链
# Crew._interpolate_inputs()
def _interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""将 inputs 插入到任务和 Agent 的模板变量中"""
# 插值所有任务
for task in self.tasks:
task.interpolate_inputs_and_add_conversation_history(inputs)
# ↓ 进入 Task.interpolate_inputs_and_add_conversation_history()
# 替换 task.description 中的 {placeholders}
# 替换 task.expected_output 中的 {placeholders}
# 处理会话历史(crew_chat_messages)
# 插值所有 Agent
for agent in self.agents:
agent.interpolate_inputs(inputs)
# ↓ 进入 Agent.interpolate_inputs()
# 替换 agent.role 中的 {placeholders}
# 替换 agent.goal 中的 {placeholders}
# 替换 agent.backstory 中的 {placeholders}
1.3 Sequential Process 执行链
# Crew._run_sequential_process()
def _run_sequential_process(self) -> CrewOutput:
"""顺序执行流程"""
return self._execute_tasks(self.tasks)
# ↓ 进入核心任务执行循环
# Crew._execute_tasks()
def _execute_tasks(
self,
tasks: list[Task],
start_index: int | None = 0,
was_replayed: bool = False,
) -> CrewOutput:
"""任务执行循环"""
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
for task_index, task in enumerate(tasks):
# 跳过已执行的任务(回放场景)
if start_index is not None and task_index < start_index:
continue
# 1. 获取执行 Agent
agent_to_use = self._get_agent_to_use(task)
# ↓ Sequential: 返回 task.agent
# ↓ Hierarchical: 返回 self.manager_agent
# 2. 准备工具
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
# ↓ 进入 _prepare_tools()
# 3. 获取上下文
context = self._get_context(task, task_outputs)
# ↓ 进入 _get_context()
# 4. 处理条件任务
if isinstance(task, ConditionalTask):
skipped_output = self._handle_conditional_task(...)
if skipped_output:
task_outputs.append(skipped_output)
continue
# 5. 执行任务
if task.async_execution:
# 异步执行
future = task.execute_async(agent=agent_to_use, context=context, tools=tools_for_task)
futures.append((task, future, task_index))
else:
# 同步执行
if futures:
# 先完成所有异步任务
task_outputs = self._process_async_tasks(futures)
futures.clear()
# 执行当前同步任务
task_output = task.execute_sync(agent=agent_to_use, context=context, tools=tools_for_task)
# ↓ 进入 Task.execute_sync()
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
# 6. 等待剩余异步任务
if futures:
task_outputs = self._process_async_tasks(futures)
# 7. 创建 CrewOutput
return self._create_crew_output(task_outputs)
1.4 工具准备链
# Crew._prepare_tools()
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
"""准备工具列表"""
# 1. 添加委派工具(如果允许)
if hasattr(agent, "allow_delegation") and getattr(agent, "allow_delegation", False):
if self.process == Process.hierarchical:
tools = self._update_manager_tools(task, tools)
# ↓ 进入 _update_manager_tools()
# 添加 DelegateTaskTool
elif agent:
tools = self._add_delegation_tools(task, tools)
# ↓ 进入 _add_delegation_tools()
# agent.get_delegation_tools(agents)
# 2. 添加代码执行工具
if hasattr(agent, "allow_code_execution") and getattr(agent, "allow_code_execution", False):
tools = self._add_code_execution_tools(agent, tools)
# ↓ agent.get_code_execution_tools()
# 3. 添加多模态工具
if hasattr(agent, "multimodal") and getattr(agent, "multimodal", False):
tools = self._add_multimodal_tools(agent, tools)
# ↓ agent.get_multimodal_tools()
# 4. 添加平台工具
if hasattr(agent, "apps") and getattr(agent, "apps", None):
tools = self._add_platform_tools(task, tools)
# ↓ agent.get_platform_tools(apps)
# 5. 添加 MCP 工具
if hasattr(agent, "mcps") and getattr(agent, "mcps", None):
tools = self._add_mcp_tools(task, tools)
# ↓ agent.get_mcp_tools(mcps)
return tools
1.5 上下文聚合链
# Crew._get_context()
def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
"""获取任务上下文"""
if not task.context:
return ""
if task.context is NOT_SPECIFIED:
# 默认:聚合所有前序任务的输出
return aggregate_raw_outputs_from_task_outputs(task_outputs)
# ↓ 进入 aggregate_raw_outputs_from_task_outputs()
# return "\n".join([output.raw for output in task_outputs])
else:
# 显式指定:仅聚合特定任务的输出
return aggregate_raw_outputs_from_tasks(task.context)
# ↓ 进入 aggregate_raw_outputs_from_tasks()
# 查找对应的任务输出
# return "\n".join([task.output.raw for task in context_tasks if task.output])
1.6 Task 执行链
# Task.execute_sync()
def execute_sync(
self,
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[Any] | None = None,
) -> TaskOutput:
"""同步执行任务"""
return self._execute_core(agent, context, tools)
# ↓ 进入 _execute_core()
# Task._execute_core()
def _execute_core(
self,
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
) -> TaskOutput:
"""任务执行核心逻辑"""
agent = agent or self.agent
self.agent = agent
# 1. 记录开始时间
self.start_time = datetime.datetime.now()
# 2. 设置上下文
self.prompt_context = context
tools = tools or self.tools or []
# 3. 记录处理 Agent
self.processed_by_agents.add(agent.role)
# 4. 发送 TaskStartedEvent
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
# 5. 执行 Agent 任务
result = agent.execute_task(task=self, context=context, tools=tools)
# ↓ 进入 Agent.execute_task()
# 6. 导出输出
pydantic_output, json_output = self._export_output(result)
# ↓ 进入 _export_output()
# 解析 JSON 或 Pydantic 模型
# 7. 创建 TaskOutput
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
# 8. 执行 Guardrails 验证
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
task_output = self._invoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=guardrail,
guardrail_index=idx,
)
# ↓ 如果验证失败,重新执行任务
# 9. 调用回调
if self.callback:
self.callback(task_output)
# 10. 发送 TaskCompletedEvent
crewai_event_bus.emit(self, TaskCompletedEvent(task_output=task_output, task=self))
return task_output
1.7 Agent 执行链
# Agent.execute_task()
def execute_task(
self,
task: Task,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Any:
"""执行任务"""
# 1. 设置任务引用
self.task = task
# 2. 构建任务提示词
task_prompt = task.prompt()
# ↓ 进入 Task.prompt()
# 合并 task.description, expected_output, context
# 3. 添加工具描述
if tools:
self.tools_description = self._render_text_description_and_args(tools)
# 4. 查询记忆系统(如果启用)
if self._is_any_available_memory():
memory_context = self._build_memory_context(task.description)
# ↓ 进入 _build_memory_context()
# 查询 short_term_memory
# 查询 long_term_memory
# 查询 entity_memory
task_prompt = f"{memory_context}\n\n{task_prompt}"
# 5. 查询知识库(如果启用)
if self.knowledge:
knowledge_context = self.knowledge.query([task.description])
# ↓ 进入 Knowledge.query()
# 执行 RAG 检索
task_prompt = f"{knowledge_context}\n\n{task_prompt}"
# 6. 执行任务(带超时)
if task.max_execution_time:
result = self._execute_with_timeout(task_prompt, task, task.max_execution_time)
# ↓ 进入 _execute_with_timeout()
else:
result = self._execute_without_timeout(task_prompt, task)
# ↓ 进入 _execute_without_timeout()
# 7. 保存到长期记忆
if self.crew and self.crew._long_term_memory:
self.crew._long_term_memory.save(result, metadata={"agent": self.role, "task": task.description})
return result
# Agent._execute_without_timeout()
def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any:
"""执行任务(无超时)"""
if not self.agent_executor:
raise RuntimeError("Agent executor is not initialized.")
# 调用 AgentExecutor
return self.agent_executor.invoke({
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
})["output"]
# ↓ 进入 AgentExecutor.invoke()
# 这是 LangChain 的 AgentExecutor
# 会循环调用 LLM 和 Tool,直到任务完成
1.8 AgentExecutor 循环链(LangChain)
# AgentExecutor.invoke() (LangChain 内部)
def invoke(self, inputs: dict) -> dict:
"""AgentExecutor 执行循环"""
# 1. 初始化
iterations = 0
intermediate_steps = []
while iterations < self.max_iterations:
# 2. 调用 LLM
output = self.llm.predict(
input=inputs["input"],
intermediate_steps=intermediate_steps,
...
)
# ↓ 进入 LLM.predict()
# 调用 OpenAI/Anthropic/etc API
# 返回文本或工具调用
# 3. 解析 LLM 输出
action = self.output_parser.parse(output)
if isinstance(action, AgentFinish):
# 任务完成
return {"output": action.return_values["output"]}
# 4. 执行工具
tool = self.tools_by_name[action.tool]
# 5. 检查缓存(CrewAI 特性)
if hasattr(tool, "cache_function"):
cached_result = tool.cache_function(action.tool_input)
if cached_result:
observation = cached_result
else:
observation = tool.run(action.tool_input)
tool.cache_function.cache_result(action.tool_input, observation)
else:
observation = tool.run(action.tool_input)
# ↓ 进入 Tool.run()
# 执行工具逻辑
# 6. 保存中间步骤
intermediate_steps.append((action, observation))
# 7. 保存到短期记忆(CrewAI 特性)
if self.agent.crew and self.agent.crew._short_term_memory:
self.agent.crew._short_term_memory.save(
observation,
metadata={"tool": action.tool, "agent": self.agent.role}
)
iterations += 1
raise ValueError("Agent exceeded max iterations")
1.9 输出聚合链
# Crew._create_crew_output()
def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
"""创建 Crew 输出"""
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
# 1. 过滤空输出
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
# 2. 获取最后一个有效输出
final_task_output = valid_outputs[-1]
# 3. 完成执行
self._finish_execution(final_task_output.raw)
# ↓ 停止 RPM controller
# 4. 计算 token 使用
self.token_usage = self.calculate_usage_metrics()
# ↓ 进入 calculate_usage_metrics()
# 遍历所有 Agent
# 收集 LLM token 使用统计
# 5. 发送完成事件
crewai_event_bus.emit(
self,
CrewKickoffCompletedEvent(
crew_name=self.name,
output=final_task_output,
total_tokens=self.token_usage.total_tokens,
),
)
# 6. 创建 CrewOutput
return CrewOutput(
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=task_outputs,
token_usage=self.token_usage,
)
路径 2:用户代码 → kickoff() → Hierarchical Process 完整链路
2.1 Hierarchical Process 执行链
# Crew._run_hierarchical_process()
def _run_hierarchical_process(self) -> CrewOutput:
"""层级执行流程"""
# 1. 创建 Manager Agent
self._create_manager_agent()
# ↓ 进入 _create_manager_agent()
# 2. 执行任务(Manager 会委派给 Worker Agents)
return self._execute_tasks(self.tasks)
# Crew._create_manager_agent()
def _create_manager_agent(self):
"""创建 Manager Agent"""
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
# 使用用户提供的 Manager Agent
self.manager_agent.allow_delegation = True
manager = self.manager_agent
# 验证 Manager 不应有工具
if manager.tools is not None and len(manager.tools) > 0:
raise Exception("Manager agent should not have tools")
else:
# 自动创建 Manager Agent
self.manager_llm = create_llm(self.manager_llm)
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
# ↓ AgentTools 提供委派工具
# - delegate_work: 委派任务给 Worker Agent
# - ask_question: 向 Worker Agent 提问
allow_delegation=True,
llm=self.manager_llm,
verbose=self.verbose,
)
self.manager_agent = manager
manager.crew = self
2.2 Manager Agent 执行任务的流程
在 Hierarchical Process 中,_execute_tasks() 的调用链与 Sequential 类似,但关键区别在于:
-
Agent 选择:
# Crew._get_agent_to_use() def _get_agent_to_use(self, task: Task) -> BaseAgent | None: if self.process == Process.hierarchical: return self.manager_agent # 总是返回 Manager return task.agent # Sequential 返回 task.agent -
Manager Agent 的工具: Manager Agent 拥有特殊的委派工具:
# AgentTools.tools() 返回的委派工具 [ DelegateTaskTool(agents=self.agents), AskQuestionTool(agents=self.agents), ] -
Manager 执行任务时的流程:
Manager Agent 收到任务 ↓ Manager 分析任务,决定委派策略 ↓ Manager 调用 delegate_work 工具 ↓ DelegateTaskTool 选择合适的 Worker Agent ↓ Worker Agent 执行子任务 ↓ Worker Agent 返回结果给 Manager ↓ Manager 聚合结果,决定下一步 ↓ Manager 继续委派或完成任务 -
DelegateTaskTool 的实现:
# DelegateTaskTool.run() def run(self, coworker: str, task: str, context: str) -> str: """委派任务给 Worker Agent""" # 1. 查找 Worker Agent agent = self._find_agent_by_role(coworker) # 2. 创建子任务 sub_task = Task( description=task, expected_output="完成任务并返回结果", agent=agent, ) # 3. 执行子任务 result = agent.execute_task(sub_task, context=context) # ↓ 进入 Agent.execute_task() # 与 Sequential Process 相同 # 4. 返回结果给 Manager return result
关键功能流程剖析
功能 1:Sequential Process 执行流程
功能描述
按照任务定义的顺序依次执行,每个任务使用前序任务的输出作为上下文。
核心代码
def _run_sequential_process(self) -> CrewOutput:
"""顺序执行流程"""
# 包含规划任务
tasks = self.planning_tasks + self.tasks if self.planning else self.tasks
# 执行任务序列
return self._execute_tasks(tasks)
def _execute_tasks(
self,
tasks: list[Task],
start_index: int | None = 0,
was_replayed: bool = False,
) -> CrewOutput:
"""执行任务序列"""
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
# 添加回放的任务输出
if was_replayed:
task_outputs.extend(self._task_outputs_replayed_from_memory)
# 遍历任务
for task_index, task in enumerate(tasks):
# 跳过已执行的任务(回放场景)
if task_index < start_index:
continue
# 1. 处理条件任务
if isinstance(task, ConditionalTask):
skipped_output = self._handle_conditional_task(
task, task_outputs, futures, task_index
)
if skipped_output:
task_outputs.append(skipped_output)
continue
# 2. 确定执行 Agent
agent_to_use = self._get_agent_to_use(task)
# 3. 准备工具
tools_for_task = self._prepare_tools(
agent_to_use, task, task.tools or []
)
# 4. 获取上下文
context = self._get_context(task, task_outputs)
# 5. 执行任务
if task.async_execution:
# 异步执行
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
# 同步执行(先完成所有异步任务)
if futures:
task_outputs = self._process_async_tasks(futures)
futures.clear()
# 执行当前同步任务
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index)
# 6. 等待剩余异步任务
if futures:
task_outputs = self._process_async_tasks(futures)
# 7. 创建 CrewOutput
return self._create_crew_output(task_outputs)
关键子函数
获取上下文:
def _get_context(
self, task: Task, task_outputs: list[TaskOutput]
) -> str:
"""获取任务上下文"""
if task.context is NOT_SPECIFIED:
# 默认:使用所有前序任务的输出
return "\n".join([output.raw for output in task_outputs])
elif isinstance(task.context, list):
# 显式指定:仅使用特定任务的输出
context_outputs = []
for context_task in task.context:
# 查找对应的输出
for output in task_outputs:
if output.description == context_task.description:
context_outputs.append(output.raw)
break
return "\n".join(context_outputs)
else:
return ""
准备工具:
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: list[BaseTool]
) -> list[BaseTool]:
"""准备任务工具"""
# 1. 合并 Agent 和 Task 的工具
combined_tools = list(agent.tools) + list(tools)
# 2. 添加 RPM 限制包装
if self.max_rpm:
wrapped_tools = []
for tool in combined_tools:
wrapped_tool = self._wrap_tool_with_rpm_control(tool)
wrapped_tools.append(wrapped_tool)
combined_tools = wrapped_tools
# 3. 添加缓存包装
if self.cache:
cached_tools = []
for tool in combined_tools:
cached_tool = CacheTool(
original_tool=tool,
cache_handler=self.cache_handler,
)
cached_tools.append(cached_tool)
combined_tools = cached_tools
return combined_tools
功能 2:Hierarchical Process 执行流程
功能描述
自动创建 Manager Agent 负责任务分配和协调,Worker Agent 接受委派并执行任务。
核心代码
def _run_hierarchical_process(self) -> CrewOutput:
"""层级执行流程"""
# 1. 创建 Manager Agent
manager = self._create_manager_agent()
# 2. 创建管理任务
manager_task = Task(
description=self._get_manager_task_description(),
expected_output=self._get_manager_expected_output(),
agent=manager,
)
# 3. Manager 执行管理任务
# Manager 会根据需要委派任务给 Worker Agents
result = manager_task.execute_sync(
agent=manager,
context=None,
tools=manager.tools,
)
# 4. 收集所有任务输出
task_outputs = self._collect_hierarchical_outputs()
# 5. 创建 CrewOutput
return self._create_crew_output(task_outputs)
def _create_manager_agent(self) -> Agent:
"""创建 Manager Agent"""
# 1. 获取配置
if self.manager_agent:
manager_config = self.manager_agent
else:
manager_config = {
"role": "Project Manager",
"goal": "Coordinate team to accomplish project goals efficiently",
"backstory": "Experienced project manager skilled at task delegation",
}
# 2. 创建委派工具
delegate_tools = [
DelegateTaskTool(agents=self.agents),
AskQuestionTool(agents=self.agents),
]
# 3. 创建 Manager
manager = Agent(
**manager_config,
llm=self.manager_llm or self.agents[0].llm,
tools=delegate_tools,
allow_delegation=True,
verbose=self.verbose,
)
return manager
Manager Agent 的工作流程
sequenceDiagram
autonumber
participant Manager as Manager Agent
participant DelegateTool as DelegateTaskTool
participant Worker1 as Worker Agent 1
participant Worker2 as Worker Agent 2
Manager->>Manager: 分析项目目标
Manager->>Manager: 制定任务计划
loop 任务分配循环
Manager->>DelegateTool: delegate_work(coworker="Researcher", task="Research")
DelegateTool->>Worker1: execute_task()
Worker1-->>DelegateTool: TaskOutput
DelegateTool-->>Manager: 任务结果
Manager->>Manager: 分析结果
Manager->>DelegateTool: delegate_work(coworker="Writer", task="Write report")
DelegateTool->>Worker2: execute_task()
Worker2-->>DelegateTool: TaskOutput
DelegateTool-->>Manager: 任务结果
end
Manager->>Manager: 聚合所有结果
Manager-->>Manager: 最终报告
功能 3:Memory 系统集成
功能描述
Crew 管理四种类型的记忆系统,在任务执行过程中自动保存和检索相关信息。
核心代码
def _initialize_memory(self) -> None:
"""初始化内存系统"""
if not self.memory:
return
# 1. 设置存储目录
storage_dir = os.getenv("CREWAI_STORAGE_DIR", appdirs.user_data_dir("crewai"))
# 2. 初始化短期记忆
if not self.short_term_memory:
self.short_term_memory = ShortTermMemory(
crew=self,
embedder_config=self.embedder,
)
# 3. 初始化长期记忆
if not self.long_term_memory:
self.long_term_memory = LongTermMemory(
crew=self,
embedder_config=self.embedder,
)
# 4. 初始化实体记忆
if not self.entity_memory:
self.entity_memory = EntityMemory(
crew=self,
embedder_config=self.embedder,
)
# 5. 初始化外部记忆(如果配置)
if self.external_memory and not self.external_memory:
self.external_memory = ExternalMemory(
crew=self,
provider="mem0",
config={"api_key": os.getenv("MEM0_API_KEY")},
)
# 6. 将记忆系统注入到 Agent
for agent in self.agents:
agent.short_term_memory = self.short_term_memory
agent.long_term_memory = self.long_term_memory
agent.entity_memory = self.entity_memory
agent.external_memory = self.external_memory
def reset_memories(self, command_type: str) -> None:
"""重置记忆系统"""
if command_type == "short_term":
self.short_term_memory.reset()
elif command_type == "long_term":
self.long_term_memory.reset()
elif command_type == "entity":
self.entity_memory.reset()
elif command_type == "external":
if self.external_memory:
self.external_memory.reset()
elif command_type == "all":
self.short_term_memory.reset()
self.long_term_memory.reset()
self.entity_memory.reset()
if self.external_memory:
self.external_memory.reset()
Memory 使用示例
# 启用内存系统
crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
memory=True, # 启用所有记忆类型
embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
)
# Agent 自动使用记忆
result = crew.kickoff(inputs={"topic": "AI Trends"})
# 手动查询记忆
relevant_memories = crew.short_term_memory.search(
query="What are the key AI trends?",
limit=5,
score_threshold=0.7,
)
# 清理记忆
crew.reset_memories("short_term") # 仅清理短期记忆
crew.reset_memories("all") # 清理所有记忆
功能 4:Planning 自动规划
功能描述
Crew 可以自动生成规划任务,分析项目目标并制定执行计划。
核心代码
def _create_planning_tasks(self) -> list[Task]:
"""创建规划任务"""
# 1. 创建规划 Agent
planning_agent = Agent(
role="Task Planner",
goal="Create detailed task breakdown and execution plan",
backstory="Expert in project planning and task decomposition",
llm=self.agents[0].llm,
verbose=self.verbose,
)
# 2. 创建规划任务
planning_task = Task(
description=f"""
Analyze the following project goals and create a detailed execution plan:
Goals:
{self._format_goals_for_planning()}
Tasks to be executed:
{self._format_tasks_for_planning()}
Create a step-by-step plan including:
1. Task dependencies
2. Resource allocation
3. Risk assessment
4. Timeline estimates
""",
expected_output="A detailed execution plan with task breakdown",
agent=planning_agent,
)
return [planning_task]
def _format_goals_for_planning(self) -> str:
"""格式化 Crew 目标"""
goals = []
for agent in self.agents:
goals.append(f"- {agent.role}: {agent.goal}")
return "\n".join(goals)
def _format_tasks_for_planning(self) -> str:
"""格式化任务列表"""
tasks_desc = []
for task in self.tasks:
tasks_desc.append(f"- {task.description}")
return "\n".join(tasks_desc)
Planning 使用示例
# 启用自动规划
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
planning=True, # 启用自动规划
)
# Crew 会先执行规划任务,然后按照规划执行实际任务
result = crew.kickoff(inputs={"project": "Market Analysis"})
# 规划任务的输出会作为后续任务的上下文
planning_output = result.tasks_output[0] # 第一个输出是规划结果
print(planning_output.raw)
实战示例与最佳实践
示例 1:Sequential Process 基础用法
from crewai import Crew, Agent, Task, Process
# 定义 Agents
researcher = Agent(
role="Research Analyst",
goal="Find and analyze relevant information",
backstory="Expert researcher with attention to detail",
verbose=True,
)
writer = Agent(
role="Content Writer",
goal="Write engaging content based on research",
backstory="Skilled writer with journalism background",
verbose=True,
)
# 定义 Tasks
research_task = Task(
description="Research the latest {topic} trends in 2024",
expected_output="A comprehensive research report with 5 key findings",
agent=researcher,
)
write_task = Task(
description="Write a blog post based on the research",
expected_output="A 500-word blog post in Markdown format",
agent=writer,
markdown=True,
)
# 创建 Crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
verbose=True,
)
# 执行
result = crew.kickoff(inputs={"topic": "AI Safety"})
print(result.raw)
print(f"Total tokens used: {result.token_usage.total_tokens}")
示例 2:Hierarchical Process 用法
# 定义 Worker Agents
data_collector = Agent(
role="Data Collector",
goal="Collect relevant data from various sources",
backstory="Data specialist",
)
data_analyst = Agent(
role="Data Analyst",
goal="Analyze collected data and extract insights",
backstory="Statistical analyst",
)
report_writer = Agent(
role="Report Writer",
goal="Create comprehensive reports",
backstory="Business analyst",
)
# 定义任务(Manager 会动态分配)
tasks = [
Task(
description="Collect market data for {product}",
expected_output="Raw market data",
agent=data_collector,
),
Task(
description="Analyze market trends",
expected_output="Analysis report with insights",
agent=data_analyst,
),
Task(
description="Create final market analysis report",
expected_output="Executive summary and recommendations",
agent=report_writer,
),
]
# 创建 Hierarchical Crew
crew = Crew(
agents=[data_collector, data_analyst, report_writer],
tasks=tasks,
process=Process.hierarchical,
manager_llm="gpt-4", # Manager 使用更强的模型
verbose=True,
)
result = crew.kickoff(inputs={"product": "Electric Vehicles"})
示例 3:使用 Memory 系统
# 创建带记忆的 Crew
crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
memory=True,
embedder={
"provider": "openai",
"config": {"model": "text-embedding-3-small"}
},
verbose=True,
)
# 首次执行
result1 = crew.kickoff(inputs={"topic": "AI Trends"})
# 第二次执行(记忆会提供上次执行的上下文)
result2 = crew.kickoff(inputs={"topic": "AI Trends in Healthcare"})
# Agent 会自动检索相关记忆
# 例如:"根据之前关于 AI Trends 的研究..."
示例 4:异步任务执行
# 定义独立的异步任务
task1 = Task(
description="Research {topic} from academic papers",
expected_output="Academic research summary",
agent=academic_researcher,
async_execution=True, # 异步执行
)
task2 = Task(
description="Research {topic} from industry reports",
expected_output="Industry insights",
agent=industry_analyst,
async_execution=True, # 异步执行
)
task3 = Task(
description="Research {topic} from social media",
expected_output="Public sentiment analysis",
agent=social_media_analyst,
async_execution=True, # 异步执行
)
# 同步任务,等待所有异步任务完成
synthesis_task = Task(
description="Synthesize all research findings",
expected_output="Comprehensive synthesis report",
agent=synthesizer,
async_execution=False, # 同步执行
)
crew = Crew(
agents=[academic_researcher, industry_analyst, social_media_analyst, synthesizer],
tasks=[task1, task2, task3, synthesis_task],
)
# task1, task2, task3 并行执行
# synthesis_task 等待三者完成后执行
result = crew.kickoff(inputs={"topic": "Climate Change"})
示例 5:使用 @CrewBase 装饰器模式
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
@CrewBase
class ResearchCrew:
"""Research crew with decorators"""
# 配置文件路径
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@before_kickoff
def setup_environment(self):
"""启动前回调"""
print("🚀 Setting up research environment...")
# 验证 API keys
# 创建必要的目录
# 初始化外部服务
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
tools=[SerperDevTool()],
verbose=True,
memory=True,
)
@agent
def analyst(self) -> Agent:
return Agent(
config=self.agents_config['analyst'],
verbose=True,
memory=True,
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
agent=self.researcher(),
)
@task
def analysis_task(self) -> Task:
return Task(
config=self.tasks_config['analysis_task'],
agent=self.analyst(),
context=[self.research_task()],
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents, # 自动收集所有 @agent
tasks=self.tasks, # 自动收集所有 @task
process=Process.sequential,
memory=True,
verbose=True,
)
@after_kickoff
def cleanup_and_report(self, output: CrewOutput):
"""完成后回调"""
print(f"✅ Research completed!")
print(f"📊 Token usage: {output.token_usage.total_tokens}")
print(f"📝 Generated {len(output.tasks_output)} task outputs")
# 使用
research_crew = ResearchCrew()
result = research_crew.crew().kickoff(inputs={"topic": "Quantum Computing"})
最佳实践总结
1. Process 选择策略
使用 Sequential Process 当:
- 任务有明确的依赖链
- 工作流是线性的
- Agent 角色和职责清晰
使用 Hierarchical Process 当:
- 需要动态任务分配
- 任务之间的依赖复杂
- 需要一个协调者管理整体进度
2. Memory 配置建议
- 短期记忆:始终启用,提供任务间的上下文
- 长期记忆:用于跨会话的知识积累
- 实体记忆:跟踪重要实体(人物、组织等)
- 外部记忆:集成 Mem0 用于高级记忆功能
# 推荐配置
crew = Crew(
agents=[...],
tasks=[...],
memory=True, # 启用所有记忆类型
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-small", # 性价比高
"dimensions": 1536,
}
},
)
3. RPM 限速策略
- 开发环境:不设置
max_rpm(无限制) - 生产环境:根据 API 限额设置
max_rpm - 批量执行:设置合理的
max_rpm避免 429 错误
# 生产环境推荐
crew = Crew(
agents=[...],
tasks=[...],
max_rpm=60, # 每分钟最多 60 次请求
)
4. 错误处理与重试
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def safe_kickoff(crew, inputs):
"""带重试的 kickoff"""
try:
return crew.kickoff(inputs=inputs)
except Exception as e:
print(f"Crew execution failed: {e}")
# 清理状态
crew.reset_memories("short_term")
raise
# 使用
result = safe_kickoff(crew, {"topic": "AI Safety"})
5. 性能优化技巧
使用异步任务:
# 独立任务使用异步
task1 = Task(..., async_execution=True)
task2 = Task(..., async_execution=True)
# 同步任务作为分隔点
task3 = Task(..., async_execution=False)
启用缓存:
crew = Crew(
agents=[...],
tasks=[...],
cache=True, # 默认启用
)
优化上下文传递:
# 仅传递必要的上下文
task = Task(
description="...",
expected_output="...",
context=[specific_task], # 而非 NOT_SPECIFIED
)
文档更新总结
本次更新对 CrewAI-03-Crew-完整剖析.md 进行了全面的扩充和深化,主要更新内容包括:
1. 整体服务架构图扩展
新增内容:
- 添加了完整的分层架构图,包含 8 个主要层次:
- 上游接口层(Entry Points)
- Crew 核心协调层
- 流程选择层(Process Layer)
- 任务执行层(Task Execution)
- Manager 管理层(Hierarchical Only)
- Task 执行层
- Agent 执行层
- 资源管理层(Shared Resources)
- 事件与日志层(Observability)
- 输出处理层
架构分层说明:
- 为每个层次添加了详细的职责说明
- 说明了各层之间的调用关系和数据流
- 标注了关键组件和它们的功能
2. 模块交互矩阵
新增内容:
- 创建了调用方和被调方的交互矩阵表格
- 说明了五种交互模式:
- 同步调用(Synchronous)
- 异步调用(Asynchronous)
- 事件发布(Event-Driven)
- 资源查询(Query)
- 回调通知(Callback)
3. 完整执行时序图
新增内容:
- 添加了从用户代码到最终输出的完整时序图(96 个步骤)
- 包含所有关键参与者:
- 用户代码
- Crew 编排器
- 回调系统
- Memory 系统
- Cache 系统
- Knowledge 系统
- Process 层
- 任务执行层
- Task
- Agent
- AgentExecutor
- LLM
- Tool
- Event Bus
- Storage
- Output 层
时序图功能详解:
-
分为 5 个主要阶段:
- 阶段 1:启动和初始化(步骤 1-15)
- 阶段 2:流程选择(步骤 16-20)
- 阶段 3:任务执行循环(步骤 21-85)
- 阶段 4:输出聚合(步骤 86-90)
- 阶段 5:完成和清理(步骤 91-96)
-
为每个阶段提供了详细的步骤说明
4. 调用链路深度分析
新增内容:
- 添加了两条完整的调用链路分析:
- 路径 1:用户代码 → kickoff() → Sequential Process 完整链路
- 路径 2:用户代码 → kickoff() → Hierarchical Process 完整链路
详细分析内容:
路径 1 包含 9 个子链路:
- 入口层调用链:从
crew.kickoff()的完整实现 - 插值层调用链:
_interpolate_inputs()的工作流程 - Sequential Process 执行链:
_execute_tasks()的循环逻辑 - 工具准备链:
_prepare_tools()如何合并和增强工具 - 上下文聚合链:
_get_context()如何聚合任务输出 - Task 执行链:
Task.execute_sync()和_execute_core()的流程 - Agent 执行链:
Agent.execute_task()的详细步骤 - AgentExecutor 循环链:LangChain 的执行循环机制
- 输出聚合链:
_create_crew_output()的输出处理
路径 2 包含 2 个子链路:
- Hierarchical Process 执行链:
_run_hierarchical_process()的流程 - Manager Agent 执行任务的流程:Manager 如何委派任务给 Worker
每个子链路都包含:
- 完整的代码示例(带注释)
- 调用流程箭头(↓)标注下一步调用
- 关键决策点和条件分支
- 数据传递和转换过程
5. 架构图与时序图的关联
改进内容:
- 时序图的每个步骤都可以映射回架构图的某个层次
- 调用链路分析展示了架构图中各模块的具体实现
- 通过三个维度(架构图、时序图、调用链)形成完整的理解体系
6. 文档组织优化
改进内容:
- 保留了原有的所有内容(数据结构、API 规格、实战示例)
- 在原有基础上增加了架构分析、时序分析、调用链路分析
- 形成了从宏观到微观的完整分析链条:
- 宏观:整体服务架构图
- 中观:完整执行时序图
- 微观:调用链路深度分析
- 实践:实战示例与最佳实践
更新价值
对读者的价值:
- 全局视角:通过架构图快速了解 Crew 的整体结构
- 流程理解:通过时序图理解完整的执行流程
- 细节掌握:通过调用链路深入理解每个函数的实现
- 实践指导:结合实战示例可以快速上手开发
对技术团队的价值:
- 新人培训:可作为新成员了解 CrewAI 的入门材料
- 问题排查:当遇到问题时可以快速定位到具体的调用链路
- 架构演进:为后续的架构优化提供参考
- 代码审查:作为代码审查的参考标准
总结
Crew 模块是 CrewAI 框架的核心编排器,通过以下机制实现强大的多 Agent 协作:
- 灵活的执行流程:Sequential 和 Hierarchical 两种模式
- 完善的记忆系统:四种类型的记忆机制
- 智能的上下文管理:自动或手动传递任务输出
- 丰富的生命周期钩子:before/after kickoff 回调
- 强大的训练与回放:支持 Crew 训练和任务回放
- 精细的资源控制:RPM 限速、缓存、知识库集成
架构设计原则
该模块的设计遵循以下原则:
- 编排者模式:专注于协调 Agent 和 Task 的协作,将实际执行委托给 Agent,将输出验证委托给 Task 的 Guardrail
- 职责分离:每个层次有明确的职责边界,降低耦合度
- 事件驱动:通过事件总线实现松耦合的组件通信
- 资源共享:Memory、Cache、Knowledge 等资源在所有 Agent 间共享
- 可观测性:完善的日志、事件和追踪系统支持调试和监控
关键技术特性
-
多进程模型:
- Sequential Process:线性任务流
- Hierarchical Process:层级委派管理
-
异步执行:
- 支持任务异步执行
- Future 管理异步任务结果
- 同步点等待所有异步任务完成
-
上下文传递:
- 自动聚合前序任务输出
- 支持显式指定上下文任务
- 智能的上下文插值
-
工具系统:
- 动态工具准备和合并
- 工具结果缓存
- 委派工具、代码执行工具、多模态工具等
-
输出处理:
- 支持 Pydantic 模型、JSON、Raw 字符串
- Guardrails 验证机制
- 自动重试和反馈
性能和可扩展性
-
性能优化:
- 工具结果缓存减少重复调用
- RPM 控制器避免 API 限额
- 异步任务提高并发能力
-
可扩展性:
- 插件化的工具系统
- 可定制的 Manager Agent
- 灵活的回调机制
- 外部记忆集成
-
可维护性:
- 清晰的分层架构
- 完善的日志和事件
- 任务回放功能
- 执行记录持久化
该模块保持了清晰的职责边界和良好的可扩展性,是 CrewAI 框架的核心价值所在。