CrewAI-00-总览

0. 摘要

项目目标与问题域

CrewAI 是一个独立构建的轻量级、高性能的 Python 多智能体编排框架,用于协调自主 AI 代理完成复杂任务。该框架专注于解决以下核心问题:

  • 多智能体协作:通过角色分配、任务委托和协作机制,实现多个 AI 代理的有效协同工作
  • 灵活的工作流控制:提供 Crews(自主协作)和 Flows(精确控制)两种互补的编排模式
  • 生产级可靠性:支持记忆系统、知识管理、任务重试、输出验证等企业级特性
  • 易用性与可扩展性:通过 YAML 配置、装饰器模式和工具集成,降低开发门槛

核心能力边界

包含的能力

  • Agent(智能体)的创建、配置和执行
  • Task(任务)的定义、调度和结果处理
  • Crew(团队)的编排和流程管理(Sequential、Hierarchical)
  • Flow(流程)的事件驱动编排和状态管理
  • Memory(记忆)系统:短期、长期、实体和外部记忆
  • Knowledge(知识)管理:多源知识检索和 RAG 集成
  • LLM(大语言模型)抽象层和多提供商支持
  • Tools(工具)集成:内置工具、自定义工具、MCP 工具
  • Events(事件)系统:全局事件总线和监听器
  • CLI(命令行)工具:项目创建、训练、测试、部署

不包含的能力

  • LLM 模型训练和微调(但支持集成已训练模型)
  • 数据持久化底层实现(依赖 ChromaDB、SQLite 等)
  • 分布式任务调度(单机多线程/协程模型)
  • 实时流式对话管理(仅支持批处理式任务执行)

运行环境

  • 语言版本:Python 3.10 - 3.13
  • 核心依赖
    • pydantic>=2.11.9:数据验证和序列化
    • openai>=1.13.3:LLM API 调用
    • chromadb~=1.1.0:向量数据库
    • opentelemetry-api:可观测性
  • 可选依赖
    • crewai-tools:扩展工具集
    • tiktoken:Token 计数
    • mem0ai:外部记忆集成
    • qdrant-client:替代向量库

部署形态

  • 单体应用:Python 进程内运行,适合小规模任务
  • 微服务:通过 REST API 或 gRPC 封装 Crew 执行
  • 异步任务队列:结合 Celery/RQ 处理长任务
  • Serverless:AWS Lambda、Azure Functions 等无服务器环境
  • 容器化:Docker/Kubernetes 部署

1. 整体架构图

flowchart TB
    subgraph "外部接口层"
        CLI[CLI 命令行工具]
        API[CrewAI API<br/>Agent/Task/Crew/Flow]
    end

    subgraph "编排层"
        Crew[Crew 编排器<br/>Sequential/Hierarchical]
        Flow[Flow 引擎<br/>事件驱动编排]
    end

    subgraph "执行层"
        Agent[Agent 智能体]
        Task[Task 任务管理器]
        Executor[AgentExecutor<br/>任务执行引擎]
    end

    subgraph "LLM 层"
        LLM[LLM 抽象层]
        Providers[多提供商支持<br/>OpenAI/Anthropic/...]
    end

    subgraph "工具层"
        Tools[工具管理器]
        BuiltinTools[内置工具]
        CustomTools[自定义工具]
        MCPTools[MCP 工具]
    end

    subgraph "存储层"
        Memory[Memory 记忆系统]
        Knowledge[Knowledge 知识库]
        RAG[RAG 检索引擎]
    end

    subgraph "基础设施层"
        Events[事件总线]
        Telemetry[遥测与追踪]
        Config[配置管理]
        Security[安全与指纹]
    end

    CLI --> API
    API --> Crew
    API --> Flow
    Crew --> Agent
    Crew --> Task
    Flow --> Agent
    Flow --> Task
    Agent --> Executor
    Task --> Executor
    Executor --> LLM
    Executor --> Tools
    LLM --> Providers
    Tools --> BuiltinTools
    Tools --> CustomTools
    Tools --> MCPTools
    Agent --> Memory
    Agent --> Knowledge
    Memory --> RAG
    Knowledge --> RAG
    Crew --> Events
    Flow --> Events
    Agent --> Events
    Task --> Events
    Events --> Telemetry
    Agent --> Config
    Agent --> Security
    Task --> Security

