CrewAI-02-Task-完整剖析

模块概览

职责与定位

Task(任务)是 CrewAI 框架中定义工作单元的核心抽象,负责:

  1. 任务定义:通过 description 和 expected_output 明确任务目标和预期结果
  2. 执行管理:协调 Agent 执行任务并处理同步/异步执行模式
  3. 上下文传递:从前序任务获取输出作为当前任务的上下文
  4. 输出验证:通过 Guardrail 机制验证任务输出的质量
  5. 结果转换:将原始输出转换为结构化格式(JSON/Pydantic)
  6. 文件持久化:可选地将任务输出保存到文件
  7. 条件执行:支持基于前序任务输出的条件执行(ConditionalTask)

输入与输出

输入

  • description:任务描述,支持模板变量插值(如 {topic}
  • expected_output:预期输出的明确定义
  • agent:执行任务的 Agent 实例
  • context:来自前序任务的输出列表(或 NOT_SPECIFIED 表示自动获取)
  • tools:任务特定的工具列表(可选,覆盖 Agent 的工具)

输出

  • TaskOutput 对象:包含原始输出、结构化输出、执行元数据

上下游依赖

上游依赖(被调用)

  • Crew 编排器:通过 Task.execute_sync()Task.execute_async() 执行任务
  • Flow 引擎:在流程中调用任务

下游依赖(调用)

  • Agent:调用 Agent.execute_task() 实际执行任务
  • Converter:将原始输出转换为结构化格式
  • Guardrail:验证任务输出
  • Events:发送任务开始、完成、失败事件

生命周期

stateDiagram-v2
    [*] --> 创建: Task(...)  @task 装饰器
    创建 --> 配置: 设置 description/expected_output
    配置 --> 插值: interpolate_inputs(inputs)
    插值 --> 就绪: 等待执行
    就绪 --> 执行中: execute_sync()  execute_async()
    执行中 --> Agent执行: agent.execute_task()
    Agent执行 --> 输出转换: _export_output()
    输出转换 --> Guardrail验证: 如果配置了 guardrail
    Guardrail验证 --> Guardrail验证: 验证失败,重新执行
    Guardrail验证 --> 输出保存: 验证通过
    输出保存 --> 文件持久化: 如果配置了 output_file
    文件持久化 --> 完成: TaskOutput
    完成 --> [*]

整体服务架构图

1. 全局模块关系图

flowchart TB
    subgraph "上游编排层"
        Crew[Crew 编排器<br/>任务序列管理]
        Flow[Flow 引擎<br/>流程编排]
    end

    subgraph "Task 核心层"
        Task[Task 类<br/>任务定义和配置]
        ConditionalTask[ConditionalTask 类<br/>条件执行任务]
        TaskAPI{Task API}
        ExecuteSync[execute_sync<br/>同步执行]
        ExecuteAsync[execute_async<br/>异步执行]
        Prompt[prompt<br/>提示词生成]
        Interpolate[interpolate_inputs<br/>输入插值]
    end

    subgraph "执行引擎层"
        Agent[Agent 代理<br/>任务执行主体]
        AgentExecutor[AgentExecutor<br/>执行器]
        LLM[LLM 语言模型<br/>推理引擎]
        Tools[Tools 工具<br/>外部能力]
    end

    subgraph "输出处理层"
        TaskOutput[TaskOutput<br/>输出封装]
        Converter[Converter<br/>结构化转换]
        OutputFormat[OutputFormat<br/>格式枚举]
        FileHandler[FileHandler<br/>文件持久化]
    end

    subgraph "验证层"
        Guardrail[Guardrail<br/>输出验证]
        GuardrailResult[GuardrailResult<br/>验证结果]
    end

    subgraph "基础设施层"
        Events[EventBus<br/>事件总线]
        Memory[Memory<br/>记忆系统]
        RPM[RPMController<br/>速率限制]
        Security[SecurityConfig<br/>安全配置]
    end

    Crew -->|kickoff| Task
    Flow -->|调用| Task

    Task --> TaskAPI
    TaskAPI --> ExecuteSync
    TaskAPI --> ExecuteAsync
    TaskAPI --> Prompt
    TaskAPI --> Interpolate

    ExecuteSync --> Agent
    ExecuteAsync --> Agent

    Agent --> AgentExecutor
    AgentExecutor --> LLM
    AgentExecutor --> Tools

    Agent -->|返回结果| Task
    Task --> Converter
    Converter --> TaskOutput
    TaskOutput --> OutputFormat

    Task --> Guardrail
    Guardrail -->|验证失败| Agent
    Guardrail --> GuardrailResult

    TaskOutput --> FileHandler

    Task --> Events
    Agent --> Events
    Agent --> Memory
    Agent --> RPM
    Task --> Security

    ConditionalTask -.继承.-> Task

2. Task 内部模块架构图

flowchart TB
    subgraph "Task 核心层"
        Task[Task 类<br/>任务定义和配置]
        ConditionalTask[ConditionalTask 类<br/>条件执行任务]
    end

    subgraph "输出处理层"
        TaskOutput[TaskOutput 类<br/>任务输出封装]
        Converter[Converter 类<br/>结构化输出转换]
        OutputFormat[OutputFormat 枚举<br/>RAW/JSON/PYDANTIC]
    end

    subgraph "验证层"
        Guardrail[Guardrail 机制<br/>输出验证]
        LLMGuardrail[LLMGuardrail 类<br/>基于 LLM 的验证]
        HallucinationGuardrail[HallucinationGuardrail<br/>幻觉检测]
    end

    subgraph "执行层"
        ExecuteSync[execute_sync()<br/>同步执行]
        ExecuteAsync[execute_async()<br/>异步执行]
        Agent[Agent.execute_task()]
    end

    subgraph "上下文层"
        Context[Context 管理<br/>前序任务输出]
        Interpolation[输入插值<br/>模板变量替换]
    end

    subgraph "基础设施层"
        Events[事件系统<br/>TaskStarted/Completed/Failed]
        Security[SecurityConfig<br/>指纹验证]
        FileHandler[文件处理<br/>输出持久化]
    end

    Task --> ExecuteSync
    Task --> ExecuteAsync
    ConditionalTask -.继承.-> Task

    ExecuteSync --> Agent
    ExecuteAsync --> Agent

    Agent --> TaskOutput
    TaskOutput --> Converter
    TaskOutput --> OutputFormat

    Task --> Guardrail
    Guardrail --> LLMGuardrail
    Guardrail --> HallucinationGuardrail

    Task --> Context
    Task --> Interpolation

    Task --> Events
    Task --> Security
    Task --> FileHandler

架构要点说明

1) Task 与 ConditionalTask 的关系

  • Task:标准任务,始终执行
  • ConditionalTask:条件任务,根据前序任务输出决定是否执行
  • 设计目的:实现动态工作流(如"仅当前一任务成功时才执行")

2) 输出格式的三种模式

  • RAW:原始字符串输出(默认)
  • JSON:结构化 JSON 字典(通过 output_json 定义 schema)
  • PYDANTIC:Pydantic 模型实例(通过 output_pydantic 定义模型)

3) Guardrail 的多层验证

  • 函数式 Guardrail:用户提供的验证函数
  • LLM Guardrail:使用 LLM 判断输出是否符合要求
  • 幻觉检测 Guardrail:检测 LLM 输出中的幻觉内容

4) 同步与异步执行的区别

  • 同步执行:阻塞当前线程直到任务完成
  • 异步执行:在后台线程执行,返回 Future 对象
  • 适用场景:异步用于 I/O 密集型或长时间任务

5) 上下文传递机制

  • NOT_SPECIFIED(默认):自动使用所有前序任务的输出
  • 显式 context:手动指定哪些任务的输出作为上下文
  • context=[]:不使用任何上下文

6) 全局模块关系总结

数据流向

用户输入 → Crew.kickoff → Task.execute_sync → Agent.execute_task → LLM.call → 原始输出
→ Converter.convert → TaskOutput → Guardrail.validate → FileHandler.save → CrewOutput

事件流向

TaskStartedEvent → AgentExecutionStartedEvent → LLMGuardrailStartedEvent
→ LLMGuardrailCompletedEvent → AgentExecutionCompletedEvent → TaskCompletedEvent

控制流向

  • Crew 负责任务编排和顺序控制
  • Task 负责任务执行和输出处理
  • Agent 负责实际的推理和工具调用
  • Guardrail 负责输出质量控制

完整调用链路分析

1. 从 Crew.kickoff() 到 Task 执行的完整路径

flowchart TD
    Start([用户调用 Crew.kickoff]) --> KickoffInput[传入 inputs 参数]
    KickoffInput --> InterpolateInputs[Crew: 对所有任务插值<br/>interpolate_inputs_and_add_conversation_history]
    InterpolateInputs --> ExecuteTasks[Crew: _execute_tasks<br/>遍历任务列表]

    ExecuteTasks --> LoopStart{遍历每个任务}

    LoopStart --> GetAgent[Crew: _get_agent_to_use<br/>确定执行该任务的 Agent]
    GetAgent --> PrepareTools[Crew: _prepare_tools<br/>准备工具列表]
    PrepareTools --> GetContext[Crew: _get_context<br/>获取前序任务输出作为上下文]

    GetContext --> CheckConditional{是否是<br/>ConditionalTask?}
    CheckConditional -->|| CheckCondition[Crew: _handle_conditional_task<br/>检查执行条件]
    CheckCondition --> ShouldExecute{条件满足?}
    ShouldExecute -->|| SkipTask[跳过任务<br/>返回空输出]
    ShouldExecute -->|| CheckAsync

    CheckConditional -->|| CheckAsync{async_execution?}

    CheckAsync -->|| ExecuteAsync[Task: execute_async<br/>异步执行]
    ExecuteAsync --> AddToFutures[ Future 加入队列]
    AddToFutures --> LoopStart

    CheckAsync -->|| WaitForAsyncTasks{有待处理的<br/>异步任务?}
    WaitForAsyncTasks -->|| ProcessAsyncTasks[Crew: _process_async_tasks<br/>等待所有异步任务完成]
    ProcessAsyncTasks --> ExecuteSync
    WaitForAsyncTasks -->|| ExecuteSync[Task: execute_sync<br/>同步执行]

    ExecuteSync --> TaskCore[Task: _execute_core<br/>核心执行逻辑]
    TaskCore --> AgentExecute[Agent: execute_task<br/>Agent 执行任务]
    AgentExecute --> ExportOutput[Task: _export_output<br/>转换输出格式]
    ExportOutput --> CreateTaskOutput[创建 TaskOutput 对象]

    CreateTaskOutput --> HasGuardrail{配置了<br/>Guardrail?}
    HasGuardrail -->|| InvokeGuardrail[Task: _invoke_guardrail_function<br/>执行验证]
    InvokeGuardrail --> GuardrailPass{验证通过?}
    GuardrailPass -->|| RetryTask[重新执行任务<br/>最多重试 N ]
    RetryTask --> AgentExecute
    GuardrailPass -->|| SaveOutput
    HasGuardrail -->|| SaveOutput

    SaveOutput[保存任务输出] --> HasOutputFile{配置了<br/>output_file?}
    HasOutputFile -->|| SaveFile[Task: _save_file<br/>持久化到文件]
    HasOutputFile -->|| ExecuteCallback
    SaveFile --> ExecuteCallback[执行回调函数]

    ExecuteCallback --> EmitCompleted[发送 TaskCompletedEvent]
    EmitCompleted --> AddToOutputs[将输出加入 task_outputs 列表]
    AddToOutputs --> LoopStart

    SkipTask --> AddToOutputs

    LoopStart -->|所有任务完成| FinalAsyncCheck{还有未完成的<br/>异步任务?}
    FinalAsyncCheck -->|| FinalProcessAsync[Crew: _process_async_tasks<br/>等待最后的异步任务]
    FinalProcessAsync --> CreateCrewOutput
    FinalAsyncCheck -->|| CreateCrewOutput[Crew: _create_crew_output<br/>创建 CrewOutput]

    CreateCrewOutput --> End([返回 CrewOutput])

