CrewAI-01-Agent-完整剖析

模块概览

职责与定位

Agent(智能体)是 CrewAI 框架的核心执行单元,负责:

  1. 角色定义:通过 role、goal、backstory 定义智能体的身份和目标
  2. 任务执行:接收 Task 并通过 LLM 推理生成输出
  3. 工具管理:集成和调用各类工具(内置、自定义、MCP、平台工具等)
  4. 记忆访问:检索和保存短期、长期、实体和外部记忆
  5. 知识检索:查询知识库以增强上下文
  6. 委托协作:在分层流程中委托任务给其他 Agent
  7. 状态管理:维护执行状态、Token 统计、工具结果缓存

输入与输出

输入

  • Task 对象:包含任务描述、预期输出、上下文
  • context 字符串:来自前序任务的输出
  • tools 列表:任务特定的工具集合

输出

  • TaskOutput 对象:包含原始输出、结构化输出(Pydantic/JSON)、元数据

上下游依赖

上游依赖(被调用)

  • Crew 编排器:调用 Agent.execute_task()
  • Flow 引擎:在事件驱动流程中调用 Agent

下游依赖(调用)

  • LLM 层:调用大语言模型进行推理
  • Tools 层:执行工具调用
  • Memory 层:检索和保存记忆
  • Knowledge 层:查询知识库
  • Events 总线:发送执行事件

生命周期

stateDiagram-v2
    [*] --> 创建: Agent(...) 或 @agent 装饰器
    创建 --> 初始化: 配置 LLM、Tools、Memory
    初始化 --> 就绪: create_agent_executor()
    就绪 --> 执行: execute_task(task)
    执行 --> 推理循环: AgentExecutor.invoke()
    推理循环 --> 推理循环: LLM 调用 / 工具执行
    推理循环 --> 完成: 达到 Final Answer
    完成 --> 保存状态: 保存记忆、统计 Token
    保存状态 --> 就绪: 等待下一个任务
    就绪 --> [*]: Crew 执行结束

整体服务架构图

全局架构视图(从上游到 Agent)

flowchart TB
    subgraph "上游调用层"
        User[用户/客户端]
        Flow[Flow 引擎<br/>事件驱动流程]
    end

    subgraph "编排层 - Crew"
        CrewKickoff[Crew.kickoff<br/>启动执行]
        CrewExec[Crew._execute_tasks<br/>任务编排]
        ProcessMgr[Process Manager<br/>Sequential/Hierarchical]
        ManagerAgent[Manager Agent<br/>分层流程专用]
    end

    subgraph "任务层 - Task"
        TaskExec[Task.execute_sync/async<br/>任务执行入口]
        TaskCore[Task._execute_core<br/>核心执行逻辑]
        TaskOutput[TaskOutput<br/>结构化输出]
        Guardrail[Guardrail<br/>输出验证]
    end

    subgraph "智能体层 - Agent"
        AgentExec[Agent.execute_task<br/>任务执行]
        AgentExecutor[CrewAgentExecutor<br/>推理循环引擎]
        PromptBuilder[Prompts<br/>提示词构建]

        subgraph "上下文管理"
            MemoryRetrieval[Memory Retrieval<br/>记忆检索]
            KnowledgeRetrieval[Knowledge Retrieval<br/>知识检索]
            ContextBuilder[Context Builder<br/>上下文拼接]
        end
    end

    subgraph "执行引擎层"
        InvokeLoop[_invoke_loop<br/>推理循环]
        LLMCall[LLM.call<br/>模型调用]
        OutputParser[OutputParser<br/>输出解析]
        ActionHandler[Action Handler<br/>动作处理]

        subgraph "工具执行子系统"
            ToolUsage[ToolUsage<br/>工具使用协调]
            ToolCache[CacheHandler<br/>缓存检查]
            ToolInvoke[Tool.invoke<br/>工具执行]
            ToolResult[Tool Result<br/>结果处理]
        end
    end

    subgraph "存储层"
        ShortTermMem[(Short Term Memory<br/>ChromaDB)]
        LongTermMem[(Long Term Memory<br/>SQLite)]
        EntityMem[(Entity Memory<br/>ChromaDB)]
        ExternalMem[(External Memory<br/>Mem0)]
        KnowledgeDB[(Knowledge Base<br/>Vector DB)]
    end

    subgraph "基础设施层"
        RPM[RPM Controller<br/>速率限制]
        TokenCounter[Token Counter<br/>使用统计]
        EventBus[Event Bus<br/>事件总线]
        Security[Security Config<br/>指纹验证]
    end

    User --> CrewKickoff
    Flow --> AgentExec
    CrewKickoff --> CrewExec
    CrewExec --> ProcessMgr
    ProcessMgr --> ManagerAgent
    ProcessMgr --> TaskExec

    TaskExec --> TaskCore
    TaskCore --> AgentExec
    TaskCore --> Guardrail
    TaskCore --> TaskOutput

    AgentExec --> MemoryRetrieval
    AgentExec --> KnowledgeRetrieval
    AgentExec --> ContextBuilder
    AgentExec --> PromptBuilder
    AgentExec --> AgentExecutor

    AgentExecutor --> InvokeLoop
    InvokeLoop --> RPM
    InvokeLoop --> LLMCall
    InvokeLoop --> OutputParser
    OutputParser --> ActionHandler

    ActionHandler --> ToolUsage
    ToolUsage --> ToolCache
    ToolCache --> ToolInvoke
    ToolInvoke --> ToolResult
    ToolResult --> InvokeLoop

    MemoryRetrieval --> ShortTermMem
    MemoryRetrieval --> LongTermMem
    MemoryRetrieval --> EntityMem
    MemoryRetrieval --> ExternalMem
    KnowledgeRetrieval --> KnowledgeDB

    AgentExecutor --> TokenCounter
    AgentExecutor --> EventBus
    ToolUsage --> EventBus
    TaskCore --> EventBus

架构说明

该架构图展示了从用户请求到 Agent 执行的完整数据流和控制流:

  1. 上游调用层:用户通过 Crew 或 Flow 发起执行请求
  2. 编排层:Crew 负责任务编排和流程控制
  3. 任务层:Task 作为执行单元的封装
  4. 智能体层:Agent 负责具体任务执行
  5. 执行引擎层:推理循环和工具调用的核心逻辑
  6. 存储层:持久化的记忆和知识系统
  7. 基础设施层:支撑整个执行流程的横切关注点

模块架构图

Agent 模块内部架构

flowchart TB
    subgraph "Agent 核心层"
        Agent[Agent 类<br/>角色、目标、背景故事]
        BaseAgent[BaseAgent 抽象类<br/>通用属性和方法]
    end

    subgraph "执行引擎层"
        Executor[CrewAgentExecutor<br/>推理循环控制]
        Parser[OutputParser<br/>解析 LLM 输出]
        PromptMgr[Prompts<br/>提示词模板管理]
    end

    subgraph "工具管理层"
        ToolsHandler[ToolsHandler<br/>工具调用协调]
        AgentTools[AgentTools<br/>委托工具]
        CustomTools[自定义工具]
        MCPTools[MCP 工具包装器]
        PlatformTools[平台工具集成]
    end

    subgraph "LLM 层"
        LLM[BaseLLM 抽象层]
        FunctionCallingLLM[Function Calling LLM<br/>工具调用专用]
    end

    subgraph "存储与上下文层"
        Memory[Memory 记忆系统<br/>短期/长期/实体/外部]
        Knowledge[Knowledge 知识库<br/>向量检索]
        CacheHandler[CacheHandler<br/>工具结果缓存]
    end

    subgraph "基础设施层"
        RPMController[RPMController<br/>请求限流]
        TokenCounter[TokenCounterCallback<br/>Token 统计]
        EventBus[EventBus<br/>事件发布]
        Security[SecurityConfig<br/>指纹验证]
    end

    Agent -.继承.-> BaseAgent
    Agent --> Executor
    Agent --> ToolsHandler
    Agent --> Memory
    Agent --> Knowledge
    Agent --> RPMController
    Agent --> Security

    Executor --> Parser
    Executor --> PromptMgr
    Executor --> LLM
    Executor --> FunctionCallingLLM
    Executor --> ToolsHandler
    Executor --> TokenCounter

    ToolsHandler --> AgentTools
    ToolsHandler --> CustomTools
    ToolsHandler --> MCPTools
    ToolsHandler --> PlatformTools
    ToolsHandler --> CacheHandler

    Memory --> EventBus
    Executor --> EventBus

架构要点说明

1) Agent 与 BaseAgent 的关系

  • BaseAgent:抽象基类,定义所有 Agent 的通用接口和属性
  • Agent:具体实现类,继承自 BaseAgent,提供完整的 CrewAI Agent 功能
  • 设计目的:允许第三方实现自定义 Agent(如 LangGraph Agent、OpenAI Agent)并集成到 CrewAI

2) 执行引擎的职责划分

  • AgentExecutor:控制推理循环,管理消息历史,处理工具调用和重试逻辑
  • OutputParser:解析 LLM 的原始输出,识别 Action(工具调用)或 Final Answer
  • Prompts:根据 Agent 配置和任务描述生成系统提示词和用户提示词

3) 工具管理的多层结构

  • ToolsHandler:协调所有工具的调用,处理工具结果的最终性检查
  • AgentTools:为分层流程提供委托工具(Delegate Work、Ask Question)
  • 自定义工具:用户定义的 BaseTool 子类
  • MCP 工具:通过 MCP 协议集成的外部工具
  • 平台工具:CrewAI AMP 提供的企业应用集成工具

4) 记忆与知识的协同

  • Memory
    • 短期记忆:当前任务相关的上下文
    • 长期记忆:跨任务的经验总结
    • 实体记忆:关键实体的信息
    • 外部记忆:Mem0 集成
  • Knowledge
    • Agent 级知识:特定于某个 Agent 的知识源
    • Crew 级知识:整个团队共享的知识库
    • 检索策略:基于语义相似度的 RAG 检索