架构要点说明

1) 组件职责与耦合关系

  • CLI 与 API:CLI 提供命令行入口,API 提供编程接口,均依赖编排层
  • 编排层(Crew/Flow)
    • Crew:自主协作模式,支持顺序和分层流程
    • Flow:精确控制模式,基于事件驱动和状态机
    • 两者可嵌套组合使用
  • 执行层(Agent/Task/Executor)
    • Agent:角色定义、工具管理、记忆访问
    • Task:任务配置、输出验证、上下文传递
    • Executor:循环推理、工具调用、结果生成
  • LLM 层:统一的 LLM 接口,屏蔽不同提供商差异
  • 工具层:工具注册、参数解析、执行委托
  • 存储层:记忆与知识的持久化和检索
  • 基础设施层:横切关注点(事件、监控、配置、安全)

2) 数据流与控制流

数据流

用户输入 → Crew.kickoff(inputs) → Task.description/expected_output
→ Agent.execute_task → Executor.invoke → LLM.call → 输出
→ TaskOutput → Memory.save → 下一个 Task 的 context

控制流

  • Sequential:任务按定义顺序串行执行
  • Hierarchical:Manager Agent 动态委托任务给其他 Agent
  • Flow:基于装饰器(@start/@listen/@router)的事件驱动流转

3) 跨进程/跨线程路径

  • 同步任务:单线程顺序执行
  • 异步任务(async_execution=True)
    • 使用 concurrent.futures.ThreadPoolExecutor
    • 任务在后台线程执行,主线程继续处理
    • 通过 Future 对象获取结果
  • Flow 异步
    • 使用 Python asyncio
    • 支持 kickoff_async() 和并发 Flow 执行

4) 高可用与扩展性

  • 无状态设计:Agent/Task/Crew 实例可随时创建/销毁
  • 记忆持久化:存储在本地文件系统或外部数据库
  • 水平扩展:多进程/多实例部署,共享存储
  • 容错机制
    • 任务重试(max_retry_limit)
    • Guardrail 输出验证
    • 超时控制(max_execution_time)

5) 状态管理位置

  • Crew 状态
    • _inputs:当前执行的输入参数
    • _task_output_handler:任务输出存储
    • usage_metrics:Token 使用统计
  • Flow 状态
    • self.state:结构化状态(Pydantic 模型或 dict)
    • 持久化到文件系统(可选)
  • Agent 状态
    • tools_results:工具调用结果缓存
    • _times_executed:重试计数
  • Memory 状态
    • 短期记忆:ChromaDB 向量存储
    • 长期记忆:SQLite 数据库

2. 全局时序图(主要业务闭环)

2.1 Sequential Process 执行流程

sequenceDiagram
    autonumber
    participant User as 用户
    participant Crew as Crew 编排器
    participant Agent1 as Agent1
    participant Agent2 as Agent2
    participant Executor as AgentExecutor
    participant LLM as LLM 
    participant Memory as Memory
    participant Tool as 工具

    User->>Crew: kickoff(inputs)
    Note over Crew: 插值 inputs  Task/Agent
    Crew->>Crew: _set_tasks_callbacks()
    Crew->>Agent1: set_knowledge(embedder)
    Crew->>Agent1: create_agent_executor()

    Crew->>Agent1: execute_task(task1, context=None)
    Agent1->>Memory: 检索相关记忆
    Memory-->>Agent1: 返回记忆片段
    Agent1->>Executor: invoke(task_prompt, tools)

    loop 推理循环(最多 max_iter 次)
        Executor->>LLM: call(messages)
        LLM-->>Executor: 返回 action/tool_call
        alt 需要调用工具
            Executor->>Tool: execute(tool_input)
            Tool-->>Executor: tool_output
            Executor->>LLM: call(messages + tool_output)
        else 输出 Final Answer
            LLM-->>Executor: final_answer
        end
    end

    Executor-->>Agent1: 返回结果
    Agent1->>Agent1: _export_output(result)
    Agent1->>Memory: save(task_output)
    Agent1-->>Crew: TaskOutput1

    Note over Crew: 处理 Task2context=TaskOutput1
    Crew->>Agent2: execute_task(task2, context=output1)
    Agent2->>Executor: invoke(task_prompt_with_context)
    Executor->>LLM: call(messages)
    LLM-->>Executor: final_answer
    Executor-->>Agent2: 返回结果
    Agent2-->>Crew: TaskOutput2

    Crew->>Crew: _create_crew_output([output1, output2])
    Crew-->>User: CrewOutput