2. Task 内部执行的详细流程

flowchart TD
    Start([Task.execute_sync]) --> CheckAgent{Agent<br/>已分配?}
    CheckAgent -->|| ThrowError[抛出异常:<br/>No agent assigned]
    CheckAgent -->|| RecordStartTime[记录 start_time]

    RecordStartTime --> SetContext[设置 prompt_context]
    SetContext --> AddToProcessed[ Agent 角色加入<br/>processed_by_agents]
    AddToProcessed --> EmitStarted[发送 TaskStartedEvent]

    EmitStarted --> CallAgent[调用 Agent.execute_task<br/>传入 task/context/tools]
    CallAgent --> AgentInternalFlow[Agent 内部执行流程<br/>见下一节]
    AgentInternalFlow --> GetResult[获取 Agent 返回的<br/>字符串结果]

    GetResult --> ExportOutput[Task: _export_output<br/>转换输出格式]

    ExportOutput --> HasPydantic{配置了<br/>output_pydantic?}
    HasPydantic -->|| ConvertToPydantic[Converter: to_pydantic<br/>转换为 Pydantic 模型]
    ConvertToPydantic --> CreateOutput

    HasPydantic -->|| HasJSON{配置了<br/>output_json?}
    HasJSON -->|| ConvertToJSON[Converter: to_json<br/>转换为 JSON 字典]
    ConvertToJSON --> CreateOutput
    HasJSON -->|| CreateOutput[创建 TaskOutput 对象]

    CreateOutput --> HasGuardrails{配置了<br/>guardrails 列表?}
    HasGuardrails -->|| LoopGuardrails[遍历每个 Guardrail]
    LoopGuardrails --> InvokeGuardrail[Task: _invoke_guardrail_function<br/>执行单个 Guardrail]

    InvokeGuardrail --> ProcessGuardrail[process_guardrail<br/>调用验证函数]
    ProcessGuardrail --> EmitGuardrailStarted[发送 LLMGuardrailStartedEvent]
    EmitGuardrailStarted --> CallGuardrailFunc[调用用户定义的<br/>guardrail task_output]
    CallGuardrailFunc --> EmitGuardrailCompleted[发送 LLMGuardrailCompletedEvent]

    EmitGuardrailCompleted --> GuardrailSuccess{验证成功?}
    GuardrailSuccess -->|| UpdateOutput[更新 task_output]
    UpdateOutput --> NextGuardrail{还有其他<br/>Guardrail?}
    NextGuardrail -->|| LoopGuardrails
    NextGuardrail -->|| CheckSingleGuardrail

    GuardrailSuccess -->|| CheckRetry{重试次数<br/>< max_retries?}
    CheckRetry -->|| IncrementRetry[增加 retry_count]
    IncrementRetry --> PrintRetry[打印重试信息]
    PrintRetry --> BuildErrorContext[构建错误上下文]
    BuildErrorContext --> CallAgentAgain[重新调用 Agent.execute_task]
    CallAgentAgain --> AgentInternalFlow

    CheckRetry -->|| ThrowGuardrailError[抛出异常:<br/>Guardrail validation failed]

    HasGuardrails -->|| CheckSingleGuardrail{配置了<br/>单个 guardrail?}
    CheckSingleGuardrail -->|| InvokeGuardrail
    CheckSingleGuardrail -->|| SaveToSelf[self.output = task_output]

    SaveToSelf --> RecordEndTime[记录 end_time]
    RecordEndTime --> ExecuteCallback{配置了<br/>callback?}
    ExecuteCallback -->|| CallCallback[执行 self.callback]
    CallCallback --> CheckCrewCallback
    ExecuteCallback -->|| CheckCrewCallback{Crew 配置了<br/>task_callback?}

    CheckCrewCallback -->|| CallCrewCallback[执行 crew.task_callback]
    CheckCrewCallback -->|| CheckOutputFile
    CallCrewCallback --> CheckOutputFile{配置了<br/>output_file?}

    CheckOutputFile -->|| PrepareContent[准备文件内容:<br/>JSON/Pydantic/Raw]
    PrepareContent --> SaveFile[Task: _save_file<br/>保存到文件系统]
    CheckOutputFile -->|| EmitCompleted
    SaveFile --> EmitCompleted[发送 TaskCompletedEvent]

    EmitCompleted --> ReturnOutput[返回 TaskOutput]
    ReturnOutput --> End([结束])

    ThrowError --> End
    ThrowGuardrailError --> EmitFailed[发送 TaskFailedEvent]
    EmitFailed --> End

3. Agent 执行任务的内部流程

flowchart TD
    Start([Agent.execute_task]) --> HasReasoning{启用了<br/>Reasoning?}
    HasReasoning -->|| CreateReasoningHandler[创建 AgentReasoning<br/>推理处理器]
    CreateReasoningHandler --> GeneratePlan[生成推理计划<br/>handle_agent_reasoning]
    GeneratePlan --> AddPlanToTask[将计划添加到<br/>task.description]
    AddPlanToTask --> InjectDate

    HasReasoning -->|| InjectDate[注入当前日期到任务]

    InjectDate --> ResetToolsHandler[重置 tools_handler.last_used_tool]
    ResetToolsHandler --> GetTaskPrompt[获取任务提示词<br/>task.prompt]

    GetTaskPrompt --> HasOutputFormat{需要结构化<br/>输出?}
    HasOutputFormat -->|| AddFormatInstructions[添加输出格式指令<br/>JSON/Pydantic schema]
    AddFormatInstructions --> PrepareExecution
    HasOutputFormat -->|| PrepareExecution[准备执行环境]

    PrepareExecution --> CreateMessages[构建消息列表:<br/>system + user + history]
    CreateMessages --> SetupCallbacks[设置 TokenCalcHandler<br/>回调]

    SetupCallbacks --> EmitExecutionStarted[发送 AgentExecutionStartedEvent]
    EmitExecutionStarted --> StartIterationLoop[开始推理循环<br/>iteration = 0]

    StartIterationLoop --> CheckMaxIter{iteration >=<br/>max_iter?}
    CheckMaxIter -->|| HandleMaxIterations[处理最大迭代次数<br/>生成最终答案]
    HandleMaxIterations --> FormatFinalAnswer

    CheckMaxIter -->|| CheckRPM[RPMController:<br/>检查速率限制]
    CheckRPM --> WaitIfNeeded{需要等待?}
    WaitIfNeeded -->|| Sleep[等待一段时间]
    Sleep --> CallLLM
    WaitIfNeeded -->|| CallLLM[调用 LLM.call<br/>发送消息]

    CallLLM --> GetLLMResponse[get_llm_response<br/>获取 LLM 响应]
    GetLLMResponse --> ParseResponse[解析响应:<br/>AgentAction  AgentFinish?]

    ParseResponse --> IsAction{ AgentAction?}
    IsAction -->|| ExecuteTool[execute_tool_and_check_finality<br/>执行工具调用]

    ExecuteTool --> ToolSuccess{工具执行<br/>成功?}
    ToolSuccess -->|| AddToolResult[将工具结果添加到<br/>messages]
    AddToolResult --> IsToolFinal{工具返回<br/>final_answer?}
    IsToolFinal -->|| FormatFinalAnswer
    IsToolFinal -->|| IncrementIter[iteration++]
    IncrementIter --> StartIterationLoop

    ToolSuccess -->|| HandleToolError[处理工具错误]
    HandleToolError --> AddErrorMessage[将错误信息添加到<br/>messages]
    AddErrorMessage --> IncrementIter

    IsAction -->|| IsFinish{ AgentFinish?}
    IsFinish -->|| FormatFinalAnswer[格式化最终答案]

    IsFinish -->|| HandleParseError[处理解析错误<br/>OutputParserError]
    HandleParseError --> AddParseErrorMsg[添加解析错误提示]
    AddParseErrorMsg --> IncrementIter

    FormatFinalAnswer --> AddToMemory{启用了<br/>Memory?}
    AddToMemory -->|| SaveToMemory[保存到记忆系统:<br/>短期/长期/实体]
    SaveToMemory --> EmitExecutionCompleted

    AddToMemory -->|| EmitExecutionCompleted[发送 AgentExecutionCompletedEvent]
    EmitExecutionCompleted --> ReturnResult[返回字符串结果]
    ReturnResult --> End([结束])

调用链路详细说明

路径 1:Crew.kickoff → Task.execute_sync → Agent.execute_task → LLM.call

第一阶段:Crew.kickoff() 初始化

# 文件:lib/crewai/src/crewai/crew.py
def kickoff(self, inputs: dict = None) -> CrewOutput:
    # 1. 插值所有任务的输入变量
    for task in self.tasks:
        task.interpolate_inputs_and_add_conversation_history(inputs)

    # 2. 执行任务序列
    return self._execute_tasks(self.tasks)

