CrewAI-02-Task-完整剖析
模块概览
职责与定位
Task(任务)是 CrewAI 框架中定义工作单元的核心抽象,负责:
- 任务定义:通过 description 和 expected_output 明确任务目标和预期结果
- 执行管理:协调 Agent 执行任务并处理同步/异步执行模式
- 上下文传递:从前序任务获取输出作为当前任务的上下文
- 输出验证:通过 Guardrail 机制验证任务输出的质量
- 结果转换:将原始输出转换为结构化格式(JSON/Pydantic)
- 文件持久化:可选地将任务输出保存到文件
- 条件执行:支持基于前序任务输出的条件执行(ConditionalTask)
输入与输出
输入:
description:任务描述,支持模板变量插值(如{topic})expected_output:预期输出的明确定义agent:执行任务的 Agent 实例context:来自前序任务的输出列表(或 NOT_SPECIFIED 表示自动获取)tools:任务特定的工具列表(可选,覆盖 Agent 的工具)
输出:
TaskOutput对象:包含原始输出、结构化输出、执行元数据
上下游依赖
上游依赖(被调用):
Crew编排器:通过Task.execute_sync()或Task.execute_async()执行任务Flow引擎:在流程中调用任务
下游依赖(调用):
Agent:调用Agent.execute_task()实际执行任务Converter:将原始输出转换为结构化格式Guardrail:验证任务输出Events:发送任务开始、完成、失败事件
生命周期
stateDiagram-v2
[*] --> 创建: Task(...) 或 @task 装饰器
创建 --> 配置: 设置 description/expected_output
配置 --> 插值: interpolate_inputs(inputs)
插值 --> 就绪: 等待执行
就绪 --> 执行中: execute_sync() 或 execute_async()
执行中 --> Agent执行: agent.execute_task()
Agent执行 --> 输出转换: _export_output()
输出转换 --> Guardrail验证: 如果配置了 guardrail
Guardrail验证 --> Guardrail验证: 验证失败,重新执行
Guardrail验证 --> 输出保存: 验证通过
输出保存 --> 文件持久化: 如果配置了 output_file
文件持久化 --> 完成: TaskOutput
完成 --> [*]
整体服务架构图
1. 全局模块关系图
flowchart TB
subgraph "上游编排层"
Crew[Crew 编排器<br/>任务序列管理]
Flow[Flow 引擎<br/>流程编排]
end
subgraph "Task 核心层"
Task[Task 类<br/>任务定义和配置]
ConditionalTask[ConditionalTask 类<br/>条件执行任务]
TaskAPI{Task API}
ExecuteSync[execute_sync<br/>同步执行]
ExecuteAsync[execute_async<br/>异步执行]
Prompt[prompt<br/>提示词生成]
Interpolate[interpolate_inputs<br/>输入插值]
end
subgraph "执行引擎层"
Agent[Agent 代理<br/>任务执行主体]
AgentExecutor[AgentExecutor<br/>执行器]
LLM[LLM 语言模型<br/>推理引擎]
Tools[Tools 工具<br/>外部能力]
end
subgraph "输出处理层"
TaskOutput[TaskOutput<br/>输出封装]
Converter[Converter<br/>结构化转换]
OutputFormat[OutputFormat<br/>格式枚举]
FileHandler[FileHandler<br/>文件持久化]
end
subgraph "验证层"
Guardrail[Guardrail<br/>输出验证]
GuardrailResult[GuardrailResult<br/>验证结果]
end
subgraph "基础设施层"
Events[EventBus<br/>事件总线]
Memory[Memory<br/>记忆系统]
RPM[RPMController<br/>速率限制]
Security[SecurityConfig<br/>安全配置]
end
Crew -->|kickoff| Task
Flow -->|调用| Task
Task --> TaskAPI
TaskAPI --> ExecuteSync
TaskAPI --> ExecuteAsync
TaskAPI --> Prompt
TaskAPI --> Interpolate
ExecuteSync --> Agent
ExecuteAsync --> Agent
Agent --> AgentExecutor
AgentExecutor --> LLM
AgentExecutor --> Tools
Agent -->|返回结果| Task
Task --> Converter
Converter --> TaskOutput
TaskOutput --> OutputFormat
Task --> Guardrail
Guardrail -->|验证失败| Agent
Guardrail --> GuardrailResult
TaskOutput --> FileHandler
Task --> Events
Agent --> Events
Agent --> Memory
Agent --> RPM
Task --> Security
ConditionalTask -.继承.-> Task
2. Task 内部模块架构图
flowchart TB
subgraph "Task 核心层"
Task[Task 类<br/>任务定义和配置]
ConditionalTask[ConditionalTask 类<br/>条件执行任务]
end
subgraph "输出处理层"
TaskOutput[TaskOutput 类<br/>任务输出封装]
Converter[Converter 类<br/>结构化输出转换]
OutputFormat[OutputFormat 枚举<br/>RAW/JSON/PYDANTIC]
end
subgraph "验证层"
Guardrail[Guardrail 机制<br/>输出验证]
LLMGuardrail[LLMGuardrail 类<br/>基于 LLM 的验证]
HallucinationGuardrail[HallucinationGuardrail<br/>幻觉检测]
end
subgraph "执行层"
ExecuteSync[execute_sync()<br/>同步执行]
ExecuteAsync[execute_async()<br/>异步执行]
Agent[Agent.execute_task()]
end
subgraph "上下文层"
Context[Context 管理<br/>前序任务输出]
Interpolation[输入插值<br/>模板变量替换]
end
subgraph "基础设施层"
Events[事件系统<br/>TaskStarted/Completed/Failed]
Security[SecurityConfig<br/>指纹验证]
FileHandler[文件处理<br/>输出持久化]
end
Task --> ExecuteSync
Task --> ExecuteAsync
ConditionalTask -.继承.-> Task
ExecuteSync --> Agent
ExecuteAsync --> Agent
Agent --> TaskOutput
TaskOutput --> Converter
TaskOutput --> OutputFormat
Task --> Guardrail
Guardrail --> LLMGuardrail
Guardrail --> HallucinationGuardrail
Task --> Context
Task --> Interpolation
Task --> Events
Task --> Security
Task --> FileHandler
架构要点说明
1) Task 与 ConditionalTask 的关系
Task:标准任务,始终执行ConditionalTask:条件任务,根据前序任务输出决定是否执行- 设计目的:实现动态工作流(如"仅当前一任务成功时才执行")
2) 输出格式的三种模式
- RAW:原始字符串输出(默认)
- JSON:结构化 JSON 字典(通过
output_json定义 schema) - PYDANTIC:Pydantic 模型实例(通过
output_pydantic定义模型)
3) Guardrail 的多层验证
- 函数式 Guardrail:用户提供的验证函数
- LLM Guardrail:使用 LLM 判断输出是否符合要求
- 幻觉检测 Guardrail:检测 LLM 输出中的幻觉内容
4) 同步与异步执行的区别
- 同步执行:阻塞当前线程直到任务完成
- 异步执行:在后台线程执行,返回
Future对象 - 适用场景:异步用于 I/O 密集型或长时间任务
5) 上下文传递机制
- NOT_SPECIFIED(默认):自动使用所有前序任务的输出
- 显式 context:手动指定哪些任务的输出作为上下文
- context=[]:不使用任何上下文
6) 全局模块关系总结
数据流向:
用户输入 → Crew.kickoff → Task.execute_sync → Agent.execute_task → LLM.call → 原始输出
→ Converter.convert → TaskOutput → Guardrail.validate → FileHandler.save → CrewOutput
事件流向:
TaskStartedEvent → AgentExecutionStartedEvent → LLMGuardrailStartedEvent
→ LLMGuardrailCompletedEvent → AgentExecutionCompletedEvent → TaskCompletedEvent
控制流向:
- Crew 负责任务编排和顺序控制
- Task 负责任务执行和输出处理
- Agent 负责实际的推理和工具调用
- Guardrail 负责输出质量控制
完整调用链路分析
1. 从 Crew.kickoff() 到 Task 执行的完整路径
flowchart TD
Start([用户调用 Crew.kickoff]) --> KickoffInput[传入 inputs 参数]
KickoffInput --> InterpolateInputs[Crew: 对所有任务插值<br/>interpolate_inputs_and_add_conversation_history]
InterpolateInputs --> ExecuteTasks[Crew: _execute_tasks<br/>遍历任务列表]
ExecuteTasks --> LoopStart{遍历每个任务}
LoopStart --> GetAgent[Crew: _get_agent_to_use<br/>确定执行该任务的 Agent]
GetAgent --> PrepareTools[Crew: _prepare_tools<br/>准备工具列表]
PrepareTools --> GetContext[Crew: _get_context<br/>获取前序任务输出作为上下文]
GetContext --> CheckConditional{是否是<br/>ConditionalTask?}
CheckConditional -->|是| CheckCondition[Crew: _handle_conditional_task<br/>检查执行条件]
CheckCondition --> ShouldExecute{条件满足?}
ShouldExecute -->|否| SkipTask[跳过任务<br/>返回空输出]
ShouldExecute -->|是| CheckAsync
CheckConditional -->|否| CheckAsync{async_execution?}
CheckAsync -->|是| ExecuteAsync[Task: execute_async<br/>异步执行]
ExecuteAsync --> AddToFutures[将 Future 加入队列]
AddToFutures --> LoopStart
CheckAsync -->|否| WaitForAsyncTasks{有待处理的<br/>异步任务?}
WaitForAsyncTasks -->|是| ProcessAsyncTasks[Crew: _process_async_tasks<br/>等待所有异步任务完成]
ProcessAsyncTasks --> ExecuteSync
WaitForAsyncTasks -->|否| ExecuteSync[Task: execute_sync<br/>同步执行]
ExecuteSync --> TaskCore[Task: _execute_core<br/>核心执行逻辑]
TaskCore --> AgentExecute[Agent: execute_task<br/>Agent 执行任务]
AgentExecute --> ExportOutput[Task: _export_output<br/>转换输出格式]
ExportOutput --> CreateTaskOutput[创建 TaskOutput 对象]
CreateTaskOutput --> HasGuardrail{配置了<br/>Guardrail?}
HasGuardrail -->|是| InvokeGuardrail[Task: _invoke_guardrail_function<br/>执行验证]
InvokeGuardrail --> GuardrailPass{验证通过?}
GuardrailPass -->|否| RetryTask[重新执行任务<br/>最多重试 N 次]
RetryTask --> AgentExecute
GuardrailPass -->|是| SaveOutput
HasGuardrail -->|否| SaveOutput
SaveOutput[保存任务输出] --> HasOutputFile{配置了<br/>output_file?}
HasOutputFile -->|是| SaveFile[Task: _save_file<br/>持久化到文件]
HasOutputFile -->|否| ExecuteCallback
SaveFile --> ExecuteCallback[执行回调函数]
ExecuteCallback --> EmitCompleted[发送 TaskCompletedEvent]
EmitCompleted --> AddToOutputs[将输出加入 task_outputs 列表]
AddToOutputs --> LoopStart
SkipTask --> AddToOutputs
LoopStart -->|所有任务完成| FinalAsyncCheck{还有未完成的<br/>异步任务?}
FinalAsyncCheck -->|是| FinalProcessAsync[Crew: _process_async_tasks<br/>等待最后的异步任务]
FinalProcessAsync --> CreateCrewOutput
FinalAsyncCheck -->|否| CreateCrewOutput[Crew: _create_crew_output<br/>创建 CrewOutput]
CreateCrewOutput --> End([返回 CrewOutput])
2. Task 内部执行的详细流程
flowchart TD
Start([Task.execute_sync]) --> CheckAgent{Agent<br/>已分配?}
CheckAgent -->|否| ThrowError[抛出异常:<br/>No agent assigned]
CheckAgent -->|是| RecordStartTime[记录 start_time]
RecordStartTime --> SetContext[设置 prompt_context]
SetContext --> AddToProcessed[将 Agent 角色加入<br/>processed_by_agents]
AddToProcessed --> EmitStarted[发送 TaskStartedEvent]
EmitStarted --> CallAgent[调用 Agent.execute_task<br/>传入 task/context/tools]
CallAgent --> AgentInternalFlow[Agent 内部执行流程<br/>见下一节]
AgentInternalFlow --> GetResult[获取 Agent 返回的<br/>字符串结果]
GetResult --> ExportOutput[Task: _export_output<br/>转换输出格式]
ExportOutput --> HasPydantic{配置了<br/>output_pydantic?}
HasPydantic -->|是| ConvertToPydantic[Converter: to_pydantic<br/>转换为 Pydantic 模型]
ConvertToPydantic --> CreateOutput
HasPydantic -->|否| HasJSON{配置了<br/>output_json?}
HasJSON -->|是| ConvertToJSON[Converter: to_json<br/>转换为 JSON 字典]
ConvertToJSON --> CreateOutput
HasJSON -->|否| CreateOutput[创建 TaskOutput 对象]
CreateOutput --> HasGuardrails{配置了<br/>guardrails 列表?}
HasGuardrails -->|是| LoopGuardrails[遍历每个 Guardrail]
LoopGuardrails --> InvokeGuardrail[Task: _invoke_guardrail_function<br/>执行单个 Guardrail]
InvokeGuardrail --> ProcessGuardrail[process_guardrail<br/>调用验证函数]
ProcessGuardrail --> EmitGuardrailStarted[发送 LLMGuardrailStartedEvent]
EmitGuardrailStarted --> CallGuardrailFunc[调用用户定义的<br/>guardrail task_output]
CallGuardrailFunc --> EmitGuardrailCompleted[发送 LLMGuardrailCompletedEvent]
EmitGuardrailCompleted --> GuardrailSuccess{验证成功?}
GuardrailSuccess -->|是| UpdateOutput[更新 task_output]
UpdateOutput --> NextGuardrail{还有其他<br/>Guardrail?}
NextGuardrail -->|是| LoopGuardrails
NextGuardrail -->|否| CheckSingleGuardrail
GuardrailSuccess -->|否| CheckRetry{重试次数<br/>< max_retries?}
CheckRetry -->|是| IncrementRetry[增加 retry_count]
IncrementRetry --> PrintRetry[打印重试信息]
PrintRetry --> BuildErrorContext[构建错误上下文]
BuildErrorContext --> CallAgentAgain[重新调用 Agent.execute_task]
CallAgentAgain --> AgentInternalFlow
CheckRetry -->|否| ThrowGuardrailError[抛出异常:<br/>Guardrail validation failed]
HasGuardrails -->|否| CheckSingleGuardrail{配置了<br/>单个 guardrail?}
CheckSingleGuardrail -->|是| InvokeGuardrail
CheckSingleGuardrail -->|否| SaveToSelf[self.output = task_output]
SaveToSelf --> RecordEndTime[记录 end_time]
RecordEndTime --> ExecuteCallback{配置了<br/>callback?}
ExecuteCallback -->|是| CallCallback[执行 self.callback]
CallCallback --> CheckCrewCallback
ExecuteCallback -->|否| CheckCrewCallback{Crew 配置了<br/>task_callback?}
CheckCrewCallback -->|是| CallCrewCallback[执行 crew.task_callback]
CheckCrewCallback -->|否| CheckOutputFile
CallCrewCallback --> CheckOutputFile{配置了<br/>output_file?}
CheckOutputFile -->|是| PrepareContent[准备文件内容:<br/>JSON/Pydantic/Raw]
PrepareContent --> SaveFile[Task: _save_file<br/>保存到文件系统]
CheckOutputFile -->|否| EmitCompleted
SaveFile --> EmitCompleted[发送 TaskCompletedEvent]
EmitCompleted --> ReturnOutput[返回 TaskOutput]
ReturnOutput --> End([结束])
ThrowError --> End
ThrowGuardrailError --> EmitFailed[发送 TaskFailedEvent]
EmitFailed --> End
3. Agent 执行任务的内部流程
flowchart TD
Start([Agent.execute_task]) --> HasReasoning{启用了<br/>Reasoning?}
HasReasoning -->|是| CreateReasoningHandler[创建 AgentReasoning<br/>推理处理器]
CreateReasoningHandler --> GeneratePlan[生成推理计划<br/>handle_agent_reasoning]
GeneratePlan --> AddPlanToTask[将计划添加到<br/>task.description]
AddPlanToTask --> InjectDate
HasReasoning -->|否| InjectDate[注入当前日期到任务]
InjectDate --> ResetToolsHandler[重置 tools_handler.last_used_tool]
ResetToolsHandler --> GetTaskPrompt[获取任务提示词<br/>task.prompt]
GetTaskPrompt --> HasOutputFormat{需要结构化<br/>输出?}
HasOutputFormat -->|是| AddFormatInstructions[添加输出格式指令<br/>JSON/Pydantic schema]
AddFormatInstructions --> PrepareExecution
HasOutputFormat -->|否| PrepareExecution[准备执行环境]
PrepareExecution --> CreateMessages[构建消息列表:<br/>system + user + history]
CreateMessages --> SetupCallbacks[设置 TokenCalcHandler<br/>回调]
SetupCallbacks --> EmitExecutionStarted[发送 AgentExecutionStartedEvent]
EmitExecutionStarted --> StartIterationLoop[开始推理循环<br/>iteration = 0]
StartIterationLoop --> CheckMaxIter{iteration >=<br/>max_iter?}
CheckMaxIter -->|是| HandleMaxIterations[处理最大迭代次数<br/>生成最终答案]
HandleMaxIterations --> FormatFinalAnswer
CheckMaxIter -->|否| CheckRPM[RPMController:<br/>检查速率限制]
CheckRPM --> WaitIfNeeded{需要等待?}
WaitIfNeeded -->|是| Sleep[等待一段时间]
Sleep --> CallLLM
WaitIfNeeded -->|否| CallLLM[调用 LLM.call<br/>发送消息]
CallLLM --> GetLLMResponse[get_llm_response<br/>获取 LLM 响应]
GetLLMResponse --> ParseResponse[解析响应:<br/>AgentAction 或 AgentFinish?]
ParseResponse --> IsAction{是 AgentAction?}
IsAction -->|是| ExecuteTool[execute_tool_and_check_finality<br/>执行工具调用]
ExecuteTool --> ToolSuccess{工具执行<br/>成功?}
ToolSuccess -->|是| AddToolResult[将工具结果添加到<br/>messages]
AddToolResult --> IsToolFinal{工具返回<br/>final_answer?}
IsToolFinal -->|是| FormatFinalAnswer
IsToolFinal -->|否| IncrementIter[iteration++]
IncrementIter --> StartIterationLoop
ToolSuccess -->|否| HandleToolError[处理工具错误]
HandleToolError --> AddErrorMessage[将错误信息添加到<br/>messages]
AddErrorMessage --> IncrementIter
IsAction -->|否| IsFinish{是 AgentFinish?}
IsFinish -->|是| FormatFinalAnswer[格式化最终答案]
IsFinish -->|否| HandleParseError[处理解析错误<br/>OutputParserError]
HandleParseError --> AddParseErrorMsg[添加解析错误提示]
AddParseErrorMsg --> IncrementIter
FormatFinalAnswer --> AddToMemory{启用了<br/>Memory?}
AddToMemory -->|是| SaveToMemory[保存到记忆系统:<br/>短期/长期/实体]
SaveToMemory --> EmitExecutionCompleted
AddToMemory -->|否| EmitExecutionCompleted[发送 AgentExecutionCompletedEvent]
EmitExecutionCompleted --> ReturnResult[返回字符串结果]
ReturnResult --> End([结束])
调用链路详细说明
路径 1:Crew.kickoff → Task.execute_sync → Agent.execute_task → LLM.call
第一阶段:Crew.kickoff() 初始化
# 文件:lib/crewai/src/crewai/crew.py
def kickoff(self, inputs: dict = None) -> CrewOutput:
# 1. 插值所有任务的输入变量
for task in self.tasks:
task.interpolate_inputs_and_add_conversation_history(inputs)
# 2. 执行任务序列
return self._execute_tasks(self.tasks)
关键点:
- 在任务执行前,先对所有任务进行输入插值,替换模板变量
- 支持在 description、expected_output、output_file 中使用
{variable}占位符 - 可选地注入会话历史(
crew_chat_messages)
第二阶段:Crew._execute_tasks() 任务编排
# 文件:lib/crewai/src/crewai/crew.py
def _execute_tasks(self, tasks: list[Task], start_index: int = 0) -> CrewOutput:
task_outputs: list[TaskOutput] = []
futures: list[(Task, Future, int)] = []
for task_index, task in enumerate(tasks):
# 1. 确定执行该任务的 Agent
agent_to_use = self._get_agent_to_use(task)
# 2. 准备工具列表
tools_for_task = self._prepare_tools(agent_to_use, task, task.tools or [])
# 3. 获取上下文(前序任务的输出)
context = self._get_context(task, task_outputs)
# 4. 处理条件任务
if isinstance(task, ConditionalTask):
skipped_output = self._handle_conditional_task(task, task_outputs, futures, task_index)
if skipped_output:
task_outputs.append(skipped_output)
continue
# 5. 异步执行
if task.async_execution:
future = task.execute_async(agent_to_use, context, tools_for_task)
futures.append((task, future, task_index))
# 6. 同步执行
else:
# 先等待所有异步任务完成
if futures:
task_outputs = self._process_async_tasks(futures)
futures.clear()
# 执行同步任务
task_output = task.execute_sync(agent_to_use, context, tools_for_task)
task_outputs.append(task_output)
# 7. 等待剩余的异步任务
if futures:
task_outputs = self._process_async_tasks(futures)
return self._create_crew_output(task_outputs)
关键点:
_get_agent_to_use():如果任务没有指定 Agent,从 Crew 的 agents 列表中选择_prepare_tools():合并任务特定工具和 Agent 的默认工具_get_context():拼接前序任务的输出为字符串上下文- 异步任务立即返回 Future,同步任务阻塞等待完成
- 同步任务执行前,先等待所有前序异步任务完成
第三阶段:Task.execute_sync() 同步执行
# 文件:lib/crewai/src/crewai/task.py
def execute_sync(self, agent: BaseAgent, context: str, tools: list[BaseTool]) -> TaskOutput:
return self._execute_core(agent, context, tools)
def _execute_core(self, agent: BaseAgent, context: str, tools: list[Any]) -> TaskOutput:
# 1. 验证 Agent
if not agent:
raise Exception(f"The task '{self.description}' has no agent assigned")
# 2. 记录开始时间
self.start_time = datetime.datetime.now()
# 3. 发送任务开始事件
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
# 4. 调用 Agent 执行任务
result = agent.execute_task(task=self, context=context, tools=tools)
# 5. 转换输出格式
pydantic_output, json_output = self._export_output(result)
# 6. 创建 TaskOutput
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
# 7. 执行 Guardrail 验证
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
task_output = self._invoke_guardrail_function(
task_output, agent, tools, guardrail, idx
)
# 8. 保存输出
self.output = task_output
self.end_time = datetime.datetime.now()
# 9. 执行回调
if self.callback:
self.callback(self.output)
# 10. 保存到文件
if self.output_file:
self._save_file(content)
# 11. 发送任务完成事件
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output, task=self))
return task_output
关键点:
agent.execute_task()是核心执行步骤,返回字符串结果_export_output()负责将字符串转换为结构化输出- Guardrail 验证失败会重新执行任务,最多重试
guardrail_max_retries次 - 任务输出可选地持久化到文件系统
- 事件系统记录任务的生命周期
第四阶段:Agent.execute_task() 推理执行
# 文件:lib/crewai/src/crewai/agent.py
def execute_task(self, task: Task, context: str, tools: list[BaseTool]) -> str:
# 1. 处理推理计划(如果启用)
if self.reasoning:
reasoning_handler = AgentReasoning(task=task, agent=self)
reasoning_output = reasoning_handler.handle_agent_reasoning()
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
# 2. 注入当前日期
self._inject_date_to_task(task)
# 3. 生成任务提示词
task_prompt = task.prompt()
# 4. 添加上下文
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)
# 5. 添加输出格式指令
if task.output_pydantic or task.output_json:
model = task.output_pydantic or task.output_json
task_prompt += f"\n\nOutput schema:\n{model.model_json_schema()}"
# 6. 进入推理循环
return self._run_agent_executor(task_prompt, tools)
def _run_agent_executor(self, task_prompt: str, tools: list[BaseTool]) -> str:
messages = self._build_messages(task_prompt)
callbacks = [TokenCalcHandler()]
iteration = 0
while iteration < self.max_iter:
# 检查速率限制
self._enforce_rpm_limit()
# 调用 LLM
answer = self.llm.call(messages, callbacks=callbacks)
# 解析响应
formatted_answer = self._parse_llm_response(answer)
# 处理 AgentAction(工具调用)
if isinstance(formatted_answer, AgentAction):
tool_result = self._execute_tool(formatted_answer, tools)
messages.append({"role": "assistant", "content": answer})
messages.append({"role": "user", "content": tool_result})
# 检查工具是否返回最终答案
if self._is_final_answer(tool_result):
return tool_result
iteration += 1
continue
# 处理 AgentFinish(任务完成)
if isinstance(formatted_answer, AgentFinish):
return formatted_answer.output
iteration += 1
# 达到最大迭代次数
return self._handle_max_iterations_exceeded(messages)
关键点:
- Reasoning 模式下,先生成推理计划再执行任务
- 推理循环采用 ReAct 模式:Reason(推理) → Act(行动) → Observe(观察)
- 每次迭代调用 LLM,可能返回工具调用(AgentAction)或最终答案(AgentFinish)
- 工具调用结果添加到消息历史,供下一次迭代使用
- RPMController 控制 LLM 调用速率,避免超过限额
第五阶段:LLM.call() 语言模型推理
# 文件:lib/crewai/src/crewai/llm.py
def call(self, messages: list[dict], callbacks: list = None) -> str:
# 1. 准备调用参数
params = {
"model": self.model,
"messages": messages,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
}
# 2. 添加工具定义(如果支持 function calling)
if self.supports_function_calling():
params["tools"] = self._format_tools()
# 3. 调用 LLM API
response = self.client.chat.completions.create(**params)
# 4. 解析响应
if response.choices[0].message.tool_calls:
# 返回工具调用
tool_call = response.choices[0].message.tool_calls[0]
return f"Action: {tool_call.function.name}\nAction Input: {tool_call.function.arguments}"
else:
# 返回文本回复
return response.choices[0].message.content
关键点:
- 支持多种 LLM 提供商(OpenAI、Anthropic、Azure、Ollama 等)
- Function Calling 模式下,LLM 可以直接返回结构化的工具调用
- 响应格式统一为字符串,便于后续解析
路径 2:异步任务的处理流程
异步任务执行:
# 文件:lib/crewai/src/crewai/task.py
def execute_async(self, agent: BaseAgent, context: str, tools: list[BaseTool]) -> Future[TaskOutput]:
future: Future[TaskOutput] = Future()
# 在新线程中执行
threading.Thread(
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
).start()
return future
def _execute_task_async(self, agent, context, tools, future):
# 调用核心执行逻辑
result = self._execute_core(agent, context, tools)
# 设置 Future 结果
future.set_result(result)
Crew 处理异步任务:
def _process_async_tasks(self, futures: list[(Task, Future, int)]) -> list[TaskOutput]:
task_outputs = []
for task, future, task_index in futures:
# 阻塞等待 Future 完成
task_output = future.result()
task_outputs.append(task_output)
# 处理任务结果
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index)
return task_outputs
关键点:
- 异步任务在独立线程中执行,不阻塞主线程
- Crew 在遇到同步任务或任务列表结束时,等待所有异步任务完成
- 异步任务不能依赖其他异步任务的输出(必须用同步任务分隔)
future.result()会阻塞直到任务完成
路径 3:Guardrail 验证与重试流程
Guardrail 验证:
# 文件:lib/crewai/src/crewai/task.py
def _invoke_guardrail_function(
self,
task_output: TaskOutput,
agent: BaseAgent,
tools: list[BaseTool],
guardrail: GuardrailCallable,
guardrail_index: int = None,
) -> TaskOutput:
current_retry_count = self._guardrail_retry_counts.get(guardrail_index, 0)
max_attempts = self.guardrail_max_retries + 1
for attempt in range(max_attempts):
# 执行验证
guardrail_result = process_guardrail(
output=task_output,
guardrail=guardrail,
retry_count=current_retry_count,
event_source=self,
from_task=self,
from_agent=agent,
)
# 验证通过
if guardrail_result.success:
# 更新输出
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(guardrail_result.result)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
return task_output
# 验证失败
if attempt >= self.guardrail_max_retries:
raise Exception(
f"Task failed guardrail validation after {self.guardrail_max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
# 更新重试次数
current_retry_count += 1
self._guardrail_retry_counts[guardrail_index] = current_retry_count
# 构建错误上下文
context = self.i18n.errors("validation_error").format(
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
)
# 打印重试信息
printer.print(
f"Guardrail blocked (attempt {attempt + 1}/{max_attempts}), "
f"retrying due to: {guardrail_result.error}\n",
color="yellow",
)
# 重新执行任务
result = agent.execute_task(task=self, context=context, tools=tools)
# 重新转换输出
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
return task_output
Guardrail 处理函数:
# 文件:lib/crewai/src/crewai/utilities/guardrail.py
def process_guardrail(
output: TaskOutput,
guardrail: GuardrailCallable,
retry_count: int,
event_source: Any,
from_agent: BaseAgent,
from_task: Task,
) -> GuardrailResult:
# 发送开始事件
crewai_event_bus.emit(
event_source,
LLMGuardrailStartedEvent(
guardrail=guardrail,
retry_count=retry_count,
from_agent=from_agent,
from_task=from_task,
),
)
# 调用验证函数
result = guardrail(output) # 返回 (bool, Any)
guardrail_result = GuardrailResult.from_tuple(result)
# 发送完成事件
crewai_event_bus.emit(
event_source,
LLMGuardrailCompletedEvent(
success=guardrail_result.success,
result=guardrail_result.result,
error=guardrail_result.error,
retry_count=retry_count,
from_agent=from_agent,
from_task=from_task,
),
)
return guardrail_result
关键点:
- Guardrail 函数签名:
(TaskOutput) -> (bool, Any) - 验证失败时,将错误信息作为上下文传递给 Agent,让 Agent 知道如何改进
- 每个 Guardrail 有独立的重试计数器
- 超过最大重试次数后抛出异常,任务执行失败
- 验证成功后,可以修改输出内容(返回新的字符串或 TaskOutput)
路径 4:输出转换流程
输出转换入口:
# 文件:lib/crewai/src/crewai/task.py
def _export_output(self, result: str) -> tuple[BaseModel | None, dict | None]:
pydantic_output = None
json_output = None
if self.output_pydantic or self.output_json:
model_output = convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
return pydantic_output, json_output
转换实现:
# 文件:lib/crewai/src/crewai/utilities/converter.py
def convert_to_model(
result: str,
output_pydantic: type[BaseModel],
output_json: type[BaseModel],
agent: Agent,
converter_cls: type[Converter] = None,
) -> BaseModel | dict | str:
model = output_pydantic or output_json
# 尝试直接解析 JSON
try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(escaped_result, model, bool(output_json))
except json.JSONDecodeError:
# JSON 解析失败,使用 Converter
return handle_partial_json(result, model, bool(output_json), agent, converter_cls)
def handle_partial_json(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent,
converter_cls: type[Converter] = None,
) -> BaseModel | dict | str:
# 尝试提取部分 JSON
try:
json_text = extract_json_from_text(result)
return validate_model(json_text, model, is_json_output)
except Exception:
# 提取失败,使用 LLM 转换
return convert_with_instructions(result, model, is_json_output, agent, converter_cls)
def convert_with_instructions(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent,
converter_cls: type[Converter] = None,
) -> BaseModel | dict | str:
llm = agent.function_calling_llm or agent.llm
instructions = get_conversion_instructions(model, llm)
converter = Converter(llm=llm, text=result, model=model, instructions=instructions)
if is_json_output:
return converter.to_json()
else:
return converter.to_pydantic()
class Converter:
def to_pydantic(self, current_attempt: int = 1) -> BaseModel:
if self.llm.supports_function_calling():
# 使用 function calling 直接获取结构化输出
return self._create_instructor().to_pydantic()
else:
# 使用提示词让 LLM 输出 JSON
response = self.llm.call([
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
])
# 验证并解析
return self.model.model_validate_json(response)
关键点:
- 三层转换策略:
- 直接解析 JSON(最快)
- 提取部分 JSON(处理混合输出)
- 使用 LLM 重新格式化(兜底方案)
- Function Calling 模式下,LLM 直接返回结构化输出,无需二次解析
- Pydantic 模型提供强类型验证,确保输出符合 schema
- 转换失败时返回原始字符串,不阻塞任务执行
数据结构 UML 图
Task 核心类图
classDiagram
class Task {
+UUID id
+str name
+str description
+str expected_output
+BaseAgent agent
+list[Task] | _NotSpecified context
+bool async_execution
+type[BaseModel] output_json
+type[BaseModel] output_pydantic
+str output_file
+bool create_directory
+TaskOutput output
+list[BaseTool] tools
+SecurityConfig security_config
+bool human_input
+bool markdown
+type[Converter] converter_cls
+set[str] processed_by_agents
+GuardrailType guardrail
+GuardrailsType guardrails
+int guardrail_max_retries
+int retry_count
+datetime start_time
+datetime end_time
+bool allow_crewai_trigger_context
+execute_sync(agent, context, tools)
+execute_async(agent, context, tools)
+prompt()
+interpolate_inputs_and_add_conversation_history(inputs)
+copy(agents, task_mapping)
-_execute_core(agent, context, tools)
-_export_output(result)
-_save_file(result)
-_invoke_guardrail_function(task_output, agent, tools, guardrail)
}
class ConditionalTask {
+Callable[[TaskOutput], bool] condition
+should_execute(context)
+get_skipped_task_output()
}
class TaskOutput {
+str description
+str name
+str expected_output
+str summary
+str raw
+BaseModel pydantic
+dict json_dict
+str agent
+OutputFormat output_format
+json()
+to_dict()
}
class OutputFormat {
<<enumeration>>
RAW
JSON
PYDANTIC
}
class Converter {
+BaseLLM llm
+str text
+type[BaseModel] model
+str instructions
+convert()
}
Task <|-- ConditionalTask : 继承
Task --> TaskOutput : 生成
TaskOutput --> OutputFormat : 使用
Task --> Converter : 使用
关键字段说明
Task 核心字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| id | UUID | frozen=True | uuid4() | 任务唯一标识符,不可修改 |
| name | str | None | - | None | 任务名称(可选) |
| description | str | required | - | 任务描述,支持模板变量 |
| expected_output | str | required | - | 预期输出的明确定义 |
| agent | BaseAgent | None | - | None | 执行任务的 Agent |
| context | list[Task] | _NotSpecified | - | NOT_SPECIFIED | 前序任务列表(作为上下文) |
| async_execution | bool | - | False | 是否异步执行任务 |
| output_json | type[BaseModel] | None | - | None | JSON 输出的 Pydantic schema |
| output_pydantic | type[BaseModel] | None | - | None | Pydantic 输出模型 |
| output_file | str | None | - | None | 输出文件路径 |
| create_directory | bool | - | True | 是否自动创建输出目录 |
| output | TaskOutput | None | - | None | 任务执行结果 |
| tools | list[BaseTool] | - | [] | 任务特定的工具列表 |
| security_config | SecurityConfig | - | default() | 安全配置(指纹验证) |
| human_input | bool | - | False | 是否需要人工审核最终输出 |
| markdown | bool | - | False | 是否要求输出 Markdown 格式 |
| converter_cls | type[Converter] | None | - | None | 自定义转换器类 |
| processed_by_agents | set[str] | - | set() | 已处理该任务的 Agent 角色集合 |
| guardrail | GuardrailType | None | - | None | 单个 Guardrail 验证函数 |
| guardrails | GuardrailsType | None | - | None | 多个 Guardrail 验证函数列表 |
| guardrail_max_retries | int | - | 3 | Guardrail 验证失败最大重试次数 |
| retry_count | int | - | 0 | 当前重试次数 |
| start_time | datetime | None | - | None | 任务开始时间 |
| end_time | datetime | None | - | None | 任务结束时间 |
| allow_crewai_trigger_context | bool | None | - | None | 是否允许注入 trigger payload |
TaskOutput 字段
| 字段 | 类型 | 约束 | 默认值 | 说明 |
|---|---|---|---|---|
| description | str | required | - | 任务描述 |
| name | str | None | - | None | 任务名称 |
| expected_output | str | None | - | None | 预期输出 |
| summary | str | None | - | auto | 任务摘要(自动生成) |
| raw | str | - | "" | 原始输出字符串 |
| pydantic | BaseModel | None | - | None | Pydantic 模型实例 |
| json_dict | dict[str, Any] | None | - | None | JSON 字典 |
| agent | str | required | - | 执行任务的 Agent 角色 |
| output_format | OutputFormat | - | RAW | 输出格式(RAW/JSON/PYDANTIC) |
API 详细规格
API 1: execute_sync
基本信息
- 方法签名:
Task.execute_sync(agent: BaseAgent | None = None, context: str | None = None, tools: list[BaseTool] | None = None) -> TaskOutput - 调用方式:实例方法
- 幂等性:否(依赖 LLM 生成)
功能说明
同步执行任务,阻塞当前线程直到任务完成。
请求结构体
class ExecuteSyncParams:
agent: Optional[BaseAgent] = None # 执行任务的 Agent(可覆盖默认)
context: Optional[str] = None # 上下文字符串
tools: Optional[list[BaseTool]] = None # 工具列表
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| agent | BaseAgent | None | 否 | None | - | 覆盖任务默认的 Agent |
| context | str | None | 否 | None | - | 前序任务的输出文本 |
| tools | list[BaseTool] | None | 否 | None | - | 覆盖 Agent 默认的工具列表 |
响应结构体
class TaskOutput(BaseModel):
description: str # 任务描述
name: Optional[str] # 任务名称
expected_output: Optional[str] # 预期输出
summary: Optional[str] # 任务摘要
raw: str # 原始输出
pydantic: Optional[BaseModel] # Pydantic 模型输出
json_dict: Optional[dict] # JSON 字典输出
agent: str # 执行的 Agent 角色
output_format: OutputFormat # 输出格式
核心代码
def execute_sync(
self,
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> TaskOutput:
"""同步执行任务"""
return self._execute_core(agent, context, tools)
def _execute_core(
self,
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
) -> TaskOutput:
"""核心执行逻辑"""
try:
# 1. 确定执行 Agent
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned"
)
# 2. 记录开始时间
self.start_time = datetime.datetime.now()
# 3. 设置提示词上下文
self.prompt_context = context
tools = tools or self.tools or []
# 4. 记录处理该任务的 Agent
self.processed_by_agents.add(agent.role)
# 5. 发送任务开始事件
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
# 6. 调用 Agent 执行任务
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
# 7. 导出结构化输出
pydantic_output, json_output = self._export_output(result)
# 8. 创建 TaskOutput
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
# 9. 执行 Guardrail 验证(如果配置)
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
task_output = self._invoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=guardrail,
guardrail_index=idx,
)
# 10. 执行单个 Guardrail(向后兼容)
if self._guardrail:
task_output = self._invoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=self._guardrail,
)
# 11. 保存输出
self.output = task_output
self.end_time = datetime.datetime.now()
# 12. 执行回调函数
if self.callback:
self.callback(self.output)
crew = self.agent.crew
if crew and crew.task_callback and crew.task_callback != self.callback:
crew.task_callback(self.output)
# 13. 保存到文件(如果配置)
if self.output_file:
content = (
json_output
if json_output
else (
pydantic_output.model_dump_json() if pydantic_output else result
)
)
self._save_file(content)
# 14. 发送任务完成事件
crewai_event_bus.emit(
self, TaskCompletedEvent(output=task_output, task=self)
)
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
raise e
调用链与上游函数
调用路径:Crew.kickoff() → Crew._execute_tasks() → Task.execute_sync()
# 上游调用者:Crew._execute_tasks()
def _execute_tasks(self, tasks: list[Task], start_index: int = 0) -> CrewOutput:
"""Crew 执行任务序列"""
task_outputs: list[TaskOutput] = []
for task_index, task in enumerate(tasks):
agent_to_use = self._get_agent_to_use(task)
tools_for_task = self._prepare_tools(agent_to_use, task, task.tools or [])
# 获取上下文
context = self._get_context(task, task_outputs)
# 执行任务
if task.async_execution:
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
# 同步执行
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
return self._create_crew_output(task_outputs)
时序图
sequenceDiagram
autonumber
participant Crew as Crew
participant Task as Task
participant Agent as Agent
participant Converter as Converter
participant Guardrail as Guardrail
participant FileHandler as FileHandler
Crew->>Task: execute_sync(agent, context, tools)
Task->>Task: 记录 start_time
Task->>Task: 发送 TaskStartedEvent
Task->>Agent: execute_task(task, context, tools)
Agent->>Agent: 推理循环
Agent-->>Task: 返回 result(字符串)
Task->>Converter: _export_output(result)
alt 配置了 output_pydantic
Converter->>Converter: convert_to_model(result, output_pydantic)
Converter-->>Task: pydantic_output
else 配置了 output_json
Converter->>Converter: convert_to_model(result, output_json)
Converter-->>Task: json_output
else 默认
Converter-->>Task: None
end
Task->>Task: 创建 TaskOutput
alt 配置了 guardrail
Task->>Guardrail: _invoke_guardrail_function(task_output)
Guardrail->>Guardrail: process_guardrail(output)
alt 验证失败
Guardrail->>Agent: 重新执行任务
Agent-->>Guardrail: 新的 result
Guardrail->>Converter: 重新转换
end
Guardrail-->>Task: 验证通过的 task_output
end
Task->>Task: 执行回调函数
alt 配置了 output_file
Task->>FileHandler: _save_file(content)
FileHandler-->>Task: 保存成功
end
Task->>Task: 记录 end_time
Task->>Task: 发送 TaskCompletedEvent
Task-->>Crew: TaskOutput
时序图详细功能说明
步骤 1-3:任务初始化阶段
Crew调用Task.execute_sync(),传入执行参数- Task 记录任务开始时间(
start_time),用于后续计算执行时长 - 发送
TaskStartedEvent事件,监听器可以记录日志或触发回调
步骤 4-5:Agent 执行阶段
- 调用
Agent.execute_task(),传入任务、上下文和工具 - Agent 进入推理循环(ReAct 模式),可能多次调用 LLM 和工具
- 返回最终的字符串结果
步骤 6-9:输出转换阶段
- 调用
_export_output()处理原始字符串 - 根据配置选择转换策略:
- 如果配置了
output_pydantic:转换为 Pydantic 模型实例 - 如果配置了
output_json:转换为 JSON 字典 - 默认:保持原始字符串
- 如果配置了
- 转换过程可能使用 Converter 调用 LLM 进行格式化
步骤 10:创建输出对象
- 创建
TaskOutput对象,封装所有输出数据 - 包含:原始字符串、结构化输出、任务元数据等
步骤 11-15:Guardrail 验证阶段(可选)
- 如果配置了 Guardrail,依次执行验证
- 调用
process_guardrail()执行用户定义的验证函数 - 验证失败:
- 构建错误上下文,包含原始输出和错误信息
- 重新调用
Agent.execute_task(),让 Agent 根据反馈改进 - 重新转换输出,再次验证
- 最多重试
guardrail_max_retries次
- 验证成功:继续后续流程
步骤 16:执行回调函数
- 执行任务级别的
callback(如果配置) - 执行 Crew 级别的
task_callback(如果配置) - 用于自定义逻辑,如日志记录、指标收集等
步骤 17-19:文件持久化阶段(可选)
- 如果配置了
output_file,调用_save_file() - 根据输出格式准备文件内容:
- JSON/Pydantic:序列化为 JSON
- Raw:直接保存字符串
- 如果
create_directory=True,自动创建目录
步骤 20-21:任务完成阶段
- 记录任务结束时间(
end_time) - 发送
TaskCompletedEvent事件 - 返回
TaskOutput给调用者
异常处理流程:
- 任何阶段抛出异常都会被
_execute_core()捕获 - 记录结束时间
- 发送
TaskFailedEvent事件 - 重新抛出异常,由上层处理
性能要点:
- 推理循环是最耗时的部分(LLM 调用)
- Guardrail 验证失败会导致重新执行,显著增加时间
- 输出转换通常很快,除非需要调用 LLM 格式化
- 文件 I/O 操作相对较快,但大文件可能有影响
边界与异常
异常情况处理:
-
没有 Agent 分配:
- 异常类型:
Exception - 触发条件:
task.agent为 None 且未传入agent参数 - 处理方式:立即抛出异常
- 回退策略:无,需要用户修复配置
- 异常类型:
-
输出转换失败:
- 异常类型:
ValidationError(Pydantic) - 触发条件:LLM 输出与 schema 不匹配
- 处理方式:使用 Converter 重试转换
- 回退策略:返回原始字符串
- 异常类型:
-
Guardrail 验证失败:
- 异常类型:自定义或 LLM 返回的错误信息
- 触发条件:
guardrail(output)返回(False, error_message) - 处理方式:重新执行任务(最多
guardrail_max_retries次) - 回退策略:超过重试次数后抛出异常
-
文件保存失败:
- 异常类型:
IOError、OSError - 触发条件:目录不存在、权限不足、磁盘满
- 处理方式:抛出
RuntimeError并提示使用 FileWriterTool - 回退策略:任务输出仍然返回,但文件未保存
- 异常类型:
执行时长统计:
@property
def execution_duration(self) -> float | None:
"""计算任务执行时长(秒)"""
if not self.start_time or not self.end_time:
return None
return (self.end_time - self.start_time).total_seconds()
API 2: execute_async
基本信息
- 方法签名:
Task.execute_async(agent: BaseAgent | None = None, context: str | None = None, tools: list[BaseTool] | None = None) -> Future[TaskOutput] - 调用方式:实例方法
- 幂等性:否
功能说明
异步执行任务,立即返回 Future 对象,任务在后台线程执行。
核心代码
def execute_async(
self,
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Future[TaskOutput]:
"""异步执行任务"""
future: Future[TaskOutput] = Future()
threading.Thread(
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
).start()
return future
def _execute_task_async(
self,
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
future: Future[TaskOutput],
) -> None:
"""异步执行的内部方法"""
result = self._execute_core(agent, context, tools)
future.set_result(result)
使用场景
- I/O 密集型任务:需要调用外部 API、读取大文件等
- 长时间任务:执行时间超过 30 秒的任务
- 并行任务:多个任务可以同时执行
注意事项
- 异步任务的上下文限制:异步任务不能依赖其他异步任务的输出(必须有同步任务作为分隔)
- 结果获取:通过
future.result()阻塞等待结果 - 异常处理:异常在
future.result()调用时抛出
API 3: prompt
基本信息
- 方法签名:
Task.prompt() -> str - 调用方式:实例方法
- 幂等性:是(相同配置返回相同提示词)
功能说明
生成任务提示词,包含任务描述、预期输出和可选的 Markdown 格式指令。
核心代码
def prompt(self) -> str:
"""生成任务提示词"""
description = self.description
# 注入 Trigger Payload(如果配置)
should_inject = self.allow_crewai_trigger_context
if should_inject and self.agent:
crew = getattr(self.agent, "crew", None)
if crew and hasattr(crew, "_inputs") and crew._inputs:
trigger_payload = crew._inputs.get("crewai_trigger_payload")
if trigger_payload is not None:
description += f"\n\nTrigger Payload: {trigger_payload}"
# 组装任务切片
output = self.i18n.slice("expected_output").format(
expected_output=self.expected_output
)
tasks_slices = [description, output]
# 添加 Markdown 指令(如果配置)
if self.markdown:
markdown_instruction = """Your final answer MUST be formatted in Markdown syntax.
Follow these guidelines:
- Use # for headers
- Use ** for bold text
- Use * for italic text
- Use - or * for bullet points
- Use `code` for inline code
- Use ```language for code blocks"""
tasks_slices.append(markdown_instruction)
return "\n".join(tasks_slices)
提示词示例
基础提示词:
Research the top 5 AI trends in 2024
Expected Output:
A list of 5 AI trends with brief descriptions
带 Markdown 的提示词:
Write a blog post about AI agents
Expected Output:
A 500-word blog post
Your final answer MUST be formatted in Markdown syntax.
Follow these guidelines:
- Use # for headers
- Use ** for bold text
...
API 4: interpolate_inputs_and_add_conversation_history
基本信息
- 方法签名:
Task.interpolate_inputs_and_add_conversation_history(inputs: dict[str, str | int | float | dict | list]) -> None - 调用方式:实例方法
- 幂等性:否(修改 Task 状态)
功能说明
将输入参数插值到任务描述和预期输出中,支持模板变量替换和会话历史注入。
请求结构体
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| inputs | dict[str, str | int | float | dict | list] | 是 | - | - | 输入参数字典 |
核心代码
def interpolate_inputs_and_add_conversation_history(
self, inputs: dict[str, str | int | float | dict[str, Any] | list[Any]]
) -> None:
"""插值输入参数并添加会话历史"""
# 保存原始值(首次插值时)
if self._original_description is None:
self._original_description = self.description
if self._original_expected_output is None:
self._original_expected_output = self.expected_output
if self.output_file is not None and self._original_output_file is None:
self._original_output_file = self.output_file
if not inputs:
return
# 插值 description
try:
self.description = interpolate_only(
input_string=self._original_description, inputs=inputs
)
except KeyError as e:
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
) from e
# 插值 expected_output
try:
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating expected_output: {e!s}") from e
# 插值 output_file
if self.output_file is not None:
try:
self.output_file = interpolate_only(
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating output_file path: {e!s}") from e
# 添加会话历史(如果有)
if inputs.get("crew_chat_messages"):
conversation_instruction = self.i18n.slice(
"conversation_history_instruction"
)
crew_chat_messages_json = str(inputs["crew_chat_messages"])
crew_chat_messages = json.loads(crew_chat_messages_json)
conversation_history = "\n".join(
f"{msg['role'].capitalize()}: {msg['content']}"
for msg in crew_chat_messages
if isinstance(msg, dict) and "role" in msg and "content" in msg
)
self.description += (
f"\n\n{conversation_instruction}\n\n{conversation_history}"
)
模板变量示例
定义任务:
task = Task(
description="Research {topic} and find {count} key insights",
expected_output="A list of {count} insights about {topic}",
output_file="reports/{topic}_insights.md",
)
插值:
inputs = {
"topic": "AI Safety",
"count": 5
}
task.interpolate_inputs_and_add_conversation_history(inputs)
插值后:
description: "Research AI Safety and find 5 key insights"
expected_output: "A list of 5 insights about AI Safety"
output_file: "reports/AI_Safety_insights.md"
API 5: copy
基本信息
- 方法签名:
Task.copy(agents: list[BaseAgent], task_mapping: dict[str, Task]) -> Task - 调用方式:实例方法
- 幂等性:是
功能说明
创建任务的深拷贝,用于 Crew.copy() 时复制任务实例。
核心代码
def copy(
self, agents: list[BaseAgent], task_mapping: dict[str, Task]
) -> Task:
"""创建任务的深拷贝"""
exclude = {
"id",
"agent",
"context",
"tools",
}
# 序列化任务数据
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
# 克隆 context
cloned_context = (
self.context
if self.context is NOT_SPECIFIED
else [task_mapping[context_task.key] for context_task in self.context]
if isinstance(self.context, list)
else None
)
# 查找对应的 Agent
def get_agent_by_role(role: str) -> BaseAgent | None:
return next((agent for agent in agents if agent.role == role), None)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = shallow_copy(self.tools) if self.tools else []
# 创建新实例(保留原始类型)
return self.__class__(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
模块内部时序图
模块时序图 1:Converter 结构化输出转换流程
sequenceDiagram
autonumber
participant Task as Task
participant ConvertToModel as convert_to_model
participant ValidateModel as validate_model
participant HandlePartial as handle_partial_json
participant ExtractJSON as extract_json_from_text
participant ConvertWithInstr as convert_with_instructions
participant Converter as Converter
participant LLM as LLM
participant Pydantic as Pydantic Validator
Task->>ConvertToModel: convert_to_model(result, output_pydantic, agent)
Note over ConvertToModel: 策略 1:直接解析 JSON
ConvertToModel->>ConvertToModel: json.loads(result)
alt JSON 解析成功
ConvertToModel->>ValidateModel: validate_model(json_str, model)
ValidateModel->>Pydantic: model.model_validate_json(json_str)
alt Pydantic 验证成功
Pydantic-->>ValidateModel: BaseModel 实例
ValidateModel-->>ConvertToModel: 返回模型
ConvertToModel-->>Task: BaseModel 实例
else Pydantic 验证失败
Pydantic-->>ValidateModel: ValidationError
ValidateModel-->>ConvertToModel: 抛出异常
end
else JSON 解析失败
ConvertToModel->>HandlePartial: handle_partial_json(result, model, agent)
Note over HandlePartial: 策略 2:提取部分 JSON
HandlePartial->>ExtractJSON: extract_json_from_text(result)
alt 提取成功
ExtractJSON-->>HandlePartial: json_string
HandlePartial->>ValidateModel: validate_model(json_string, model)
ValidateModel->>Pydantic: model.model_validate_json()
Pydantic-->>ValidateModel: BaseModel 实例
ValidateModel-->>HandlePartial: 返回模型
HandlePartial-->>ConvertToModel: BaseModel 实例
ConvertToModel-->>Task: BaseModel 实例
else 提取失败
ExtractJSON-->>HandlePartial: 异常
Note over HandlePartial: 策略 3:LLM 重新格式化
HandlePartial->>ConvertWithInstr: convert_with_instructions(result, model, agent)
ConvertWithInstr->>ConvertWithInstr: 获取转换指令
ConvertWithInstr->>Converter: 创建 Converter 实例
Converter->>Converter: 检查 LLM 是否支持 Function Calling
alt 支持 Function Calling
Converter->>LLM: call() with function definition
LLM-->>Converter: 结构化输出
Converter->>Pydantic: 直接验证
Pydantic-->>Converter: BaseModel 实例
else 不支持 Function Calling
Converter->>Converter: 构建转换提示词
Converter->>LLM: call([system, user])
LLM-->>Converter: JSON 字符串
Converter->>Pydantic: model.model_validate_json()
alt 验证成功
Pydantic-->>Converter: BaseModel 实例
else 验证失败且未达最大重试次数
Pydantic-->>Converter: ValidationError
Converter->>Converter: 递归重试 (attempt++)
Converter->>LLM: 再次调用
end
end
Converter-->>ConvertWithInstr: BaseModel 实例
ConvertWithInstr-->>HandlePartial: BaseModel 实例
HandlePartial-->>ConvertToModel: BaseModel 实例
ConvertToModel-->>Task: BaseModel 实例
end
end
Converter 时序图功能说明
策略 1:直接解析 JSON(最快路径)
- 步骤 1-3:尝试直接解析原始字符串为 JSON
- 步骤 4-5:调用 Pydantic 的
model_validate_json()进行验证 - 适用场景:LLM 直接返回完整的 JSON 格式
- 性能:最快,无需额外 LLM 调用
- 失败情况:JSON 格式不完整、包含注释、混合文本
策略 2:提取部分 JSON(中等路径)
- 步骤 6-8:使用正则表达式从混合文本中提取 JSON
- 提取逻辑:
- 查找
{...}或[...]模式 - 处理嵌套结构
- 忽略 JSON 前后的文本
- 查找
- 适用场景:LLM 返回 “这是结果:{…}” 格式
- 性能:较快,仅字符串处理
- 失败情况:JSON 不完整、语法错误
策略 3:LLM 重新格式化(兜底路径)
- 步骤 9-11:创建 Converter 实例,使用 LLM 转换
- Function Calling 模式(步骤 12-14):
- LLM 直接返回符合 schema 的结构化数据
- 无需解析,直接验证
- 推荐用于支持 FC 的模型(GPT-4、Claude 等)
- 提示词模式(步骤 15-19):
- 构建转换提示词,包含原始文本和目标 schema
- LLM 返回 JSON 字符串
- Pydantic 验证并可能重试(最多 3 次)
- 适用场景:所有其他策略失败
- 性能:最慢,需要额外 LLM 调用(1-3 次)
- 优势:容错性最强,几乎总能成功
关键设计点:
- 三层降级策略:从快到慢,逐层尝试
- 智能重试:Converter 内部实现重试机制
- 指令优化:根据 LLM 类型生成不同的转换指令
- 错误恢复:最终失败时返回原始字符串,不阻塞任务
模块时序图 2:Guardrail 验证与重试流程
sequenceDiagram
autonumber
participant Task as Task
participant InvokeGuardrail as _invoke_guardrail_function
participant ProcessGuardrail as process_guardrail
participant EventBus as EventBus
participant UserGuardrail as 用户验证函数
participant Agent as Agent
participant Converter as Converter
Task->>InvokeGuardrail: _invoke_guardrail_function(task_output, agent, tools, guardrail)
InvokeGuardrail->>InvokeGuardrail: 获取当前重试次数
loop 最多 guardrail_max_retries + 1 次
InvokeGuardrail->>ProcessGuardrail: process_guardrail(output, guardrail, retry_count)
ProcessGuardrail->>EventBus: emit(LLMGuardrailStartedEvent)
EventBus-->>ProcessGuardrail: 事件已发送
ProcessGuardrail->>UserGuardrail: guardrail(task_output)
Note over UserGuardrail: 用户定义的验证逻辑<br/>检查长度、格式、内容等
UserGuardrail-->>ProcessGuardrail: (success: bool, result: Any)
ProcessGuardrail->>ProcessGuardrail: GuardrailResult.from_tuple()
ProcessGuardrail->>EventBus: emit(LLMGuardrailCompletedEvent)
EventBus-->>ProcessGuardrail: 事件已发送
ProcessGuardrail-->>InvokeGuardrail: GuardrailResult
alt success = True
InvokeGuardrail->>InvokeGuardrail: 更新 task_output
alt result 是字符串
InvokeGuardrail->>Converter: _export_output(result)
Converter-->>InvokeGuardrail: (pydantic, json_dict)
InvokeGuardrail->>InvokeGuardrail: 更新 task_output 的结构化输出
else result 是 TaskOutput
InvokeGuardrail->>InvokeGuardrail: 直接替换 task_output
end
InvokeGuardrail-->>Task: 验证通过的 task_output
else success = False AND attempt < max_retries
InvokeGuardrail->>InvokeGuardrail: retry_count++
InvokeGuardrail->>InvokeGuardrail: 打印重试警告
InvokeGuardrail->>InvokeGuardrail: 构建错误上下文
Note over InvokeGuardrail: 错误上下文包含:<br/>- 原始输出<br/>- 错误信息<br/>- 改进建议
InvokeGuardrail->>Agent: execute_task(task, error_context, tools)
Note over Agent: Agent 看到之前的错误<br/>尝试修正输出
Agent-->>InvokeGuardrail: 新的 result
InvokeGuardrail->>Converter: _export_output(result)
Converter-->>InvokeGuardrail: (pydantic, json_dict)
InvokeGuardrail->>InvokeGuardrail: 创建新的 task_output
Note over InvokeGuardrail: 继续下一次验证循环
else success = False AND attempt >= max_retries
InvokeGuardrail->>InvokeGuardrail: 抛出异常
InvokeGuardrail-->>Task: Exception: Guardrail validation failed
end
end
Guardrail 时序图功能说明
步骤 1-3:初始化验证
- 获取当前 Guardrail 的重试次数(支持多个 Guardrail 独立计数)
- 计算最大尝试次数 =
guardrail_max_retries + 1(默认 4 次)
步骤 4-10:执行验证
- 发送
LLMGuardrailStartedEvent事件(步骤 4-5) - 调用用户定义的验证函数(步骤 6-7)
- 接收
TaskOutput对象 - 执行自定义验证逻辑
- 返回
(success: bool, result: Any)元组
- 接收
- 封装为
GuardrailResult对象(步骤 8) - 发送
LLMGuardrailCompletedEvent事件(步骤 9-10)
步骤 11-16:验证成功分支
- 检查
result的类型:- 字符串:调用 Converter 重新转换为结构化输出
- TaskOutput:直接替换原输出
- 返回验证通过的
task_output
步骤 17-24:验证失败且未达重试上限
- 增加重试计数器(步骤 17)
- 打印黄色警告信息,显示重试进度(步骤 18)
- 构建错误上下文(步骤 19):
context = f""" Previous output validation failed with error: {error_message} Previous output: {task_output.raw} Please improve your response to address the validation error. """ - 重新调用
Agent.execute_task(),传入错误上下文(步骤 20-21)- Agent 会看到之前的错误信息
- Agent 可以根据反馈调整输出
- 重新转换输出(步骤 22-23)
- 创建新的
task_output(步骤 24) - 继续下一次验证循环
步骤 25-26:达到最大重试次数
- 抛出异常:
Task failed guardrail validation after N retries - 包含最后一次的错误信息
关键设计点:
- 多 Guardrail 支持:每个 Guardrail 独立计数,按顺序验证
- 智能重试:将错误信息作为上下文,引导 Agent 改进
- 灵活的返回值:
- 返回原字符串:保持结构化输出不变
- 返回新字符串:重新转换结构化输出
- 返回新 TaskOutput:完全替换输出对象
- 事件驱动:每次验证都发送事件,便于监控和调试
- 性能权衡:每次重试都需要完整的 Agent 执行(LLM 调用 + 工具调用)
常见应用场景:
- 长度检查:确保输出至少 N 个字符
- 格式验证:检查是否包含必要的标题、章节
- 内容验证:检查是否包含特定关键词或主题
- 质量评估:使用 LLM 判断输出质量是否达标
- 安全检查:检测有害内容、PII 泄露等
关键功能流程剖析
功能 1:Guardrail 验证机制
功能描述
Guardrail 是任务输出的验证机制,确保 LLM 生成的内容符合要求。如果验证失败,会重新执行任务直到通过或达到最大重试次数。
核心代码
def _invoke_guardrail_function(
self,
task_output: TaskOutput,
agent: BaseAgent,
tools: list[BaseTool],
guardrail: GuardrailCallable | None,
guardrail_index: int | None = None,
) -> TaskOutput:
"""执行 Guardrail 验证"""
if not guardrail:
return task_output
# 获取当前重试次数
if guardrail_index is not None:
current_retry_count = self._guardrail_retry_counts.get(guardrail_index, 0)
else:
current_retry_count = self.retry_count
max_attempts = self.guardrail_max_retries + 1
for attempt in range(max_attempts):
# 执行验证
guardrail_result = process_guardrail(
output=task_output,
guardrail=guardrail,
retry_count=current_retry_count,
event_source=self,
from_task=self,
from_agent=agent,
)
if guardrail_result.success:
# 验证通过
if isinstance(guardrail_result.result, str):
# 更新原始输出
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
return task_output
# 验证失败
if attempt >= self.guardrail_max_retries:
# 达到最大重试次数
guardrail_name = (
f"guardrail {guardrail_index}"
if guardrail_index is not None
else "guardrail"
)
raise Exception(
f"Task failed {guardrail_name} validation after "
f"{self.guardrail_max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
# 更新重试次数
if guardrail_index is not None:
current_retry_count += 1
self._guardrail_retry_counts[guardrail_index] = current_retry_count
else:
self.retry_count += 1
current_retry_count = self.retry_count
# 构建错误上下文
context = self.i18n.errors("validation_error").format(
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
)
# 打印重试信息
printer = Printer()
printer.print(
content=f"Guardrail blocked (attempt {attempt + 1}/{max_attempts}), "
f"retrying due to: {guardrail_result.error}\n",
color="yellow",
)
# 重新执行任务
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
# 重新转换输出
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
return task_output
Guardrail 类型
1. 函数式 Guardrail:
def validate_length(output: TaskOutput) -> tuple[bool, str]:
"""验证输出长度"""
if len(output.raw) < 100:
return False, "Output too short, needs at least 100 characters"
return True, output.raw
task = Task(
description="Write a summary",
expected_output="A detailed summary",
guardrail=validate_length,
guardrail_max_retries=3,
)
2. LLM Guardrail:
task = Task(
description="Write a product review",
expected_output="A fair and balanced review",
guardrail="The output must be objective and mention both pros and cons",
guardrail_max_retries=2,
)
3. 多 Guardrail 组合:
def check_format(output: TaskOutput) -> tuple[bool, str]:
if not output.raw.startswith("#"):
return False, "Must start with a Markdown header"
return True, output.raw
def check_length(output: TaskOutput) -> tuple[bool, str]:
if len(output.raw) < 200:
return False, "Too short"
return True, output.raw
task = Task(
description="Write a blog post",
expected_output="A well-formatted blog post",
guardrails=[check_format, check_length],
)
功能 2:结构化输出转换
功能描述
将 LLM 的原始字符串输出转换为结构化格式(JSON 或 Pydantic 模型)。
核心代码
def _export_output(
self, result: str
) -> tuple[BaseModel | None, dict[str, Any] | None]:
"""导出结构化输出"""
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
return pydantic_output, json_output
Converter 的实现:
class Converter:
"""结构化输出转换器"""
def __init__(
self,
llm: BaseLLM,
text: str,
model: type[BaseModel],
instructions: str,
):
self.llm = llm
self.text = text
self.model = model
self.instructions = instructions
def convert(self) -> BaseModel:
"""转换文本为 Pydantic 模型"""
# 使用 LLM 提取结构化数据
prompt = f"""
{self.instructions}
Original text:
{self.text}
Output must match this schema:
{self.model.model_json_schema()}
"""
response = self.llm.call([{"role": "user", "content": prompt}])
# 解析 JSON 并验证
try:
data = json.loads(response)
return self.model.model_validate(data)
except (json.JSONDecodeError, ValidationError) as e:
# 重试或返回默认值
raise ValueError(f"Failed to convert output: {e}")
使用示例
定义 Pydantic 模型:
from pydantic import BaseModel, Field
class ArticleSummary(BaseModel):
title: str = Field(description="Article title")
author: str = Field(description="Article author")
summary: str = Field(description="Brief summary")
key_points: list[str] = Field(description="Key takeaways")
sentiment: str = Field(description="Overall sentiment")
task = Task(
description="Summarize the article at {url}",
expected_output="A structured summary",
output_pydantic=ArticleSummary,
agent=researcher,
)
result = task.execute_sync()
# result.output.pydantic 是 ArticleSummary 实例
print(result.pydantic.title)
print(result.pydantic.key_points)
功能 3:ConditionalTask 条件执行
功能描述
ConditionalTask 根据前序任务的输出决定是否执行,实现动态工作流。
核心代码
class ConditionalTask(Task):
"""条件执行任务"""
condition: Callable[[TaskOutput], bool] | None = None
def should_execute(self, context: TaskOutput) -> bool:
"""判断是否执行"""
if self.condition is None:
raise ValueError("No condition function set")
return self.condition(context)
def get_skipped_task_output(self) -> TaskOutput:
"""生成跳过任务的输出"""
return TaskOutput(
description=self.description,
raw="",
agent=self.agent.role if self.agent else "",
output_format=OutputFormat.RAW,
)
Crew 中的处理:
def _handle_conditional_task(
self,
task: ConditionalTask,
task_outputs: list[TaskOutput],
futures: list[tuple[Task, Future[TaskOutput], int]],
task_index: int,
) -> TaskOutput | None:
"""处理条件任务"""
# 等待所有异步任务完成
if futures:
task_outputs = self._process_async_tasks(futures)
futures.clear()
# 获取前一个任务的输出
previous_output = task_outputs[-1] if task_outputs else None
# 判断是否执行
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
f"Skipping conditional task: {task.description}",
color="yellow",
)
skipped_task_output = task.get_skipped_task_output()
self._store_execution_log(task, skipped_task_output, task_index)
return skipped_task_output
return None
使用示例
from crewai import ConditionalTask
# 定义条件函数
def check_success(output: TaskOutput) -> bool:
"""检查前序任务是否成功"""
return "success" in output.raw.lower()
# 创建条件任务
analyze_task = Task(
description="Analyze the data",
expected_output="Analysis result",
agent=analyst,
)
report_task = ConditionalTask(
description="Generate report based on analysis",
expected_output="Final report",
agent=writer,
condition=check_success, # 仅当分析成功时执行
)
crew = Crew(
agents=[analyst, writer],
tasks=[analyze_task, report_task],
)
实战示例与最佳实践
示例 1:基础任务定义
from crewai import Agent, Task, Crew
# 创建 Agent
researcher = Agent(
role="Research Analyst",
goal="Find relevant information",
backstory="Expert researcher",
)
# 创建 Task
research_task = Task(
description="Research the latest AI trends in 2024",
expected_output="A list of 5 AI trends with descriptions",
agent=researcher,
)
# 执行任务
crew = Crew(agents=[researcher], tasks=[research_task])
result = crew.kickoff()
print(result.raw)
示例 2:结构化输出
from pydantic import BaseModel, Field
class TrendReport(BaseModel):
trends: list[str] = Field(description="List of trends")
analysis: str = Field(description="Overall analysis")
recommendations: list[str] = Field(description="Recommendations")
task = Task(
description="Analyze AI trends and provide recommendations",
expected_output="Structured trend report",
agent=analyst,
output_pydantic=TrendReport,
)
result = task.execute_sync()
# 访问结构化输出
print(result.pydantic.trends)
print(result.pydantic.analysis)
示例 3:任务链与上下文传递
# 任务 1:数据收集
collect_task = Task(
description="Collect data about {topic}",
expected_output="Raw data",
agent=collector,
)
# 任务 2:数据分析(使用任务 1 的输出)
analyze_task = Task(
description="Analyze the collected data",
expected_output="Analysis report",
agent=analyst,
context=[collect_task], # 显式指定上下文
)
# 任务 3:报告生成(自动使用所有前序任务的输出)
report_task = Task(
description="Generate final report",
expected_output="Comprehensive report",
agent=writer,
# context 默认为 NOT_SPECIFIED,自动使用所有前序输出
)
crew = Crew(
agents=[collector, analyst, writer],
tasks=[collect_task, analyze_task, report_task],
)
crew.kickoff(inputs={"topic": "Climate Change"})
示例 4:Guardrail 验证
def validate_report_quality(output: TaskOutput) -> tuple[bool, str]:
"""验证报告质量"""
content = output.raw
# 检查长度
if len(content) < 500:
return False, "Report too short, needs at least 500 characters"
# 检查关键词
required_keywords = ["analysis", "conclusion", "recommendation"]
missing_keywords = [kw for kw in required_keywords if kw not in content.lower()]
if missing_keywords:
return False, f"Missing required sections: {', '.join(missing_keywords)}"
return True, content
task = Task(
description="Write a comprehensive report on {topic}",
expected_output="A well-structured report",
agent=writer,
guardrail=validate_report_quality,
guardrail_max_retries=3,
)
示例 5:异步任务执行
# 定义三个独立的任务
task1 = Task(
description="Research technology trends",
expected_output="Tech trends",
agent=tech_researcher,
async_execution=True, # 异步执行
)
task2 = Task(
description="Research market trends",
expected_output="Market trends",
agent=market_researcher,
async_execution=True, # 异步执行
)
# 同步任务,等待前两个异步任务完成
task3 = Task(
description="Combine research findings",
expected_output="Combined report",
agent=writer,
async_execution=False, # 同步执行
context=[task1, task2], # 依赖前两个异步任务
)
crew = Crew(
agents=[tech_researcher, market_researcher, writer],
tasks=[task1, task2, task3],
)
# task1 和 task2 并行执行,task3 等待两者完成
result = crew.kickoff()
最佳实践总结
1. 任务描述的编写原则
- 明确具体:避免模糊的描述,提供清晰的指令
- 上下文完整:提供足够的背景信息
- 可操作性:描述应该是可执行的动作
- 避免歧义:使用准确的术语
好的描述:
description = """
Analyze the sales data from Q1 2024 and identify:
1. Top 3 performing products
2. Geographic regions with highest growth
3. Key customer segments
4. Trends compared to Q4 2023
"""
不好的描述:
description = "Look at the sales data and tell me something"
2. 预期输出的定义技巧
- 格式明确:指定输出格式(文本、列表、JSON 等)
- 质量标准:定义输出的质量要求
- 长度范围:给出合理的长度预期
- 示例说明:提供输出示例
好的预期输出:
expected_output = """
A structured analysis report in Markdown format containing:
- Executive summary (100-150 words)
- Detailed findings (500-800 words)
- 3-5 key recommendations
- Supporting data visualizations (descriptions)
"""
3. 工具选择策略
- 任务级工具:为特定任务分配专用工具
- Agent 级工具:为 Agent 配置通用工具
- 避免工具过载:每个任务 3-5 个工具为宜
- 工具互补性:选择功能互补的工具
4. 输出格式选择
- 简单文本:使用默认 RAW 格式
- 结构化数据:使用
output_pydantic或output_json - 文件输出:配置
output_file用于持久化 - 混合输出:原始文本 + 结构化数据
5. 性能优化建议
- 异步执行:独立任务使用
async_execution=True - 上下文优化:仅传递必要的上下文
- Guardrail 节制:避免过于严格的验证
- 输出缓存:相同任务考虑缓存结果
总结
Task 模块是 CrewAI 框架中定义工作单元的核心抽象,通过以下机制实现灵活的任务管理:
- 清晰的任务定义:description + expected_output 明确任务目标
- 灵活的执行模式:同步/异步执行,支持并行任务
- 强大的上下文管理:自动或手动传递前序任务输出
- 完善的输出处理:原始、JSON、Pydantic 三种格式
- 可靠的验证机制:Guardrail 确保输出质量
- 条件执行支持:ConditionalTask 实现动态工作流
该模块的设计遵循 单一职责原则,专注于任务的定义和执行协调,将实际执行委托给 Agent,将输出验证委托给 Guardrail,保持了良好的模块边界和可扩展性。