5) 并发控制与限流

  • RPMController:控制每分钟请求数,避免触发 LLM 提供商的速率限制
  • CacheHandler:缓存工具结果,减少重复调用
  • 锁机制:Memory 使用读写锁(RWLock)保证并发安全

模块间交互图

1. Crew → Task → Agent 调用链路图

sequenceDiagram
    autonumber
    participant User as 用户
    participant Crew as Crew
    participant Process as Process Manager
    participant Task as Task
    participant Agent as Agent
    participant Executor as AgentExecutor
    participant Event as EventBus

    User->>Crew: kickoff(inputs)
    activate Crew

    Crew->>Event: emit(CrewKickoffStartedEvent)
    Crew->>Crew: 初始化 Agents(设置 i18ncrew 引用)
    Crew->>Agent: set_knowledge(crew_embedder)
    Crew->>Agent: create_agent_executor()

    alt Planning 启用
        Crew->>Crew: _handle_crew_planning()
    end

    Crew->>Process: 选择执行流程

    alt Sequential Process
        Process->>Crew: _run_sequential_process()
    else Hierarchical Process
        Process->>Crew: _run_hierarchical_process()
    end

    Crew->>Crew: _execute_tasks(tasks)

    loop 遍历每个 Task
        Crew->>Crew: _get_agent_to_use(task)
        Crew->>Crew: _prepare_tools(agent, task)
        Crew->>Crew: _get_context(task, task_outputs)

        alt 异步执行
            Crew->>Task: execute_async(agent, context, tools)
        else 同步执行
            Crew->>Task: execute_sync(agent, context, tools)
            activate Task

            Task->>Event: emit(TaskStartedEvent)
            Task->>Agent: execute_task(task, context, tools)
            activate Agent

            Note over Agent: 详细执行流程见下一张图

            Agent-->>Task: return result
            deactivate Agent

            Task->>Task: _export_output(result)
            Task->>Task: 构造 TaskOutput

            alt  Guardrail
                loop 每个 Guardrail
                    Task->>Task: _invoke_guardrail_function()
                end
            end

            Task->>Event: emit(TaskCompletedEvent)
            Task-->>Crew: return TaskOutput
            deactivate Task
        end
    end

    Crew->>Crew: _create_crew_output(task_outputs)
    Crew->>Crew: calculate_usage_metrics()
    Crew->>Event: emit(CrewKickoffCompletedEvent)
    Crew-->>User: return CrewOutput
    deactivate Crew

图说明

该时序图展示了从用户调用 Crew.kickoff() 到返回最终结果的完整流程:

  1. 初始化阶段(步骤 1-5)

    • Crew 发送启动事件
    • 为所有 Agent 设置国际化配置、Crew 引用
    • 配置知识库嵌入器
    • 预创建 AgentExecutor
  2. 规划阶段(步骤 6)

    • 如果启用了 Planning,生成任务执行计划
  3. 流程选择阶段(步骤 7-9)

    • 根据 Process.sequentialProcess.hierarchical 选择执行策略
    • Sequential:按顺序执行任务
    • Hierarchical:Manager Agent 动态委托任务
  4. 任务执行循环(步骤 10-23)

    • 确定任务的执行 Agent
    • 准备工具列表(Task 工具优先于 Agent 工具)
    • 构建任务上下文(来自前序任务的输出)
    • 选择同步或异步执行
    • Task 发送开始事件并调用 Agent.execute_task()
    • Agent 执行完成后导出结构化输出
    • 应用 Guardrail 验证(如果配置)
    • 发送任务完成事件
  5. 汇总阶段(步骤 24-27)

    • 创建 CrewOutput 对象
    • 计算 Token 使用量和成本
    • 发送完成事件
    • 返回最终结果

关键设计点

  • 事件驱动:整个流程通过 EventBus 发送关键事件,便于监控和调试
  • 上下文传递:通过 context 参数将前序任务输出传递给后续任务
  • 工具优先级:Task 级工具覆盖 Agent 级工具
  • 输出验证:支持多个 Guardrail 串联验证

2. Agent 执行任务的完整流程图

sequenceDiagram
    autonumber
    participant Task as Task
    participant Agent as Agent
    participant Reasoning as Reasoning Handler
    participant Memory as Memory System
    participant Knowledge as Knowledge System
    participant Prompts as Prompts Builder
    participant Executor as AgentExecutor
    participant Event as EventBus

    Task->>Agent: execute_task(task, context, tools)
    activate Agent

    alt 启用推理规划
        Agent->>Reasoning: AgentReasoning.handle_agent_reasoning()
        activate Reasoning
        Reasoning->>Reasoning: 生成初始计划
        Reasoning->>Reasoning: 细化计划步骤
        Reasoning-->>Agent: return ReasoningOutput(plan)
        deactivate Reasoning
        Agent->>Agent:  plan 添加到 task.description
    end

    Agent->>Agent: _inject_date_to_task(task)
    Agent->>Agent: task.prompt()  task_prompt

    alt 使用结构化输出
        Agent->>Agent: 生成 output schema
        Agent->>Agent: 添加格式化指令到 task_prompt
    end

    alt 有前序上下文
        Agent->>Agent:  context 添加到 task_prompt
    end

    alt 有记忆系统
        Agent->>Event: emit(MemoryRetrievalStartedEvent)
        Agent->>Memory: ContextualMemory.build_context_for_task()
        activate Memory

        Memory->>Memory: 构建检索查询

        par 并行检索多种记忆
            Memory->>Memory: search ShortTermMemory
        and
            Memory->>Memory: search LongTermMemory
        and
            Memory->>Memory: search EntityMemory
        and
            Memory->>Memory: search ExternalMemory
        end

        Memory->>Memory: 合并检索结果
        Memory-->>Agent: return memory_context
        deactivate Memory

        Agent->>Agent:  memory_context 添加到 task_prompt
        Agent->>Event: emit(MemoryRetrievalCompletedEvent)
    end

    alt 有知识库
        Agent->>Event: emit(KnowledgeRetrievalStartedEvent)
        Agent->>Agent: _get_knowledge_search_query(task_prompt)
        Agent->>Knowledge: LLM 重写检索查询

        alt Agent 级知识库
            Agent->>Knowledge: agent.knowledge.query([search_query])
            Knowledge-->>Agent: return agent_knowledge_snippets
            Agent->>Agent: 提取知识上下文
            Agent->>Agent: 添加到 task_prompt
        end

        alt Crew 级知识库
            Agent->>Knowledge: crew.query_knowledge([search_query])
            Knowledge-->>Agent: return crew_knowledge_snippets
            Agent->>Agent: 提取知识上下文
            Agent->>Agent: 添加到 task_prompt
        end

        Agent->>Event: emit(KnowledgeRetrievalCompletedEvent)
    end

    Agent->>Agent: create_agent_executor(tools, task)
    Agent->>Prompts: 生成提示词模板
    activate Prompts
    Prompts->>Prompts: 构建系统提示词(角色、工具描述)
    Prompts->>Prompts: 构建用户提示词(任务描述)
    Prompts-->>Agent: return prompt_result
    deactivate Prompts

    alt 有训练数据
        Agent->>Agent: _training_handler(task_prompt)
    else 使用已训练数据
        Agent->>Agent: _use_trained_data(task_prompt)
    end

    alt 配置超时
        Agent->>Agent: _execute_with_timeout(task_prompt, task, timeout)
        Agent->>Executor: 在线程池中执行
    else 无超时
        Agent->>Executor: _execute_without_timeout(task_prompt, task)
    end

    activate Executor
    Note over Executor: 详细推理循环见下一张图
    Executor-->>Agent: return result
    deactivate Executor

    Agent->>Agent: 检查工具最终性标记
    Agent->>Event: emit(AgentExecutionCompletedEvent)
    Agent-->>Task: return result
    deactivate Agent

图说明

该时序图详细展示了 Agent.execute_task() 的内部执行流程,包含以下关键阶段:

1. 推理规划阶段(可选,步骤 2-6)

  • 如果启用 reasoning=True,Agent 会先生成任务执行计划
  • Reasoning Handler 调用 LLM 生成初始计划和细化步骤
  • 计划被追加到任务描述中,指导后续执行

2. 任务提示词构建阶段(步骤 7-12)

  • 注入当前日期(如果配置)
  • 调用 task.prompt() 获取基础任务描述
  • 如果使用结构化输出(output_pydantic/output_json),生成 Schema 并添加格式化指令
  • 如果有前序任务上下文,追加到提示词

3. 记忆检索阶段(可选,步骤 13-24)

  • 发送记忆检索开始事件
  • 创建 ContextualMemory 实例
  • 并行检索四种记忆类型:
    • ShortTermMemory:当前会话的任务描述和输出(ChromaDB)
    • LongTermMemory:跨会话的经验总结(SQLite)
    • EntityMemory:提取的关键实体(ChromaDB)
    • ExternalMemory:用户偏好和历史(Mem0)
  • 合并检索结果并追加到提示词
  • 发送记忆检索完成事件(含检索耗时)

4. 知识检索阶段(可选,步骤 25-38)

  • 发送知识检索开始事件
  • 使用 LLM 重写检索查询(提高检索质量)
  • 检索 Agent 级知识库(如果配置)
  • 检索 Crew 级知识库(如果配置)
  • 提取知识片段并追加到提示词
  • 发送知识检索完成事件

5. 执行器创建阶段(步骤 39-45)

  • 创建 CrewAgentExecutor 实例
  • 使用 Prompts 类生成系统提示词和用户提示词
  • 系统提示词包含:角色定义、目标、背景故事、工具描述
  • 用户提示词包含:任务描述和所有上下文

6. 训练数据处理阶段(可选,步骤 46-48)

  • 如果在训练模式(crew._train=True),记录训练数据
  • 否则,使用已训练的数据优化提示词

7. 执行阶段(步骤 49-54)

  • 如果配置了 max_execution_time,在线程池中执行(可超时)
  • 否则直接执行
  • 调用 AgentExecutor.invoke() 进入推理循环
  • 推理循环返回最终答案