关键点

  • 在任务执行前,先对所有任务进行输入插值,替换模板变量
  • 支持在 description、expected_output、output_file 中使用 {variable} 占位符
  • 可选地注入会话历史(crew_chat_messages

第二阶段:Crew._execute_tasks() 任务编排

# 文件:lib/crewai/src/crewai/crew.py
def _execute_tasks(self, tasks: list[Task], start_index: int = 0) -> CrewOutput:
    task_outputs: list[TaskOutput] = []
    futures: list[(Task, Future, int)] = []

    for task_index, task in enumerate(tasks):
        # 1. 确定执行该任务的 Agent
        agent_to_use = self._get_agent_to_use(task)

        # 2. 准备工具列表
        tools_for_task = self._prepare_tools(agent_to_use, task, task.tools or [])

        # 3. 获取上下文(前序任务的输出)
        context = self._get_context(task, task_outputs)

        # 4. 处理条件任务
        if isinstance(task, ConditionalTask):
            skipped_output = self._handle_conditional_task(task, task_outputs, futures, task_index)
            if skipped_output:
                task_outputs.append(skipped_output)
                continue

        # 5. 异步执行
        if task.async_execution:
            future = task.execute_async(agent_to_use, context, tools_for_task)
            futures.append((task, future, task_index))
        # 6. 同步执行
        else:
            # 先等待所有异步任务完成
            if futures:
                task_outputs = self._process_async_tasks(futures)
                futures.clear()

            # 执行同步任务
            task_output = task.execute_sync(agent_to_use, context, tools_for_task)
            task_outputs.append(task_output)

    # 7. 等待剩余的异步任务
    if futures:
        task_outputs = self._process_async_tasks(futures)

    return self._create_crew_output(task_outputs)

关键点

  • _get_agent_to_use():如果任务没有指定 Agent,从 Crew 的 agents 列表中选择
  • _prepare_tools():合并任务特定工具和 Agent 的默认工具
  • _get_context():拼接前序任务的输出为字符串上下文
  • 异步任务立即返回 Future,同步任务阻塞等待完成
  • 同步任务执行前,先等待所有前序异步任务完成

第三阶段:Task.execute_sync() 同步执行

# 文件:lib/crewai/src/crewai/task.py
def execute_sync(self, agent: BaseAgent, context: str, tools: list[BaseTool]) -> TaskOutput:
    return self._execute_core(agent, context, tools)

def _execute_core(self, agent: BaseAgent, context: str, tools: list[Any]) -> TaskOutput:
    # 1. 验证 Agent
    if not agent:
        raise Exception(f"The task '{self.description}' has no agent assigned")

    # 2. 记录开始时间
    self.start_time = datetime.datetime.now()

    # 3. 发送任务开始事件
    crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))

    # 4. 调用 Agent 执行任务
    result = agent.execute_task(task=self, context=context, tools=tools)

    # 5. 转换输出格式
    pydantic_output, json_output = self._export_output(result)

    # 6. 创建 TaskOutput
    task_output = TaskOutput(
        name=self.name or self.description,
        description=self.description,
        expected_output=self.expected_output,
        raw=result,
        pydantic=pydantic_output,
        json_dict=json_output,
        agent=agent.role,
        output_format=self._get_output_format(),
    )

    # 7. 执行 Guardrail 验证
    if self._guardrails:
        for idx, guardrail in enumerate(self._guardrails):
            task_output = self._invoke_guardrail_function(
                task_output, agent, tools, guardrail, idx
            )

    # 8. 保存输出
    self.output = task_output
    self.end_time = datetime.datetime.now()

    # 9. 执行回调
    if self.callback:
        self.callback(self.output)

    # 10. 保存到文件
    if self.output_file:
        self._save_file(content)

    # 11. 发送任务完成事件
    crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output, task=self))

    return task_output

关键点

  • agent.execute_task() 是核心执行步骤,返回字符串结果
  • _export_output() 负责将字符串转换为结构化输出
  • Guardrail 验证失败会重新执行任务,最多重试 guardrail_max_retries
  • 任务输出可选地持久化到文件系统
  • 事件系统记录任务的生命周期

第四阶段:Agent.execute_task() 推理执行

# 文件:lib/crewai/src/crewai/agent.py
def execute_task(self, task: Task, context: str, tools: list[BaseTool]) -> str:
    # 1. 处理推理计划(如果启用)
    if self.reasoning:
        reasoning_handler = AgentReasoning(task=task, agent=self)
        reasoning_output = reasoning_handler.handle_agent_reasoning()
        task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"

    # 2. 注入当前日期
    self._inject_date_to_task(task)

    # 3. 生成任务提示词
    task_prompt = task.prompt()

    # 4. 添加上下文
    if context:
        task_prompt = self.i18n.slice("task_with_context").format(
            task=task_prompt, context=context
        )

    # 5. 添加输出格式指令
    if task.output_pydantic or task.output_json:
        model = task.output_pydantic or task.output_json
        task_prompt += f"\n\nOutput schema:\n{model.model_json_schema()}"

    # 6. 进入推理循环
    return self._run_agent_executor(task_prompt, tools)

def _run_agent_executor(self, task_prompt: str, tools: list[BaseTool]) -> str:
    messages = self._build_messages(task_prompt)
    callbacks = [TokenCalcHandler()]
    iteration = 0

    while iteration < self.max_iter:
        # 检查速率限制
        self._enforce_rpm_limit()

        # 调用 LLM
        answer = self.llm.call(messages, callbacks=callbacks)

        # 解析响应
        formatted_answer = self._parse_llm_response(answer)

        # 处理 AgentAction(工具调用)
        if isinstance(formatted_answer, AgentAction):
            tool_result = self._execute_tool(formatted_answer, tools)
            messages.append({"role": "assistant", "content": answer})
            messages.append({"role": "user", "content": tool_result})

            # 检查工具是否返回最终答案
            if self._is_final_answer(tool_result):
                return tool_result

            iteration += 1
            continue

        # 处理 AgentFinish(任务完成)
        if isinstance(formatted_answer, AgentFinish):
            return formatted_answer.output

        iteration += 1

    # 达到最大迭代次数
    return self._handle_max_iterations_exceeded(messages)

关键点

  • Reasoning 模式下,先生成推理计划再执行任务
  • 推理循环采用 ReAct 模式:Reason(推理) → Act(行动) → Observe(观察)
  • 每次迭代调用 LLM,可能返回工具调用(AgentAction)或最终答案(AgentFinish)
  • 工具调用结果添加到消息历史,供下一次迭代使用
  • RPMController 控制 LLM 调用速率,避免超过限额

第五阶段:LLM.call() 语言模型推理

# 文件:lib/crewai/src/crewai/llm.py
def call(self, messages: list[dict], callbacks: list = None) -> str:
    # 1. 准备调用参数
    params = {
        "model": self.model,
        "messages": messages,
        "temperature": self.temperature,
        "max_tokens": self.max_tokens,
    }

    # 2. 添加工具定义(如果支持 function calling)
    if self.supports_function_calling():
        params["tools"] = self._format_tools()

    # 3. 调用 LLM API
    response = self.client.chat.completions.create(**params)

    # 4. 解析响应
    if response.choices[0].message.tool_calls:
        # 返回工具调用
        tool_call = response.choices[0].message.tool_calls[0]
        return f"Action: {tool_call.function.name}\nAction Input: {tool_call.function.arguments}"
    else:
        # 返回文本回复
        return response.choices[0].message.content

关键点

  • 支持多种 LLM 提供商(OpenAI、Anthropic、Azure、Ollama 等)
  • Function Calling 模式下,LLM 可以直接返回结构化的工具调用
  • 响应格式统一为字符串,便于后续解析

路径 2:异步任务的处理流程

异步任务执行

# 文件:lib/crewai/src/crewai/task.py
def execute_async(self, agent: BaseAgent, context: str, tools: list[BaseTool]) -> Future[TaskOutput]:
    future: Future[TaskOutput] = Future()

    # 在新线程中执行
    threading.Thread(
        daemon=True,
        target=self._execute_task_async,
        args=(agent, context, tools, future),
    ).start()

    return future

def _execute_task_async(self, agent, context, tools, future):
    # 调用核心执行逻辑
    result = self._execute_core(agent, context, tools)
    # 设置 Future 结果
    future.set_result(result)

Crew 处理异步任务

def _process_async_tasks(self, futures: list[(Task, Future, int)]) -> list[TaskOutput]:
    task_outputs = []

    for task, future, task_index in futures:
        # 阻塞等待 Future 完成
        task_output = future.result()
        task_outputs.append(task_output)

        # 处理任务结果
        self._process_task_result(task, task_output)
        self._store_execution_log(task, task_output, task_index)

    return task_outputs

关键点

  • 异步任务在独立线程中执行,不阻塞主线程
  • Crew 在遇到同步任务或任务列表结束时,等待所有异步任务完成
  • 异步任务不能依赖其他异步任务的输出(必须用同步任务分隔)
  • future.result() 会阻塞直到任务完成

路径 3:Guardrail 验证与重试流程

Guardrail 验证

# 文件:lib/crewai/src/crewai/task.py
def _invoke_guardrail_function(
    self,
    task_output: TaskOutput,
    agent: BaseAgent,
    tools: list[BaseTool],
    guardrail: GuardrailCallable,
    guardrail_index: int = None,
) -> TaskOutput:
    current_retry_count = self._guardrail_retry_counts.get(guardrail_index, 0)
    max_attempts = self.guardrail_max_retries + 1

    for attempt in range(max_attempts):
        # 执行验证
        guardrail_result = process_guardrail(
            output=task_output,
            guardrail=guardrail,
            retry_count=current_retry_count,
            event_source=self,
            from_task=self,
            from_agent=agent,
        )

        # 验证通过
        if guardrail_result.success:
            # 更新输出
            if isinstance(guardrail_result.result, str):
                task_output.raw = guardrail_result.result
                pydantic_output, json_output = self._export_output(guardrail_result.result)
                task_output.pydantic = pydantic_output
                task_output.json_dict = json_output
            elif isinstance(guardrail_result.result, TaskOutput):
                task_output = guardrail_result.result

            return task_output

        # 验证失败
        if attempt >= self.guardrail_max_retries:
            raise Exception(
                f"Task failed guardrail validation after {self.guardrail_max_retries} retries. "
                f"Last error: {guardrail_result.error}"
            )

        # 更新重试次数
        current_retry_count += 1
        self._guardrail_retry_counts[guardrail_index] = current_retry_count

        # 构建错误上下文
        context = self.i18n.errors("validation_error").format(
            guardrail_result_error=guardrail_result.error,
            task_output=task_output.raw,
        )

        # 打印重试信息
        printer.print(
            f"Guardrail blocked (attempt {attempt + 1}/{max_attempts}), "
            f"retrying due to: {guardrail_result.error}\n",
            color="yellow",
        )

        # 重新执行任务
        result = agent.execute_task(task=self, context=context, tools=tools)

        # 重新转换输出
        pydantic_output, json_output = self._export_output(result)
        task_output = TaskOutput(
            name=self.name or self.description,
            description=self.description,
            expected_output=self.expected_output,
            raw=result,
            pydantic=pydantic_output,
            json_dict=json_output,
            agent=agent.role,
            output_format=self._get_output_format(),
        )

    return task_output

