CrewAI-01-Agent-完整剖析
模块概览
职责与定位
Agent(智能体)是 CrewAI 框架的核心执行单元,负责:
- 角色定义:通过 role、goal、backstory 定义智能体的身份和目标
- 任务执行:接收 Task 并通过 LLM 推理生成输出
- 工具管理:集成和调用各类工具(内置、自定义、MCP、平台工具等)
- 记忆访问:检索和保存短期、长期、实体和外部记忆
- 知识检索:查询知识库以增强上下文
- 委托协作:在分层流程中委托任务给其他 Agent
- 状态管理:维护执行状态、Token 统计、工具结果缓存
输入与输出
输入:
Task对象:包含任务描述、预期输出、上下文context字符串:来自前序任务的输出tools列表:任务特定的工具集合
输出:
TaskOutput对象:包含原始输出、结构化输出(Pydantic/JSON)、元数据
上下游依赖
上游依赖(被调用):
Crew编排器:调用Agent.execute_task()Flow引擎:在事件驱动流程中调用 Agent
下游依赖(调用):
LLM层:调用大语言模型进行推理Tools层:执行工具调用Memory层:检索和保存记忆Knowledge层:查询知识库Events总线:发送执行事件
生命周期
stateDiagram-v2
[*] --> 创建: Agent(...) 或 @agent 装饰器
创建 --> 初始化: 配置 LLM、Tools、Memory
初始化 --> 就绪: create_agent_executor()
就绪 --> 执行: execute_task(task)
执行 --> 推理循环: AgentExecutor.invoke()
推理循环 --> 推理循环: LLM 调用 / 工具执行
推理循环 --> 完成: 达到 Final Answer
完成 --> 保存状态: 保存记忆、统计 Token
保存状态 --> 就绪: 等待下一个任务
就绪 --> [*]: Crew 执行结束
整体服务架构图
全局架构视图(从上游到 Agent)
flowchart TB
subgraph "上游调用层"
User[用户/客户端]
Flow[Flow 引擎<br/>事件驱动流程]
end
subgraph "编排层 - Crew"
CrewKickoff[Crew.kickoff<br/>启动执行]
CrewExec[Crew._execute_tasks<br/>任务编排]
ProcessMgr[Process Manager<br/>Sequential/Hierarchical]
ManagerAgent[Manager Agent<br/>分层流程专用]
end
subgraph "任务层 - Task"
TaskExec[Task.execute_sync/async<br/>任务执行入口]
TaskCore[Task._execute_core<br/>核心执行逻辑]
TaskOutput[TaskOutput<br/>结构化输出]
Guardrail[Guardrail<br/>输出验证]
end
subgraph "智能体层 - Agent"
AgentExec[Agent.execute_task<br/>任务执行]
AgentExecutor[CrewAgentExecutor<br/>推理循环引擎]
PromptBuilder[Prompts<br/>提示词构建]
subgraph "上下文管理"
MemoryRetrieval[Memory Retrieval<br/>记忆检索]
KnowledgeRetrieval[Knowledge Retrieval<br/>知识检索]
ContextBuilder[Context Builder<br/>上下文拼接]
end
end
subgraph "执行引擎层"
InvokeLoop[_invoke_loop<br/>推理循环]
LLMCall[LLM.call<br/>模型调用]
OutputParser[OutputParser<br/>输出解析]
ActionHandler[Action Handler<br/>动作处理]
subgraph "工具执行子系统"
ToolUsage[ToolUsage<br/>工具使用协调]
ToolCache[CacheHandler<br/>缓存检查]
ToolInvoke[Tool.invoke<br/>工具执行]
ToolResult[Tool Result<br/>结果处理]
end
end
subgraph "存储层"
ShortTermMem[(Short Term Memory<br/>ChromaDB)]
LongTermMem[(Long Term Memory<br/>SQLite)]
EntityMem[(Entity Memory<br/>ChromaDB)]
ExternalMem[(External Memory<br/>Mem0)]
KnowledgeDB[(Knowledge Base<br/>Vector DB)]
end
subgraph "基础设施层"
RPM[RPM Controller<br/>速率限制]
TokenCounter[Token Counter<br/>使用统计]
EventBus[Event Bus<br/>事件总线]
Security[Security Config<br/>指纹验证]
end
User --> CrewKickoff
Flow --> AgentExec
CrewKickoff --> CrewExec
CrewExec --> ProcessMgr
ProcessMgr --> ManagerAgent
ProcessMgr --> TaskExec
TaskExec --> TaskCore
TaskCore --> AgentExec
TaskCore --> Guardrail
TaskCore --> TaskOutput
AgentExec --> MemoryRetrieval
AgentExec --> KnowledgeRetrieval
AgentExec --> ContextBuilder
AgentExec --> PromptBuilder
AgentExec --> AgentExecutor
AgentExecutor --> InvokeLoop
InvokeLoop --> RPM
InvokeLoop --> LLMCall
InvokeLoop --> OutputParser
OutputParser --> ActionHandler
ActionHandler --> ToolUsage
ToolUsage --> ToolCache
ToolCache --> ToolInvoke
ToolInvoke --> ToolResult
ToolResult --> InvokeLoop
MemoryRetrieval --> ShortTermMem
MemoryRetrieval --> LongTermMem
MemoryRetrieval --> EntityMem
MemoryRetrieval --> ExternalMem
KnowledgeRetrieval --> KnowledgeDB
AgentExecutor --> TokenCounter
AgentExecutor --> EventBus
ToolUsage --> EventBus
TaskCore --> EventBus
架构说明
该架构图展示了从用户请求到 Agent 执行的完整数据流和控制流:
- 上游调用层:用户通过 Crew 或 Flow 发起执行请求
- 编排层:Crew 负责任务编排和流程控制
- 任务层:Task 作为执行单元的封装
- 智能体层:Agent 负责具体任务执行
- 执行引擎层:推理循环和工具调用的核心逻辑
- 存储层:持久化的记忆和知识系统
- 基础设施层:支撑整个执行流程的横切关注点
模块架构图
Agent 模块内部架构
flowchart TB
subgraph "Agent 核心层"
Agent[Agent 类<br/>角色、目标、背景故事]
BaseAgent[BaseAgent 抽象类<br/>通用属性和方法]
end
subgraph "执行引擎层"
Executor[CrewAgentExecutor<br/>推理循环控制]
Parser[OutputParser<br/>解析 LLM 输出]
PromptMgr[Prompts<br/>提示词模板管理]
end
subgraph "工具管理层"
ToolsHandler[ToolsHandler<br/>工具调用协调]
AgentTools[AgentTools<br/>委托工具]
CustomTools[自定义工具]
MCPTools[MCP 工具包装器]
PlatformTools[平台工具集成]
end
subgraph "LLM 层"
LLM[BaseLLM 抽象层]
FunctionCallingLLM[Function Calling LLM<br/>工具调用专用]
end
subgraph "存储与上下文层"
Memory[Memory 记忆系统<br/>短期/长期/实体/外部]
Knowledge[Knowledge 知识库<br/>向量检索]
CacheHandler[CacheHandler<br/>工具结果缓存]
end
subgraph "基础设施层"
RPMController[RPMController<br/>请求限流]
TokenCounter[TokenCounterCallback<br/>Token 统计]
EventBus[EventBus<br/>事件发布]
Security[SecurityConfig<br/>指纹验证]
end
Agent -.继承.-> BaseAgent
Agent --> Executor
Agent --> ToolsHandler
Agent --> Memory
Agent --> Knowledge
Agent --> RPMController
Agent --> Security
Executor --> Parser
Executor --> PromptMgr
Executor --> LLM
Executor --> FunctionCallingLLM
Executor --> ToolsHandler
Executor --> TokenCounter
ToolsHandler --> AgentTools
ToolsHandler --> CustomTools
ToolsHandler --> MCPTools
ToolsHandler --> PlatformTools
ToolsHandler --> CacheHandler
Memory --> EventBus
Executor --> EventBus
架构要点说明
1) Agent 与 BaseAgent 的关系
BaseAgent:抽象基类,定义所有 Agent 的通用接口和属性Agent:具体实现类,继承自BaseAgent,提供完整的 CrewAI Agent 功能- 设计目的:允许第三方实现自定义 Agent(如 LangGraph Agent、OpenAI Agent)并集成到 CrewAI
2) 执行引擎的职责划分
- AgentExecutor:控制推理循环,管理消息历史,处理工具调用和重试逻辑
- OutputParser:解析 LLM 的原始输出,识别 Action(工具调用)或 Final Answer
- Prompts:根据 Agent 配置和任务描述生成系统提示词和用户提示词
3) 工具管理的多层结构
- ToolsHandler:协调所有工具的调用,处理工具结果的最终性检查
- AgentTools:为分层流程提供委托工具(Delegate Work、Ask Question)
- 自定义工具:用户定义的
BaseTool子类 - MCP 工具:通过 MCP 协议集成的外部工具
- 平台工具:CrewAI AMP 提供的企业应用集成工具
4) 记忆与知识的协同
- Memory:
- 短期记忆:当前任务相关的上下文
- 长期记忆:跨任务的经验总结
- 实体记忆:关键实体的信息
- 外部记忆:Mem0 集成
- Knowledge:
- Agent 级知识:特定于某个 Agent 的知识源
- Crew 级知识:整个团队共享的知识库
- 检索策略:基于语义相似度的 RAG 检索
5) 并发控制与限流
- RPMController:控制每分钟请求数,避免触发 LLM 提供商的速率限制
- CacheHandler:缓存工具结果,减少重复调用
- 锁机制:Memory 使用读写锁(RWLock)保证并发安全
模块间交互图
1. Crew → Task → Agent 调用链路图
sequenceDiagram
autonumber
participant User as 用户
participant Crew as Crew
participant Process as Process Manager
participant Task as Task
participant Agent as Agent
participant Executor as AgentExecutor
participant Event as EventBus
User->>Crew: kickoff(inputs)
activate Crew
Crew->>Event: emit(CrewKickoffStartedEvent)
Crew->>Crew: 初始化 Agents(设置 i18n、crew 引用)
Crew->>Agent: set_knowledge(crew_embedder)
Crew->>Agent: create_agent_executor()
alt Planning 启用
Crew->>Crew: _handle_crew_planning()
end
Crew->>Process: 选择执行流程
alt Sequential Process
Process->>Crew: _run_sequential_process()
else Hierarchical Process
Process->>Crew: _run_hierarchical_process()
end
Crew->>Crew: _execute_tasks(tasks)
loop 遍历每个 Task
Crew->>Crew: _get_agent_to_use(task)
Crew->>Crew: _prepare_tools(agent, task)
Crew->>Crew: _get_context(task, task_outputs)
alt 异步执行
Crew->>Task: execute_async(agent, context, tools)
else 同步执行
Crew->>Task: execute_sync(agent, context, tools)
activate Task
Task->>Event: emit(TaskStartedEvent)
Task->>Agent: execute_task(task, context, tools)
activate Agent
Note over Agent: 详细执行流程见下一张图
Agent-->>Task: return result
deactivate Agent
Task->>Task: _export_output(result)
Task->>Task: 构造 TaskOutput
alt 有 Guardrail
loop 每个 Guardrail
Task->>Task: _invoke_guardrail_function()
end
end
Task->>Event: emit(TaskCompletedEvent)
Task-->>Crew: return TaskOutput
deactivate Task
end
end
Crew->>Crew: _create_crew_output(task_outputs)
Crew->>Crew: calculate_usage_metrics()
Crew->>Event: emit(CrewKickoffCompletedEvent)
Crew-->>User: return CrewOutput
deactivate Crew
图说明:
该时序图展示了从用户调用 Crew.kickoff() 到返回最终结果的完整流程:
-
初始化阶段(步骤 1-5):
- Crew 发送启动事件
- 为所有 Agent 设置国际化配置、Crew 引用
- 配置知识库嵌入器
- 预创建 AgentExecutor
-
规划阶段(步骤 6):
- 如果启用了 Planning,生成任务执行计划
-
流程选择阶段(步骤 7-9):
- 根据
Process.sequential或Process.hierarchical选择执行策略 - Sequential:按顺序执行任务
- Hierarchical:Manager Agent 动态委托任务
- 根据
-
任务执行循环(步骤 10-23):
- 确定任务的执行 Agent
- 准备工具列表(Task 工具优先于 Agent 工具)
- 构建任务上下文(来自前序任务的输出)
- 选择同步或异步执行
- Task 发送开始事件并调用
Agent.execute_task() - Agent 执行完成后导出结构化输出
- 应用 Guardrail 验证(如果配置)
- 发送任务完成事件
-
汇总阶段(步骤 24-27):
- 创建 CrewOutput 对象
- 计算 Token 使用量和成本
- 发送完成事件
- 返回最终结果
关键设计点:
- 事件驱动:整个流程通过 EventBus 发送关键事件,便于监控和调试
- 上下文传递:通过
context参数将前序任务输出传递给后续任务 - 工具优先级:Task 级工具覆盖 Agent 级工具
- 输出验证:支持多个 Guardrail 串联验证
2. Agent 执行任务的完整流程图
sequenceDiagram
autonumber
participant Task as Task
participant Agent as Agent
participant Reasoning as Reasoning Handler
participant Memory as Memory System
participant Knowledge as Knowledge System
participant Prompts as Prompts Builder
participant Executor as AgentExecutor
participant Event as EventBus
Task->>Agent: execute_task(task, context, tools)
activate Agent
alt 启用推理规划
Agent->>Reasoning: AgentReasoning.handle_agent_reasoning()
activate Reasoning
Reasoning->>Reasoning: 生成初始计划
Reasoning->>Reasoning: 细化计划步骤
Reasoning-->>Agent: return ReasoningOutput(plan)
deactivate Reasoning
Agent->>Agent: 将 plan 添加到 task.description
end
Agent->>Agent: _inject_date_to_task(task)
Agent->>Agent: task.prompt() → task_prompt
alt 使用结构化输出
Agent->>Agent: 生成 output schema
Agent->>Agent: 添加格式化指令到 task_prompt
end
alt 有前序上下文
Agent->>Agent: 将 context 添加到 task_prompt
end
alt 有记忆系统
Agent->>Event: emit(MemoryRetrievalStartedEvent)
Agent->>Memory: ContextualMemory.build_context_for_task()
activate Memory
Memory->>Memory: 构建检索查询
par 并行检索多种记忆
Memory->>Memory: search ShortTermMemory
and
Memory->>Memory: search LongTermMemory
and
Memory->>Memory: search EntityMemory
and
Memory->>Memory: search ExternalMemory
end
Memory->>Memory: 合并检索结果
Memory-->>Agent: return memory_context
deactivate Memory
Agent->>Agent: 将 memory_context 添加到 task_prompt
Agent->>Event: emit(MemoryRetrievalCompletedEvent)
end
alt 有知识库
Agent->>Event: emit(KnowledgeRetrievalStartedEvent)
Agent->>Agent: _get_knowledge_search_query(task_prompt)
Agent->>Knowledge: LLM 重写检索查询
alt Agent 级知识库
Agent->>Knowledge: agent.knowledge.query([search_query])
Knowledge-->>Agent: return agent_knowledge_snippets
Agent->>Agent: 提取知识上下文
Agent->>Agent: 添加到 task_prompt
end
alt Crew 级知识库
Agent->>Knowledge: crew.query_knowledge([search_query])
Knowledge-->>Agent: return crew_knowledge_snippets
Agent->>Agent: 提取知识上下文
Agent->>Agent: 添加到 task_prompt
end
Agent->>Event: emit(KnowledgeRetrievalCompletedEvent)
end
Agent->>Agent: create_agent_executor(tools, task)
Agent->>Prompts: 生成提示词模板
activate Prompts
Prompts->>Prompts: 构建系统提示词(角色、工具描述)
Prompts->>Prompts: 构建用户提示词(任务描述)
Prompts-->>Agent: return prompt_result
deactivate Prompts
alt 有训练数据
Agent->>Agent: _training_handler(task_prompt)
else 使用已训练数据
Agent->>Agent: _use_trained_data(task_prompt)
end
alt 配置超时
Agent->>Agent: _execute_with_timeout(task_prompt, task, timeout)
Agent->>Executor: 在线程池中执行
else 无超时
Agent->>Executor: _execute_without_timeout(task_prompt, task)
end
activate Executor
Note over Executor: 详细推理循环见下一张图
Executor-->>Agent: return result
deactivate Executor
Agent->>Agent: 检查工具最终性标记
Agent->>Event: emit(AgentExecutionCompletedEvent)
Agent-->>Task: return result
deactivate Agent
图说明:
该时序图详细展示了 Agent.execute_task() 的内部执行流程,包含以下关键阶段:
1. 推理规划阶段(可选,步骤 2-6):
- 如果启用
reasoning=True,Agent 会先生成任务执行计划 - Reasoning Handler 调用 LLM 生成初始计划和细化步骤
- 计划被追加到任务描述中,指导后续执行
2. 任务提示词构建阶段(步骤 7-12):
- 注入当前日期(如果配置)
- 调用
task.prompt()获取基础任务描述 - 如果使用结构化输出(
output_pydantic/output_json),生成 Schema 并添加格式化指令 - 如果有前序任务上下文,追加到提示词
3. 记忆检索阶段(可选,步骤 13-24):
- 发送记忆检索开始事件
- 创建
ContextualMemory实例 - 并行检索四种记忆类型:
- ShortTermMemory:当前会话的任务描述和输出(ChromaDB)
- LongTermMemory:跨会话的经验总结(SQLite)
- EntityMemory:提取的关键实体(ChromaDB)
- ExternalMemory:用户偏好和历史(Mem0)
- 合并检索结果并追加到提示词
- 发送记忆检索完成事件(含检索耗时)
4. 知识检索阶段(可选,步骤 25-38):
- 发送知识检索开始事件
- 使用 LLM 重写检索查询(提高检索质量)
- 检索 Agent 级知识库(如果配置)
- 检索 Crew 级知识库(如果配置)
- 提取知识片段并追加到提示词
- 发送知识检索完成事件
5. 执行器创建阶段(步骤 39-45):
- 创建
CrewAgentExecutor实例 - 使用
Prompts类生成系统提示词和用户提示词 - 系统提示词包含:角色定义、目标、背景故事、工具描述
- 用户提示词包含:任务描述和所有上下文
6. 训练数据处理阶段(可选,步骤 46-48):
- 如果在训练模式(
crew._train=True),记录训练数据 - 否则,使用已训练的数据优化提示词
7. 执行阶段(步骤 49-54):
- 如果配置了
max_execution_time,在线程池中执行(可超时) - 否则直接执行
- 调用
AgentExecutor.invoke()进入推理循环 - 推理循环返回最终答案
8. 后处理阶段(步骤 55-57):
- 检查是否有工具标记了
result_as_answer=True(工具结果直接作为最终答案) - 发送 Agent 执行完成事件
- 返回结果给 Task
关键设计点:
- 上下文增强:通过记忆和知识检索,显著增强任务执行的准确性
- 并行检索:多种记忆类型并行检索,减少延迟
- 查询重写:使用 LLM 重写知识检索查询,提高相关性
- 超时保护:防止长时间运行的任务阻塞整个系统
- 事件可观测:关键步骤发送事件,便于监控和调试
性能考虑:
- 记忆检索:通常 50-200ms(取决于数据量和向量维度)
- 知识检索:通常 100-500ms(包含 LLM 查询重写)
- 推理循环:主要耗时部分,取决于任务复杂度和迭代次数
3. 推理循环(Reasoning Loop)详细时序图
sequenceDiagram
autonumber
participant Executor as AgentExecutor
participant RPM as RPM Controller
participant LLM as LLM
participant Parser as OutputParser
participant ToolHandler as ToolsHandler
participant Tool as Tool
participant Cache as CacheHandler
participant Memory as Memory
participant Event as EventBus
participant Callback as StepCallback
Executor->>Executor: invoke({"input": task_prompt, ...})
activate Executor
Executor->>Executor: 格式化提示词(注入变量)
alt 使用系统提示词
Executor->>Executor: 添加 system message
Executor->>Executor: 添加 user message
else 单一提示词
Executor->>Executor: 添加 user message
end
Executor->>Executor: _invoke_loop()
Note over Executor: 进入推理循环
loop 直到达到 Final Answer 或 max_iter
Executor->>Executor: iterations += 1
alt 达到最大迭代次数
Executor->>Executor: handle_max_iterations_exceeded()
Executor->>Executor: 返回当前最佳答案
end
alt 超过上下文长度
Executor->>Executor: is_context_length_exceeded()
Executor->>Executor: handle_context_length()
Executor->>Executor: 截断历史消息
end
Executor->>RPM: enforce_rpm_limit()
alt 触发限流
RPM->>RPM: 等待至下一时间窗口
end
Executor->>LLM: call(messages, stop=stop_words)
activate LLM
Note over LLM: 调用 LLM API<br/>(OpenAI/Anthropic/...)
LLM-->>Executor: return llm_response
deactivate LLM
Executor->>Parser: process_llm_response(llm_response)
activate Parser
alt 解析出 Action(工具调用)
Parser->>Parser: 提取 tool_name, tool_input
Parser-->>Executor: return AgentAction
else 解析出 Final Answer
Parser->>Parser: 提取最终答案
Parser-->>Executor: return AgentFinish
else 解析错误
Parser-->>Executor: raise OutputParserError
end
deactivate Parser
alt 解析出 AgentAction
Executor->>Executor: execute_tool_and_check_finality()
Executor->>Event: emit(ToolUsageStartedEvent)
Executor->>Cache: read(tool_name, tool_input)
activate Cache
alt 缓存命中
Cache-->>Executor: return cached_result
Note over Executor: 跳过工具执行
else 缓存未命中
Cache-->>Executor: return None
Executor->>Tool: invoke(tool_input)
activate Tool
alt 工具执行成功
Tool->>Tool: _run(**parsed_args)
Tool-->>Executor: return tool_result
else 工具执行失败
Tool-->>Executor: raise ToolException
Executor->>Executor: 将错误信息作为 observation
end
deactivate Tool
Executor->>ToolHandler: on_tool_use(calling, output)
activate ToolHandler
ToolHandler->>Cache: add(tool_name, tool_input, output)
ToolHandler->>ToolHandler: last_used_tool = calling
deactivate ToolHandler
end
deactivate Cache
Executor->>Event: emit(ToolUsageCompletedEvent)
alt 工具标记 result_as_answer=True
Executor->>Executor: 直接返回工具结果
else 继续推理
Executor->>Executor: 格式化 observation
Executor->>Executor: 添加 observation 到 messages
end
alt 有 step_callback
Executor->>Callback: step_callback(formatted_answer)
end
else 解析出 AgentFinish
Executor->>Executor: 推理循环结束
else OutputParserError
Executor->>Executor: handle_output_parser_exception()
Executor->>Executor: 添加错误提示到 messages
Executor->>Executor: 继续循环(重试)
end
end
Note over Executor: 退出推理循环
alt 需要人工反馈
Executor->>Executor: _handle_human_feedback()
end
Executor->>Memory: _create_short_term_memory(formatted_answer)
Executor->>Memory: _create_long_term_memory(formatted_answer)
Executor->>Memory: _create_external_memory(formatted_answer)
Executor-->>Executor: return {"output": formatted_answer.output}
deactivate Executor
图说明:
该时序图展示了 CrewAgentExecutor._invoke_loop() 的核心推理循环逻辑,这是 Agent 执行的心脏:
1. 初始化阶段(步骤 1-8):
- 接收任务提示词和相关参数
- 格式化提示词,注入变量(
{role}、{goal}、{backstory}) - 根据配置选择系统提示词模式或单一提示词模式
- 构建初始消息列表
- 进入
_invoke_loop()循环
2. 循环前置检查(步骤 9-18):
- 迭代计数:记录当前迭代次数
- 最大迭代检查:如果达到
max_iter(默认 25),返回当前最佳答案 - 上下文长度检查:如果消息历史超过 LLM 上下文窗口,自动截断
- 保留系统提示词和最后 N 条消息
- 截断策略:优先保留最近的消息
- RPM 限流:检查是否超过每分钟请求数限制
- 如果触发限流,等待至下一时间窗口
3. LLM 调用阶段(步骤 19-22):
- 调用 LLM API(通过 LiteLLM 统一接口)
- 传递消息历史和停止词(
stop_words) - 停止词通常包含:
Observation:(用于解析 Action) - LLM 返回原始响应文本
4. 响应解析阶段(步骤 23-32):
- 使用
OutputParser解析 LLM 响应 - 可能的解析结果:
- AgentAction:包含
tool_name、tool_input、log - AgentFinish:包含最终答案
- OutputParserError:解析失败
- AgentAction:包含
5. 工具执行分支(步骤 33-60):
如果解析出 AgentAction,进入工具执行流程:
-
缓存检查(步骤 36-40):
- 查询 CacheHandler,检查是否有缓存结果
- 缓存键:
{tool_name}-{tool_input} - 如果命中,直接返回缓存结果,跳过工具执行
- 缓存可显著提高性能(减少重复调用)
-
工具调用(步骤 41-50):
- 调用
Tool.invoke(tool_input) - 工具内部解析参数并执行
_run()方法 - 如果执行成功,返回结果
- 如果执行失败,捕获异常并将错误信息作为 observation
- 调用
-
缓存更新(步骤 51-55):
- 调用
ToolsHandler.on_tool_use()回调 - 将工具结果写入 CacheHandler
- 更新
last_used_tool引用
- 调用
-
最终性检查(步骤 58-61):
- 如果工具标记了
result_as_answer=True,直接返回工具结果 - 否则,格式化 observation 并添加到消息历史
- Observation 格式:
Observation: {tool_result}
- 如果工具标记了
-
回调通知(步骤 62-64):
- 如果配置了
step_callback,执行回调 - 回调可用于日志记录、进度更新等
- 如果配置了
6. 循环结束分支(步骤 65-69):
如果解析出 AgentFinish:
- 推理循环结束,跳出循环
- 提取最终答案
如果解析出 OutputParserError:
- 捕获异常并记录
- 添加错误提示到消息历史(引导 LLM 重试)
- 继续下一次循环
7. 后处理阶段(步骤 70-77):
- 人工反馈(可选):如果
human_input=True,请求用户反馈 - 保存记忆:
- 短期记忆:保存任务描述和输出(当前会话)
- 长期记忆:保存任务摘要和经验(持久化)
- 外部记忆:同步到 Mem0(如果配置)
- 返回最终结果
关键设计点:
- 自适应上下文管理:自动截断超长历史,避免超过 LLM 限制
- 智能缓存:避免重复工具调用,提高响应速度
- 错误恢复:解析错误时自动重试,增强鲁棒性
- 限流保护:避免触发 API 速率限制
- 记忆持久化:自动保存执行结果,支持跨会话学习
性能指标:
- 平均迭代次数:简单任务 2-5 次,复杂任务 10-20 次
- 单次 LLM 调用:200-2000ms(取决于模型和提示词长度)
- 工具执行:10-5000ms(取决于工具类型)
- 缓存命中率:通常 30-60%(减少 30-60% 的工具调用)
异常处理:
- LiteLLM 错误:不重试,直接抛出(通常是配置或 API 问题)
- 其他错误:重试(最多
max_retry_limit次,默认 2 次) - 超时:由外层
_execute_with_timeout()处理
4. 工具执行详细流程图
sequenceDiagram
autonumber
participant Executor as AgentExecutor
participant ToolUsage as ToolUsage
participant Cache as CacheHandler
participant Tool as CrewStructuredTool
participant ToolFunc as Tool Function
participant Event as EventBus
participant ToolHandler as ToolsHandler
Executor->>ToolUsage: 执行工具调用
activate ToolUsage
ToolUsage->>ToolUsage: 解析 tool_name 和 tool_input
alt 使用 Function Calling LLM
ToolUsage->>ToolUsage: 使用 function_calling_llm 结构化解析
else 使用标准解析
ToolUsage->>ToolUsage: 正则表达式解析
end
ToolUsage->>Event: emit(ToolUsageStartedEvent)
ToolUsage->>Cache: read(tool_name, tool_input)
activate Cache
alt 缓存命中
Cache-->>ToolUsage: return cached_result
ToolUsage->>ToolUsage: from_cache = True
else 缓存未命中
Cache-->>ToolUsage: return None
end
deactivate Cache
alt 缓存未命中
ToolUsage->>Tool: 查找工具实例
alt 工具不存在
ToolUsage-->>Executor: return "Tool not found"
end
ToolUsage->>Tool: 检查 max_usage_count
alt 达到使用限制
Tool-->>ToolUsage: raise ToolUsageLimitExceededError
ToolUsage-->>Executor: return error message
end
ToolUsage->>Tool: invoke(tool_input)
activate Tool
Tool->>Tool: _parse_args(tool_input)
alt 参数为字符串
Tool->>Tool: 使用 function_calling_llm 解析
else 参数为字典
Tool->>Tool: 直接验证
end
alt 参数验证失败
Tool-->>ToolUsage: raise ValidationError
ToolUsage-->>Executor: return error message
end
Tool->>Tool: _increment_usage_count()
Tool->>ToolFunc: func(**parsed_args)
activate ToolFunc
alt 工具执行成功
ToolFunc-->>Tool: return result
else 工具执行失败
ToolFunc-->>Tool: raise Exception
Tool->>Tool: 捕获异常
Tool-->>ToolUsage: return error message
end
deactivate ToolFunc
deactivate Tool
end
ToolUsage->>ToolUsage: 格式化工具结果
alt 工具返回字典
ToolUsage->>ToolUsage: json.dumps(result)
else 工具返回字符串
ToolUsage->>ToolUsage: 直接使用
end
ToolUsage->>ToolHandler: on_tool_use(calling, output, should_cache)
activate ToolHandler
alt should_cache=True 且非 CacheTools
ToolHandler->>Cache: add(tool_name, tool_input, output)
end
ToolHandler->>ToolHandler: last_used_tool = calling
deactivate ToolHandler
ToolUsage->>Event: emit(ToolUsageCompletedEvent)
alt 工具标记 result_as_answer=True
ToolUsage->>ToolUsage: 设置最终性标记
end
ToolUsage-->>Executor: return tool_result
deactivate ToolUsage
图说明:
该时序图详细展示了工具执行的完整流程,从工具调用到结果返回:
1. 工具调用解析阶段(步骤 1-6):
- Executor 将 AgentAction 传递给 ToolUsage
- 解析工具名称和输入参数
- 支持两种解析模式:
- Function Calling LLM:使用专门的 LLM 结构化解析(更准确)
- 标准解析:使用正则表达式解析 LLM 输出(兼容性更好)
- 发送工具使用开始事件
2. 缓存检查阶段(步骤 7-12):
- 查询 CacheHandler 检查是否有缓存结果
- 缓存键格式:
{tool_name}-{tool_input}(JSON 序列化) - 如果命中缓存:
- 直接返回缓存结果
- 设置
from_cache=True标记 - 跳过后续工具执行步骤
- 如果未命中缓存:
- 继续工具执行流程
3. 工具查找与验证阶段(步骤 13-21):
- 根据
tool_name查找工具实例 - 如果工具不存在,返回错误信息
- 检查工具使用次数限制(
max_usage_count) - 如果达到限制,抛出
ToolUsageLimitExceededError - 这个机制防止工具被过度使用(例如:昂贵的 API 调用)
4. 参数解析与验证阶段(步骤 22-30):
- 调用
Tool._parse_args()解析输入参数 - 支持两种输入格式:
- 字符串:使用 function_calling_llm 解析为结构化参数
- 字典:直接使用 Pydantic Schema 验证
- 使用 Pydantic 验证参数类型和约束
- 如果验证失败,返回详细的错误信息
- 增加工具使用计数
5. 工具执行阶段(步骤 31-40):
- 调用工具的
func(**parsed_args)方法 - 工具函数执行实际逻辑(可能是 API 调用、文件操作、数据查询等)
- 如果执行成功,返回结果
- 如果执行失败:
- 捕获异常
- 记录错误日志
- 返回友好的错误信息(而非抛出异常)
- 这确保推理循环不会因工具错误而中断
6. 结果格式化阶段(步骤 41-47):
- 格式化工具结果为字符串
- 如果结果是字典或列表,使用
json.dumps()序列化 - 如果结果是字符串,直接使用
- 格式化后的结果作为 observation 添加到消息历史
7. 缓存更新阶段(步骤 48-53):
- 调用
ToolsHandler.on_tool_use()回调 - 如果
should_cache=True且工具不是 CacheTools:- 将结果写入 CacheHandler
- 缓存键:
{tool_name}-{tool_input}
- 更新
last_used_tool引用(用于检查工具最终性)
8. 事件发送与返回阶段(步骤 54-59):
- 发送工具使用完成事件(含耗时、缓存命中等信息)
- 检查工具是否标记了
result_as_answer=True - 如果标记,设置最终性标记(推理循环直接返回工具结果)
- 返回工具结果给 Executor
关键设计点:
- 缓存优先:优先使用缓存,避免重复调用(提高性能)
- 参数验证:严格的参数验证,确保工具调用的正确性
- 使用限制:防止工具被过度使用(成本控制)
- 错误容错:工具错误不中断推理循环,返回错误信息供 LLM 参考
- 最终性标记:允许工具直接返回最终答案(跳过后续推理)
工具类型示例:
-
搜索工具:
SerperDevTool、DuckDuckGoSearchTool- 执行时间:200-1000ms
- 缓存收益:高(相同查询重复率高)
-
文件工具:
FileReadTool、FileWriteTool- 执行时间:10-100ms
- 缓存收益:中
-
API 工具:
RestAPITool、GraphQLTool- 执行时间:100-5000ms
- 缓存收益:高(相同请求重复率高)
-
代码执行工具:
CodeInterpreterTool- 执行时间:500-10000ms
- 缓存收益:低(代码通常不重复)
-
委托工具:
DelegateWorkTool、AskQuestionTool- 执行时间:取决于被委托 Agent 的执行时间
- 缓存收益:低(任务描述通常不重复)
性能优化建议:
- 缓存策略:对幂等工具启用缓存,对非幂等工具禁用
- 并发调用:对独立的工具调用使用异步执行
- 超时设置:为长时间工具设置超时
- 结果压缩:对大结果使用压缩存储
数据结构 UML 图
Agent 核心类图
classDiagram
class BaseAgent {
<<abstract>>
+UUID4 id
+str role
+str goal
+str backstory
+bool cache
+bool verbose
+int max_rpm
+bool allow_delegation
+list[BaseTool] tools
+int max_iter
+Any agent_executor
+Any llm
+Any crew
+I18N i18n
+CacheHandler cache_handler
+ToolsHandler tools_handler
+list[dict] tools_results
+int max_tokens
+Knowledge knowledge
+list[BaseKnowledgeSource] knowledge_sources
+SecurityConfig security_config
+list[PlatformAppOrAction] apps
+list[str] mcps
+execute_task(task, context, tools)*
+create_agent_executor(tools)*
+get_delegation_tools(agents)*
+interpolate_inputs(inputs)
+set_cache_handler(cache_handler)
+copy()
+set_rpm_controller(rpm_controller)
}
class Agent {
+int max_execution_time
+Any step_callback
+bool use_system_prompt
+str system_template
+str prompt_template
+str response_template
+bool allow_code_execution
+bool respect_context_window
+int max_retry_limit
+bool multimodal
+bool inject_date
+str date_format
+Literal["safe", "unsafe"] code_execution_mode
+bool reasoning
+int max_reasoning_attempts
+EmbedderConfig embedder
+str agent_knowledge_context
+str crew_knowledge_context
+str knowledge_search_query
+str from_repository
+GuardrailType guardrail
+int guardrail_max_retries
+execute_task(task, context, tools)
+create_agent_executor(tools, task)
+get_delegation_tools(agents)
+get_platform_tools(apps)
+get_mcp_tools(mcps)
+get_multimodal_tools()
+get_code_execution_tools()
+kickoff(messages, response_format)
+kickoff_async(messages, response_format)
-_execute_with_timeout(task_prompt, task, timeout)
-_execute_without_timeout(task_prompt, task)
-_get_knowledge_search_query(task_prompt, task)
-_inject_date_to_task(task)
-_validate_docker_installation()
}
class CrewAgentExecutor {
+BaseLLM llm
+Task task
+Crew crew
+Agent agent
+SystemPromptResult | StandardPromptResult prompt
+int max_iter
+list[CrewStructuredTool] tools
+str tools_names
+list[str] stop
+str tools_description
+ToolsHandler tools_handler
+Any step_callback
+list[BaseTool] original_tools
+BaseLLM function_calling_llm
+bool respect_context_window
+Callable request_within_rpm_limit
+list[LLMMessage] messages
+int iterations
+invoke(inputs)
-_invoke_loop()
-_format_prompt(prompt, inputs)
-_show_start_logs()
-_handle_human_feedback(formatted_answer)
-_create_short_term_memory(formatted_answer)
-_create_long_term_memory(formatted_answer)
-_create_external_memory(formatted_answer)
}
class ToolsHandler {
+BaseTool last_used_tool
+CacheHandler cache
+handle_tool_call(tool_call, agent, task)
+on_tool_use(tool_usage)
}
class CacheHandler {
+dict _cache
+hit(key)
+read(tool, input)
+add(tool, input, output)
}
BaseAgent <|-- Agent : 继承
Agent --> CrewAgentExecutor : 使用
Agent --> ToolsHandler : 使用
CrewAgentExecutor --> ToolsHandler : 使用
ToolsHandler --> CacheHandler : 使用
关键字段说明
BaseAgent 核心字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| id | UUID4 | frozen=True | uuid4() | Agent 唯一标识符,不可修改 |
| role | str | required | - | Agent 的角色定义 |
| goal | str | required | - | Agent 的目标 |
| backstory | str | required | - | Agent 的背景故事,用于增强 LLM 理解 |
| cache | bool | - | True | 是否启用工具结果缓存 |
| verbose | bool | - | False | 是否打印详细日志 |
| max_rpm | int | None | - | None | 每分钟最大请求数限制 |
| allow_delegation | bool | - | False | 是否允许委托任务给其他 Agent |
| tools | list[BaseTool] | - | [] | Agent 可用的工具列表 |
| max_iter | int | - | 25 | 推理循环最大迭代次数 |
| agent_executor | Any | - | None | AgentExecutor 实例 |
| llm | Any | - | None | LLM 实例 |
| crew | Any | - | None | 所属的 Crew 实例 |
| tools_results | list[dict] | - | [] | 工具执行结果缓存 |
| knowledge | Knowledge | None | - | None | Agent 级知识库 |
| knowledge_sources | list[BaseKnowledgeSource] | - | None | 知识源列表 |
| security_config | SecurityConfig | - | default() | 安全配置(指纹验证) |
| apps | list[PlatformAppOrAction] | - | None | 平台应用集成列表 |
| mcps | list[str] | - | None | MCP 服务器引用列表 |
Agent 扩展字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| max_execution_time | int | None | - | None | 任务超时时间(秒) |
| step_callback | Any | None | - | None | 每步执行后的回调函数 |
| use_system_prompt | bool | - | True | 是否使用系统提示词 |
| system_template | str | None | - | None | 自定义系统提示词模板 |
| prompt_template | str | None | - | None | 自定义用户提示词模板 |
| response_template | str | None | - | None | 自定义响应模板 |
| allow_code_execution | bool | - | False | 是否允许执行代码(需 Docker) |
| respect_context_window | bool | - | True | 是否自动截断超长上下文 |
| max_retry_limit | int | - | 2 | 执行失败最大重试次数 |
| multimodal | bool | - | False | 是否支持多模态输入(图片等) |
| inject_date | bool | - | False | 是否自动注入当前日期 |
| date_format | str | - | %Y-%m-%d | 日期格式字符串 |
| code_execution_mode | Literal[“safe”, “unsafe”] | - | “safe” | 代码执行模式(Docker 或直接执行) |
| reasoning | bool | - | False | 是否启用推理规划(执行前生成计划) |
| max_reasoning_attempts | int | None | - | None | 推理规划最大尝试次数 |
| embedder | EmbedderConfig | None | - | None | 嵌入模型配置 |
| from_repository | str | None | - | None | 从 Agent 仓库加载配置 |
| guardrail | GuardrailType | None | - | None | Agent 级输出验证函数 |
| guardrail_max_retries | int | - | 3 | Guardrail 验证失败最大重试次数 |
API 详细规格
API 1: execute_task
基本信息
- 方法签名:
Agent.execute_task(task: Task, context: str | None = None, tools: list[BaseTool] | None = None) -> Any - 调用方式:实例方法
- 幂等性:否(每次调用可能产生不同结果)
请求结构体
# 方法参数
class ExecuteTaskParams:
task: Task # 要执行的任务
context: Optional[str] = None # 来自前序任务的上下文
tools: Optional[list[BaseTool]] = None # 任务特定的工具列表
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| task | Task | 是 | - | 必须是有效的 Task 对象 | 要执行的任务实例 |
| context | str | None | 否 | None | - | 来自前序任务的输出文本 |
| tools | list[BaseTool] | None | 否 | None | - | 覆盖 Agent 默认工具的工具列表 |
响应结构体
# 返回值为 LLM 生成的原始字符串
# 经过后续处理后封装为 TaskOutput
return_value: Any # 实际为 str,但类型标注为 Any
入口函数与核心代码
def execute_task(
self,
task: Task,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Any:
"""执行任务的核心方法"""
# 1. 启用推理规划(如果配置)
if self.reasoning:
reasoning_handler = AgentReasoning(task=task, agent=self)
reasoning_output = reasoning_handler.handle_agent_reasoning()
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
# 2. 注入日期到任务描述(如果配置)
self._inject_date_to_task(task)
# 3. 构建任务提示词
task_prompt = task.prompt()
# 4. 添加输出格式指令(如果使用结构化输出)
if task.output_json or task.output_pydantic:
schema = generate_model_description(task.output_json or task.output_pydantic)
task_prompt += self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
# 5. 添加上下文(如果有)
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)
# 6. 检索记忆并添加到提示词
if self._is_any_available_memory():
memory = contextual_memory.build_context_for_task(task, context or "")
task_prompt += self.i18n.slice("memory").format(memory=memory)
# 7. 检索知识并添加到提示词
if self.knowledge or (self.crew and self.crew.knowledge):
self.knowledge_search_query = self._get_knowledge_search_query(task_prompt, task)
if self.knowledge_search_query:
# 检索 Agent 级知识
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query([self.knowledge_search_query])
self.agent_knowledge_context = extract_knowledge_context(agent_knowledge_snippets)
task_prompt += self.agent_knowledge_context
# 检索 Crew 级知识
knowledge_snippets = self.crew.query_knowledge([self.knowledge_search_query])
self.crew_knowledge_context = extract_knowledge_context(knowledge_snippets)
task_prompt += self.crew_knowledge_context
# 8. 创建 AgentExecutor 实例
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
# 9. 处理训练数据(如果在训练模式)
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
# 10. 执行任务(带超时或不带超时)
if self.max_execution_time is not None:
result = self._execute_with_timeout(task_prompt, task, self.max_execution_time)
else:
result = self._execute_without_timeout(task_prompt, task)
# 11. 检查工具最终性(如果工具标记了 result_as_answer)
for tool_result in self.tools_results:
if tool_result.get("result_as_answer", False):
result = tool_result["result"]
# 12. 发送执行完成事件
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
)
return result
调用链与上游函数
调用路径:Crew.kickoff() → Crew._execute_tasks() → Agent.execute_task()
# 上游调用者:Crew._execute_tasks()
def _execute_tasks(self, tasks: list[Task], start_index: int = 0) -> CrewOutput:
"""Crew 执行任务的核心流程"""
task_outputs: list[TaskOutput] = []
for task_index, task in enumerate(tasks):
agent_to_use = self._get_agent_to_use(task)
tools_for_task = task.tools or agent_to_use.tools or []
context = self._get_context(task, task_outputs)
# 调用 Agent.execute_task
if task.async_execution:
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
task_outputs.append(task_output)
return self._create_crew_output(task_outputs)
Task.execute_sync() 的实现:
# Task.execute_sync() 内部调用 Agent.execute_task()
def execute_sync(
self,
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> TaskOutput:
"""同步执行任务"""
agent = agent or self.agent
if not agent:
raise Exception(f"The task '{self.description}' has no agent assigned")
# 调用 Agent.execute_task
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
# 导出结构化输出
pydantic_output, json_output = self._export_output(result)
# 创建 TaskOutput
task_output = TaskOutput(
description=self.description,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
return task_output
时序图(请求→响应完整路径)
sequenceDiagram
autonumber
participant Crew as Crew
participant Task as Task
participant Agent as Agent
participant Executor as AgentExecutor
participant LLM as LLM
participant Memory as Memory
participant Knowledge as Knowledge
participant Tool as Tool
Crew->>Task: execute_sync(agent, context, tools)
Task->>Agent: execute_task(task, context, tools)
alt 启用推理规划
Agent->>Agent: AgentReasoning.handle_agent_reasoning()
end
Agent->>Agent: 构建 task_prompt
alt 有记忆系统
Agent->>Memory: build_context_for_task(task, context)
Memory-->>Agent: memory_snippets
Agent->>Agent: 添加 memory 到 task_prompt
end
alt 有知识库
Agent->>Agent: _get_knowledge_search_query(task_prompt)
Agent->>Knowledge: query([search_query])
Knowledge-->>Agent: knowledge_snippets
Agent->>Agent: 添加 knowledge 到 task_prompt
end
Agent->>Agent: create_agent_executor(tools, task)
Agent->>Executor: invoke({"input": task_prompt})
loop 推理循环(最多 max_iter 次)
Executor->>LLM: call(messages)
LLM-->>Executor: response
alt 解析出 Action(工具调用)
Executor->>Tool: execute(tool_input)
Tool-->>Executor: tool_output
Executor->>Executor: 添加 tool_output 到 messages
else 解析出 Final Answer
Executor->>Executor: 返回 Final Answer
end
end
Executor-->>Agent: output
Agent->>Memory: save(output)
Agent-->>Task: return result
Task->>Task: _export_output(result)
Task-->>Crew: TaskOutput
边界与异常
异常情况处理:
-
任务超时:
- 异常类型:
TimeoutError - 触发条件:执行时间超过
max_execution_time - 处理方式:不重试,直接抛出异常
- 回退策略:无,由调用方处理
- 异常类型:
-
LLM 调用失败:
- 异常类型:
LiteLLM相关异常 - 触发条件:LLM API 返回错误
- 处理方式:LiteLLM 错误不重试,其他错误重试(最多
max_retry_limit次) - 回退策略:重试失败后抛出
AgentExecutionError
- 异常类型:
-
工具执行失败:
- 异常类型:工具抛出的异常
- 触发条件:工具内部执行错误
- 处理方式:记录错误,继续推理循环
- 回退策略:LLM 可以选择重试工具或换用其他工具
-
记忆/知识检索失败:
- 异常类型:
ChromaDBError、SQLiteError等 - 触发条件:存储服务不可用
- 处理方式:记录警告,继续执行(不阻塞任务)
- 回退策略:降级为无记忆/知识模式
- 异常类型:
幂等性与重复请求:
- 任务级幂等:同一 Task 对象多次执行,每次可能产生不同结果(因为 LLM 是非确定性的)
- 工具幂等:取决于工具本身的实现,CrewAI 不强制工具幂等
- 缓存机制:如果启用
cache=True,相同的工具调用会返回缓存结果
资源限制:
- 最大迭代次数:
max_iter(默认 25),超过后返回当前最佳答案 - 执行超时:
max_execution_time(秒),超时后强制终止 - RPM 限制:
max_rpm控制每分钟请求数
实践与最佳实践
1. 客户端重试建议:
- 对于超时错误,增加
max_execution_time - 对于 LLM 错误,检查 API Key 和配额
- 对于工具错误,验证工具参数和权限
2. 批量任务处理:
- 使用
async_execution=True并行执行多个任务 - 在 Crew 级别使用
kickoff_for_each(inputs)批量处理
3. 性能优化:
- 启用
cache=True缓存工具结果 - 使用
respect_context_window=True自动截断历史 - 配置
max_iter避免过多迭代
4. 记忆与知识的权衡:
- 记忆检索适合动态上下文(会话历史)
- 知识检索适合静态文档(产品手册、FAQ)
- 避免同时使用大量记忆和知识(会超长上下文)
API 2: create_agent_executor
基本信息
- 方法签名:
Agent.create_agent_executor(tools: list[BaseTool] | None = None, task: Task | None = None) -> None - 调用方式:实例方法(内部调用)
- 幂等性:是(重复调用会覆盖之前的 executor)
功能说明
创建并配置 CrewAgentExecutor 实例,该实例负责管理推理循环和工具调用。
核心代码
def create_agent_executor(
self, tools: list[BaseTool] | None = None, task: Task | None = None
) -> None:
"""创建 AgentExecutor"""
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
# 生成提示词模板
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
# 生成停止词列表
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
# 创建 Executor 实例
self.agent_executor = CrewAgentExecutor(
llm=self.llm,
task=task,
agent=self,
crew=self.crew,
tools=parsed_tools,
prompt=prompt,
original_tools=raw_tools,
stop_words=stop_words,
max_iter=self.max_iter,
tools_handler=self.tools_handler,
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
)
关键设计点
- 提示词生成:根据 Agent 配置动态生成系统提示词和用户提示词
- 工具解析:将
BaseTool转换为CrewStructuredTool,提取参数 schema - 停止词配置:根据响应模板动态生成停止词
- RPM 限流集成:将 RPMController 的回调函数传递给 Executor
- Token 统计:注册 TokenCalcHandler 回调以统计 Token 使用
API 3: get_delegation_tools
基本信息
- 方法签名:
Agent.get_delegation_tools(agents: list[BaseAgent]) -> list[BaseTool] - 调用方式:实例方法
- 幂等性:是(相同输入返回相同结果)
功能说明
为分层流程生成委托工具,允许 Manager Agent 委托任务给 Worker Agents。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| agents | list[BaseAgent] | 是 | - | - | 可委托的 Worker Agents |
响应结构体
返回两个工具的列表:
-
Delegate Work To Coworker:
- 描述:委托具体任务给指定的 Worker Agent
- 参数:
coworker(Agent 角色)、task(任务描述)、context(上下文信息)
-
Ask Question To Coworker:
- 描述:向指定的 Worker Agent 提问
- 参数:
coworker(Agent 角色)、question(问题)、context(上下文信息)
核心代码
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""生成委托工具"""
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()
AgentTools 的实现:
class AgentTools(BaseTool):
"""提供委托和提问工具"""
agents: list[BaseAgent]
def tools(self) -> list[BaseTool]:
tools = [
self._delegate_work(),
self._ask_question(),
]
return tools
def _delegate_work(self) -> BaseTool:
"""创建委托工作工具"""
class DelegateWorkInput(BaseModel):
coworker: str = Field(description="Agent 角色")
task: str = Field(description="任务描述")
context: str = Field(description="上下文信息")
return Tool(
name="Delegate work to coworker",
description="Delegate a task to a specific coworker",
args_schema=DelegateWorkInput,
func=self._execute_delegation,
)
def _execute_delegation(self, coworker: str, task: str, context: str) -> str:
"""执行委托"""
agent = self._get_agent_by_role(coworker)
if not agent:
return f"Error: Agent '{coworker}' not found"
# 创建临时 Task 并执行
sub_task = Task(description=task, expected_output="Result", agent=agent)
result = agent.execute_task(sub_task, context=context)
return result
时序图
sequenceDiagram
autonumber
participant Manager as Manager Agent
participant DelegateTool as Delegate Tool
participant Worker as Worker Agent
participant Executor as AgentExecutor
Manager->>Manager: 分析任务需求
Manager->>DelegateTool: Delegate work to Worker1
DelegateTool->>Worker: execute_task(sub_task, context)
Worker->>Executor: invoke(task_prompt)
Executor->>Executor: 执行推理循环
Executor-->>Worker: 返回结果
Worker-->>DelegateTool: 返回结果
DelegateTool-->>Manager: 返回结果
Manager->>Manager: 整合结果
API 4: kickoff
基本信息
- 方法签名:
Agent.kickoff(messages: str | list[LLMMessage], response_format: type[Any] | None = None) -> LiteAgentOutput - 调用方式:实例方法
- 幂等性:否(基于 LLM 生成)
功能说明
使用 LiteAgent 模式执行简单的 Agent 交互,适用于单轮对话或快速原型。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| messages | str | list[LLMMessage] | 是 | - | - | 用户消息或消息列表 |
| response_format | type[Any] | None | 否 | None | - | Pydantic 模型,用于结构化输出 |
响应结构体
class LiteAgentOutput(BaseModel):
output: str # 原始输出
pydantic: BaseModel | None # 结构化输出(如果指定 response_format)
json_dict: dict[str, Any] | None # JSON 输出
token_usage: dict[str, int] # Token 使用统计
核心代码
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""使用 LiteAgent 执行"""
lite_agent = LiteAgent(
id=self.id,
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
)
return lite_agent.kickoff(messages)
使用场景
- 快速原型:无需定义 Task 即可测试 Agent 行为
- 单轮对话:简单的问答场景
- 流式输出:配合
kickoff_async()实现流式响应
关键功能流程剖析
功能 1:提示词生成(Prompt Generation)
功能描述
提示词生成是 Agent 执行的第一步,负责将任务描述、Agent 角色、工具描述、记忆和知识等信息组装成结构化的提示词,指导 LLM 的行为。
核心代码实现
Prompts 类的实现(lib/crewai/src/crewai/utilities/prompts.py):
class Prompts(BaseModel):
"""管理和生成 Agent 的提示词"""
i18n: I18N = Field(default_factory=I18N)
has_tools: bool = Field(default=False, description="是否有工具")
system_template: str | None = Field(default=None, description="自定义系统提示词模板")
prompt_template: str | None = Field(default=None, description="自定义用户提示词模板")
response_template: str | None = Field(default=None, description="自定义响应模板")
use_system_prompt: bool | None = Field(default=False, description="是否使用系统提示词")
agent: Any = Field(description="Agent 引用")
def task_execution(self) -> SystemPromptResult | StandardPromptResult:
"""生成任务执行的提示词"""
# 1. 构建提示词组件
slices: list[str] = ["role_playing"] # 角色扮演部分
if self.has_tools:
slices.append("tools") # 工具使用说明
else:
slices.append("no_tools") # 无工具说明
# 2. 构建系统提示词
system: str = self._build_prompt(slices)
# 3. 添加任务组件
slices.append("task")
# 4. 根据配置选择提示词模式
if not self.system_template and not self.prompt_template and self.use_system_prompt:
# 分离系统提示词和用户提示词
return SystemPromptResult(
system=system,
user=self._build_prompt(["task"]),
prompt=self._build_prompt(slices),
)
else:
# 单一提示词模式
return StandardPromptResult(
prompt=self._build_prompt(
slices,
self.system_template,
self.prompt_template,
self.response_template,
)
)
def _build_prompt(
self,
components: list[str],
system_template: str | None = None,
prompt_template: str | None = None,
response_template: str | None = None,
) -> str:
"""构建提示词字符串"""
prompt: str
if not system_template or not prompt_template:
# 使用默认格式
prompt_parts: list[str] = [
self.i18n.slice(component) for component in components
]
prompt = "".join(prompt_parts)
else:
# 使用自定义模板
template_parts: list[str] = [
self.i18n.slice(component)
for component in components
if component != "task"
]
system: str = system_template.replace(
"{{ .System }}", "".join(template_parts)
)
prompt = prompt_template.replace(
"{{ .Prompt }}", "".join(self.i18n.slice("task"))
)
if response_template:
response: str = response_template.split("{{ .Response }}")[0]
prompt = f"{system}\n{prompt}\n{response}"
else:
prompt = f"{system}\n{prompt}"
# 5. 注入 Agent 的角色信息
return (
prompt.replace("{goal}", self.agent.goal)
.replace("{role}", self.agent.role)
.replace("{backstory}", self.agent.backstory)
)
提示词结构分析
1. 角色扮演部分(role_playing):
CrewAI 使用国际化的提示词模板(位于 i18n 目录),标准的角色扮演提示词包含:
You are {role}.
{backstory}
Your personal goal is: {goal}
2. 工具使用说明(tools):
如果 Agent 有工具,添加工具使用指南:
To use a tool, please use the following format:
Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action, always a simple python dictionary, enclosed in curly braces, using \" to wrap keys and values.
Observation: the result of the action
When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format:
Thought: Do I need to use a tool? No
Final Answer: [your response here]
3. 任务描述(task):
Begin! This is VERY important to you, your job depends on it!
Current Task: {task_description}
Expected Output: {expected_output}
提示词组装流程
flowchart TB
Start[开始] --> LoadI18N[加载国际化模板]
LoadI18N --> SelectMode{选择模式}
SelectMode -->|SystemPrompt| SplitMode[分离系统/用户提示词]
SelectMode -->|Standard| SingleMode[单一提示词]
SplitMode --> BuildSystem[构建系统提示词]
BuildSystem --> AddRole[角色扮演]
AddRole --> AddToolGuide[工具使用指南]
AddToolGuide --> BuildUser[构建用户提示词]
BuildUser --> AddTask[任务描述]
SingleMode --> CombineAll[合并所有组件]
CombineAll --> AddRole2[角色扮演]
AddRole2 --> AddToolGuide2[工具使用指南]
AddToolGuide2 --> AddTask2[任务描述]
AddTask --> InjectVars[注入变量]
AddTask2 --> InjectVars
InjectVars --> ReplaceRole[替换 {role}]
ReplaceRole --> ReplaceGoal[替换 {goal}]
ReplaceGoal --> ReplaceBackstory[替换 {backstory}]
ReplaceBackstory --> AddContext{有上下文?}
AddContext -->|是| AppendContext[追加 context]
AddContext -->|否| AddMemory{有记忆?}
AppendContext --> AddMemory
AddMemory -->|是| AppendMemory[追加 memory snippets]
AddMemory -->|否| AddKnowledge{有知识?}
AppendMemory --> AddKnowledge
AddKnowledge -->|是| AppendKnowledge[追加 knowledge snippets]
AddKnowledge -->|否| AddFormat{结构化输出?}
AppendKnowledge --> AddFormat
AddFormat -->|是| AppendSchema[追加 output schema]
AddFormat -->|否| End[完成]
AppendSchema --> End
图说明:
该流程图展示了提示词从空模板到最终完整提示词的组装过程:
- 模板加载:从国际化文件加载基础模板
- 模式选择:
- SystemPrompt 模式:分离系统提示词和用户提示词(推荐用于支持 system role 的模型)
- Standard 模式:单一提示词(兼容所有模型)
- 组件添加:按顺序添加角色、工具、任务等组件
- 变量注入:替换占位符(
{role}、{goal}、{backstory}) - 上下文增强:追加前序任务输出、记忆片段、知识片段
- 格式化指令:如果使用结构化输出,追加 Schema 说明
示例:完整提示词结构
假设一个带工具的研究 Agent,最终提示词可能如下:
系统提示词:
You are Senior Research Analyst.
You are a world-class researcher with 10 years of experience in AI and machine learning.
Your personal goal is: Find the latest trends in AI technology
To use a tool, please use the following format:
Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [search_tool, file_read_tool]
Action Input: the input to the action, always a simple python dictionary
Observation: the result of the action
When you have a response, you MUST use the format:
Thought: Do I need to use a tool? No
Final Answer: [your response here]
用户提示词:
Begin! This is VERY important to you, your job depends on it!
Current Task: Research the top 5 AI trends in 2024 and provide detailed analysis
Expected Output: A comprehensive report with at least 1000 words covering each trend
Context from previous task:
The company is interested in implementing AI solutions in healthcare.
Relevant memories:
- Previous research indicated strong interest in generative AI
- Healthcare regulations require HIPAA compliance
Relevant knowledge:
- AI trends 2024: Generative AI, AI safety, multimodal models...
- Healthcare AI applications: Diagnostics, drug discovery...
Please provide the output in the following JSON format:
{
"trends": [
{"name": "string", "description": "string", "impact": "string"}
],
"recommendations": ["string"]
}
功能 2:推理循环(Reasoning Loop)
功能描述
推理循环是 Agent 执行任务的核心机制,通过多次调用 LLM 并执行工具,最终生成满足要求的输出。
核心代码
def _invoke_loop(self) -> AgentFinish:
"""执行推理循环直到完成"""
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
# 检查是否达到最大迭代次数
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
formatted_answer,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
)
break
# 检查上下文长度
if is_context_length_exceeded(
self.messages, self.llm.model_name, self.respect_context_window
):
self.messages = handle_context_length(
self.messages, self.llm.model_name
)
# RPM 限流
enforce_rpm_limit(self.request_within_rpm_limit, self._printer)
# 调用 LLM
llm_response = get_llm_response(
llm=self.llm,
messages=self.messages,
printer=self._printer,
stop=self.stop if self.use_stop_words else None,
)
# 解析响应
try:
formatted_answer = process_llm_response(
llm_response, self.messages, self._printer
)
except OutputParserError as e:
formatted_answer = handle_output_parser_exception(
e, self._printer, self.i18n
)
# 处理 Action(工具调用)
if isinstance(formatted_answer, AgentAction):
tool_result = handle_agent_action_core(
agent_action=formatted_answer,
agent=self.agent,
task=self.task,
crew=self.crew,
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
function_calling_llm=self.function_calling_llm,
step_callback=self.step_callback,
messages=self.messages,
printer=self._printer,
)
# 添加工具结果到消息历史
observation_message = self._i18n.slice("observation").format(
observation=tool_result
)
self.messages.append(format_message_for_llm(observation_message))
self.iterations += 1
return formatted_answer
流程说明
- 迭代控制:最多迭代
max_iter次 - 上下文管理:超长上下文自动截断(如果启用
respect_context_window) - 限流控制:调用前检查 RPM 限制
- LLM 调用:发送消息历史并获取响应
- 响应解析:识别 Action 或 Final Answer
- 工具执行:如果是 Action,执行工具并将结果添加到消息
- 循环终止:识别到 Final Answer 或达到最大迭代次数
复杂度分析
- 时间复杂度:O(N * T),N 为迭代次数,T 为单次 LLM 调用时间
- 空间复杂度:O(M),M 为消息历史长度
- 最坏情况:达到
max_iter但未生成 Final Answer
功能 2:记忆管理(Memory Management)
功能描述
Agent 在执行任务时检索相关记忆,并在完成后保存新记忆,实现跨任务的上下文累积。
核心代码
def execute_task(self, task: Task, context: str | None = None, tools: list[BaseTool] | None = None) -> Any:
# (省略其他代码)
# 检索记忆
if self._is_any_available_memory():
# 发送记忆检索开始事件
crewai_event_bus.emit(
self,
event=MemoryRetrievalStartedEvent(
task_id=str(task.id),
source_type="agent",
from_agent=self,
from_task=task,
),
)
start_time = time.time()
# 构建上下文记忆
contextual_memory = ContextualMemory(
self.crew._short_term_memory,
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._external_memory,
agent=self,
task=task,
)
memory = contextual_memory.build_context_for_task(task, context or "")
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
# 发送记忆检索完成事件
crewai_event_bus.emit(
self,
event=MemoryRetrievalCompletedEvent(
task_id=str(task.id),
memory_content=memory,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent",
from_agent=self,
from_task=task,
),
)
# (执行任务)
result = self._execute_without_timeout(task_prompt, task)
# 保存记忆(在 AgentExecutor 内部)
# _create_short_term_memory(formatted_answer)
# _create_long_term_memory(formatted_answer)
# _create_external_memory(formatted_answer)
ContextualMemory 的实现:
class ContextualMemory:
def __init__(self, short_term, long_term, entity, external, agent, task):
self.short_term = short_term
self.long_term = long_term
self.entity = entity
self.external = external
self.agent = agent
self.task = task
def build_context_for_task(self, task: Task, context: str) -> str:
"""构建任务上下文"""
query = f"{task.description} {context}"
# 检索短期记忆
short_term_results = self.short_term.search(query, limit=5) if self.short_term else []
# 检索长期记忆
long_term_results = self.long_term.search(query, limit=3) if self.long_term else []
# 检索实体记忆
entity_results = self.entity.search(query, limit=3) if self.entity else []
# 检索外部记忆
external_results = self.external.search(query, limit=3) if self.external else []
# 合并并格式化
all_results = short_term_results + long_term_results + entity_results + external_results
memory_text = "\n\n".join([result.content for result in all_results])
return memory_text
记忆类型说明
-
短期记忆(Short-Term Memory):
- 存储:ChromaDB 向量数据库
- 内容:当前会话的任务描述和输出
- 生命周期:单次 Crew 执行
- 检索:基于语义相似度
-
长期记忆(Long-Term Memory):
- 存储:SQLite 数据库
- 内容:任务执行的总结和经验
- 生命周期:跨 Crew 执行持久化
- 检索:基于任务描述和元数据
-
实体记忆(Entity Memory):
- 存储:ChromaDB 向量数据库
- 内容:提取的关键实体(人名、地名、产品名等)
- 生命周期:持久化
- 检索:基于实体名称和关系
-
外部记忆(External Memory - Mem0):
- 存储:Mem0 云服务
- 内容:用户偏好、历史交互、个性化数据
- 生命周期:全局持久化
- 检索:基于 Mem0 的智能检索
功能 3:知识检索(Knowledge Retrieval)
功能描述
Agent 从知识库中检索相关文档片段,增强任务执行的准确性。
核心代码
def execute_task(self, task: Task, context: str | None = None, tools: list[BaseTool] | None = None) -> Any:
# (省略其他代码)
# 知识检索
knowledge_config = self.knowledge_config.model_dump() if self.knowledge_config else {}
if self.knowledge or (self.crew and self.crew.knowledge):
crewai_event_bus.emit(
self,
event=KnowledgeRetrievalStartedEvent(
from_task=task,
from_agent=self,
),
)
try:
# 生成检索查询
self.knowledge_search_query = self._get_knowledge_search_query(task_prompt, task)
if self.knowledge_search_query:
# 检索 Agent 级知识
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query(
[self.knowledge_search_query], **knowledge_config
)
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
task_prompt += self.agent_knowledge_context
# 检索 Crew 级知识
knowledge_snippets = self.crew.query_knowledge(
[self.knowledge_search_query], **knowledge_config
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
)
task_prompt += self.crew_knowledge_context
crewai_event_bus.emit(
self,
event=KnowledgeRetrievalCompletedEvent(
query=self.knowledge_search_query,
from_task=task,
from_agent=self,
retrieved_knowledge=(
(self.agent_knowledge_context or "")
+ (self.crew_knowledge_context or "")
),
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=KnowledgeSearchQueryFailedEvent(
query=self.knowledge_search_query or "",
error=str(e),
from_task=task,
from_agent=self,
),
)
知识查询生成:
def _get_knowledge_search_query(self, task_prompt: str, task: Task) -> str | None:
"""生成知识检索查询"""
query = self.i18n.slice("knowledge_search_query").format(
task_prompt=task_prompt
)
rewriter_prompt = self.i18n.slice("knowledge_search_query_system_prompt")
try:
# 使用 LLM 重写查询
rewritten_query = self.llm.call(
[
{"role": "system", "content": rewriter_prompt},
{"role": "user", "content": query},
]
)
return rewritten_query
except Exception as e:
return None
知识源类型
- 文本文件:TXT、MD、PDF
- 结构化文档:JSON、YAML、CSV
- 网页内容:HTML、URL
- 代码仓库:GitHub、GitLab
- API 响应:动态获取的数据
实战示例与最佳实践
示例 1:创建基础 Agent
from crewai import Agent
from crewai_tools import SerperDevTool
# 创建 Agent
researcher = Agent(
role="Research Analyst",
goal="Find the latest trends in AI",
backstory="You are an expert researcher with 10 years of experience in AI.",
tools=[SerperDevTool()],
verbose=True,
max_iter=15,
max_execution_time=300, # 5 分钟超时
)
# 直接使用 Agent(无需 Task)
result = researcher.kickoff("What are the top 5 AI trends in 2024?")
print(result.output)
示例 2:启用记忆和知识
from crewai import Agent, Crew
from crewai.knowledge import Knowledge
from crewai.knowledge.source import TextFileKnowledgeSource
# 配置知识源
knowledge = Knowledge(
sources=[
TextFileKnowledgeSource(file_path="./docs/ai_trends.txt"),
],
collection_name="ai_trends",
)
# 创建 Agent(带知识)
analyst = Agent(
role="AI Analyst",
goal="Analyze AI trends",
backstory="Expert analyst",
knowledge_sources=[knowledge],
verbose=True,
)
# 创建 Crew(启用记忆)
crew = Crew(
agents=[analyst],
tasks=[...],
memory=True, # 启用记忆系统
)
crew.kickoff()
示例 3:自定义工具集成
from crewai import Agent
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="数学表达式")
class CalculatorTool(BaseTool):
name: str = "Calculator"
description: str = "执行数学计算"
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
# 创建 Agent(带自定义工具)
math_agent = Agent(
role="Math Expert",
goal="Solve math problems",
backstory="You are a mathematics professor",
tools=[CalculatorTool()],
verbose=True,
)
示例 4:分层委托
from crewai import Agent, Task, Crew, Process
# 定义 Worker Agents
researcher = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
)
writer = Agent(
role="Writer",
goal="Write articles",
backstory="Professional writer",
)
# 定义任务(无需指定 agent)
task1 = Task(
description="Research AI trends",
expected_output="Research report",
)
task2 = Task(
description="Write an article",
expected_output="Article",
)
# 创建 Hierarchical Crew
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
process=Process.hierarchical,
manager_llm="gpt-4", # Manager 使用 GPT-4
)
result = crew.kickoff()
最佳实践总结
-
合理配置超时:
- 简单任务:60-120 秒
- 复杂任务:300-600 秒
- 长时间任务:考虑拆分
-
优化记忆和知识:
- 避免同时检索大量记忆和知识
- 使用
score_threshold过滤低相关性结果 - 定期清理过期记忆
-
工具设计原则:
- 工具描述要清晰详细
- 参数 schema 要完整
- 实现幂等性(如果可能)
- 处理错误并返回友好提示
-
性能优化:
- 启用
cache=True - 使用
respect_context_window=True - 配置
max_rpm避免速率限制 - 使用
async_execution=True并行任务
- 启用
-
错误处理:
- 捕获工具异常并优雅处理
- 使用
guardrail验证输出 - 配置合理的重试次数
- 记录详细日志(
verbose=True)
总结
Agent 模块是 CrewAI 框架的核心,通过以下机制实现强大的任务执行能力:
- 灵活的角色定义:role、goal、backstory 构建清晰的身份
- 强大的执行引擎:推理循环、工具调用、超时控制
- 丰富的上下文管理:记忆检索、知识查询、任务上下文
- 多样的工具集成:内置、自定义、MCP、平台工具
- 完善的可观测性:事件总线、Token 统计、日志记录
该模块的设计遵循 单一职责原则 和 开放封闭原则,允许用户通过继承 BaseAgent 或扩展工具系统来定制行为,同时保持核心功能的稳定性。