2.2 Hierarchical Process 执行流程

sequenceDiagram
    autonumber
    participant User as 用户
    participant Crew as Crew 编排器
    participant Manager as Manager Agent
    participant Worker1 as Worker Agent1
    participant Worker2 as Worker Agent2
    participant Tool as Delegation Tool

    User->>Crew: kickoff(inputs)
    Crew->>Crew: _create_manager_agent()
    Note over Crew: 创建 Manager Agent<br/>注入 delegation_tools

    Crew->>Manager: execute_task(task, tools=[delegation_tools])
    Manager->>Manager: 分析任务需求

    alt 委托给 Worker1
        Manager->>Tool: Delegate work to Worker1
        Tool->>Worker1: execute_sub_task(description)
        Worker1->>Worker1: 执行工作
        Worker1-->>Tool: 返回结果
        Tool-->>Manager: Worker1 结果
    end

    alt 委托给 Worker2
        Manager->>Tool: Ask question to Worker2
        Tool->>Worker2: execute_sub_task(question)
        Worker2->>Worker2: 回答问题
        Worker2-->>Tool: 返回答案
        Tool-->>Manager: Worker2 答案
    end

    Manager->>Manager: 整合所有结果
    Manager-->>Crew: 综合 TaskOutput
    Crew-->>User: CrewOutput

图解与要点

1) 入口与鉴权

  • 入口Crew.kickoff(inputs)
  • 鉴权:通过 security_config.fingerprint 验证 Agent/Task 来源
  • 输入验证:插值检查({placeholder} 是否有对应值)

2) 幂等性

  • 任务级幂等:通过 task.idtask.key(基于描述和预期输出的哈希)唯一标识
  • 工具幂等:工具自身需保证幂等性(CrewAI 不强制)
  • 记忆幂等:同一内容多次保存不会重复(向量相似度去重)

3) 回退策略

  • Agent 执行失败
    • 重试机制(max_retry_limit,默认 2 次)
    • 超时后抛出 TimeoutError,不重试
  • Guardrail 验证失败
    • 重新执行任务(guardrail_max_retries,默认 3 次)
    • 超过次数后抛出异常
  • LLM 调用失败
    • LiteLLM 错误不重试,直接抛出
    • 其他错误触发 Agent 级重试

4) 重试点

  • Agent.execute_task:LLM 调用失败、工具执行失败
  • Task.guardrail:输出验证失败
  • Tool 调用:部分工具支持内部重试(如网络工具)

5) 超时与资源上界

  • 任务超时max_execution_time(秒),使用 ThreadPoolExecutor 强制终止
  • 推理循环上界max_iter(默认 25),防止无限循环
  • RPM 限制max_rpm 控制每分钟请求数
  • Token 限制
    • respect_context_window=True:自动截断历史消息
    • 未强制 Token 总数上限(依赖 LLM 提供商)

3. 模块边界与交互图

3.1 模块清单

序号 模块名 路径 职责 对外 API 提供 依赖模块
01 Agent crewai/agent.py 智能体定义、工具管理、记忆访问 Agent 类 LLM、Tools、Memory
02 Task crewai/task.py 任务配置、输出验证、上下文传递 Task 类 Agent、Guardrail
03 Crew crewai/crew.py 团队编排、流程控制 Crew 类 Agent、Task、Memory
04 Flow crewai/flow/flow.py 事件驱动编排、状态管理 Flow 类、装饰器 Events
05 Memory crewai/memory/ 短期、长期、实体、外部记忆 Memory 类 RAG、Storage
06 Knowledge crewai/knowledge/ 知识源管理、检索 Knowledge 类 RAG、Storage
07 LLM crewai/llm.py LLM 抽象层、提供商适配 LLM 类、BaseLLM
08 Tools crewai/tools/ 工具注册、参数解析、执行 BaseTool 类 Agent
09 Events crewai/events/ 事件总线、监听器 EventBus 类
10 RAG crewai/rag/ 向量检索、嵌入模型 RAGPipeline 类 ChromaDB/Qdrant
11 CLI crewai/cli/ 命令行工具 crewai 命令 Crew、Flow、Project
12 Project crewai/project/ 装饰器模式(@agent/@task/@crew) CrewBase 类 Agent、Task、Crew