8. 后处理阶段(步骤 55-57)

  • 检查是否有工具标记了 result_as_answer=True(工具结果直接作为最终答案)
  • 发送 Agent 执行完成事件
  • 返回结果给 Task

关键设计点

  • 上下文增强:通过记忆和知识检索,显著增强任务执行的准确性
  • 并行检索:多种记忆类型并行检索,减少延迟
  • 查询重写:使用 LLM 重写知识检索查询,提高相关性
  • 超时保护:防止长时间运行的任务阻塞整个系统
  • 事件可观测:关键步骤发送事件,便于监控和调试

性能考虑

  • 记忆检索:通常 50-200ms(取决于数据量和向量维度)
  • 知识检索:通常 100-500ms(包含 LLM 查询重写)
  • 推理循环:主要耗时部分,取决于任务复杂度和迭代次数

3. 推理循环(Reasoning Loop)详细时序图

sequenceDiagram
    autonumber
    participant Executor as AgentExecutor
    participant RPM as RPM Controller
    participant LLM as LLM
    participant Parser as OutputParser
    participant ToolHandler as ToolsHandler
    participant Tool as Tool
    participant Cache as CacheHandler
    participant Memory as Memory
    participant Event as EventBus
    participant Callback as StepCallback

    Executor->>Executor: invoke({"input": task_prompt, ...})
    activate Executor

    Executor->>Executor: 格式化提示词(注入变量)

    alt 使用系统提示词
        Executor->>Executor: 添加 system message
        Executor->>Executor: 添加 user message
    else 单一提示词
        Executor->>Executor: 添加 user message
    end

    Executor->>Executor: _invoke_loop()

    Note over Executor: 进入推理循环

    loop 直到达到 Final Answer  max_iter
        Executor->>Executor: iterations += 1

        alt 达到最大迭代次数
            Executor->>Executor: handle_max_iterations_exceeded()
            Executor->>Executor: 返回当前最佳答案
        end

        alt 超过上下文长度
            Executor->>Executor: is_context_length_exceeded()
            Executor->>Executor: handle_context_length()
            Executor->>Executor: 截断历史消息
        end

        Executor->>RPM: enforce_rpm_limit()
        alt 触发限流
            RPM->>RPM: 等待至下一时间窗口
        end

        Executor->>LLM: call(messages, stop=stop_words)
        activate LLM
        Note over LLM: 调用 LLM API<br/>OpenAI/Anthropic/...
        LLM-->>Executor: return llm_response
        deactivate LLM

        Executor->>Parser: process_llm_response(llm_response)
        activate Parser

        alt 解析出 Action(工具调用)
            Parser->>Parser: 提取 tool_name, tool_input
            Parser-->>Executor: return AgentAction
        else 解析出 Final Answer
            Parser->>Parser: 提取最终答案
            Parser-->>Executor: return AgentFinish
        else 解析错误
            Parser-->>Executor: raise OutputParserError
        end
        deactivate Parser

        alt 解析出 AgentAction
            Executor->>Executor: execute_tool_and_check_finality()

            Executor->>Event: emit(ToolUsageStartedEvent)

            Executor->>Cache: read(tool_name, tool_input)
            activate Cache
            alt 缓存命中
                Cache-->>Executor: return cached_result
                Note over Executor: 跳过工具执行
            else 缓存未命中
                Cache-->>Executor: return None

                Executor->>Tool: invoke(tool_input)
                activate Tool

                alt 工具执行成功
                    Tool->>Tool: _run(**parsed_args)
                    Tool-->>Executor: return tool_result
                else 工具执行失败
                    Tool-->>Executor: raise ToolException
                    Executor->>Executor: 将错误信息作为 observation
                end
                deactivate Tool

                Executor->>ToolHandler: on_tool_use(calling, output)
                activate ToolHandler
                ToolHandler->>Cache: add(tool_name, tool_input, output)
                ToolHandler->>ToolHandler: last_used_tool = calling
                deactivate ToolHandler
            end
            deactivate Cache

            Executor->>Event: emit(ToolUsageCompletedEvent)

            alt 工具标记 result_as_answer=True
                Executor->>Executor: 直接返回工具结果
            else 继续推理
                Executor->>Executor: 格式化 observation
                Executor->>Executor: 添加 observation  messages
            end

            alt  step_callback
                Executor->>Callback: step_callback(formatted_answer)
            end

        else 解析出 AgentFinish
            Executor->>Executor: 推理循环结束
        else OutputParserError
            Executor->>Executor: handle_output_parser_exception()
            Executor->>Executor: 添加错误提示到 messages
            Executor->>Executor: 继续循环(重试)
        end

    end

    Note over Executor: 退出推理循环

    alt 需要人工反馈
        Executor->>Executor: _handle_human_feedback()
    end

    Executor->>Memory: _create_short_term_memory(formatted_answer)
    Executor->>Memory: _create_long_term_memory(formatted_answer)
    Executor->>Memory: _create_external_memory(formatted_answer)

    Executor-->>Executor: return {"output": formatted_answer.output}
    deactivate Executor

图说明

该时序图展示了 CrewAgentExecutor._invoke_loop() 的核心推理循环逻辑,这是 Agent 执行的心脏:

