CrewAI-00-总览
0. 摘要
项目目标与问题域
CrewAI 是一个独立构建的轻量级、高性能的 Python 多智能体编排框架,用于协调自主 AI 代理完成复杂任务。该框架专注于解决以下核心问题:
- 多智能体协作:通过角色分配、任务委托和协作机制,实现多个 AI 代理的有效协同工作
- 灵活的工作流控制:提供 Crews(自主协作)和 Flows(精确控制)两种互补的编排模式
- 生产级可靠性:支持记忆系统、知识管理、任务重试、输出验证等企业级特性
- 易用性与可扩展性:通过 YAML 配置、装饰器模式和工具集成,降低开发门槛
核心能力边界
包含的能力:
- Agent(智能体)的创建、配置和执行
- Task(任务)的定义、调度和结果处理
- Crew(团队)的编排和流程管理(Sequential、Hierarchical)
- Flow(流程)的事件驱动编排和状态管理
- Memory(记忆)系统:短期、长期、实体和外部记忆
- Knowledge(知识)管理:多源知识检索和 RAG 集成
- LLM(大语言模型)抽象层和多提供商支持
- Tools(工具)集成:内置工具、自定义工具、MCP 工具
- Events(事件)系统:全局事件总线和监听器
- CLI(命令行)工具:项目创建、训练、测试、部署
不包含的能力:
- LLM 模型训练和微调(但支持集成已训练模型)
- 数据持久化底层实现(依赖 ChromaDB、SQLite 等)
- 分布式任务调度(单机多线程/协程模型)
- 实时流式对话管理(仅支持批处理式任务执行)
运行环境
- 语言版本:Python 3.10 - 3.13
- 核心依赖:
pydantic>=2.11.9:数据验证和序列化openai>=1.13.3:LLM API 调用chromadb~=1.1.0:向量数据库opentelemetry-api:可观测性
- 可选依赖:
crewai-tools:扩展工具集tiktoken:Token 计数mem0ai:外部记忆集成qdrant-client:替代向量库
部署形态
- 单体应用:Python 进程内运行,适合小规模任务
- 微服务:通过 REST API 或 gRPC 封装 Crew 执行
- 异步任务队列:结合 Celery/RQ 处理长任务
- Serverless:AWS Lambda、Azure Functions 等无服务器环境
- 容器化:Docker/Kubernetes 部署
1. 整体架构图
flowchart TB
subgraph "外部接口层"
CLI[CLI 命令行工具]
API[CrewAI API<br/>Agent/Task/Crew/Flow]
end
subgraph "编排层"
Crew[Crew 编排器<br/>Sequential/Hierarchical]
Flow[Flow 引擎<br/>事件驱动编排]
end
subgraph "执行层"
Agent[Agent 智能体]
Task[Task 任务管理器]
Executor[AgentExecutor<br/>任务执行引擎]
end
subgraph "LLM 层"
LLM[LLM 抽象层]
Providers[多提供商支持<br/>OpenAI/Anthropic/...]
end
subgraph "工具层"
Tools[工具管理器]
BuiltinTools[内置工具]
CustomTools[自定义工具]
MCPTools[MCP 工具]
end
subgraph "存储层"
Memory[Memory 记忆系统]
Knowledge[Knowledge 知识库]
RAG[RAG 检索引擎]
end
subgraph "基础设施层"
Events[事件总线]
Telemetry[遥测与追踪]
Config[配置管理]
Security[安全与指纹]
end
CLI --> API
API --> Crew
API --> Flow
Crew --> Agent
Crew --> Task
Flow --> Agent
Flow --> Task
Agent --> Executor
Task --> Executor
Executor --> LLM
Executor --> Tools
LLM --> Providers
Tools --> BuiltinTools
Tools --> CustomTools
Tools --> MCPTools
Agent --> Memory
Agent --> Knowledge
Memory --> RAG
Knowledge --> RAG
Crew --> Events
Flow --> Events
Agent --> Events
Task --> Events
Events --> Telemetry
Agent --> Config
Agent --> Security
Task --> Security
架构要点说明
1) 组件职责与耦合关系
- CLI 与 API:CLI 提供命令行入口,API 提供编程接口,均依赖编排层
- 编排层(Crew/Flow):
- Crew:自主协作模式,支持顺序和分层流程
- Flow:精确控制模式,基于事件驱动和状态机
- 两者可嵌套组合使用
- 执行层(Agent/Task/Executor):
- Agent:角色定义、工具管理、记忆访问
- Task:任务配置、输出验证、上下文传递
- Executor:循环推理、工具调用、结果生成
- LLM 层:统一的 LLM 接口,屏蔽不同提供商差异
- 工具层:工具注册、参数解析、执行委托
- 存储层:记忆与知识的持久化和检索
- 基础设施层:横切关注点(事件、监控、配置、安全)
2) 数据流与控制流
数据流:
用户输入 → Crew.kickoff(inputs) → Task.description/expected_output
→ Agent.execute_task → Executor.invoke → LLM.call → 输出
→ TaskOutput → Memory.save → 下一个 Task 的 context
控制流:
- Sequential:任务按定义顺序串行执行
- Hierarchical:Manager Agent 动态委托任务给其他 Agent
- Flow:基于装饰器(@start/@listen/@router)的事件驱动流转
3) 跨进程/跨线程路径
- 同步任务:单线程顺序执行
- 异步任务(async_execution=True):
- 使用
concurrent.futures.ThreadPoolExecutor - 任务在后台线程执行,主线程继续处理
- 通过
Future对象获取结果
- 使用
- Flow 异步:
- 使用 Python
asyncio - 支持
kickoff_async()和并发 Flow 执行
- 使用 Python
4) 高可用与扩展性
- 无状态设计:Agent/Task/Crew 实例可随时创建/销毁
- 记忆持久化:存储在本地文件系统或外部数据库
- 水平扩展:多进程/多实例部署,共享存储
- 容错机制:
- 任务重试(max_retry_limit)
- Guardrail 输出验证
- 超时控制(max_execution_time)
5) 状态管理位置
- Crew 状态:
_inputs:当前执行的输入参数_task_output_handler:任务输出存储usage_metrics:Token 使用统计
- Flow 状态:
self.state:结构化状态(Pydantic 模型或 dict)- 持久化到文件系统(可选)
- Agent 状态:
tools_results:工具调用结果缓存_times_executed:重试计数
- Memory 状态:
- 短期记忆:ChromaDB 向量存储
- 长期记忆:SQLite 数据库
2. 全局时序图(主要业务闭环)
2.1 Sequential Process 执行流程
sequenceDiagram
autonumber
participant User as 用户
participant Crew as Crew 编排器
participant Agent1 as Agent1
participant Agent2 as Agent2
participant Executor as AgentExecutor
participant LLM as LLM 层
participant Memory as Memory
participant Tool as 工具
User->>Crew: kickoff(inputs)
Note over Crew: 插值 inputs 到 Task/Agent
Crew->>Crew: _set_tasks_callbacks()
Crew->>Agent1: set_knowledge(embedder)
Crew->>Agent1: create_agent_executor()
Crew->>Agent1: execute_task(task1, context=None)
Agent1->>Memory: 检索相关记忆
Memory-->>Agent1: 返回记忆片段
Agent1->>Executor: invoke(task_prompt, tools)
loop 推理循环(最多 max_iter 次)
Executor->>LLM: call(messages)
LLM-->>Executor: 返回 action/tool_call
alt 需要调用工具
Executor->>Tool: execute(tool_input)
Tool-->>Executor: tool_output
Executor->>LLM: call(messages + tool_output)
else 输出 Final Answer
LLM-->>Executor: final_answer
end
end
Executor-->>Agent1: 返回结果
Agent1->>Agent1: _export_output(result)
Agent1->>Memory: save(task_output)
Agent1-->>Crew: TaskOutput1
Note over Crew: 处理 Task2,context=TaskOutput1
Crew->>Agent2: execute_task(task2, context=output1)
Agent2->>Executor: invoke(task_prompt_with_context)
Executor->>LLM: call(messages)
LLM-->>Executor: final_answer
Executor-->>Agent2: 返回结果
Agent2-->>Crew: TaskOutput2
Crew->>Crew: _create_crew_output([output1, output2])
Crew-->>User: CrewOutput
2.2 Hierarchical Process 执行流程
sequenceDiagram
autonumber
participant User as 用户
participant Crew as Crew 编排器
participant Manager as Manager Agent
participant Worker1 as Worker Agent1
participant Worker2 as Worker Agent2
participant Tool as Delegation Tool
User->>Crew: kickoff(inputs)
Crew->>Crew: _create_manager_agent()
Note over Crew: 创建 Manager Agent<br/>注入 delegation_tools
Crew->>Manager: execute_task(task, tools=[delegation_tools])
Manager->>Manager: 分析任务需求
alt 委托给 Worker1
Manager->>Tool: Delegate work to Worker1
Tool->>Worker1: execute_sub_task(description)
Worker1->>Worker1: 执行工作
Worker1-->>Tool: 返回结果
Tool-->>Manager: Worker1 结果
end
alt 委托给 Worker2
Manager->>Tool: Ask question to Worker2
Tool->>Worker2: execute_sub_task(question)
Worker2->>Worker2: 回答问题
Worker2-->>Tool: 返回答案
Tool-->>Manager: Worker2 答案
end
Manager->>Manager: 整合所有结果
Manager-->>Crew: 综合 TaskOutput
Crew-->>User: CrewOutput
图解与要点
1) 入口与鉴权
- 入口:
Crew.kickoff(inputs) - 鉴权:通过
security_config.fingerprint验证 Agent/Task 来源 - 输入验证:插值检查({placeholder} 是否有对应值)
2) 幂等性
- 任务级幂等:通过
task.id和task.key(基于描述和预期输出的哈希)唯一标识 - 工具幂等:工具自身需保证幂等性(CrewAI 不强制)
- 记忆幂等:同一内容多次保存不会重复(向量相似度去重)
3) 回退策略
- Agent 执行失败:
- 重试机制(max_retry_limit,默认 2 次)
- 超时后抛出
TimeoutError,不重试
- Guardrail 验证失败:
- 重新执行任务(guardrail_max_retries,默认 3 次)
- 超过次数后抛出异常
- LLM 调用失败:
- LiteLLM 错误不重试,直接抛出
- 其他错误触发 Agent 级重试
4) 重试点
- Agent.execute_task:LLM 调用失败、工具执行失败
- Task.guardrail:输出验证失败
- Tool 调用:部分工具支持内部重试(如网络工具)
5) 超时与资源上界
- 任务超时:
max_execution_time(秒),使用ThreadPoolExecutor强制终止 - 推理循环上界:
max_iter(默认 25),防止无限循环 - RPM 限制:
max_rpm控制每分钟请求数 - Token 限制:
respect_context_window=True:自动截断历史消息- 未强制 Token 总数上限(依赖 LLM 提供商)
3. 模块边界与交互图
3.1 模块清单
| 序号 | 模块名 | 路径 | 职责 | 对外 API 提供 | 依赖模块 |
|---|---|---|---|---|---|
| 01 | Agent | crewai/agent.py |
智能体定义、工具管理、记忆访问 | Agent 类 | LLM、Tools、Memory |
| 02 | Task | crewai/task.py |
任务配置、输出验证、上下文传递 | Task 类 | Agent、Guardrail |
| 03 | Crew | crewai/crew.py |
团队编排、流程控制 | Crew 类 | Agent、Task、Memory |
| 04 | Flow | crewai/flow/flow.py |
事件驱动编排、状态管理 | Flow 类、装饰器 | Events |
| 05 | Memory | crewai/memory/ |
短期、长期、实体、外部记忆 | Memory 类 | RAG、Storage |
| 06 | Knowledge | crewai/knowledge/ |
知识源管理、检索 | Knowledge 类 | RAG、Storage |
| 07 | LLM | crewai/llm.py |
LLM 抽象层、提供商适配 | LLM 类、BaseLLM | 无 |
| 08 | Tools | crewai/tools/ |
工具注册、参数解析、执行 | BaseTool 类 | Agent |
| 09 | Events | crewai/events/ |
事件总线、监听器 | EventBus 类 | 无 |
| 10 | RAG | crewai/rag/ |
向量检索、嵌入模型 | RAGPipeline 类 | ChromaDB/Qdrant |
| 11 | CLI | crewai/cli/ |
命令行工具 | crewai 命令 | Crew、Flow、Project |
| 12 | Project | crewai/project/ |
装饰器模式(@agent/@task/@crew) | CrewBase 类 | Agent、Task、Crew |
3.2 模块交互矩阵
| 调用方\被调方 | Agent | Task | Crew | Flow | Memory | Knowledge | LLM | Tools | Events | RAG | CLI |
|---|---|---|---|---|---|---|---|---|---|---|---|
| CLI | - | - | ✓ | ✓ | - | - | - | - | - | - | - |
| Crew | ✓ | ✓ | - | - | ✓ | ✓ | - | - | ✓ | - | - |
| Flow | ✓ | ✓ | ✓ | - | - | - | - | - | ✓ | - | - |
| Agent | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | - |
| Task | ✓ | - | - | - | - | - | - | ✓ | ✓ | - | - |
| Memory | - | - | - | - | - | - | - | - | - | ✓ | - |
| Knowledge | - | - | - | - | - | - | - | - | - | ✓ | - |
| Tools | - | - | - | - | - | - | ✓ | - | ✓ | - | - |
交互说明:
- Crew → Agent/Task:同步调用(Sequential)或委托调用(Hierarchical)
- Agent → Memory/Knowledge:同步调用,检索相关上下文
- Agent → LLM:同步调用,获取推理结果
- Agent → Tools:同步调用,执行工具操作
- 所有模块 → Events:异步发送事件(通过 EventBus)
- Memory/Knowledge → RAG:同步调用,向量检索
3.3 数据一致性要求
- TaskOutput 一致性:
- 强一致性:单个 Task 的输出立即可见
- 最终一致性:Memory 的保存是异步的(后台写入)
- Memory 一致性:
- 短期记忆(ChromaDB):无事务保证,可能丢失
- 长期记忆(SQLite):ACID 事务保证
- 外部记忆(Mem0):取决于外部服务
4. 关键设计与权衡
4.1 数据一致性与事务
设计决策:
- 无分布式事务:单机部署,依赖本地文件锁
- Memory 写入策略:
- 短期记忆:立即写入 ChromaDB(无事务)
- 长期记忆:批量提交到 SQLite(有事务)
- 任务输出存储:
- 默认不持久化
- 可选
output_file持久化到文件
权衡:
- ✅ 优点:简化架构,降低运维复杂度
- ❌ 缺点:无法保证跨存储的 ACID 特性
4.2 并发与锁策略
锁机制:
- RW 锁(Reader-Writer Lock):
- 用于 Memory 的并发读写
- 位置:
crewai/utilities/rw_lock.py
- 文件锁(portalocker):
- 用于 Flow 持久化状态的并发访问
- 防止多进程同时修改状态文件
- 线程池限制:
- 异步任务使用
ThreadPoolExecutor - 默认线程数由 Python 自动管理
- 异步任务使用
并发模型:
- Sequential Process:单线程执行
- Async Tasks:多线程并发
- Hierarchical Process:Manager 单线程,Worker 可并发
4.3 性能关键路径与可观测性
性能热点:
- LLM 调用延迟:
- 缓存工具结果(CacheHandler)
- 减少上下文长度(respect_context_window)
- Memory 检索延迟:
- 向量检索优化(score_threshold 过滤)
- 批量查询(单次检索多个记忆)
- Task 上下文传递:
- 避免传递大量文本(仅传递关键片段)
可观测性指标:
- Token 使用统计:
UsageMetrics:total_tokens、prompt_tokens、completion_tokens- 每个 Agent 和 Crew 级别统计
- 任务执行时间:
Task.start_time、Task.end_timeTask.execution_duration计算
- 事件追踪:
- OpenTelemetry 集成
- 自动追踪 AgentExecutionStartedEvent、TaskCompletedEvent 等
- 日志记录:
- 结构化日志(verbose 模式)
- 文件输出(output_log_file)
4.4 配置项与行为影响
Crew 级配置:
| 配置项 | 默认值 | 影响 |
|---|---|---|
| process | sequential | 控制任务执行顺序(sequential/hierarchical) |
| memory | False | 是否启用记忆系统 |
| max_rpm | None | 每分钟最大请求数(限流) |
| verbose | False | 是否打印详细日志 |
| cache | True | 是否缓存工具结果 |
| planning | False | 是否启用任务规划(AI 自动生成执行计划) |
Agent 级配置:
| 配置项 | 默认值 | 影响 |
|---|---|---|
| max_iter | 25 | 推理循环最大迭代次数 |
| max_execution_time | None | 任务超时时间(秒) |
| allow_delegation | True | 是否允许委托任务给其他 Agent |
| reasoning | False | 是否在执行前生成推理计划 |
| max_retry_limit | 2 | 执行失败最大重试次数 |
Task 级配置:
| 配置项 | 默认值 | 影响 |
|---|---|---|
| async_execution | False | 是否异步执行任务 |
| output_pydantic | None | 输出结构化验证(Pydantic 模型) |
| guardrail | None | 输出验证函数 |
| guardrail_max_retries | 3 | Guardrail 验证失败最大重试次数 |
5. 典型使用场景与流程
5.1 场景一:简单顺序任务
from crewai import Agent, Task, Crew, Process
# 1. 定义 Agent
researcher = Agent(
role="Research Analyst",
goal="Find the latest AI trends",
backstory="Expert in AI research",
verbose=True
)
# 2. 定义 Task
research_task = Task(
description="Research the top 5 AI trends in 2024",
expected_output="A list of 5 AI trends with brief descriptions",
agent=researcher
)
# 3. 创建 Crew 并执行
crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
print(result.raw)
执行流程:
crew.kickoff()→ 初始化 Agent 和 Taskresearcher.execute_task(research_task)→ 调用 LLM- LLM 返回结果 → 封装为
TaskOutput - 返回
CrewOutput(包含所有 TaskOutput)
5.2 场景二:分层管理
from crewai import Agent, Task, Crew, Process
# 定义 Worker Agents
researcher = Agent(role="Researcher", goal="Research topics")
writer = Agent(role="Writer", goal="Write reports")
# 定义任务(无需指定 agent,由 Manager 分配)
task1 = Task(description="Research AI in healthcare", expected_output="Report")
task2 = Task(description="Write a summary", expected_output="Summary")
# 创建 Hierarchical Crew
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
process=Process.hierarchical,
manager_llm="gpt-4" # Manager 使用 GPT-4
)
result = crew.kickoff()
执行流程:
- Crew 自动创建 Manager Agent
- Manager 分析任务,决定委托给 researcher 或 writer
- Worker 执行子任务并返回结果
- Manager 整合结果
5.3 场景三:Flow 事件驱动
from crewai.flow.flow import Flow, start, listen, router
class DataPipeline(Flow):
@start()
def load_data(self):
# 加载数据
self.state["data"] = [1, 2, 3]
return "loaded"
@listen("load_data")
def process_data(self, data):
# 处理数据
self.state["processed"] = sum(self.state["data"])
return "processed"
@router("process_data")
def decide_next(self):
if self.state["processed"] > 5:
return "high_value"
return "low_value"
@listen("high_value")
def handle_high_value(self):
print("High value detected!")
flow = DataPipeline()
flow.kickoff()
执行流程:
load_data()自动启动(@start)process_data()监听load_data完成事件decide_next()根据状态路由到high_value或low_valuehandle_high_value()监听high_value事件
6. 系统级关键场景时序图
6.1 冷启动场景
sequenceDiagram
autonumber
participant User as 用户
participant Crew as Crew
participant Memory as Memory
participant Knowledge as Knowledge
participant ChromaDB as ChromaDB
User->>Crew: 首次 kickoff()
Crew->>Memory: 初始化 Memory
alt Memory 未启用
Memory-->>Crew: 跳过初始化
else Memory 启用
Memory->>ChromaDB: 检查 Collection 是否存在
alt Collection 不存在
ChromaDB->>ChromaDB: 创建新 Collection
end
ChromaDB-->>Memory: 初始化完成
end
Crew->>Knowledge: 初始化 Knowledge
alt Knowledge 未配置
Knowledge-->>Crew: 跳过初始化
else Knowledge 已配置
Knowledge->>Knowledge: add_sources()
Knowledge->>ChromaDB: 批量插入文档
ChromaDB-->>Knowledge: 完成
end
Crew->>Crew: 执行任务流程
Crew-->>User: 返回结果
6.2 峰值压测路径
sequenceDiagram
autonumber
participant User as 用户(并发 100)
participant LoadBalancer as 负载均衡器
participant Crew1 as Crew 实例1
participant Crew2 as Crew 实例2
participant LLM as LLM API
participant RPMController as RPM 限流器
par 并发请求
User->>LoadBalancer: kickoff() * 100
LoadBalancer->>Crew1: 分发请求 1-50
LoadBalancer->>Crew2: 分发请求 51-100
end
Crew1->>RPMController: check_or_wait()
RPMController->>RPMController: 检查当前 RPM
alt 超过限制
RPMController->>Crew1: sleep(delay)
else 未超过限制
RPMController-->>Crew1: 允许执行
end
Crew1->>LLM: call(messages)
LLM-->>Crew1: 返回结果
Crew1-->>User: 返回结果
Note over Crew2: 同样的限流和执行流程
6.3 异常恢复场景
sequenceDiagram
autonumber
participant User as 用户
participant Crew as Crew
participant Agent as Agent
participant LLM as LLM
participant Memory as Memory
User->>Crew: kickoff(inputs)
Crew->>Agent: execute_task(task1)
Agent->>LLM: call(messages)
alt LLM 超时
LLM-->>Agent: TimeoutError
Agent->>Agent: 不重试,直接抛出
Agent-->>Crew: 抛出 TimeoutError
Crew-->>User: 抛出异常
else LLM 返回错误格式
LLM-->>Agent: 返回无效 JSON
Agent->>Agent: 验证失败,retry_count++
alt retry_count < max_retry_limit
Agent->>LLM: 重试 call(messages)
LLM-->>Agent: 返回正确结果
else retry_count >= max_retry_limit
Agent-->>Crew: 抛出 AgentExecutionError
end
else Guardrail 验证失败
Agent->>Agent: 调用 guardrail(output)
Agent->>Agent: 验证失败,重新生成
Agent->>LLM: call(messages + error_context)
LLM-->>Agent: 返回修正结果
end
Agent->>Memory: save(task_output)
Agent-->>Crew: TaskOutput
Crew-->>User: CrewOutput
说明:
- 冷启动:首次启动时初始化 Memory 和 Knowledge 存储
- 峰值压测:RPM 限流器自动控制请求速率
- 异常恢复:区分可重试(格式错误、Guardrail 失败)和不可重试(超时、LiteLLM 错误)
7. 附录:模块导出与版本信息
7.1 对外导出的核心 API
# crewai/__init__.py
__all__ = [
"LLM",
"Agent",
"BaseLLM",
"Crew",
"CrewOutput",
"Flow",
"Knowledge",
"LLMGuardrail",
"Process",
"Task",
"TaskOutput",
"__version__",
]
7.2 版本信息
- 当前版本:
v1.0.0 - Python 兼容:
>=3.10, <3.14 - 依赖管理:UV(基于 pyproject.toml)
7.3 项目仓库与文档
- GitHub:https://github.com/crewAIInc/crewAI
- 官方文档:https://docs.crewai.com
- 社区论坛:https://community.crewai.com
8. 总结
CrewAI 通过 Agent-Task-Crew-Flow 四层架构,实现了从简单顺序任务到复杂事件驱动流程的全覆盖:
- Agent:智能体核心,负责推理、工具调用和记忆访问
- Task:任务单元,定义输入输出和验证规则
- Crew:团队编排,支持顺序和分层流程
- Flow:事件驱动,实现精确控制和状态管理
该框架的设计哲学是 简单易用 + 深度可定制:
- 通过 YAML 配置和装饰器模式降低门槛
- 通过 LLM 抽象层和工具系统提供扩展性
- 通过记忆和知识管理支持长期上下文
- 通过事件系统和可观测性保证生产级可靠性