Guardrail 处理函数

# 文件:lib/crewai/src/crewai/utilities/guardrail.py
def process_guardrail(
    output: TaskOutput,
    guardrail: GuardrailCallable,
    retry_count: int,
    event_source: Any,
    from_agent: BaseAgent,
    from_task: Task,
) -> GuardrailResult:
    # 发送开始事件
    crewai_event_bus.emit(
        event_source,
        LLMGuardrailStartedEvent(
            guardrail=guardrail,
            retry_count=retry_count,
            from_agent=from_agent,
            from_task=from_task,
        ),
    )

    # 调用验证函数
    result = guardrail(output)  # 返回 (bool, Any)
    guardrail_result = GuardrailResult.from_tuple(result)

    # 发送完成事件
    crewai_event_bus.emit(
        event_source,
        LLMGuardrailCompletedEvent(
            success=guardrail_result.success,
            result=guardrail_result.result,
            error=guardrail_result.error,
            retry_count=retry_count,
            from_agent=from_agent,
            from_task=from_task,
        ),
    )

    return guardrail_result

关键点

  • Guardrail 函数签名:(TaskOutput) -> (bool, Any)
  • 验证失败时,将错误信息作为上下文传递给 Agent,让 Agent 知道如何改进
  • 每个 Guardrail 有独立的重试计数器
  • 超过最大重试次数后抛出异常,任务执行失败
  • 验证成功后,可以修改输出内容(返回新的字符串或 TaskOutput)

路径 4:输出转换流程

输出转换入口

# 文件:lib/crewai/src/crewai/task.py
def _export_output(self, result: str) -> tuple[BaseModel | None, dict | None]:
    pydantic_output = None
    json_output = None

    if self.output_pydantic or self.output_json:
        model_output = convert_to_model(
            result,
            self.output_pydantic,
            self.output_json,
            self.agent,
            self.converter_cls,
        )

        if isinstance(model_output, BaseModel):
            pydantic_output = model_output
        elif isinstance(model_output, dict):
            json_output = model_output
        elif isinstance(model_output, str):
            try:
                json_output = json.loads(model_output)
            except json.JSONDecodeError:
                json_output = None

    return pydantic_output, json_output

转换实现

# 文件:lib/crewai/src/crewai/utilities/converter.py
def convert_to_model(
    result: str,
    output_pydantic: type[BaseModel],
    output_json: type[BaseModel],
    agent: Agent,
    converter_cls: type[Converter] = None,
) -> BaseModel | dict | str:
    model = output_pydantic or output_json

    # 尝试直接解析 JSON
    try:
        escaped_result = json.dumps(json.loads(result, strict=False))
        return validate_model(escaped_result, model, bool(output_json))
    except json.JSONDecodeError:
        # JSON 解析失败,使用 Converter
        return handle_partial_json(result, model, bool(output_json), agent, converter_cls)

def handle_partial_json(
    result: str,
    model: type[BaseModel],
    is_json_output: bool,
    agent: Agent,
    converter_cls: type[Converter] = None,
) -> BaseModel | dict | str:
    # 尝试提取部分 JSON
    try:
        json_text = extract_json_from_text(result)
        return validate_model(json_text, model, is_json_output)
    except Exception:
        # 提取失败,使用 LLM 转换
        return convert_with_instructions(result, model, is_json_output, agent, converter_cls)

def convert_with_instructions(
    result: str,
    model: type[BaseModel],
    is_json_output: bool,
    agent: Agent,
    converter_cls: type[Converter] = None,
) -> BaseModel | dict | str:
    llm = agent.function_calling_llm or agent.llm
    instructions = get_conversion_instructions(model, llm)

    converter = Converter(llm=llm, text=result, model=model, instructions=instructions)

    if is_json_output:
        return converter.to_json()
    else:
        return converter.to_pydantic()

class Converter:
    def to_pydantic(self, current_attempt: int = 1) -> BaseModel:
        if self.llm.supports_function_calling():
            # 使用 function calling 直接获取结构化输出
            return self._create_instructor().to_pydantic()
        else:
            # 使用提示词让 LLM 输出 JSON
            response = self.llm.call([
                {"role": "system", "content": self.instructions},
                {"role": "user", "content": self.text},
            ])

            # 验证并解析
            return self.model.model_validate_json(response)

关键点

  • 三层转换策略:
    1. 直接解析 JSON(最快)
    2. 提取部分 JSON(处理混合输出)
    3. 使用 LLM 重新格式化(兜底方案)
  • Function Calling 模式下,LLM 直接返回结构化输出,无需二次解析
  • Pydantic 模型提供强类型验证,确保输出符合 schema
  • 转换失败时返回原始字符串,不阻塞任务执行

数据结构 UML 图

Task 核心类图

classDiagram
    class Task {
        +UUID id
        +str name
        +str description
        +str expected_output
        +BaseAgent agent
        +list[Task] | _NotSpecified context
        +bool async_execution
        +type[BaseModel] output_json
        +type[BaseModel] output_pydantic
        +str output_file
        +bool create_directory
        +TaskOutput output
        +list[BaseTool] tools
        +SecurityConfig security_config
        +bool human_input
        +bool markdown
        +type[Converter] converter_cls
        +set[str] processed_by_agents
        +GuardrailType guardrail
        +GuardrailsType guardrails
        +int guardrail_max_retries
        +int retry_count
        +datetime start_time
        +datetime end_time
        +bool allow_crewai_trigger_context
        +execute_sync(agent, context, tools)
        +execute_async(agent, context, tools)
        +prompt()
        +interpolate_inputs_and_add_conversation_history(inputs)
        +copy(agents, task_mapping)
        -_execute_core(agent, context, tools)
        -_export_output(result)
        -_save_file(result)
        -_invoke_guardrail_function(task_output, agent, tools, guardrail)
    }

    class ConditionalTask {
        +Callable[[TaskOutput], bool] condition
        +should_execute(context)
        +get_skipped_task_output()
    }

    class TaskOutput {
        +str description
        +str name
        +str expected_output
        +str summary
        +str raw
        +BaseModel pydantic
        +dict json_dict
        +str agent
        +OutputFormat output_format
        +json()
        +to_dict()
    }

    class OutputFormat {
        <<enumeration>>
        RAW
        JSON
        PYDANTIC
    }

    class Converter {
        +BaseLLM llm
        +str text
        +type[BaseModel] model
        +str instructions
        +convert()
    }

    Task <|-- ConditionalTask : 继承
    Task --> TaskOutput : 生成
    TaskOutput --> OutputFormat : 使用
    Task --> Converter : 使用

关键字段说明

Task 核心字段

字段 类型 约束 默认值 说明
id UUID frozen=True uuid4() 任务唯一标识符,不可修改
name str | None - None 任务名称(可选)
description str required - 任务描述,支持模板变量
expected_output str required - 预期输出的明确定义
agent BaseAgent | None - None 执行任务的 Agent
context list[Task] | _NotSpecified - NOT_SPECIFIED 前序任务列表(作为上下文)
async_execution bool - False 是否异步执行任务
output_json type[BaseModel] | None - None JSON 输出的 Pydantic schema
output_pydantic type[BaseModel] | None - None Pydantic 输出模型
output_file str | None - None 输出文件路径
create_directory bool - True 是否自动创建输出目录
output TaskOutput | None - None 任务执行结果
tools list[BaseTool] - [] 任务特定的工具列表
security_config SecurityConfig - default() 安全配置(指纹验证)
human_input bool - False 是否需要人工审核最终输出
markdown bool - False 是否要求输出 Markdown 格式
converter_cls type[Converter] | None - None 自定义转换器类
processed_by_agents set[str] - set() 已处理该任务的 Agent 角色集合
guardrail GuardrailType | None - None 单个 Guardrail 验证函数
guardrails GuardrailsType | None - None 多个 Guardrail 验证函数列表
guardrail_max_retries int - 3 Guardrail 验证失败最大重试次数
retry_count int - 0 当前重试次数
start_time datetime | None - None 任务开始时间
end_time datetime | None - None 任务结束时间
allow_crewai_trigger_context bool | None - None 是否允许注入 trigger payload

TaskOutput 字段

字段 类型 约束 默认值 说明
description str required - 任务描述
name str | None - None 任务名称
expected_output str | None - None 预期输出
summary str | None - auto 任务摘要(自动生成)
raw str - "" 原始输出字符串
pydantic BaseModel | None - None Pydantic 模型实例
json_dict dict[str, Any] | None - None JSON 字典
agent str required - 执行任务的 Agent 角色
output_format OutputFormat - RAW 输出格式(RAW/JSON/PYDANTIC)

API 详细规格

API 1: execute_sync

基本信息

  • 方法签名Task.execute_sync(agent: BaseAgent | None = None, context: str | None = None, tools: list[BaseTool] | None = None) -> TaskOutput
  • 调用方式:实例方法
  • 幂等性:否(依赖 LLM 生成)

功能说明

同步执行任务,阻塞当前线程直到任务完成。

请求结构体

class ExecuteSyncParams:
    agent: Optional[BaseAgent] = None     # 执行任务的 Agent(可覆盖默认)
    context: Optional[str] = None         # 上下文字符串
    tools: Optional[list[BaseTool]] = None  # 工具列表
字段 类型 必填 默认 约束 说明
agent BaseAgent | None None - 覆盖任务默认的 Agent
context str | None None - 前序任务的输出文本
tools list[BaseTool] | None None - 覆盖 Agent 默认的工具列表

响应结构体

class TaskOutput(BaseModel):
    description: str                # 任务描述
    name: Optional[str]             # 任务名称
    expected_output: Optional[str]  # 预期输出
    summary: Optional[str]          # 任务摘要
    raw: str                        # 原始输出
    pydantic: Optional[BaseModel]   # Pydantic 模型输出
    json_dict: Optional[dict]       # JSON 字典输出
    agent: str                      # 执行的 Agent 角色
    output_format: OutputFormat     # 输出格式