1. 初始化阶段(步骤 1-8)

  • 接收任务提示词和相关参数
  • 格式化提示词,注入变量({role}{goal}{backstory}
  • 根据配置选择系统提示词模式或单一提示词模式
  • 构建初始消息列表
  • 进入 _invoke_loop() 循环

2. 循环前置检查(步骤 9-18)

  • 迭代计数:记录当前迭代次数
  • 最大迭代检查:如果达到 max_iter(默认 25),返回当前最佳答案
  • 上下文长度检查:如果消息历史超过 LLM 上下文窗口,自动截断
    • 保留系统提示词和最后 N 条消息
    • 截断策略:优先保留最近的消息
  • RPM 限流:检查是否超过每分钟请求数限制
    • 如果触发限流,等待至下一时间窗口

3. LLM 调用阶段(步骤 19-22)

  • 调用 LLM API(通过 LiteLLM 统一接口)
  • 传递消息历史和停止词(stop_words
  • 停止词通常包含:Observation:(用于解析 Action)
  • LLM 返回原始响应文本

4. 响应解析阶段(步骤 23-32)

  • 使用 OutputParser 解析 LLM 响应
  • 可能的解析结果:
    • AgentAction:包含 tool_nametool_inputlog
    • AgentFinish:包含最终答案
    • OutputParserError:解析失败

5. 工具执行分支(步骤 33-60)

如果解析出 AgentAction,进入工具执行流程:

  • 缓存检查(步骤 36-40)

    • 查询 CacheHandler,检查是否有缓存结果
    • 缓存键:{tool_name}-{tool_input}
    • 如果命中,直接返回缓存结果,跳过工具执行
    • 缓存可显著提高性能(减少重复调用)
  • 工具调用(步骤 41-50)

    • 调用 Tool.invoke(tool_input)
    • 工具内部解析参数并执行 _run() 方法
    • 如果执行成功,返回结果
    • 如果执行失败,捕获异常并将错误信息作为 observation
  • 缓存更新(步骤 51-55)

    • 调用 ToolsHandler.on_tool_use() 回调
    • 将工具结果写入 CacheHandler
    • 更新 last_used_tool 引用
  • 最终性检查(步骤 58-61)

    • 如果工具标记了 result_as_answer=True,直接返回工具结果
    • 否则,格式化 observation 并添加到消息历史
    • Observation 格式:Observation: {tool_result}
  • 回调通知(步骤 62-64)

    • 如果配置了 step_callback,执行回调
    • 回调可用于日志记录、进度更新等

6. 循环结束分支(步骤 65-69)

如果解析出 AgentFinish

  • 推理循环结束,跳出循环
  • 提取最终答案

如果解析出 OutputParserError

  • 捕获异常并记录
  • 添加错误提示到消息历史(引导 LLM 重试)
  • 继续下一次循环

7. 后处理阶段(步骤 70-77)

  • 人工反馈(可选):如果 human_input=True,请求用户反馈
  • 保存记忆
    • 短期记忆:保存任务描述和输出(当前会话)
    • 长期记忆:保存任务摘要和经验(持久化)
    • 外部记忆:同步到 Mem0(如果配置)
  • 返回最终结果

关键设计点

  • 自适应上下文管理:自动截断超长历史,避免超过 LLM 限制
  • 智能缓存:避免重复工具调用,提高响应速度
  • 错误恢复:解析错误时自动重试,增强鲁棒性
  • 限流保护:避免触发 API 速率限制
  • 记忆持久化:自动保存执行结果,支持跨会话学习

性能指标

  • 平均迭代次数:简单任务 2-5 次,复杂任务 10-20 次
  • 单次 LLM 调用:200-2000ms(取决于模型和提示词长度)
  • 工具执行:10-5000ms(取决于工具类型)
  • 缓存命中率:通常 30-60%(减少 30-60% 的工具调用)

异常处理

  • LiteLLM 错误:不重试,直接抛出(通常是配置或 API 问题)
  • 其他错误:重试(最多 max_retry_limit 次,默认 2 次)
  • 超时:由外层 _execute_with_timeout() 处理

4. 工具执行详细流程图

sequenceDiagram
    autonumber
    participant Executor as AgentExecutor
    participant ToolUsage as ToolUsage
    participant Cache as CacheHandler
    participant Tool as CrewStructuredTool
    participant ToolFunc as Tool Function
    participant Event as EventBus
    participant ToolHandler as ToolsHandler

    Executor->>ToolUsage: 执行工具调用
    activate ToolUsage

    ToolUsage->>ToolUsage: 解析 tool_name  tool_input

    alt 使用 Function Calling LLM
        ToolUsage->>ToolUsage: 使用 function_calling_llm 结构化解析
    else 使用标准解析
        ToolUsage->>ToolUsage: 正则表达式解析
    end

    ToolUsage->>Event: emit(ToolUsageStartedEvent)

    ToolUsage->>Cache: read(tool_name, tool_input)
    activate Cache
    alt 缓存命中
        Cache-->>ToolUsage: return cached_result
        ToolUsage->>ToolUsage: from_cache = True
    else 缓存未命中
        Cache-->>ToolUsage: return None
    end
    deactivate Cache

    alt 缓存未命中
        ToolUsage->>Tool: 查找工具实例
        alt 工具不存在
            ToolUsage-->>Executor: return "Tool not found"
        end

        ToolUsage->>Tool: 检查 max_usage_count
        alt 达到使用限制
            Tool-->>ToolUsage: raise ToolUsageLimitExceededError
            ToolUsage-->>Executor: return error message
        end

        ToolUsage->>Tool: invoke(tool_input)
        activate Tool

        Tool->>Tool: _parse_args(tool_input)
        alt 参数为字符串
            Tool->>Tool: 使用 function_calling_llm 解析
        else 参数为字典
            Tool->>Tool: 直接验证
        end

        alt 参数验证失败
            Tool-->>ToolUsage: raise ValidationError
            ToolUsage-->>Executor: return error message
        end

        Tool->>Tool: _increment_usage_count()

        Tool->>ToolFunc: func(**parsed_args)
        activate ToolFunc

        alt 工具执行成功
            ToolFunc-->>Tool: return result
        else 工具执行失败
            ToolFunc-->>Tool: raise Exception
            Tool->>Tool: 捕获异常
            Tool-->>ToolUsage: return error message
        end
        deactivate ToolFunc

        deactivate Tool
    end

    ToolUsage->>ToolUsage: 格式化工具结果

    alt 工具返回字典
        ToolUsage->>ToolUsage: json.dumps(result)
    else 工具返回字符串
        ToolUsage->>ToolUsage: 直接使用
    end

    ToolUsage->>ToolHandler: on_tool_use(calling, output, should_cache)
    activate ToolHandler

    alt should_cache=True 且非 CacheTools
        ToolHandler->>Cache: add(tool_name, tool_input, output)
    end

    ToolHandler->>ToolHandler: last_used_tool = calling
    deactivate ToolHandler

    ToolUsage->>Event: emit(ToolUsageCompletedEvent)

    alt 工具标记 result_as_answer=True
        ToolUsage->>ToolUsage: 设置最终性标记
    end

    ToolUsage-->>Executor: return tool_result
    deactivate ToolUsage

图说明

该时序图详细展示了工具执行的完整流程,从工具调用到结果返回:

1. 工具调用解析阶段(步骤 1-6)

  • Executor 将 AgentAction 传递给 ToolUsage
  • 解析工具名称和输入参数
  • 支持两种解析模式:
    • Function Calling LLM:使用专门的 LLM 结构化解析(更准确)
    • 标准解析:使用正则表达式解析 LLM 输出(兼容性更好)
  • 发送工具使用开始事件

2. 缓存检查阶段(步骤 7-12)

  • 查询 CacheHandler 检查是否有缓存结果
  • 缓存键格式:{tool_name}-{tool_input}(JSON 序列化)
  • 如果命中缓存:
    • 直接返回缓存结果
    • 设置 from_cache=True 标记
    • 跳过后续工具执行步骤
  • 如果未命中缓存:
    • 继续工具执行流程

3. 工具查找与验证阶段(步骤 13-21)

  • 根据 tool_name 查找工具实例
  • 如果工具不存在,返回错误信息
  • 检查工具使用次数限制(max_usage_count
  • 如果达到限制,抛出 ToolUsageLimitExceededError
  • 这个机制防止工具被过度使用(例如:昂贵的 API 调用)

4. 参数解析与验证阶段(步骤 22-30)

  • 调用 Tool._parse_args() 解析输入参数
  • 支持两种输入格式:
    • 字符串:使用 function_calling_llm 解析为结构化参数
    • 字典:直接使用 Pydantic Schema 验证
  • 使用 Pydantic 验证参数类型和约束
  • 如果验证失败,返回详细的错误信息
  • 增加工具使用计数

5. 工具执行阶段(步骤 31-40)

  • 调用工具的 func(**parsed_args) 方法
  • 工具函数执行实际逻辑(可能是 API 调用、文件操作、数据查询等)
  • 如果执行成功,返回结果
  • 如果执行失败:
    • 捕获异常
    • 记录错误日志
    • 返回友好的错误信息(而非抛出异常)
    • 这确保推理循环不会因工具错误而中断

6. 结果格式化阶段(步骤 41-47)

  • 格式化工具结果为字符串
  • 如果结果是字典或列表,使用 json.dumps() 序列化
  • 如果结果是字符串,直接使用
  • 格式化后的结果作为 observation 添加到消息历史

7. 缓存更新阶段(步骤 48-53)

  • 调用 ToolsHandler.on_tool_use() 回调
  • 如果 should_cache=True 且工具不是 CacheTools:
    • 将结果写入 CacheHandler
    • 缓存键:{tool_name}-{tool_input}
  • 更新 last_used_tool 引用(用于检查工具最终性)

8. 事件发送与返回阶段(步骤 54-59)

  • 发送工具使用完成事件(含耗时、缓存命中等信息)
  • 检查工具是否标记了 result_as_answer=True
  • 如果标记,设置最终性标记(推理循环直接返回工具结果)
  • 返回工具结果给 Executor

关键设计点

  • 缓存优先:优先使用缓存,避免重复调用(提高性能)
  • 参数验证:严格的参数验证,确保工具调用的正确性
  • 使用限制:防止工具被过度使用(成本控制)
  • 错误容错:工具错误不中断推理循环,返回错误信息供 LLM 参考
  • 最终性标记:允许工具直接返回最终答案(跳过后续推理)

工具类型示例

  1. 搜索工具SerperDevToolDuckDuckGoSearchTool

    • 执行时间:200-1000ms
    • 缓存收益:高(相同查询重复率高)
  2. 文件工具FileReadToolFileWriteTool

    • 执行时间:10-100ms
    • 缓存收益:中
  3. API 工具RestAPIToolGraphQLTool

    • 执行时间:100-5000ms
    • 缓存收益:高(相同请求重复率高)
  4. 代码执行工具CodeInterpreterTool

    • 执行时间:500-10000ms
    • 缓存收益:低(代码通常不重复)
  5. 委托工具DelegateWorkToolAskQuestionTool

    • 执行时间:取决于被委托 Agent 的执行时间
    • 缓存收益:低(任务描述通常不重复)

性能优化建议

  • 缓存策略:对幂等工具启用缓存,对非幂等工具禁用
  • 并发调用:对独立的工具调用使用异步执行
  • 超时设置:为长时间工具设置超时
  • 结果压缩:对大结果使用压缩存储

数据结构 UML 图

Agent 核心类图

classDiagram
    class BaseAgent {
        <<abstract>>
        +UUID4 id
        +str role
        +str goal
        +str backstory
        +bool cache
        +bool verbose
        +int max_rpm
        +bool allow_delegation
        +list[BaseTool] tools
        +int max_iter
        +Any agent_executor
        +Any llm
        +Any crew
        +I18N i18n
        +CacheHandler cache_handler
        +ToolsHandler tools_handler
        +list[dict] tools_results
        +int max_tokens
        +Knowledge knowledge
        +list[BaseKnowledgeSource] knowledge_sources
        +SecurityConfig security_config
        +list[PlatformAppOrAction] apps
        +list[str] mcps
        +execute_task(task, context, tools)*
        +create_agent_executor(tools)*
        +get_delegation_tools(agents)*
        +interpolate_inputs(inputs)
        +set_cache_handler(cache_handler)
        +copy()
        +set_rpm_controller(rpm_controller)
    }

    class Agent {
        +int max_execution_time
        +Any step_callback
        +bool use_system_prompt
        +str system_template
        +str prompt_template
        +str response_template
        +bool allow_code_execution
        +bool respect_context_window
        +int max_retry_limit
        +bool multimodal
        +bool inject_date
        +str date_format
        +Literal["safe", "unsafe"] code_execution_mode
        +bool reasoning
        +int max_reasoning_attempts
        +EmbedderConfig embedder
        +str agent_knowledge_context
        +str crew_knowledge_context
        +str knowledge_search_query
        +str from_repository
        +GuardrailType guardrail
        +int guardrail_max_retries
        +execute_task(task, context, tools)
        +create_agent_executor(tools, task)
        +get_delegation_tools(agents)
        +get_platform_tools(apps)
        +get_mcp_tools(mcps)
        +get_multimodal_tools()
        +get_code_execution_tools()
        +kickoff(messages, response_format)
        +kickoff_async(messages, response_format)
        -_execute_with_timeout(task_prompt, task, timeout)
        -_execute_without_timeout(task_prompt, task)
        -_get_knowledge_search_query(task_prompt, task)
        -_inject_date_to_task(task)
        -_validate_docker_installation()
    }

    class CrewAgentExecutor {
        +BaseLLM llm
        +Task task
        +Crew crew
        +Agent agent
        +SystemPromptResult | StandardPromptResult prompt
        +int max_iter
        +list[CrewStructuredTool] tools
        +str tools_names
        +list[str] stop
        +str tools_description
        +ToolsHandler tools_handler
        +Any step_callback
        +list[BaseTool] original_tools
        +BaseLLM function_calling_llm
        +bool respect_context_window
        +Callable request_within_rpm_limit
        +list[LLMMessage] messages
        +int iterations
        +invoke(inputs)
        -_invoke_loop()
        -_format_prompt(prompt, inputs)
        -_show_start_logs()
        -_handle_human_feedback(formatted_answer)
        -_create_short_term_memory(formatted_answer)
        -_create_long_term_memory(formatted_answer)
        -_create_external_memory(formatted_answer)
    }

    class ToolsHandler {
        +BaseTool last_used_tool
        +CacheHandler cache
        +handle_tool_call(tool_call, agent, task)
        +on_tool_use(tool_usage)
    }

    class CacheHandler {
        +dict _cache
        +hit(key)
        +read(tool, input)
        +add(tool, input, output)
    }

    BaseAgent <|-- Agent : 继承
    Agent --> CrewAgentExecutor : 使用
    Agent --> ToolsHandler : 使用
    CrewAgentExecutor --> ToolsHandler : 使用
    ToolsHandler --> CacheHandler : 使用

关键字段说明

BaseAgent 核心字段

字段 类型 约束 默认值 说明
id UUID4 frozen=True uuid4() Agent 唯一标识符,不可修改
role str required - Agent 的角色定义
goal str required - Agent 的目标
backstory str required - Agent 的背景故事,用于增强 LLM 理解
cache bool - True 是否启用工具结果缓存
verbose bool - False 是否打印详细日志
max_rpm int | None - None 每分钟最大请求数限制
allow_delegation bool - False 是否允许委托任务给其他 Agent
tools list[BaseTool] - [] Agent 可用的工具列表
max_iter int - 25 推理循环最大迭代次数
agent_executor Any - None AgentExecutor 实例
llm Any - None LLM 实例
crew Any - None 所属的 Crew 实例
tools_results list[dict] - [] 工具执行结果缓存
knowledge Knowledge | None - None Agent 级知识库
knowledge_sources list[BaseKnowledgeSource] - None 知识源列表
security_config SecurityConfig - default() 安全配置(指纹验证)
apps list[PlatformAppOrAction] - None 平台应用集成列表
mcps list[str] - None MCP 服务器引用列表

Agent 扩展字段

字段 类型 约束 默认值 说明
max_execution_time int | None - None 任务超时时间(秒)
step_callback Any | None - None 每步执行后的回调函数
use_system_prompt bool - True 是否使用系统提示词
system_template str | None - None 自定义系统提示词模板
prompt_template str | None - None 自定义用户提示词模板
response_template str | None - None 自定义响应模板
allow_code_execution bool - False 是否允许执行代码(需 Docker)
respect_context_window bool - True 是否自动截断超长上下文
max_retry_limit int - 2 执行失败最大重试次数
multimodal bool - False 是否支持多模态输入(图片等)
inject_date bool - False 是否自动注入当前日期
date_format str - %Y-%m-%d 日期格式字符串
code_execution_mode Literal[“safe”, “unsafe”] - “safe” 代码执行模式(Docker 或直接执行)
reasoning bool - False 是否启用推理规划(执行前生成计划)
max_reasoning_attempts int | None - None 推理规划最大尝试次数
embedder EmbedderConfig | None - None 嵌入模型配置
from_repository str | None - None 从 Agent 仓库加载配置
guardrail GuardrailType | None - None Agent 级输出验证函数
guardrail_max_retries int - 3 Guardrail 验证失败最大重试次数

API 详细规格

API 1: execute_task

基本信息

  • 方法签名Agent.execute_task(task: Task, context: str | None = None, tools: list[BaseTool] | None = None) -> Any
  • 调用方式:实例方法
  • 幂等性:否(每次调用可能产生不同结果)

请求结构体

# 方法参数
class ExecuteTaskParams:
    task: Task                           # 要执行的任务
    context: Optional[str] = None        # 来自前序任务的上下文
    tools: Optional[list[BaseTool]] = None  # 任务特定的工具列表
字段 类型 必填 默认 约束 说明
task Task - 必须是有效的 Task 对象 要执行的任务实例
context str | None None - 来自前序任务的输出文本
tools list[BaseTool] | None None - 覆盖 Agent 默认工具的工具列表

响应结构体

# 返回值为 LLM 生成的原始字符串
# 经过后续处理后封装为 TaskOutput
return_value: Any  # 实际为 str,但类型标注为 Any

入口函数与核心代码

def execute_task(
    self,
    task: Task,
    context: str | None = None,
    tools: list[BaseTool] | None = None,
) -> Any:
    """执行任务的核心方法"""

    # 1. 启用推理规划(如果配置)
    if self.reasoning:
        reasoning_handler = AgentReasoning(task=task, agent=self)
        reasoning_output = reasoning_handler.handle_agent_reasoning()
        task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"

    # 2. 注入日期到任务描述(如果配置)
    self._inject_date_to_task(task)

    # 3. 构建任务提示词
    task_prompt = task.prompt()

    # 4. 添加输出格式指令(如果使用结构化输出)
    if task.output_json or task.output_pydantic:
        schema = generate_model_description(task.output_json or task.output_pydantic)
        task_prompt += self.i18n.slice("formatted_task_instructions").format(
            output_format=schema
        )

    # 5. 添加上下文(如果有)
    if context:
        task_prompt = self.i18n.slice("task_with_context").format(
            task=task_prompt, context=context
        )

    # 6. 检索记忆并添加到提示词
    if self._is_any_available_memory():
        memory = contextual_memory.build_context_for_task(task, context or "")
        task_prompt += self.i18n.slice("memory").format(memory=memory)

    # 7. 检索知识并添加到提示词
    if self.knowledge or (self.crew and self.crew.knowledge):
        self.knowledge_search_query = self._get_knowledge_search_query(task_prompt, task)
        if self.knowledge_search_query:
            # 检索 Agent 级知识
            if self.knowledge:
                agent_knowledge_snippets = self.knowledge.query([self.knowledge_search_query])
                self.agent_knowledge_context = extract_knowledge_context(agent_knowledge_snippets)
                task_prompt += self.agent_knowledge_context

            # 检索 Crew 级知识
            knowledge_snippets = self.crew.query_knowledge([self.knowledge_search_query])
            self.crew_knowledge_context = extract_knowledge_context(knowledge_snippets)
            task_prompt += self.crew_knowledge_context

    # 8. 创建 AgentExecutor 实例
    tools = tools or self.tools or []
    self.create_agent_executor(tools=tools, task=task)

    # 9. 处理训练数据(如果在训练模式)
    if self.crew and self.crew._train:
        task_prompt = self._training_handler(task_prompt=task_prompt)
    else:
        task_prompt = self._use_trained_data(task_prompt=task_prompt)

    # 10. 执行任务(带超时或不带超时)
    if self.max_execution_time is not None:
        result = self._execute_with_timeout(task_prompt, task, self.max_execution_time)
    else:
        result = self._execute_without_timeout(task_prompt, task)

    # 11. 检查工具最终性(如果工具标记了 result_as_answer)
    for tool_result in self.tools_results:
        if tool_result.get("result_as_answer", False):
            result = tool_result["result"]

    # 12. 发送执行完成事件
    crewai_event_bus.emit(
        self,
        event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
    )

    return result

调用链与上游函数

调用路径Crew.kickoff()Crew._execute_tasks()Agent.execute_task()

# 上游调用者:Crew._execute_tasks()
def _execute_tasks(self, tasks: list[Task], start_index: int = 0) -> CrewOutput:
    """Crew 执行任务的核心流程"""
    task_outputs: list[TaskOutput] = []

    for task_index, task in enumerate(tasks):
        agent_to_use = self._get_agent_to_use(task)
        tools_for_task = task.tools or agent_to_use.tools or []
        context = self._get_context(task, task_outputs)

        # 调用 Agent.execute_task
        if task.async_execution:
            future = task.execute_async(
                agent=agent_to_use,
                context=context,
                tools=tools_for_task,
            )
            futures.append((task, future, task_index))
        else:
            task_output = task.execute_sync(
                agent=agent_to_use,
                context=context,
                tools=tools_for_task,
            )
            task_outputs.append(task_output)

    return self._create_crew_output(task_outputs)

Task.execute_sync() 的实现

# Task.execute_sync() 内部调用 Agent.execute_task()
def execute_sync(
    self,
    agent: BaseAgent | None = None,
    context: str | None = None,
    tools: list[BaseTool] | None = None,
) -> TaskOutput:
    """同步执行任务"""
    agent = agent or self.agent
    if not agent:
        raise Exception(f"The task '{self.description}' has no agent assigned")

    # 调用 Agent.execute_task
    result = agent.execute_task(
        task=self,
        context=context,
        tools=tools,
    )

    # 导出结构化输出
    pydantic_output, json_output = self._export_output(result)

    # 创建 TaskOutput
    task_output = TaskOutput(
        description=self.description,
        raw=result,
        pydantic=pydantic_output,
        json_dict=json_output,
        agent=agent.role,
        output_format=self._get_output_format(),
    )

    return task_output

时序图(请求→响应完整路径)

sequenceDiagram
    autonumber
    participant Crew as Crew
    participant Task as Task
    participant Agent as Agent
    participant Executor as AgentExecutor
    participant LLM as LLM
    participant Memory as Memory
    participant Knowledge as Knowledge
    participant Tool as Tool

    Crew->>Task: execute_sync(agent, context, tools)
    Task->>Agent: execute_task(task, context, tools)

    alt 启用推理规划
        Agent->>Agent: AgentReasoning.handle_agent_reasoning()
    end

    Agent->>Agent: 构建 task_prompt

    alt 有记忆系统
        Agent->>Memory: build_context_for_task(task, context)
        Memory-->>Agent: memory_snippets
        Agent->>Agent: 添加 memory  task_prompt
    end

    alt 有知识库
        Agent->>Agent: _get_knowledge_search_query(task_prompt)
        Agent->>Knowledge: query([search_query])
        Knowledge-->>Agent: knowledge_snippets
        Agent->>Agent: 添加 knowledge  task_prompt
    end

    Agent->>Agent: create_agent_executor(tools, task)
    Agent->>Executor: invoke({"input": task_prompt})

    loop 推理循环(最多 max_iter 次)
        Executor->>LLM: call(messages)
        LLM-->>Executor: response

        alt 解析出 Action(工具调用)
            Executor->>Tool: execute(tool_input)
            Tool-->>Executor: tool_output
            Executor->>Executor: 添加 tool_output  messages
        else 解析出 Final Answer
            Executor->>Executor: 返回 Final Answer
        end
    end

    Executor-->>Agent: output
    Agent->>Memory: save(output)
    Agent-->>Task: return result
    Task->>Task: _export_output(result)
    Task-->>Crew: TaskOutput

边界与异常

异常情况处理

  1. 任务超时

    • 异常类型:TimeoutError
    • 触发条件:执行时间超过 max_execution_time
    • 处理方式:不重试,直接抛出异常
    • 回退策略:无,由调用方处理
  2. LLM 调用失败

    • 异常类型:LiteLLM 相关异常
    • 触发条件:LLM API 返回错误
    • 处理方式:LiteLLM 错误不重试,其他错误重试(最多 max_retry_limit 次)
    • 回退策略:重试失败后抛出 AgentExecutionError
  3. 工具执行失败

    • 异常类型:工具抛出的异常
    • 触发条件:工具内部执行错误
    • 处理方式:记录错误,继续推理循环
    • 回退策略:LLM 可以选择重试工具或换用其他工具
  4. 记忆/知识检索失败

    • 异常类型:ChromaDBErrorSQLiteError
    • 触发条件:存储服务不可用
    • 处理方式:记录警告,继续执行(不阻塞任务)
    • 回退策略:降级为无记忆/知识模式

幂等性与重复请求

  • 任务级幂等:同一 Task 对象多次执行,每次可能产生不同结果(因为 LLM 是非确定性的)
  • 工具幂等:取决于工具本身的实现,CrewAI 不强制工具幂等
  • 缓存机制:如果启用 cache=True,相同的工具调用会返回缓存结果

资源限制

  • 最大迭代次数max_iter(默认 25),超过后返回当前最佳答案
  • 执行超时max_execution_time(秒),超时后强制终止
  • RPM 限制max_rpm 控制每分钟请求数

实践与最佳实践

1. 客户端重试建议

  • 对于超时错误,增加 max_execution_time
  • 对于 LLM 错误,检查 API Key 和配额
  • 对于工具错误,验证工具参数和权限

2. 批量任务处理

  • 使用 async_execution=True 并行执行多个任务
  • 在 Crew 级别使用 kickoff_for_each(inputs) 批量处理

3. 性能优化

  • 启用 cache=True 缓存工具结果
  • 使用 respect_context_window=True 自动截断历史
  • 配置 max_iter 避免过多迭代

4. 记忆与知识的权衡

  • 记忆检索适合动态上下文(会话历史)
  • 知识检索适合静态文档(产品手册、FAQ)
  • 避免同时使用大量记忆和知识(会超长上下文)

API 2: create_agent_executor

基本信息

  • 方法签名Agent.create_agent_executor(tools: list[BaseTool] | None = None, task: Task | None = None) -> None
  • 调用方式:实例方法(内部调用)
  • 幂等性:是(重复调用会覆盖之前的 executor)

功能说明

创建并配置 CrewAgentExecutor 实例,该实例负责管理推理循环和工具调用。

核心代码

def create_agent_executor(
    self, tools: list[BaseTool] | None = None, task: Task | None = None
) -> None:
    """创建 AgentExecutor"""
    raw_tools: list[BaseTool] = tools or self.tools or []
    parsed_tools = parse_tools(raw_tools)

    # 生成提示词模板
    prompt = Prompts(
        agent=self,
        has_tools=len(raw_tools) > 0,
        i18n=self.i18n,
        use_system_prompt=self.use_system_prompt,
        system_template=self.system_template,
        prompt_template=self.prompt_template,
        response_template=self.response_template,
    ).task_execution()

    # 生成停止词列表
    stop_words = [self.i18n.slice("observation")]
    if self.response_template:
        stop_words.append(
            self.response_template.split("{{ .Response }}")[1].strip()
        )

    # 创建 Executor 实例
    self.agent_executor = CrewAgentExecutor(
        llm=self.llm,
        task=task,
        agent=self,
        crew=self.crew,
        tools=parsed_tools,
        prompt=prompt,
        original_tools=raw_tools,
        stop_words=stop_words,
        max_iter=self.max_iter,
        tools_handler=self.tools_handler,
        tools_names=get_tool_names(parsed_tools),
        tools_description=render_text_description_and_args(parsed_tools),
        step_callback=self.step_callback,
        function_calling_llm=self.function_calling_llm,
        respect_context_window=self.respect_context_window,
        request_within_rpm_limit=(
            self._rpm_controller.check_or_wait if self._rpm_controller else None
        ),
        callbacks=[TokenCalcHandler(self._token_process)],
    )

关键设计点

  1. 提示词生成:根据 Agent 配置动态生成系统提示词和用户提示词
  2. 工具解析:将 BaseTool 转换为 CrewStructuredTool,提取参数 schema
  3. 停止词配置:根据响应模板动态生成停止词
  4. RPM 限流集成:将 RPMController 的回调函数传递给 Executor
  5. Token 统计:注册 TokenCalcHandler 回调以统计 Token 使用

API 3: get_delegation_tools

基本信息

  • 方法签名Agent.get_delegation_tools(agents: list[BaseAgent]) -> list[BaseTool]
  • 调用方式:实例方法
  • 幂等性:是(相同输入返回相同结果)

功能说明

为分层流程生成委托工具,允许 Manager Agent 委托任务给 Worker Agents。

请求结构体

字段 类型 必填 默认 约束 说明
agents list[BaseAgent] - - 可委托的 Worker Agents

响应结构体

返回两个工具的列表:

  1. Delegate Work To Coworker

    • 描述:委托具体任务给指定的 Worker Agent
    • 参数:coworker(Agent 角色)、task(任务描述)、context(上下文信息)
  2. Ask Question To Coworker

    • 描述:向指定的 Worker Agent 提问
    • 参数:coworker(Agent 角色)、question(问题)、context(上下文信息)

核心代码

def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
    """生成委托工具"""
    agent_tools = AgentTools(agents=agents)
    return agent_tools.tools()

AgentTools 的实现

class AgentTools(BaseTool):
    """提供委托和提问工具"""
    agents: list[BaseAgent]

    def tools(self) -> list[BaseTool]:
        tools = [
            self._delegate_work(),
            self._ask_question(),
        ]
        return tools

    def _delegate_work(self) -> BaseTool:
        """创建委托工作工具"""
        class DelegateWorkInput(BaseModel):
            coworker: str = Field(description="Agent 角色")
            task: str = Field(description="任务描述")
            context: str = Field(description="上下文信息")

        return Tool(
            name="Delegate work to coworker",
            description="Delegate a task to a specific coworker",
            args_schema=DelegateWorkInput,
            func=self._execute_delegation,
        )

    def _execute_delegation(self, coworker: str, task: str, context: str) -> str:
        """执行委托"""
        agent = self._get_agent_by_role(coworker)
        if not agent:
            return f"Error: Agent '{coworker}' not found"

        # 创建临时 Task 并执行
        sub_task = Task(description=task, expected_output="Result", agent=agent)
        result = agent.execute_task(sub_task, context=context)
        return result

时序图

sequenceDiagram
    autonumber
    participant Manager as Manager Agent
    participant DelegateTool as Delegate Tool
    participant Worker as Worker Agent
    participant Executor as AgentExecutor

    Manager->>Manager: 分析任务需求
    Manager->>DelegateTool: Delegate work to Worker1
    DelegateTool->>Worker: execute_task(sub_task, context)
    Worker->>Executor: invoke(task_prompt)
    Executor->>Executor: 执行推理循环
    Executor-->>Worker: 返回结果
    Worker-->>DelegateTool: 返回结果
    DelegateTool-->>Manager: 返回结果
    Manager->>Manager: 整合结果

API 4: kickoff

基本信息

  • 方法签名Agent.kickoff(messages: str | list[LLMMessage], response_format: type[Any] | None = None) -> LiteAgentOutput
  • 调用方式:实例方法
  • 幂等性:否(基于 LLM 生成)

功能说明

使用 LiteAgent 模式执行简单的 Agent 交互,适用于单轮对话或快速原型。

请求结构体

字段 类型 必填 默认 约束 说明
messages str | list[LLMMessage] - - 用户消息或消息列表
response_format type[Any] | None None - Pydantic 模型,用于结构化输出

响应结构体

class LiteAgentOutput(BaseModel):
    output: str                       # 原始输出
    pydantic: BaseModel | None        # 结构化输出(如果指定 response_format)
    json_dict: dict[str, Any] | None  # JSON 输出
    token_usage: dict[str, int]       # Token 使用统计

核心代码

def kickoff(
    self,
    messages: str | list[LLMMessage],
    response_format: type[Any] | None = None,
) -> LiteAgentOutput:
    """使用 LiteAgent 执行"""
    lite_agent = LiteAgent(
        id=self.id,
        role=self.role,
        goal=self.goal,
        backstory=self.backstory,
        llm=self.llm,
        tools=self.tools or [],
        max_iterations=self.max_iter,
        max_execution_time=self.max_execution_time,
        respect_context_window=self.respect_context_window,
        verbose=self.verbose,
        response_format=response_format,
        i18n=self.i18n,
        original_agent=self,
        guardrail=self.guardrail,
        guardrail_max_retries=self.guardrail_max_retries,
    )

    return lite_agent.kickoff(messages)

使用场景

  • 快速原型:无需定义 Task 即可测试 Agent 行为
  • 单轮对话:简单的问答场景
  • 流式输出:配合 kickoff_async() 实现流式响应


关键功能流程剖析

功能 1:提示词生成(Prompt Generation)

功能描述

提示词生成是 Agent 执行的第一步,负责将任务描述、Agent 角色、工具描述、记忆和知识等信息组装成结构化的提示词,指导 LLM 的行为。

核心代码实现

Prompts 类的实现lib/crewai/src/crewai/utilities/prompts.py):

class Prompts(BaseModel):
    """管理和生成 Agent 的提示词"""

    i18n: I18N = Field(default_factory=I18N)
    has_tools: bool = Field(default=False, description="是否有工具")
    system_template: str | None = Field(default=None, description="自定义系统提示词模板")
    prompt_template: str | None = Field(default=None, description="自定义用户提示词模板")
    response_template: str | None = Field(default=None, description="自定义响应模板")
    use_system_prompt: bool | None = Field(default=False, description="是否使用系统提示词")
    agent: Any = Field(description="Agent 引用")

    def task_execution(self) -> SystemPromptResult | StandardPromptResult:
        """生成任务执行的提示词"""
        # 1. 构建提示词组件
        slices: list[str] = ["role_playing"]  # 角色扮演部分

        if self.has_tools:
            slices.append("tools")  # 工具使用说明
        else:
            slices.append("no_tools")  # 无工具说明

        # 2. 构建系统提示词
        system: str = self._build_prompt(slices)

        # 3. 添加任务组件
        slices.append("task")

        # 4. 根据配置选择提示词模式
        if not self.system_template and not self.prompt_template and self.use_system_prompt:
            # 分离系统提示词和用户提示词
            return SystemPromptResult(
                system=system,
                user=self._build_prompt(["task"]),
                prompt=self._build_prompt(slices),
            )
        else:
            # 单一提示词模式
            return StandardPromptResult(
                prompt=self._build_prompt(
                    slices,
                    self.system_template,
                    self.prompt_template,
                    self.response_template,
                )
            )

    def _build_prompt(
        self,
        components: list[str],
        system_template: str | None = None,
        prompt_template: str | None = None,
        response_template: str | None = None,
    ) -> str:
        """构建提示词字符串"""
        prompt: str

        if not system_template or not prompt_template:
            # 使用默认格式
            prompt_parts: list[str] = [
                self.i18n.slice(component) for component in components
            ]
            prompt = "".join(prompt_parts)
        else:
            # 使用自定义模板
            template_parts: list[str] = [
                self.i18n.slice(component)
                for component in components
                if component != "task"
            ]
            system: str = system_template.replace(
                "{{ .System }}", "".join(template_parts)
            )
            prompt = prompt_template.replace(
                "{{ .Prompt }}", "".join(self.i18n.slice("task"))
            )

            if response_template:
                response: str = response_template.split("{{ .Response }}")[0]
                prompt = f"{system}\n{prompt}\n{response}"
            else:
                prompt = f"{system}\n{prompt}"

        # 5. 注入 Agent 的角色信息
        return (
            prompt.replace("{goal}", self.agent.goal)
            .replace("{role}", self.agent.role)
            .replace("{backstory}", self.agent.backstory)
        )

提示词结构分析

1. 角色扮演部分(role_playing

CrewAI 使用国际化的提示词模板(位于 i18n 目录),标准的角色扮演提示词包含:

You are {role}.
{backstory}

Your personal goal is: {goal}

2. 工具使用说明(tools

如果 Agent 有工具,添加工具使用指南:

To use a tool, please use the following format:

Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action, always a simple python dictionary, enclosed in curly braces, using \" to wrap keys and values.
Observation: the result of the action

When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format:

Thought: Do I need to use a tool? No
Final Answer: [your response here]

3. 任务描述(task

Begin! This is VERY important to you, your job depends on it!

Current Task: {task_description}

Expected Output: {expected_output}

提示词组装流程

flowchart TB
    Start[开始] --> LoadI18N[加载国际化模板]
    LoadI18N --> SelectMode{选择模式}

    SelectMode -->|SystemPrompt| SplitMode[分离系统/用户提示词]
    SelectMode -->|Standard| SingleMode[单一提示词]

    SplitMode --> BuildSystem[构建系统提示词]
    BuildSystem --> AddRole[角色扮演]
    AddRole --> AddToolGuide[工具使用指南]
    AddToolGuide --> BuildUser[构建用户提示词]
    BuildUser --> AddTask[任务描述]

    SingleMode --> CombineAll[合并所有组件]
    CombineAll --> AddRole2[角色扮演]
    AddRole2 --> AddToolGuide2[工具使用指南]
    AddToolGuide2 --> AddTask2[任务描述]

    AddTask --> InjectVars[注入变量]
    AddTask2 --> InjectVars

    InjectVars --> ReplaceRole[替换 {role}]
    ReplaceRole --> ReplaceGoal[替换 {goal}]
    ReplaceGoal --> ReplaceBackstory[替换 {backstory}]
    ReplaceBackstory --> AddContext{有上下文?}

    AddContext -->|是| AppendContext[追加 context]
    AddContext -->|否| AddMemory{有记忆?}
    AppendContext --> AddMemory

    AddMemory -->|是| AppendMemory[追加 memory snippets]
    AddMemory -->|否| AddKnowledge{有知识?}
    AppendMemory --> AddKnowledge

    AddKnowledge -->|是| AppendKnowledge[追加 knowledge snippets]
    AddKnowledge -->|否| AddFormat{结构化输出?}
    AppendKnowledge --> AddFormat

    AddFormat -->|是| AppendSchema[追加 output schema]
    AddFormat -->|否| End[完成]
    AppendSchema --> End

图说明

该流程图展示了提示词从空模板到最终完整提示词的组装过程:

  1. 模板加载:从国际化文件加载基础模板
  2. 模式选择
    • SystemPrompt 模式:分离系统提示词和用户提示词(推荐用于支持 system role 的模型)
    • Standard 模式:单一提示词(兼容所有模型)
  3. 组件添加:按顺序添加角色、工具、任务等组件
  4. 变量注入:替换占位符({role}{goal}{backstory}
  5. 上下文增强:追加前序任务输出、记忆片段、知识片段
  6. 格式化指令:如果使用结构化输出,追加 Schema 说明

示例:完整提示词结构

假设一个带工具的研究 Agent,最终提示词可能如下:

系统提示词

You are Senior Research Analyst.
You are a world-class researcher with 10 years of experience in AI and machine learning.

Your personal goal is: Find the latest trends in AI technology

To use a tool, please use the following format:

Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [search_tool, file_read_tool]
Action Input: the input to the action, always a simple python dictionary
Observation: the result of the action

When you have a response, you MUST use the format:

Thought: Do I need to use a tool? No
Final Answer: [your response here]

用户提示词

Begin! This is VERY important to you, your job depends on it!

Current Task: Research the top 5 AI trends in 2024 and provide detailed analysis

Expected Output: A comprehensive report with at least 1000 words covering each trend

Context from previous task:
The company is interested in implementing AI solutions in healthcare.

Relevant memories:
- Previous research indicated strong interest in generative AI
- Healthcare regulations require HIPAA compliance

Relevant knowledge:
- AI trends 2024: Generative AI, AI safety, multimodal models...
- Healthcare AI applications: Diagnostics, drug discovery...

Please provide the output in the following JSON format:
{
  "trends": [
    {"name": "string", "description": "string", "impact": "string"}
  ],
  "recommendations": ["string"]
}

功能 2:推理循环(Reasoning Loop)

功能描述

推理循环是 Agent 执行任务的核心机制,通过多次调用 LLM 并执行工具,最终生成满足要求的输出。

核心代码

def _invoke_loop(self) -> AgentFinish:
    """执行推理循环直到完成"""
    formatted_answer = None

    while not isinstance(formatted_answer, AgentFinish):
        # 检查是否达到最大迭代次数
        if has_reached_max_iterations(self.iterations, self.max_iter):
            formatted_answer = handle_max_iterations_exceeded(
                formatted_answer,
                printer=self._printer,
                i18n=self._i18n,
                messages=self.messages,
            )
            break

        # 检查上下文长度
        if is_context_length_exceeded(
            self.messages, self.llm.model_name, self.respect_context_window
        ):
            self.messages = handle_context_length(
                self.messages, self.llm.model_name
            )

        # RPM 限流
        enforce_rpm_limit(self.request_within_rpm_limit, self._printer)

        # 调用 LLM
        llm_response = get_llm_response(
            llm=self.llm,
            messages=self.messages,
            printer=self._printer,
            stop=self.stop if self.use_stop_words else None,
        )

        # 解析响应
        try:
            formatted_answer = process_llm_response(
                llm_response, self.messages, self._printer
            )
        except OutputParserError as e:
            formatted_answer = handle_output_parser_exception(
                e, self._printer, self.i18n
            )

        # 处理 Action(工具调用)
        if isinstance(formatted_answer, AgentAction):
            tool_result = handle_agent_action_core(
                agent_action=formatted_answer,
                agent=self.agent,
                task=self.task,
                crew=self.crew,
                tools_handler=self.tools_handler,
                tools=self.tools,
                original_tools=self.original_tools,
                function_calling_llm=self.function_calling_llm,
                step_callback=self.step_callback,
                messages=self.messages,
                printer=self._printer,
            )

            # 添加工具结果到消息历史
            observation_message = self._i18n.slice("observation").format(
                observation=tool_result
            )
            self.messages.append(format_message_for_llm(observation_message))

        self.iterations += 1

    return formatted_answer

流程说明

  1. 迭代控制:最多迭代 max_iter
  2. 上下文管理:超长上下文自动截断(如果启用 respect_context_window
  3. 限流控制:调用前检查 RPM 限制
  4. LLM 调用:发送消息历史并获取响应
  5. 响应解析:识别 Action 或 Final Answer
  6. 工具执行:如果是 Action,执行工具并将结果添加到消息
  7. 循环终止:识别到 Final Answer 或达到最大迭代次数

复杂度分析

  • 时间复杂度:O(N * T),N 为迭代次数,T 为单次 LLM 调用时间
  • 空间复杂度:O(M),M 为消息历史长度
  • 最坏情况:达到 max_iter 但未生成 Final Answer

功能 2:记忆管理(Memory Management)

功能描述

Agent 在执行任务时检索相关记忆,并在完成后保存新记忆,实现跨任务的上下文累积。

核心代码

def execute_task(self, task: Task, context: str | None = None, tools: list[BaseTool] | None = None) -> Any:
    # (省略其他代码)

    # 检索记忆
    if self._is_any_available_memory():
        # 发送记忆检索开始事件
        crewai_event_bus.emit(
            self,
            event=MemoryRetrievalStartedEvent(
                task_id=str(task.id),
                source_type="agent",
                from_agent=self,
                from_task=task,
            ),
        )

        start_time = time.time()

        # 构建上下文记忆
        contextual_memory = ContextualMemory(
            self.crew._short_term_memory,
            self.crew._long_term_memory,
            self.crew._entity_memory,
            self.crew._external_memory,
            agent=self,
            task=task,
        )
        memory = contextual_memory.build_context_for_task(task, context or "")

        if memory.strip() != "":
            task_prompt += self.i18n.slice("memory").format(memory=memory)

        # 发送记忆检索完成事件
        crewai_event_bus.emit(
            self,
            event=MemoryRetrievalCompletedEvent(
                task_id=str(task.id),
                memory_content=memory,
                retrieval_time_ms=(time.time() - start_time) * 1000,
                source_type="agent",
                from_agent=self,
                from_task=task,
            ),
        )

    # (执行任务)
    result = self._execute_without_timeout(task_prompt, task)

    # 保存记忆(在 AgentExecutor 内部)
    # _create_short_term_memory(formatted_answer)
    # _create_long_term_memory(formatted_answer)
    # _create_external_memory(formatted_answer)

ContextualMemory 的实现

class ContextualMemory:
    def __init__(self, short_term, long_term, entity, external, agent, task):
        self.short_term = short_term
        self.long_term = long_term
        self.entity = entity
        self.external = external
        self.agent = agent
        self.task = task

    def build_context_for_task(self, task: Task, context: str) -> str:
        """构建任务上下文"""
        query = f"{task.description} {context}"

        # 检索短期记忆
        short_term_results = self.short_term.search(query, limit=5) if self.short_term else []

        # 检索长期记忆
        long_term_results = self.long_term.search(query, limit=3) if self.long_term else []

        # 检索实体记忆
        entity_results = self.entity.search(query, limit=3) if self.entity else []

        # 检索外部记忆
        external_results = self.external.search(query, limit=3) if self.external else []

        # 合并并格式化
        all_results = short_term_results + long_term_results + entity_results + external_results
        memory_text = "\n\n".join([result.content for result in all_results])

        return memory_text

记忆类型说明

  1. 短期记忆(Short-Term Memory)

    • 存储:ChromaDB 向量数据库
    • 内容:当前会话的任务描述和输出
    • 生命周期:单次 Crew 执行
    • 检索:基于语义相似度
  2. 长期记忆(Long-Term Memory)

    • 存储:SQLite 数据库
    • 内容:任务执行的总结和经验
    • 生命周期:跨 Crew 执行持久化
    • 检索:基于任务描述和元数据
  3. 实体记忆(Entity Memory)

    • 存储:ChromaDB 向量数据库
    • 内容:提取的关键实体(人名、地名、产品名等)
    • 生命周期:持久化
    • 检索:基于实体名称和关系
  4. 外部记忆(External Memory - Mem0)

    • 存储:Mem0 云服务
    • 内容:用户偏好、历史交互、个性化数据
    • 生命周期:全局持久化
    • 检索:基于 Mem0 的智能检索

功能 3:知识检索(Knowledge Retrieval)

功能描述

Agent 从知识库中检索相关文档片段,增强任务执行的准确性。

核心代码

def execute_task(self, task: Task, context: str | None = None, tools: list[BaseTool] | None = None) -> Any:
    # (省略其他代码)

    # 知识检索
    knowledge_config = self.knowledge_config.model_dump() if self.knowledge_config else {}

    if self.knowledge or (self.crew and self.crew.knowledge):
        crewai_event_bus.emit(
            self,
            event=KnowledgeRetrievalStartedEvent(
                from_task=task,
                from_agent=self,
            ),
        )

        try:
            # 生成检索查询
            self.knowledge_search_query = self._get_knowledge_search_query(task_prompt, task)

            if self.knowledge_search_query:
                # 检索 Agent 级知识
                if self.knowledge:
                    agent_knowledge_snippets = self.knowledge.query(
                        [self.knowledge_search_query], **knowledge_config
                    )
                    if agent_knowledge_snippets:
                        self.agent_knowledge_context = extract_knowledge_context(
                            agent_knowledge_snippets
                        )
                        task_prompt += self.agent_knowledge_context

                # 检索 Crew 级知识
                knowledge_snippets = self.crew.query_knowledge(
                    [self.knowledge_search_query], **knowledge_config
                )
                if knowledge_snippets:
                    self.crew_knowledge_context = extract_knowledge_context(
                        knowledge_snippets
                    )
                    task_prompt += self.crew_knowledge_context

                crewai_event_bus.emit(
                    self,
                    event=KnowledgeRetrievalCompletedEvent(
                        query=self.knowledge_search_query,
                        from_task=task,
                        from_agent=self,
                        retrieved_knowledge=(
                            (self.agent_knowledge_context or "")
                            + (self.crew_knowledge_context or "")
                        ),
                    ),
                )
        except Exception as e:
            crewai_event_bus.emit(
                self,
                event=KnowledgeSearchQueryFailedEvent(
                    query=self.knowledge_search_query or "",
                    error=str(e),
                    from_task=task,
                    from_agent=self,
                ),
            )

知识查询生成

def _get_knowledge_search_query(self, task_prompt: str, task: Task) -> str | None:
    """生成知识检索查询"""
    query = self.i18n.slice("knowledge_search_query").format(
        task_prompt=task_prompt
    )
    rewriter_prompt = self.i18n.slice("knowledge_search_query_system_prompt")

    try:
        # 使用 LLM 重写查询
        rewritten_query = self.llm.call(
            [
                {"role": "system", "content": rewriter_prompt},
                {"role": "user", "content": query},
            ]
        )
        return rewritten_query
    except Exception as e:
        return None

知识源类型

  1. 文本文件:TXT、MD、PDF
  2. 结构化文档:JSON、YAML、CSV
  3. 网页内容:HTML、URL
  4. 代码仓库:GitHub、GitLab
  5. API 响应:动态获取的数据

实战示例与最佳实践

示例 1:创建基础 Agent

from crewai import Agent
from crewai_tools import SerperDevTool

# 创建 Agent
researcher = Agent(
    role="Research Analyst",
    goal="Find the latest trends in AI",
    backstory="You are an expert researcher with 10 years of experience in AI.",
    tools=[SerperDevTool()],
    verbose=True,
    max_iter=15,
    max_execution_time=300,  # 5 分钟超时
)

# 直接使用 Agent(无需 Task)
result = researcher.kickoff("What are the top 5 AI trends in 2024?")
print(result.output)

示例 2:启用记忆和知识

from crewai import Agent, Crew
from crewai.knowledge import Knowledge
from crewai.knowledge.source import TextFileKnowledgeSource

# 配置知识源
knowledge = Knowledge(
    sources=[
        TextFileKnowledgeSource(file_path="./docs/ai_trends.txt"),
    ],
    collection_name="ai_trends",
)

# 创建 Agent(带知识)
analyst = Agent(
    role="AI Analyst",
    goal="Analyze AI trends",
    backstory="Expert analyst",
    knowledge_sources=[knowledge],
    verbose=True,
)

# 创建 Crew(启用记忆)
crew = Crew(
    agents=[analyst],
    tasks=[...],
    memory=True,  # 启用记忆系统
)

crew.kickoff()

示例 3:自定义工具集成

from crewai import Agent
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

class CalculatorInput(BaseModel):
    expression: str = Field(description="数学表达式")

class CalculatorTool(BaseTool):
    name: str = "Calculator"
    description: str = "执行数学计算"
    args_schema: type[BaseModel] = CalculatorInput

    def _run(self, expression: str) -> str:
        try:
            result = eval(expression)
            return f"Result: {result}"
        except Exception as e:
            return f"Error: {str(e)}"

# 创建 Agent(带自定义工具)
math_agent = Agent(
    role="Math Expert",
    goal="Solve math problems",
    backstory="You are a mathematics professor",
    tools=[CalculatorTool()],
    verbose=True,
)

示例 4:分层委托

from crewai import Agent, Task, Crew, Process

# 定义 Worker Agents
researcher = Agent(
    role="Researcher",
    goal="Research topics",
    backstory="Expert researcher",
)

writer = Agent(
    role="Writer",
    goal="Write articles",
    backstory="Professional writer",
)

# 定义任务(无需指定 agent)
task1 = Task(
    description="Research AI trends",
    expected_output="Research report",
)

task2 = Task(
    description="Write an article",
    expected_output="Article",
)

# 创建 Hierarchical Crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[task1, task2],
    process=Process.hierarchical,
    manager_llm="gpt-4",  # Manager 使用 GPT-4
)

result = crew.kickoff()

最佳实践总结

  1. 合理配置超时

    • 简单任务:60-120 秒
    • 复杂任务:300-600 秒
    • 长时间任务:考虑拆分
  2. 优化记忆和知识

    • 避免同时检索大量记忆和知识
    • 使用 score_threshold 过滤低相关性结果
    • 定期清理过期记忆
  3. 工具设计原则

    • 工具描述要清晰详细
    • 参数 schema 要完整
    • 实现幂等性(如果可能)
    • 处理错误并返回友好提示
  4. 性能优化

    • 启用 cache=True
    • 使用 respect_context_window=True
    • 配置 max_rpm 避免速率限制
    • 使用 async_execution=True 并行任务
  5. 错误处理

    • 捕获工具异常并优雅处理
    • 使用 guardrail 验证输出
    • 配置合理的重试次数
    • 记录详细日志(verbose=True

总结

Agent 模块是 CrewAI 框架的核心,通过以下机制实现强大的任务执行能力:

  1. 灵活的角色定义:role、goal、backstory 构建清晰的身份
  2. 强大的执行引擎:推理循环、工具调用、超时控制
  3. 丰富的上下文管理:记忆检索、知识查询、任务上下文
  4. 多样的工具集成:内置、自定义、MCP、平台工具
  5. 完善的可观测性:事件总线、Token 统计、日志记录

该模块的设计遵循 单一职责原则开放封闭原则,允许用户通过继承 BaseAgent 或扩展工具系统来定制行为,同时保持核心功能的稳定性。