3.2 模块交互矩阵

调用方\被调方 Agent Task Crew Flow Memory Knowledge LLM Tools Events RAG CLI
CLI - - - - - - - - -
Crew - - - - - -
Flow - - - - - - -
Agent - - - - - -
Task - - - - - - - -
Memory - - - - - - - - - -
Knowledge - - - - - - - - - -
Tools - - - - - - - - -

交互说明

  • Crew → Agent/Task:同步调用(Sequential)或委托调用(Hierarchical)
  • Agent → Memory/Knowledge:同步调用,检索相关上下文
  • Agent → LLM:同步调用,获取推理结果
  • Agent → Tools:同步调用,执行工具操作
  • 所有模块 → Events:异步发送事件(通过 EventBus)
  • Memory/Knowledge → RAG:同步调用,向量检索

3.3 数据一致性要求

  • TaskOutput 一致性
    • 强一致性:单个 Task 的输出立即可见
    • 最终一致性:Memory 的保存是异步的(后台写入)
  • Memory 一致性
    • 短期记忆(ChromaDB):无事务保证,可能丢失
    • 长期记忆(SQLite):ACID 事务保证
    • 外部记忆(Mem0):取决于外部服务

4. 关键设计与权衡

4.1 数据一致性与事务

设计决策

  • 无分布式事务:单机部署,依赖本地文件锁
  • Memory 写入策略
    • 短期记忆:立即写入 ChromaDB(无事务)
    • 长期记忆:批量提交到 SQLite(有事务)
  • 任务输出存储
    • 默认不持久化
    • 可选 output_file 持久化到文件

权衡

  • ✅ 优点:简化架构,降低运维复杂度
  • ❌ 缺点:无法保证跨存储的 ACID 特性

4.2 并发与锁策略

锁机制

  • RW 锁(Reader-Writer Lock)
    • 用于 Memory 的并发读写
    • 位置:crewai/utilities/rw_lock.py
  • 文件锁(portalocker)
    • 用于 Flow 持久化状态的并发访问
    • 防止多进程同时修改状态文件
  • 线程池限制
    • 异步任务使用 ThreadPoolExecutor
    • 默认线程数由 Python 自动管理

并发模型

  • Sequential Process:单线程执行
  • Async Tasks:多线程并发
  • Hierarchical Process:Manager 单线程,Worker 可并发

4.3 性能关键路径与可观测性

性能热点

  1. LLM 调用延迟
    • 缓存工具结果(CacheHandler)
    • 减少上下文长度(respect_context_window)
  2. Memory 检索延迟
    • 向量检索优化(score_threshold 过滤)
    • 批量查询(单次检索多个记忆)
  3. Task 上下文传递
    • 避免传递大量文本(仅传递关键片段)

可观测性指标

  • Token 使用统计
    • UsageMetrics:total_tokens、prompt_tokens、completion_tokens
    • 每个 Agent 和 Crew 级别统计
  • 任务执行时间
    • Task.start_timeTask.end_time
    • Task.execution_duration 计算
  • 事件追踪
    • OpenTelemetry 集成
    • 自动追踪 AgentExecutionStartedEvent、TaskCompletedEvent 等
  • 日志记录
    • 结构化日志(verbose 模式)
    • 文件输出(output_log_file)

4.4 配置项与行为影响

Crew 级配置

配置项 默认值 影响
process sequential 控制任务执行顺序(sequential/hierarchical)
memory False 是否启用记忆系统
max_rpm None 每分钟最大请求数(限流)
verbose False 是否打印详细日志
cache True 是否缓存工具结果
planning False 是否启用任务规划(AI 自动生成执行计划)