核心代码

def execute_sync(
    self,
    agent: BaseAgent | None = None,
    context: str | None = None,
    tools: list[BaseTool] | None = None,
) -> TaskOutput:
    """同步执行任务"""
    return self._execute_core(agent, context, tools)

def _execute_core(
    self,
    agent: BaseAgent | None,
    context: str | None,
    tools: list[Any] | None,
) -> TaskOutput:
    """核心执行逻辑"""
    try:
        # 1. 确定执行 Agent
        agent = agent or self.agent
        self.agent = agent
        if not agent:
            raise Exception(
                f"The task '{self.description}' has no agent assigned"
            )

        # 2. 记录开始时间
        self.start_time = datetime.datetime.now()

        # 3. 设置提示词上下文
        self.prompt_context = context
        tools = tools or self.tools or []

        # 4. 记录处理该任务的 Agent
        self.processed_by_agents.add(agent.role)

        # 5. 发送任务开始事件
        crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))

        # 6. 调用 Agent 执行任务
        result = agent.execute_task(
            task=self,
            context=context,
            tools=tools,
        )

        # 7. 导出结构化输出
        pydantic_output, json_output = self._export_output(result)

        # 8. 创建 TaskOutput
        task_output = TaskOutput(
            name=self.name or self.description,
            description=self.description,
            expected_output=self.expected_output,
            raw=result,
            pydantic=pydantic_output,
            json_dict=json_output,
            agent=agent.role,
            output_format=self._get_output_format(),
        )

        # 9. 执行 Guardrail 验证(如果配置)
        if self._guardrails:
            for idx, guardrail in enumerate(self._guardrails):
                task_output = self._invoke_guardrail_function(
                    task_output=task_output,
                    agent=agent,
                    tools=tools,
                    guardrail=guardrail,
                    guardrail_index=idx,
                )

        # 10. 执行单个 Guardrail(向后兼容)
        if self._guardrail:
            task_output = self._invoke_guardrail_function(
                task_output=task_output,
                agent=agent,
                tools=tools,
                guardrail=self._guardrail,
            )

        # 11. 保存输出
        self.output = task_output
        self.end_time = datetime.datetime.now()

        # 12. 执行回调函数
        if self.callback:
            self.callback(self.output)

        crew = self.agent.crew
        if crew and crew.task_callback and crew.task_callback != self.callback:
            crew.task_callback(self.output)

        # 13. 保存到文件(如果配置)
        if self.output_file:
            content = (
                json_output
                if json_output
                else (
                    pydantic_output.model_dump_json() if pydantic_output else result
                )
            )
            self._save_file(content)

        # 14. 发送任务完成事件
        crewai_event_bus.emit(
            self, TaskCompletedEvent(output=task_output, task=self)
        )

        return task_output

    except Exception as e:
        self.end_time = datetime.datetime.now()
        crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
        raise e

调用链与上游函数

调用路径Crew.kickoff()Crew._execute_tasks()Task.execute_sync()

# 上游调用者:Crew._execute_tasks()
def _execute_tasks(self, tasks: list[Task], start_index: int = 0) -> CrewOutput:
    """Crew 执行任务序列"""
    task_outputs: list[TaskOutput] = []

    for task_index, task in enumerate(tasks):
        agent_to_use = self._get_agent_to_use(task)
        tools_for_task = self._prepare_tools(agent_to_use, task, task.tools or [])

        # 获取上下文
        context = self._get_context(task, task_outputs)

        # 执行任务
        if task.async_execution:
            future = task.execute_async(
                agent=agent_to_use,
                context=context,
                tools=tools_for_task,
            )
            futures.append((task, future, task_index))
        else:
            # 同步执行
            task_output = task.execute_sync(
                agent=agent_to_use,
                context=context,
                tools=tools_for_task,
            )
            task_outputs.append(task_output)
            self._process_task_result(task, task_output)

    return self._create_crew_output(task_outputs)

时序图

sequenceDiagram
    autonumber
    participant Crew as Crew
    participant Task as Task
    participant Agent as Agent
    participant Converter as Converter
    participant Guardrail as Guardrail
    participant FileHandler as FileHandler

    Crew->>Task: execute_sync(agent, context, tools)
    Task->>Task: 记录 start_time
    Task->>Task: 发送 TaskStartedEvent

    Task->>Agent: execute_task(task, context, tools)
    Agent->>Agent: 推理循环
    Agent-->>Task: 返回 result(字符串)

    Task->>Converter: _export_output(result)
    alt 配置了 output_pydantic
        Converter->>Converter: convert_to_model(result, output_pydantic)
        Converter-->>Task: pydantic_output
    else 配置了 output_json
        Converter->>Converter: convert_to_model(result, output_json)
        Converter-->>Task: json_output
    else 默认
        Converter-->>Task: None
    end

    Task->>Task: 创建 TaskOutput

    alt 配置了 guardrail
        Task->>Guardrail: _invoke_guardrail_function(task_output)
        Guardrail->>Guardrail: process_guardrail(output)
        alt 验证失败
            Guardrail->>Agent: 重新执行任务
            Agent-->>Guardrail: 新的 result
            Guardrail->>Converter: 重新转换
        end
        Guardrail-->>Task: 验证通过的 task_output
    end

    Task->>Task: 执行回调函数

    alt 配置了 output_file
        Task->>FileHandler: _save_file(content)
        FileHandler-->>Task: 保存成功
    end

    Task->>Task: 记录 end_time
    Task->>Task: 发送 TaskCompletedEvent
    Task-->>Crew: TaskOutput

时序图详细功能说明

步骤 1-3:任务初始化阶段

  • Crew 调用 Task.execute_sync(),传入执行参数
  • Task 记录任务开始时间(start_time),用于后续计算执行时长
  • 发送 TaskStartedEvent 事件,监听器可以记录日志或触发回调

步骤 4-5:Agent 执行阶段

  • 调用 Agent.execute_task(),传入任务、上下文和工具
  • Agent 进入推理循环(ReAct 模式),可能多次调用 LLM 和工具
  • 返回最终的字符串结果

步骤 6-9:输出转换阶段

  • 调用 _export_output() 处理原始字符串
  • 根据配置选择转换策略:
    • 如果配置了 output_pydantic:转换为 Pydantic 模型实例
    • 如果配置了 output_json:转换为 JSON 字典
    • 默认:保持原始字符串
  • 转换过程可能使用 Converter 调用 LLM 进行格式化

步骤 10:创建输出对象

  • 创建 TaskOutput 对象,封装所有输出数据
  • 包含:原始字符串、结构化输出、任务元数据等

步骤 11-15:Guardrail 验证阶段(可选)

  • 如果配置了 Guardrail,依次执行验证
  • 调用 process_guardrail() 执行用户定义的验证函数
  • 验证失败
    • 构建错误上下文,包含原始输出和错误信息
    • 重新调用 Agent.execute_task(),让 Agent 根据反馈改进
    • 重新转换输出,再次验证
    • 最多重试 guardrail_max_retries
  • 验证成功:继续后续流程

步骤 16:执行回调函数

  • 执行任务级别的 callback(如果配置)
  • 执行 Crew 级别的 task_callback(如果配置)
  • 用于自定义逻辑,如日志记录、指标收集等

步骤 17-19:文件持久化阶段(可选)

  • 如果配置了 output_file,调用 _save_file()
  • 根据输出格式准备文件内容:
    • JSON/Pydantic:序列化为 JSON
    • Raw:直接保存字符串
  • 如果 create_directory=True,自动创建目录

步骤 20-21:任务完成阶段

  • 记录任务结束时间(end_time
  • 发送 TaskCompletedEvent 事件
  • 返回 TaskOutput 给调用者

异常处理流程

  • 任何阶段抛出异常都会被 _execute_core() 捕获
  • 记录结束时间
  • 发送 TaskFailedEvent 事件
  • 重新抛出异常,由上层处理

性能要点

  • 推理循环是最耗时的部分(LLM 调用)
  • Guardrail 验证失败会导致重新执行,显著增加时间
  • 输出转换通常很快,除非需要调用 LLM 格式化
  • 文件 I/O 操作相对较快,但大文件可能有影响

边界与异常

异常情况处理

  1. 没有 Agent 分配

    • 异常类型:Exception
    • 触发条件:task.agent 为 None 且未传入 agent 参数
    • 处理方式:立即抛出异常
    • 回退策略:无,需要用户修复配置
  2. 输出转换失败

    • 异常类型:ValidationError(Pydantic)
    • 触发条件:LLM 输出与 schema 不匹配
    • 处理方式:使用 Converter 重试转换
    • 回退策略:返回原始字符串
  3. Guardrail 验证失败

    • 异常类型:自定义或 LLM 返回的错误信息
    • 触发条件:guardrail(output) 返回 (False, error_message)
    • 处理方式:重新执行任务(最多 guardrail_max_retries 次)
    • 回退策略:超过重试次数后抛出异常
  4. 文件保存失败

    • 异常类型:IOErrorOSError
    • 触发条件:目录不存在、权限不足、磁盘满
    • 处理方式:抛出 RuntimeError 并提示使用 FileWriterTool
    • 回退策略:任务输出仍然返回,但文件未保存

执行时长统计

@property
def execution_duration(self) -> float | None:
    """计算任务执行时长(秒)"""
    if not self.start_time or not self.end_time:
        return None
    return (self.end_time - self.start_time).total_seconds()

API 2: execute_async

基本信息

  • 方法签名Task.execute_async(agent: BaseAgent | None = None, context: str | None = None, tools: list[BaseTool] | None = None) -> Future[TaskOutput]
  • 调用方式:实例方法
  • 幂等性:否

功能说明

异步执行任务,立即返回 Future 对象,任务在后台线程执行。

核心代码

def execute_async(
    self,
    agent: BaseAgent | None = None,
    context: str | None = None,
    tools: list[BaseTool] | None = None,
) -> Future[TaskOutput]:
    """异步执行任务"""
    future: Future[TaskOutput] = Future()
    threading.Thread(
        daemon=True,
        target=self._execute_task_async,
        args=(agent, context, tools, future),
    ).start()
    return future

def _execute_task_async(
    self,
    agent: BaseAgent | None,
    context: str | None,
    tools: list[Any] | None,
    future: Future[TaskOutput],
) -> None:
    """异步执行的内部方法"""
    result = self._execute_core(agent, context, tools)
    future.set_result(result)

使用场景

  • I/O 密集型任务:需要调用外部 API、读取大文件等
  • 长时间任务:执行时间超过 30 秒的任务
  • 并行任务:多个任务可以同时执行

注意事项

  1. 异步任务的上下文限制:异步任务不能依赖其他异步任务的输出(必须有同步任务作为分隔)
  2. 结果获取:通过 future.result() 阻塞等待结果
  3. 异常处理:异常在 future.result() 调用时抛出

API 3: prompt

基本信息

  • 方法签名Task.prompt() -> str
  • 调用方式:实例方法
  • 幂等性:是(相同配置返回相同提示词)

功能说明

生成任务提示词,包含任务描述、预期输出和可选的 Markdown 格式指令。

核心代码

def prompt(self) -> str:
    """生成任务提示词"""
    description = self.description

    # 注入 Trigger Payload(如果配置)
    should_inject = self.allow_crewai_trigger_context
    if should_inject and self.agent:
        crew = getattr(self.agent, "crew", None)
        if crew and hasattr(crew, "_inputs") and crew._inputs:
            trigger_payload = crew._inputs.get("crewai_trigger_payload")
            if trigger_payload is not None:
                description += f"\n\nTrigger Payload: {trigger_payload}"

    # 组装任务切片
    output = self.i18n.slice("expected_output").format(
        expected_output=self.expected_output
    )
    tasks_slices = [description, output]

    # 添加 Markdown 指令(如果配置)
    if self.markdown:
        markdown_instruction = """Your final answer MUST be formatted in Markdown syntax.
Follow these guidelines:
- Use # for headers
- Use ** for bold text
- Use * for italic text
- Use - or * for bullet points
- Use `code` for inline code
- Use ```language for code blocks"""
        tasks_slices.append(markdown_instruction)

    return "\n".join(tasks_slices)

提示词示例

基础提示词

Research the top 5 AI trends in 2024

Expected Output:
A list of 5 AI trends with brief descriptions

带 Markdown 的提示词

Write a blog post about AI agents

Expected Output:
A 500-word blog post

Your final answer MUST be formatted in Markdown syntax.
Follow these guidelines:
- Use # for headers
- Use ** for bold text
...

API 4: interpolate_inputs_and_add_conversation_history

基本信息

  • 方法签名Task.interpolate_inputs_and_add_conversation_history(inputs: dict[str, str | int | float | dict | list]) -> None
  • 调用方式:实例方法
  • 幂等性:否(修改 Task 状态)

功能说明

将输入参数插值到任务描述和预期输出中,支持模板变量替换和会话历史注入。

请求结构体

字段 类型 必填 默认 约束 说明
inputs dict[str, str | int | float | dict | list] - - 输入参数字典

核心代码

def interpolate_inputs_and_add_conversation_history(
    self, inputs: dict[str, str | int | float | dict[str, Any] | list[Any]]
) -> None:
    """插值输入参数并添加会话历史"""

    # 保存原始值(首次插值时)
    if self._original_description is None:
        self._original_description = self.description
    if self._original_expected_output is None:
        self._original_expected_output = self.expected_output
    if self.output_file is not None and self._original_output_file is None:
        self._original_output_file = self.output_file

    if not inputs:
        return

    # 插值 description
    try:
        self.description = interpolate_only(
            input_string=self._original_description, inputs=inputs
        )
    except KeyError as e:
        raise ValueError(
            f"Missing required template variable '{e.args[0]}' in description"
        ) from e

    # 插值 expected_output
    try:
        self.expected_output = interpolate_only(
            input_string=self._original_expected_output, inputs=inputs
        )
    except (KeyError, ValueError) as e:
        raise ValueError(f"Error interpolating expected_output: {e!s}") from e

    # 插值 output_file
    if self.output_file is not None:
        try:
            self.output_file = interpolate_only(
                input_string=self._original_output_file, inputs=inputs
            )
        except (KeyError, ValueError) as e:
            raise ValueError(f"Error interpolating output_file path: {e!s}") from e

    # 添加会话历史(如果有)
    if inputs.get("crew_chat_messages"):
        conversation_instruction = self.i18n.slice(
            "conversation_history_instruction"
        )

        crew_chat_messages_json = str(inputs["crew_chat_messages"])
        crew_chat_messages = json.loads(crew_chat_messages_json)

        conversation_history = "\n".join(
            f"{msg['role'].capitalize()}: {msg['content']}"
            for msg in crew_chat_messages
            if isinstance(msg, dict) and "role" in msg and "content" in msg
        )

        self.description += (
            f"\n\n{conversation_instruction}\n\n{conversation_history}"
        )

模板变量示例

定义任务

task = Task(
    description="Research {topic} and find {count} key insights",
    expected_output="A list of {count} insights about {topic}",
    output_file="reports/{topic}_insights.md",
)

插值

inputs = {
    "topic": "AI Safety",
    "count": 5
}
task.interpolate_inputs_and_add_conversation_history(inputs)

插值后

description: "Research AI Safety and find 5 key insights"
expected_output: "A list of 5 insights about AI Safety"
output_file: "reports/AI_Safety_insights.md"

API 5: copy

基本信息

  • 方法签名Task.copy(agents: list[BaseAgent], task_mapping: dict[str, Task]) -> Task
  • 调用方式:实例方法
  • 幂等性:是

功能说明

创建任务的深拷贝,用于 Crew.copy() 时复制任务实例。

核心代码

def copy(
    self, agents: list[BaseAgent], task_mapping: dict[str, Task]
) -> Task:
    """创建任务的深拷贝"""
    exclude = {
        "id",
        "agent",
        "context",
        "tools",
    }

    # 序列化任务数据
    copied_data = self.model_dump(exclude=exclude)
    copied_data = {k: v for k, v in copied_data.items() if v is not None}

    # 克隆 context
    cloned_context = (
        self.context
        if self.context is NOT_SPECIFIED
        else [task_mapping[context_task.key] for context_task in self.context]
        if isinstance(self.context, list)
        else None
    )

    # 查找对应的 Agent
    def get_agent_by_role(role: str) -> BaseAgent | None:
        return next((agent for agent in agents if agent.role == role), None)

    cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
    cloned_tools = shallow_copy(self.tools) if self.tools else []

    # 创建新实例(保留原始类型)
    return self.__class__(
        **copied_data,
        context=cloned_context,
        agent=cloned_agent,
        tools=cloned_tools,
    )

模块内部时序图

模块时序图 1:Converter 结构化输出转换流程

sequenceDiagram
    autonumber
    participant Task as Task
    participant ConvertToModel as convert_to_model
    participant ValidateModel as validate_model
    participant HandlePartial as handle_partial_json
    participant ExtractJSON as extract_json_from_text
    participant ConvertWithInstr as convert_with_instructions
    participant Converter as Converter
    participant LLM as LLM
    participant Pydantic as Pydantic Validator

    Task->>ConvertToModel: convert_to_model(result, output_pydantic, agent)

    Note over ConvertToModel: 策略 1:直接解析 JSON
    ConvertToModel->>ConvertToModel: json.loads(result)
    alt JSON 解析成功
        ConvertToModel->>ValidateModel: validate_model(json_str, model)
        ValidateModel->>Pydantic: model.model_validate_json(json_str)
        alt Pydantic 验证成功
            Pydantic-->>ValidateModel: BaseModel 实例
            ValidateModel-->>ConvertToModel: 返回模型
            ConvertToModel-->>Task: BaseModel 实例
        else Pydantic 验证失败
            Pydantic-->>ValidateModel: ValidationError
            ValidateModel-->>ConvertToModel: 抛出异常
        end
    else JSON 解析失败
        ConvertToModel->>HandlePartial: handle_partial_json(result, model, agent)

        Note over HandlePartial: 策略 2:提取部分 JSON
        HandlePartial->>ExtractJSON: extract_json_from_text(result)
        alt 提取成功
            ExtractJSON-->>HandlePartial: json_string
            HandlePartial->>ValidateModel: validate_model(json_string, model)
            ValidateModel->>Pydantic: model.model_validate_json()
            Pydantic-->>ValidateModel: BaseModel 实例
            ValidateModel-->>HandlePartial: 返回模型
            HandlePartial-->>ConvertToModel: BaseModel 实例
            ConvertToModel-->>Task: BaseModel 实例
        else 提取失败
            ExtractJSON-->>HandlePartial: 异常

            Note over HandlePartial: 策略 3LLM 重新格式化
            HandlePartial->>ConvertWithInstr: convert_with_instructions(result, model, agent)
            ConvertWithInstr->>ConvertWithInstr: 获取转换指令
            ConvertWithInstr->>Converter: 创建 Converter 实例

            Converter->>Converter: 检查 LLM 是否支持 Function Calling
            alt 支持 Function Calling
                Converter->>LLM: call() with function definition
                LLM-->>Converter: 结构化输出
                Converter->>Pydantic: 直接验证
                Pydantic-->>Converter: BaseModel 实例
            else 不支持 Function Calling
                Converter->>Converter: 构建转换提示词
                Converter->>LLM: call([system, user])
                LLM-->>Converter: JSON 字符串
                Converter->>Pydantic: model.model_validate_json()
                alt 验证成功
                    Pydantic-->>Converter: BaseModel 实例
                else 验证失败且未达最大重试次数
                    Pydantic-->>Converter: ValidationError
                    Converter->>Converter: 递归重试 (attempt++)
                    Converter->>LLM: 再次调用
                end
            end

            Converter-->>ConvertWithInstr: BaseModel 实例
            ConvertWithInstr-->>HandlePartial: BaseModel 实例
            HandlePartial-->>ConvertToModel: BaseModel 实例
            ConvertToModel-->>Task: BaseModel 实例
        end
    end

Converter 时序图功能说明

策略 1:直接解析 JSON(最快路径)

  • 步骤 1-3:尝试直接解析原始字符串为 JSON
  • 步骤 4-5:调用 Pydantic 的 model_validate_json() 进行验证
  • 适用场景:LLM 直接返回完整的 JSON 格式
  • 性能:最快,无需额外 LLM 调用
  • 失败情况:JSON 格式不完整、包含注释、混合文本

策略 2:提取部分 JSON(中等路径)

  • 步骤 6-8:使用正则表达式从混合文本中提取 JSON
  • 提取逻辑
    • 查找 {...}[...] 模式
    • 处理嵌套结构
    • 忽略 JSON 前后的文本
  • 适用场景:LLM 返回 “这是结果:{…}” 格式
  • 性能:较快,仅字符串处理
  • 失败情况:JSON 不完整、语法错误

策略 3:LLM 重新格式化(兜底路径)

  • 步骤 9-11:创建 Converter 实例,使用 LLM 转换
  • Function Calling 模式(步骤 12-14):
    • LLM 直接返回符合 schema 的结构化数据
    • 无需解析,直接验证
    • 推荐用于支持 FC 的模型(GPT-4、Claude 等)
  • 提示词模式(步骤 15-19):
    • 构建转换提示词,包含原始文本和目标 schema
    • LLM 返回 JSON 字符串
    • Pydantic 验证并可能重试(最多 3 次)
  • 适用场景:所有其他策略失败
  • 性能:最慢,需要额外 LLM 调用(1-3 次)
  • 优势:容错性最强,几乎总能成功

关键设计点

  1. 三层降级策略:从快到慢,逐层尝试
  2. 智能重试:Converter 内部实现重试机制
  3. 指令优化:根据 LLM 类型生成不同的转换指令
  4. 错误恢复:最终失败时返回原始字符串,不阻塞任务

模块时序图 2:Guardrail 验证与重试流程

sequenceDiagram
    autonumber
    participant Task as Task
    participant InvokeGuardrail as _invoke_guardrail_function
    participant ProcessGuardrail as process_guardrail
    participant EventBus as EventBus
    participant UserGuardrail as 用户验证函数
    participant Agent as Agent
    participant Converter as Converter

    Task->>InvokeGuardrail: _invoke_guardrail_function(task_output, agent, tools, guardrail)
    InvokeGuardrail->>InvokeGuardrail: 获取当前重试次数

    loop 最多 guardrail_max_retries + 1 
        InvokeGuardrail->>ProcessGuardrail: process_guardrail(output, guardrail, retry_count)

        ProcessGuardrail->>EventBus: emit(LLMGuardrailStartedEvent)
        EventBus-->>ProcessGuardrail: 事件已发送

        ProcessGuardrail->>UserGuardrail: guardrail(task_output)
        Note over UserGuardrail: 用户定义的验证逻辑<br/>检查长度、格式、内容等
        UserGuardrail-->>ProcessGuardrail: (success: bool, result: Any)

        ProcessGuardrail->>ProcessGuardrail: GuardrailResult.from_tuple()
        ProcessGuardrail->>EventBus: emit(LLMGuardrailCompletedEvent)
        EventBus-->>ProcessGuardrail: 事件已发送

        ProcessGuardrail-->>InvokeGuardrail: GuardrailResult

        alt success = True
            InvokeGuardrail->>InvokeGuardrail: 更新 task_output
            alt result 是字符串
                InvokeGuardrail->>Converter: _export_output(result)
                Converter-->>InvokeGuardrail: (pydantic, json_dict)
                InvokeGuardrail->>InvokeGuardrail: 更新 task_output 的结构化输出
            else result  TaskOutput
                InvokeGuardrail->>InvokeGuardrail: 直接替换 task_output
            end
            InvokeGuardrail-->>Task: 验证通过的 task_output
        else success = False AND attempt < max_retries
            InvokeGuardrail->>InvokeGuardrail: retry_count++
            InvokeGuardrail->>InvokeGuardrail: 打印重试警告
            InvokeGuardrail->>InvokeGuardrail: 构建错误上下文
            Note over InvokeGuardrail: 错误上下文包含:<br/>- 原始输出<br/>- 错误信息<br/>- 改进建议

            InvokeGuardrail->>Agent: execute_task(task, error_context, tools)
            Note over Agent: Agent 看到之前的错误<br/>尝试修正输出
            Agent-->>InvokeGuardrail: 新的 result

            InvokeGuardrail->>Converter: _export_output(result)
            Converter-->>InvokeGuardrail: (pydantic, json_dict)
            InvokeGuardrail->>InvokeGuardrail: 创建新的 task_output
            Note over InvokeGuardrail: 继续下一次验证循环
        else success = False AND attempt >= max_retries
            InvokeGuardrail->>InvokeGuardrail: 抛出异常
            InvokeGuardrail-->>Task: Exception: Guardrail validation failed
        end
    end

Guardrail 时序图功能说明

步骤 1-3:初始化验证

  • 获取当前 Guardrail 的重试次数(支持多个 Guardrail 独立计数)
  • 计算最大尝试次数 = guardrail_max_retries + 1(默认 4 次)

步骤 4-10:执行验证

  • 发送 LLMGuardrailStartedEvent 事件(步骤 4-5)
  • 调用用户定义的验证函数(步骤 6-7)
    • 接收 TaskOutput 对象
    • 执行自定义验证逻辑
    • 返回 (success: bool, result: Any) 元组
  • 封装为 GuardrailResult 对象(步骤 8)
  • 发送 LLMGuardrailCompletedEvent 事件(步骤 9-10)

步骤 11-16:验证成功分支

  • 检查 result 的类型:
    • 字符串:调用 Converter 重新转换为结构化输出
    • TaskOutput:直接替换原输出
  • 返回验证通过的 task_output

步骤 17-24:验证失败且未达重试上限

  • 增加重试计数器(步骤 17)
  • 打印黄色警告信息,显示重试进度(步骤 18)
  • 构建错误上下文(步骤 19):
    context = f"""
    Previous output validation failed with error: {error_message}
    
    Previous output:
    {task_output.raw}
    
    Please improve your response to address the validation error.
    """
    
  • 重新调用 Agent.execute_task(),传入错误上下文(步骤 20-21)
    • Agent 会看到之前的错误信息
    • Agent 可以根据反馈调整输出
  • 重新转换输出(步骤 22-23)
  • 创建新的 task_output(步骤 24)
  • 继续下一次验证循环

步骤 25-26:达到最大重试次数

  • 抛出异常:Task failed guardrail validation after N retries
  • 包含最后一次的错误信息

关键设计点

  1. 多 Guardrail 支持:每个 Guardrail 独立计数,按顺序验证
  2. 智能重试:将错误信息作为上下文,引导 Agent 改进
  3. 灵活的返回值
    • 返回原字符串:保持结构化输出不变
    • 返回新字符串:重新转换结构化输出
    • 返回新 TaskOutput:完全替换输出对象
  4. 事件驱动:每次验证都发送事件,便于监控和调试
  5. 性能权衡:每次重试都需要完整的 Agent 执行(LLM 调用 + 工具调用)

常见应用场景

  • 长度检查:确保输出至少 N 个字符
  • 格式验证:检查是否包含必要的标题、章节
  • 内容验证:检查是否包含特定关键词或主题
  • 质量评估:使用 LLM 判断输出质量是否达标
  • 安全检查:检测有害内容、PII 泄露等

关键功能流程剖析

功能 1:Guardrail 验证机制

功能描述

Guardrail 是任务输出的验证机制,确保 LLM 生成的内容符合要求。如果验证失败,会重新执行任务直到通过或达到最大重试次数。

核心代码

def _invoke_guardrail_function(
    self,
    task_output: TaskOutput,
    agent: BaseAgent,
    tools: list[BaseTool],
    guardrail: GuardrailCallable | None,
    guardrail_index: int | None = None,
) -> TaskOutput:
    """执行 Guardrail 验证"""
    if not guardrail:
        return task_output

    # 获取当前重试次数
    if guardrail_index is not None:
        current_retry_count = self._guardrail_retry_counts.get(guardrail_index, 0)
    else:
        current_retry_count = self.retry_count

    max_attempts = self.guardrail_max_retries + 1

    for attempt in range(max_attempts):
        # 执行验证
        guardrail_result = process_guardrail(
            output=task_output,
            guardrail=guardrail,
            retry_count=current_retry_count,
            event_source=self,
            from_task=self,
            from_agent=agent,
        )

        if guardrail_result.success:
            # 验证通过
            if isinstance(guardrail_result.result, str):
                # 更新原始输出
                task_output.raw = guardrail_result.result
                pydantic_output, json_output = self._export_output(
                    guardrail_result.result
                )
                task_output.pydantic = pydantic_output
                task_output.json_dict = json_output
            elif isinstance(guardrail_result.result, TaskOutput):
                task_output = guardrail_result.result

            return task_output

        # 验证失败
        if attempt >= self.guardrail_max_retries:
            # 达到最大重试次数
            guardrail_name = (
                f"guardrail {guardrail_index}"
                if guardrail_index is not None
                else "guardrail"
            )
            raise Exception(
                f"Task failed {guardrail_name} validation after "
                f"{self.guardrail_max_retries} retries. "
                f"Last error: {guardrail_result.error}"
            )

        # 更新重试次数
        if guardrail_index is not None:
            current_retry_count += 1
            self._guardrail_retry_counts[guardrail_index] = current_retry_count
        else:
            self.retry_count += 1
            current_retry_count = self.retry_count

        # 构建错误上下文
        context = self.i18n.errors("validation_error").format(
            guardrail_result_error=guardrail_result.error,
            task_output=task_output.raw,
        )

        # 打印重试信息
        printer = Printer()
        printer.print(
            content=f"Guardrail blocked (attempt {attempt + 1}/{max_attempts}), "
                   f"retrying due to: {guardrail_result.error}\n",
            color="yellow",
        )

        # 重新执行任务
        result = agent.execute_task(
            task=self,
            context=context,
            tools=tools,
        )

        # 重新转换输出
        pydantic_output, json_output = self._export_output(result)
        task_output = TaskOutput(
            name=self.name or self.description,
            description=self.description,
            expected_output=self.expected_output,
            raw=result,
            pydantic=pydantic_output,
            json_dict=json_output,
            agent=agent.role,
            output_format=self._get_output_format(),
        )

    return task_output

Guardrail 类型

1. 函数式 Guardrail

def validate_length(output: TaskOutput) -> tuple[bool, str]:
    """验证输出长度"""
    if len(output.raw) < 100:
        return False, "Output too short, needs at least 100 characters"
    return True, output.raw

task = Task(
    description="Write a summary",
    expected_output="A detailed summary",
    guardrail=validate_length,
    guardrail_max_retries=3,
)

2. LLM Guardrail

task = Task(
    description="Write a product review",
    expected_output="A fair and balanced review",
    guardrail="The output must be objective and mention both pros and cons",
    guardrail_max_retries=2,
)

3. 多 Guardrail 组合

def check_format(output: TaskOutput) -> tuple[bool, str]:
    if not output.raw.startswith("#"):
        return False, "Must start with a Markdown header"
    return True, output.raw

def check_length(output: TaskOutput) -> tuple[bool, str]:
    if len(output.raw) < 200:
        return False, "Too short"
    return True, output.raw

task = Task(
    description="Write a blog post",
    expected_output="A well-formatted blog post",
    guardrails=[check_format, check_length],
)

功能 2:结构化输出转换

功能描述

将 LLM 的原始字符串输出转换为结构化格式(JSON 或 Pydantic 模型)。

核心代码

def _export_output(
    self, result: str
) -> tuple[BaseModel | None, dict[str, Any] | None]:
    """导出结构化输出"""
    pydantic_output: BaseModel | None = None
    json_output: dict[str, Any] | None = None

    if self.output_pydantic or self.output_json:
        model_output = convert_to_model(
            result,
            self.output_pydantic,
            self.output_json,
            self.agent,
            self.converter_cls,
        )

        if isinstance(model_output, BaseModel):
            pydantic_output = model_output
        elif isinstance(model_output, dict):
            json_output = model_output
        elif isinstance(model_output, str):
            try:
                json_output = json.loads(model_output)
            except json.JSONDecodeError:
                json_output = None

    return pydantic_output, json_output

Converter 的实现

class Converter:
    """结构化输出转换器"""

    def __init__(
        self,
        llm: BaseLLM,
        text: str,
        model: type[BaseModel],
        instructions: str,
    ):
        self.llm = llm
        self.text = text
        self.model = model
        self.instructions = instructions

    def convert(self) -> BaseModel:
        """转换文本为 Pydantic 模型"""
        # 使用 LLM 提取结构化数据
        prompt = f"""
{self.instructions}

Original text:
{self.text}

Output must match this schema:
{self.model.model_json_schema()}
"""

        response = self.llm.call([{"role": "user", "content": prompt}])

        # 解析 JSON 并验证
        try:
            data = json.loads(response)
            return self.model.model_validate(data)
        except (json.JSONDecodeError, ValidationError) as e:
            # 重试或返回默认值
            raise ValueError(f"Failed to convert output: {e}")

使用示例

定义 Pydantic 模型

from pydantic import BaseModel, Field

class ArticleSummary(BaseModel):
    title: str = Field(description="Article title")
    author: str = Field(description="Article author")
    summary: str = Field(description="Brief summary")
    key_points: list[str] = Field(description="Key takeaways")
    sentiment: str = Field(description="Overall sentiment")

task = Task(
    description="Summarize the article at {url}",
    expected_output="A structured summary",
    output_pydantic=ArticleSummary,
    agent=researcher,
)

result = task.execute_sync()
# result.output.pydantic 是 ArticleSummary 实例
print(result.pydantic.title)
print(result.pydantic.key_points)

功能 3:ConditionalTask 条件执行

功能描述

ConditionalTask 根据前序任务的输出决定是否执行,实现动态工作流。

核心代码

class ConditionalTask(Task):
    """条件执行任务"""
    condition: Callable[[TaskOutput], bool] | None = None

    def should_execute(self, context: TaskOutput) -> bool:
        """判断是否执行"""
        if self.condition is None:
            raise ValueError("No condition function set")
        return self.condition(context)

    def get_skipped_task_output(self) -> TaskOutput:
        """生成跳过任务的输出"""
        return TaskOutput(
            description=self.description,
            raw="",
            agent=self.agent.role if self.agent else "",
            output_format=OutputFormat.RAW,
        )

Crew 中的处理

def _handle_conditional_task(
    self,
    task: ConditionalTask,
    task_outputs: list[TaskOutput],
    futures: list[tuple[Task, Future[TaskOutput], int]],
    task_index: int,
) -> TaskOutput | None:
    """处理条件任务"""
    # 等待所有异步任务完成
    if futures:
        task_outputs = self._process_async_tasks(futures)
        futures.clear()

    # 获取前一个任务的输出
    previous_output = task_outputs[-1] if task_outputs else None

    # 判断是否执行
    if previous_output is not None and not task.should_execute(previous_output):
        self._logger.log(
            "debug",
            f"Skipping conditional task: {task.description}",
            color="yellow",
        )
        skipped_task_output = task.get_skipped_task_output()
        self._store_execution_log(task, skipped_task_output, task_index)
        return skipped_task_output

    return None

使用示例

from crewai import ConditionalTask

# 定义条件函数
def check_success(output: TaskOutput) -> bool:
    """检查前序任务是否成功"""
    return "success" in output.raw.lower()

# 创建条件任务
analyze_task = Task(
    description="Analyze the data",
    expected_output="Analysis result",
    agent=analyst,
)

report_task = ConditionalTask(
    description="Generate report based on analysis",
    expected_output="Final report",
    agent=writer,
    condition=check_success,  # 仅当分析成功时执行
)

crew = Crew(
    agents=[analyst, writer],
    tasks=[analyze_task, report_task],
)

实战示例与最佳实践

示例 1:基础任务定义

from crewai import Agent, Task, Crew

# 创建 Agent
researcher = Agent(
    role="Research Analyst",
    goal="Find relevant information",
    backstory="Expert researcher",
)

# 创建 Task
research_task = Task(
    description="Research the latest AI trends in 2024",
    expected_output="A list of 5 AI trends with descriptions",
    agent=researcher,
)

# 执行任务
crew = Crew(agents=[researcher], tasks=[research_task])
result = crew.kickoff()
print(result.raw)

示例 2:结构化输出

from pydantic import BaseModel, Field

class TrendReport(BaseModel):
    trends: list[str] = Field(description="List of trends")
    analysis: str = Field(description="Overall analysis")
    recommendations: list[str] = Field(description="Recommendations")

task = Task(
    description="Analyze AI trends and provide recommendations",
    expected_output="Structured trend report",
    agent=analyst,
    output_pydantic=TrendReport,
)

result = task.execute_sync()
# 访问结构化输出
print(result.pydantic.trends)
print(result.pydantic.analysis)

示例 3:任务链与上下文传递

# 任务 1:数据收集
collect_task = Task(
    description="Collect data about {topic}",
    expected_output="Raw data",
    agent=collector,
)

# 任务 2:数据分析(使用任务 1 的输出)
analyze_task = Task(
    description="Analyze the collected data",
    expected_output="Analysis report",
    agent=analyst,
    context=[collect_task],  # 显式指定上下文
)

# 任务 3:报告生成(自动使用所有前序任务的输出)
report_task = Task(
    description="Generate final report",
    expected_output="Comprehensive report",
    agent=writer,
    # context 默认为 NOT_SPECIFIED,自动使用所有前序输出
)

crew = Crew(
    agents=[collector, analyst, writer],
    tasks=[collect_task, analyze_task, report_task],
)

crew.kickoff(inputs={"topic": "Climate Change"})

示例 4:Guardrail 验证

def validate_report_quality(output: TaskOutput) -> tuple[bool, str]:
    """验证报告质量"""
    content = output.raw

    # 检查长度
    if len(content) < 500:
        return False, "Report too short, needs at least 500 characters"

    # 检查关键词
    required_keywords = ["analysis", "conclusion", "recommendation"]
    missing_keywords = [kw for kw in required_keywords if kw not in content.lower()]
    if missing_keywords:
        return False, f"Missing required sections: {', '.join(missing_keywords)}"

    return True, content

task = Task(
    description="Write a comprehensive report on {topic}",
    expected_output="A well-structured report",
    agent=writer,
    guardrail=validate_report_quality,
    guardrail_max_retries=3,
)

示例 5:异步任务执行

# 定义三个独立的任务
task1 = Task(
    description="Research technology trends",
    expected_output="Tech trends",
    agent=tech_researcher,
    async_execution=True,  # 异步执行
)

task2 = Task(
    description="Research market trends",
    expected_output="Market trends",
    agent=market_researcher,
    async_execution=True,  # 异步执行
)

# 同步任务,等待前两个异步任务完成
task3 = Task(
    description="Combine research findings",
    expected_output="Combined report",
    agent=writer,
    async_execution=False,  # 同步执行
    context=[task1, task2],  # 依赖前两个异步任务
)

crew = Crew(
    agents=[tech_researcher, market_researcher, writer],
    tasks=[task1, task2, task3],
)

# task1 和 task2 并行执行,task3 等待两者完成
result = crew.kickoff()

最佳实践总结

1. 任务描述的编写原则

  • 明确具体:避免模糊的描述,提供清晰的指令
  • 上下文完整:提供足够的背景信息
  • 可操作性:描述应该是可执行的动作
  • 避免歧义:使用准确的术语

好的描述

description = """
Analyze the sales data from Q1 2024 and identify:
1. Top 3 performing products
2. Geographic regions with highest growth
3. Key customer segments
4. Trends compared to Q4 2023
"""

不好的描述

description = "Look at the sales data and tell me something"

2. 预期输出的定义技巧

  • 格式明确:指定输出格式(文本、列表、JSON 等)
  • 质量标准:定义输出的质量要求
  • 长度范围:给出合理的长度预期
  • 示例说明:提供输出示例

好的预期输出

expected_output = """
A structured analysis report in Markdown format containing:
- Executive summary (100-150 words)
- Detailed findings (500-800 words)
- 3-5 key recommendations
- Supporting data visualizations (descriptions)
"""

3. 工具选择策略

  • 任务级工具:为特定任务分配专用工具
  • Agent 级工具:为 Agent 配置通用工具
  • 避免工具过载:每个任务 3-5 个工具为宜
  • 工具互补性:选择功能互补的工具

4. 输出格式选择

  • 简单文本:使用默认 RAW 格式
  • 结构化数据:使用 output_pydanticoutput_json
  • 文件输出:配置 output_file 用于持久化
  • 混合输出:原始文本 + 结构化数据

5. 性能优化建议

  • 异步执行:独立任务使用 async_execution=True
  • 上下文优化:仅传递必要的上下文
  • Guardrail 节制:避免过于严格的验证
  • 输出缓存:相同任务考虑缓存结果

总结

Task 模块是 CrewAI 框架中定义工作单元的核心抽象,通过以下机制实现灵活的任务管理:

  1. 清晰的任务定义:description + expected_output 明确任务目标
  2. 灵活的执行模式:同步/异步执行,支持并行任务
  3. 强大的上下文管理:自动或手动传递前序任务输出
  4. 完善的输出处理:原始、JSON、Pydantic 三种格式
  5. 可靠的验证机制:Guardrail 确保输出质量
  6. 条件执行支持:ConditionalTask 实现动态工作流

该模块的设计遵循 单一职责原则,专注于任务的定义和执行协调,将实际执行委托给 Agent,将输出验证委托给 Guardrail,保持了良好的模块边界和可扩展性。