Agent 级配置

配置项 默认值 影响
max_iter 25 推理循环最大迭代次数
max_execution_time None 任务超时时间(秒)
allow_delegation True 是否允许委托任务给其他 Agent
reasoning False 是否在执行前生成推理计划
max_retry_limit 2 执行失败最大重试次数

Task 级配置

配置项 默认值 影响
async_execution False 是否异步执行任务
output_pydantic None 输出结构化验证(Pydantic 模型)
guardrail None 输出验证函数
guardrail_max_retries 3 Guardrail 验证失败最大重试次数

5. 典型使用场景与流程

5.1 场景一:简单顺序任务

from crewai import Agent, Task, Crew, Process

# 1. 定义 Agent
researcher = Agent(
    role="Research Analyst",
    goal="Find the latest AI trends",
    backstory="Expert in AI research",
    verbose=True
)

# 2. 定义 Task
research_task = Task(
    description="Research the top 5 AI trends in 2024",
    expected_output="A list of 5 AI trends with brief descriptions",
    agent=researcher
)

# 3. 创建 Crew 并执行
crew = Crew(
    agents=[researcher],
    tasks=[research_task],
    process=Process.sequential,
    verbose=True
)

result = crew.kickoff()
print(result.raw)

执行流程

  1. crew.kickoff() → 初始化 Agent 和 Task
  2. researcher.execute_task(research_task) → 调用 LLM
  3. LLM 返回结果 → 封装为 TaskOutput
  4. 返回 CrewOutput(包含所有 TaskOutput)

5.2 场景二:分层管理

from crewai import Agent, Task, Crew, Process

# 定义 Worker Agents
researcher = Agent(role="Researcher", goal="Research topics")
writer = Agent(role="Writer", goal="Write reports")

# 定义任务(无需指定 agent,由 Manager 分配)
task1 = Task(description="Research AI in healthcare", expected_output="Report")
task2 = Task(description="Write a summary", expected_output="Summary")

# 创建 Hierarchical Crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[task1, task2],
    process=Process.hierarchical,
    manager_llm="gpt-4"  # Manager 使用 GPT-4
)

result = crew.kickoff()

执行流程

  1. Crew 自动创建 Manager Agent
  2. Manager 分析任务,决定委托给 researcher 或 writer
  3. Worker 执行子任务并返回结果
  4. Manager 整合结果

5.3 场景三:Flow 事件驱动

from crewai.flow.flow import Flow, start, listen, router

class DataPipeline(Flow):
    @start()
    def load_data(self):
        # 加载数据
        self.state["data"] = [1, 2, 3]
        return "loaded"

    @listen("load_data")
    def process_data(self, data):
        # 处理数据
        self.state["processed"] = sum(self.state["data"])
        return "processed"

    @router("process_data")
    def decide_next(self):
        if self.state["processed"] > 5:
            return "high_value"
        return "low_value"

    @listen("high_value")
    def handle_high_value(self):
        print("High value detected!")

flow = DataPipeline()
flow.kickoff()

执行流程

  1. load_data() 自动启动(@start)
  2. process_data() 监听 load_data 完成事件
  3. decide_next() 根据状态路由到 high_valuelow_value
  4. handle_high_value() 监听 high_value 事件

6. 系统级关键场景时序图

6.1 冷启动场景

sequenceDiagram
    autonumber
    participant User as 用户
    participant Crew as Crew
    participant Memory as Memory
    participant Knowledge as Knowledge
    participant ChromaDB as ChromaDB

    User->>Crew: 首次 kickoff()
    Crew->>Memory: 初始化 Memory
    alt Memory 未启用
        Memory-->>Crew: 跳过初始化
    else Memory 启用
        Memory->>ChromaDB: 检查 Collection 是否存在
        alt Collection 不存在
            ChromaDB->>ChromaDB: 创建新 Collection
        end
        ChromaDB-->>Memory: 初始化完成
    end

    Crew->>Knowledge: 初始化 Knowledge
    alt Knowledge 未配置
        Knowledge-->>Crew: 跳过初始化
    else Knowledge 已配置
        Knowledge->>Knowledge: add_sources()
        Knowledge->>ChromaDB: 批量插入文档
        ChromaDB-->>Knowledge: 完成
    end

    Crew->>Crew: 执行任务流程
    Crew-->>User: 返回结果

6.2 峰值压测路径

sequenceDiagram
    autonumber
    participant User as 用户(并发 100)
    participant LoadBalancer as 负载均衡器
    participant Crew1 as Crew 实例1
    participant Crew2 as Crew 实例2
    participant LLM as LLM API
    participant RPMController as RPM 限流器

    par 并发请求
        User->>LoadBalancer: kickoff() * 100
        LoadBalancer->>Crew1: 分发请求 1-50
        LoadBalancer->>Crew2: 分发请求 51-100
    end

    Crew1->>RPMController: check_or_wait()
    RPMController->>RPMController: 检查当前 RPM
    alt 超过限制
        RPMController->>Crew1: sleep(delay)
    else 未超过限制
        RPMController-->>Crew1: 允许执行
    end

    Crew1->>LLM: call(messages)
    LLM-->>Crew1: 返回结果
    Crew1-->>User: 返回结果

    Note over Crew2: 同样的限流和执行流程

6.3 异常恢复场景

sequenceDiagram
    autonumber
    participant User as 用户
    participant Crew as Crew
    participant Agent as Agent
    participant LLM as LLM
    participant Memory as Memory

    User->>Crew: kickoff(inputs)
    Crew->>Agent: execute_task(task1)
    Agent->>LLM: call(messages)

    alt LLM 超时
        LLM-->>Agent: TimeoutError
        Agent->>Agent: 不重试,直接抛出
        Agent-->>Crew: 抛出 TimeoutError
        Crew-->>User: 抛出异常
    else LLM 返回错误格式
        LLM-->>Agent: 返回无效 JSON
        Agent->>Agent: 验证失败,retry_count++
        alt retry_count < max_retry_limit
            Agent->>LLM: 重试 call(messages)
            LLM-->>Agent: 返回正确结果
        else retry_count >= max_retry_limit
            Agent-->>Crew: 抛出 AgentExecutionError
        end
    else Guardrail 验证失败
        Agent->>Agent: 调用 guardrail(output)
        Agent->>Agent: 验证失败,重新生成
        Agent->>LLM: call(messages + error_context)
        LLM-->>Agent: 返回修正结果
    end

    Agent->>Memory: save(task_output)
    Agent-->>Crew: TaskOutput
    Crew-->>User: CrewOutput

说明

  • 冷启动:首次启动时初始化 Memory 和 Knowledge 存储
  • 峰值压测:RPM 限流器自动控制请求速率
  • 异常恢复:区分可重试(格式错误、Guardrail 失败)和不可重试(超时、LiteLLM 错误)

7. 附录:模块导出与版本信息

7.1 对外导出的核心 API

# crewai/__init__.py
__all__ = [
    "LLM",
    "Agent",
    "BaseLLM",
    "Crew",
    "CrewOutput",
    "Flow",
    "Knowledge",
    "LLMGuardrail",
    "Process",
    "Task",
    "TaskOutput",
    "__version__",
]

7.2 版本信息

  • 当前版本v1.0.0
  • Python 兼容>=3.10, <3.14
  • 依赖管理:UV(基于 pyproject.toml)

7.3 项目仓库与文档

  • GitHub:https://github.com/crewAIInc/crewAI
  • 官方文档:https://docs.crewai.com
  • 社区论坛:https://community.crewai.com

8. 总结

CrewAI 通过 Agent-Task-Crew-Flow 四层架构,实现了从简单顺序任务到复杂事件驱动流程的全覆盖:

  1. Agent:智能体核心,负责推理、工具调用和记忆访问
  2. Task:任务单元,定义输入输出和验证规则
  3. Crew:团队编排,支持顺序和分层流程
  4. Flow:事件驱动,实现精确控制和状态管理

该框架的设计哲学是 简单易用 + 深度可定制

  • 通过 YAML 配置和装饰器模式降低门槛
  • 通过 LLM 抽象层和工具系统提供扩展性
  • 通过记忆和知识管理支持长期上下文
  • 通过事件系统和可观测性保证生产级可靠性