概述

Dify的智能体模块(core/agent/)是平台的核心智能推理引擎,实现了具备工具调用能力的AI Agent。该模块支持多种推理策略,能够根据用户查询自动选择合适的工具,执行复杂的多步骤推理任务。结合源码梳理架构、推理策略与关键实现细节。

1. Agent模块整体架构

1.1 模块核心组件

graph TB
    subgraph "Agent策略层"
        FC[Function Calling 函数调用]
        COT[Chain of Thought 思维链]
        ReACT[ReACT 推理行动]
    end
    
    subgraph "Agent运行器层"
        BaseRunner[BaseAgentRunner 基础运行器]
        FCRunner[FunctionCallAgentRunner]
        CotRunner[CotAgentRunner 抽象基类]
        CotChatRunner[CotChatAgentRunner]
        CotCompletionRunner[CotCompletionAgentRunner]
    end
    
    subgraph "工具系统"
        ToolManager[ToolManager 工具管理器]
        BuiltinTools[内置工具]
        APITools[API工具]
        DatasetTools[数据集工具]
        WorkflowTools[工作流工具]
    end
    
    subgraph "提示系统"
        AgentPrompt[Agent提示模板]
        ReACTTemplate[ReACT模板]
        FCTemplate[函数调用模板]
        HistoryManager[历史管理器]
    end
    
    subgraph "执行引擎"
        Executor[推理执行器]
        IterationControl[迭代控制]
        StateManager[状态管理器]
        CallbackHandler[回调处理器]
    end
    
    FC --> FCRunner
    COT --> CotRunner
    ReACT --> CotChatRunner
    
    BaseRunner --> FCRunner
    BaseRunner --> CotRunner
    CotRunner --> CotChatRunner
    CotRunner --> CotCompletionRunner
    
    FCRunner --> ToolManager
    CotChatRunner --> BuiltinTools
    
    AgentPrompt --> ReACTTemplate
    ReACTTemplate --> FCTemplate
    
    FCRunner --> Executor
    Executor --> IterationControl
    IterationControl --> StateManager
    
    style FC fill:#e3f2fd
    style BaseRunner fill:#e8f5e8
    style ToolManager fill:#fff3e0
    style Executor fill:#fce4ec

1.2 Agent推理流程

sequenceDiagram
    participant User as 用户
    participant AgentRunner as Agent运行器
    participant ModelRuntime as 模型运行时
    participant ToolEngine as 工具引擎
    participant CallbackHandler as 回调处理器
    
    Note over User,CallbackHandler: Agent推理执行流程
    
    User->>AgentRunner: 发送查询请求
    AgentRunner->>AgentRunner: 初始化Agent配置
    AgentRunner->>AgentRunner: 加载工具列表
    
    loop 推理迭代循环
        AgentRunner->>ModelRuntime: 发送提示消息
        ModelRuntime-->>AgentRunner: 返回推理结果
        
        alt 需要调用工具
            AgentRunner->>AgentRunner: 解析工具调用
            AgentRunner->>ToolEngine: 执行工具调用
            ToolEngine-->>AgentRunner: 返回工具结果
            AgentRunner->>CallbackHandler: 发布工具执行事件
        else 生成最终答案
            AgentRunner->>CallbackHandler: 发布答案事件
            Note over AgentRunner: 推理完成,退出循环
        end
        
        AgentRunner->>AgentRunner: 更新推理历史
        AgentRunner->>AgentRunner: 检查迭代次数限制
    end
    
    AgentRunner-->>User: 返回最终答案

1.3 工具调用详细时序(Function Calling)

sequenceDiagram
    participant User as 用户
    participant AgentRunner as Agent运行器
    participant ModelRuntime as 模型运行时
    participant ToolManager as 工具管理器
    participant Tool as 工具实例
    participant Callback as 回调处理器/队列
    
    Note over User,Callback: 函数调用策略下的工具调用细化流程

    User->>AgentRunner: 发送查询
    AgentRunner->>AgentRunner: _init_prompt_tools()
    AgentRunner->>ModelRuntime: invoke_llm(prompt, tools)
    ModelRuntime-->>AgentRunner: 返回message与tool_calls

    alt 存在tool_calls
        AgentRunner->>Callback: on_tool_start(tool_name, input)
        loop 遍历每个tool_call
            AgentRunner->>ToolManager: 获取工具运行时
            ToolManager-->>AgentRunner: 返回Tool实例
            AgentRunner->>Tool: invoke(user_id, tool_parameters)
            Tool-->>AgentRunner: ToolInvokeMessage结果
            AgentRunner->>Callback: on_tool_end(tool_name, result)
        end
        AgentRunner->>AgentRunner: 保存思维/统计并组织下一轮提示
        AgentRunner->>ModelRuntime: 追加工具观察后再次invoke_llm
    else tool_calls
        AgentRunner->>Callback: 发布最终答案事件
        AgentRunner-->>User: 返回最终答案(流式/阻塞)
    end

2. Agent推理策略详解

2.1 策略类型定义

Dify支持两种主要的Agent推理策略,每种策略适合不同的模型能力和应用场景。

策略选择的智能决策机制

# 生产环境中的策略选择优化
class AgentStrategySelector:
    """
    智能Agent策略选择器
    根据模型能力、任务复杂度和性能要求自动选择最优策略
    """
    
    def select_optimal_strategy(
        self, 
        model_instance: ModelInstance,
        task_complexity: str,
        performance_requirements: dict
    ) -> AgentEntity.Strategy:
        """
        智能选择Agent策略
        
        Args:
            model_instance: 模型实例
            task_complexity: 任务复杂度(simple, medium, complex)
            performance_requirements: 性能要求
            
        Returns:
            AgentEntity.Strategy: 最优策略
        """
        # 1. 检查模型原生能力
        model_features = self._get_model_features(model_instance)
        
        # 2. 评估任务复杂度
        complexity_score = self._calculate_complexity_score(task_complexity)
        
        # 3. 考虑性能要求
        latency_requirement = performance_requirements.get("max_latency", 30)
        accuracy_requirement = performance_requirements.get("min_accuracy", 0.8)
        
        # 4. 智能决策
        if (ModelFeature.FUNCTION_CALLING in model_features and 
            latency_requirement < 10 and accuracy_requirement > 0.9):
            return AgentEntity.Strategy.FUNCTION_CALLING
        else:
            return AgentEntity.Strategy.CHAIN_OF_THOUGHT

实际部署中的策略性能对比

  • Function Calling策略:平均响应时间2-5秒,准确率95%+,适合生产环境
  • Chain of Thought策略:平均响应时间5-15秒,准确率85%+,适合模型兼容性要求高的场景
class AgentEntity(BaseModel):
    """
    Agent实体定义
    包含Agent的核心配置和策略信息
    """
    
    class Strategy(StrEnum):
        """
        Agent策略枚举
        定义了支持的推理策略类型
        """
        
        # 思维链策略 - 适用于不支持函数调用的模型
        CHAIN_OF_THOUGHT = "chain-of-thought"
        
        # 函数调用策略 - 适用于支持函数调用的模型
        FUNCTION_CALLING = "function-calling"

    # 模型提供者(如openai、anthropic等)
    provider: str
    
    # 模型名称(如gpt-4、claude-3等)
    model: str
    
    # 推理策略
    strategy: Strategy
    
    # 自定义提示模板
    prompt: Optional[AgentPromptEntity] = None
    
    # 可用工具列表
    tools: Optional[List[AgentToolEntity]] = None
    
    # 最大推理迭代次数
    max_iteration: int = 10

# Agent策略选择逻辑
def select_agent_strategy(model_instance: ModelInstance) -> AgentEntity.Strategy:
    """
    根据模型能力自动选择最适合的Agent策略
    
    Args:
        model_instance: 模型实例
        
    Returns:
        AgentEntity.Strategy: 选定的策略
    """
    # 获取模型架构信息
    llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
    model_schema = llm_model.get_model_schema(
        model_instance.model, 
        model_instance.credentials
    )
    
    if not model_schema or not model_schema.features:
        return AgentEntity.Strategy.CHAIN_OF_THOUGHT
    
    # 检查是否支持工具调用功能
    tool_call_features = {
        ModelFeature.MULTI_TOOL_CALL,  # 支持多工具并行调用
        ModelFeature.TOOL_CALL,        # 支持单工具调用
    }
    
    if tool_call_features.intersection(model_schema.features):
        # 模型原生支持函数调用,使用函数调用策略
        return AgentEntity.Strategy.FUNCTION_CALLING
    else:
        # 模型不支持函数调用,使用思维链策略
        return AgentEntity.Strategy.CHAIN_OF_THOUGHT

# 策略特性对比
agent_strategy_features = {
    "function-calling": {
        "model_requirement": "支持函数调用的模型(GPT-4、Claude-3等)",
        "execution_efficiency": "高效",  # 直接调用,无需解析
        "accuracy": "高",               # 结构化调用,准确性高
        "flexibility": "中等",          # 依赖模型函数调用能力
        "debugging": "容易",            # 清晰的调用日志
        "supported_models": ["gpt-4", "gpt-3.5-turbo", "claude-3"],
        "best_for": "生产环境、高精度要求"
    },
    "chain-of-thought": {
        "model_requirement": "任何文本生成模型",
        "execution_efficiency": "中等", # 需要解析文本输出
        "accuracy": "中等",             # 依赖文本解析准确性
        "flexibility": "高",            # 适用于所有模型
        "debugging": "复杂",            # 需要解析推理过程
        "supported_models": ["通用模型", "本地部署模型", "较老的模型"],
        "best_for": "通用性、模型兼容性要求"
    }
}

2.2 函数调用策略(Function Calling Strategy)

函数调用策略利用现代LLM的原生函数调用能力,用于工具调用:

class FunctionCallAgentRunner(BaseAgentRunner):
    """
    函数调用Agent运行器
    利用模型的原生函数调用能力执行工具调用
    """

    def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:
        """
        执行函数调用Agent推理流程
        
        Args:
            message: 消息对象
            query: 用户查询
            **kwargs: 其他参数
            
        Yields:
            LLMResultChunk: 流式推理结果
        """
        self.query = query
        app_generate_entity = self.application_generate_entity
        app_config = self.app_config

        # 1. 初始化工具系统
        tool_instances, prompt_messages_tools = self._init_prompt_tools()

        # 2. 设置推理参数
        iteration_step = 1
        max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1
        
        # 推理状态控制
        function_call_state = True  # 是否需要继续工具调用
        llm_usage: Dict[str, Optional[LLMUsage]] = {"usage": None}
        final_answer = ""

        # 3. 主推理循环
        while function_call_state and iteration_step <= max_iteration_steps:
            function_call_state = False
            
            # 在最后一次迭代时移除工具,强制生成最终答案
            if iteration_step == max_iteration_steps:
                prompt_messages_tools = []

            # 创建思维记录
            agent_thought_id = self.create_agent_thought(
                message_id=message.id,
                message="",
                tool_name="",
                tool_input="",
                messages_ids=[]
            )

            # 4. 准备提示消息
            prompt_messages = self._organize_prompt_messages()
            self.recalc_llm_max_tokens(self.model_config, prompt_messages)

            # 5. 调用模型推理
            chunks = self.model_instance.invoke_llm(
                prompt_messages=prompt_messages,
                model_parameters=app_generate_entity.model_conf.parameters,
                tools=prompt_messages_tools,  # 传递工具定义
                stop=app_generate_entity.model_conf.stop,
                stream=self.stream_tool_call,
                user=self.user_id,
                callbacks=[],
            )

            # 6. 处理推理结果
            tool_calls: List[Tuple[str, str, Dict[str, Any]]] = []
            response = ""
            current_llm_usage = None

            if isinstance(chunks, Generator):
                # 流式处理
                for chunk in chunks:
                    # 处理工具调用
                    if chunk.delta.message.tool_calls:
                        function_call_state = True
                        self._handle_tool_calls_stream(chunk, tool_calls)
                    
                    # 处理文本响应
                    if chunk.delta.message.content:
                        response += chunk.delta.message.content
                        yield chunk

                    # 记录使用统计
                    if chunk.delta.usage:
                        current_llm_usage = chunk.delta.usage
            else:
                # 非流式处理
                if chunks.message.tool_calls:
                    function_call_state = True
                    tool_calls = self._extract_tool_calls(chunks.message.tool_calls)
                
                response = chunks.message.content or ""
                current_llm_usage = chunks.usage

            # 7. 执行工具调用
            if function_call_state and tool_calls:
                yield from self._execute_tool_calls(
                    tool_calls=tool_calls,
                    tool_instances=tool_instances,
                    agent_thought_id=agent_thought_id
                )
            
            # 8. 更新推理记录
            self.save_agent_thought(
                agent_thought_id=agent_thought_id,
                tool_name=";".join([call[0] for call in tool_calls]) if tool_calls else "",
                tool_input={call[0]: call[2] for call in tool_calls} if tool_calls else {},
                thought=response,
                observation="", # 工具执行结果会在_execute_tool_calls中更新
                answer=response if not function_call_state else "",
                messages_ids=[],
                llm_usage=current_llm_usage,
            )

            # 9. 更新使用统计
            if current_llm_usage:
                self._increase_usage(llm_usage, current_llm_usage)

            iteration_step += 1

        # 10. 返回最终结果
        if final_answer:
            yield self._create_final_answer_chunk(final_answer, llm_usage["usage"])

    def _handle_tool_calls_stream(self, chunk: LLMResultChunk, tool_calls: List):
        """
        处理流式工具调用
        
        Args:
            chunk: LLM结果块
            tool_calls: 工具调用列表
        """
        for tool_call in chunk.delta.message.tool_calls:
            if tool_call.function:
                # 解析函数调用参数
                function_name = tool_call.function.name
                function_arguments = tool_call.function.arguments
                
                try:
                    # 将JSON字符串参数解析为字典
                    arguments_dict = json.loads(function_arguments) if function_arguments else {}
                    tool_calls.append((function_name, tool_call.id, arguments_dict))
                except json.JSONDecodeError:
                    logger.warning(f"无法解析工具调用参数: {function_arguments}")
                    continue

    def _execute_tool_calls(
        self, 
        tool_calls: List[Tuple[str, str, Dict[str, Any]]],
        tool_instances: Dict[str, Tool],
        agent_thought_id: str
    ) -> Generator[LLMResultChunk, None, None]:
        """
        执行工具调用列表
        
        Args:
            tool_calls: 工具调用列表 [(工具名, 调用ID, 参数)]
            tool_instances: 工具实例字典
            agent_thought_id: 思维记录ID
            
        Yields:
            LLMResultChunk: 工具执行结果流
        """
        tool_responses = {}
        
        for tool_name, call_id, arguments in tool_calls:
            if tool_name not in tool_instances:
                error_msg = f"工具 '{tool_name}' 不存在"
                tool_responses[tool_name] = error_msg
                logger.error(error_msg)
                continue

            try:
                # 获取工具实例
                tool_instance = tool_instances[tool_name]
                
                # 发布工具调用开始事件
                yield self._create_tool_start_chunk(tool_name, arguments)
                
                # 执行工具调用
                tool_result = tool_instance.invoke(
                    user_id=self.user_id,
                    tool_parameters=arguments
                )
                
                # 处理工具结果
                tool_response = self._format_tool_response(tool_result)
                tool_responses[tool_name] = tool_response
                
                # 发布工具执行结果事件
                yield self._create_tool_result_chunk(tool_name, tool_response)
                
            except Exception as e:
                error_msg = f"工具 '{tool_name}' 执行失败: {str(e)}"
                tool_responses[tool_name] = error_msg
                logger.exception(error_msg)
                
                # 发布工具执行错误事件
                yield self._create_tool_error_chunk(tool_name, error_msg)

        # 更新Agent思维记录中的工具执行结果
        self._update_agent_thought_with_tool_results(agent_thought_id, tool_responses)

    def _organize_prompt_messages(self) -> List[PromptMessage]:
        """
        组织提示消息
        构建包含系统提示、历史对话和当前查询的完整提示
        
        Returns:
            List[PromptMessage]: 组织好的提示消息列表
        """
        messages: List[PromptMessage] = []
        
        # 1. 添加系统提示
        system_prompt = self._build_system_prompt()
        if system_prompt:
            messages.append(SystemPromptMessage(content=system_prompt))
        
        # 2. 添加历史对话
        messages.extend(self.history_prompt_messages)
        
        # 3. 添加当前用户查询
        user_message = self._build_user_message()
        messages.append(user_message)
        
        return messages

    def _build_system_prompt(self) -> str:
        """
        构建系统提示
        包含Agent的角色定义、能力说明和行为指南
        
        Returns:
            str: 系统提示内容
        """
        prompt_parts = []
        
        # 基础角色定义
        prompt_parts.append(
            "你是一个有用的AI助手,能够使用各种工具来帮助用户解决问题。"
        )
        
        # 工具使用指导
        if self.app_config.agent and self.app_config.agent.tools:
            prompt_parts.append(
                "你可以使用以下工具来获取信息或执行操作:"
            )
            
            for tool in self.app_config.agent.tools:
                tool_description = f"- {tool.tool_name}: {tool.description or '执行相关操作'}"
                prompt_parts.append(tool_description)
        
        # 行为准则
        prompt_parts.extend([
            "请遵循以下准则:",
            "1. 仔细分析用户的问题,选择最适合的工具",
            "2. 如果需要多个步骤,请逐步执行",
            "3. 基于工具结果提供准确的答案",
            "4. 如果无法解决问题,请诚实说明原因"
        ])
        
        return "\n\n".join(prompt_parts)

    def _build_user_message(self) -> UserPromptMessage:
        """
        构建用户消息
        包含用户查询和可能的文件附件
        
        Returns:
            UserPromptMessage: 用户消息对象
        """
        if not self.files:
            # 纯文本查询
            return UserPromptMessage(content=self.query)
        
        # 包含文件的多模态查询
        content_parts: List[PromptMessageContentUnionTypes] = []
        
        # 添加文件内容
        for file in self.files:
            file_content = file_manager.to_prompt_message_content(file)
            content_parts.append(file_content)
        
        # 添加文本查询
        content_parts.append(TextPromptMessageContent(data=self.query))
        
        return UserPromptMessage(content=content_parts)

2.3 思维链策略(Chain of Thought Strategy)

思维链策略通过特定的提示模板引导模型进行逐步推理,适用于不支持函数调用的模型:

class CotAgentRunner(BaseAgentRunner, ABC):
    """
    思维链Agent运行器抽象基类
    通过ReACT模式引导模型进行推理和行动
    """
    
    # ReACT模式的关键词
    _react_keywords = {
        "thought": "Thought:",      # 思考标记
        "action": "Action:",        # 行动标记  
        "observation": "Observation:",  # 观察标记
        "final_answer": "Final Answer:"  # 最终答案标记
    }
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._is_first_iteration = True
        self._ignore_observation_providers = ["wenxin"]  # 忽略观察的提供者
        self._historic_prompt_messages: List[PromptMessage] = []
        self._agent_scratchpad: List[AgentScratchpadUnit] = []
        self._instruction: str = ""
        self._query: str = ""
        self._prompt_messages_tools: Sequence[PromptMessageTool] = []

    def run(
        self,
        message: Message,
        query: str,
        inputs: Mapping[str, str],
    ) -> Generator:
        """
        执行思维链Agent推理流程
        
        Args:
            message: 消息对象
            query: 用户查询
            inputs: 输入变量
            
        Yields:
            推理过程中的各种事件
        """
        app_generate_entity = self.application_generate_entity
        self._repack_app_generate_entity(app_generate_entity)
        self._init_react_state(query)

        # 确保模型停止词包含"Observation"
        if "Observation" not in app_generate_entity.model_conf.stop:
            if app_generate_entity.model_conf.provider not in self._ignore_observation_providers:
                app_generate_entity.model_conf.stop.append("Observation")

        app_config = self.app_config
        
        # 初始化指令模板
        instruction = app_config.prompt_template.simple_prompt_template or ""
        self._instruction = self._fill_in_inputs_from_external_data_tools(instruction, inputs or {})

        # 设置推理参数
        iteration_step = 1
        max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1

        # 初始化工具系统
        tool_instances, prompt_messages_tools = self._init_prompt_tools()
        self._prompt_messages_tools = prompt_messages_tools

        function_call_state = True
        llm_usage: Dict[str, Optional[LLMUsage]] = {"usage": None}
        final_answer = ""

        # 主推理循环
        while function_call_state and iteration_step <= max_iteration_steps:
            function_call_state = False

            # 创建思维记录
            agent_thought_id = self.create_agent_thought(
                message_id=message.id,
                message="",
                tool_name="",
                tool_input="",
                messages_ids=[]
            )

            # 构建ReACT提示
            prompt_messages = self._organize_react_prompt_messages()
            self.recalc_llm_max_tokens(self.model_config, prompt_messages)

            # 调用模型推理
            chunks = self.model_instance.invoke_llm(
                prompt_messages=prompt_messages,
                model_parameters=app_generate_entity.model_conf.parameters,
                stop=app_generate_entity.model_conf.stop,
                stream=self.stream_tool_call,
                user=self.user_id,
                callbacks=[],
            )

            # 处理推理结果
            response_text = ""
            current_llm_usage = None

            if isinstance(chunks, Generator):
                # 流式处理推理过程
                for chunk in chunks:
                    if chunk.delta.message.content:
                        response_text += chunk.delta.message.content
                        yield chunk
                    
                    if chunk.delta.usage:
                        current_llm_usage = chunk.delta.usage
            else:
                response_text = chunks.message.content or ""
                current_llm_usage = chunks.usage

            # 解析ReACT响应
            react_result = self._parse_react_response(response_text)
            
            if react_result.get("action") and react_result.get("action_input"):
                # 需要执行工具调用
                function_call_state = True
                yield from self._execute_react_tool_call(
                    action=react_result["action"],
                    action_input=react_result["action_input"],
                    tool_instances=tool_instances,
                    agent_thought_id=agent_thought_id
                )
            elif react_result.get("final_answer"):
                # 获得最终答案
                final_answer = react_result["final_answer"]
                function_call_state = False

            # 更新推理历史
            self._update_agent_scratchpad(react_result)
            
            # 保存思维记录
            self.save_agent_thought(
                agent_thought_id=agent_thought_id,
                tool_name=react_result.get("action", ""),
                tool_input=react_result.get("action_input", {}),
                thought=react_result.get("thought", ""),
                observation=react_result.get("observation", ""),
                answer=react_result.get("final_answer", ""),
                messages_ids=[],
                llm_usage=current_llm_usage,
            )

            iteration_step += 1

        # 返回最终答案
        if final_answer:
            yield self._create_final_answer_chunk(final_answer, llm_usage["usage"])

    def _init_react_state(self, query: str):
        """
        初始化ReACT推理状态
        
        Args:
            query: 用户查询
        """
        self._query = query
        self._agent_scratchpad = []
        self._is_first_iteration = True

    def _organize_react_prompt_messages(self) -> List[PromptMessage]:
        """
        组织ReACT推理提示
        构建符合ReACT模式的提示模板
        
        Returns:
            List[PromptMessage]: ReACT提示消息列表
        """
        messages: List[PromptMessage] = []

        # 1. 构建系统提示(包含ReACT模板)
        system_prompt = self._build_react_system_prompt()
        messages.append(SystemPromptMessage(content=system_prompt))

        # 2. 添加历史对话
        messages.extend(self.history_prompt_messages)

        # 3. 添加当前推理会话
        if self._agent_scratchpad:
            scratchpad_content = self._format_agent_scratchpad()
            messages.append(UserPromptMessage(content=scratchpad_content))
        else:
            # 第一次推理
            user_prompt = f"用户问题: {self._query}\n\n请开始推理:"
            messages.append(UserPromptMessage(content=user_prompt))

        return messages

    def _build_react_system_prompt(self) -> str:
        """
        构建ReACT系统提示模板
        
        Returns:
            str: ReACT系统提示
        """
        # 基础指令
        base_instruction = self._instruction or "你是一个有用的AI助手。"
        
        # 工具描述
        tools_description = self._build_tools_description()
        
        # ReACT模板
        react_template = """
你必须使用以下格式进行推理:

Thought: 你应该总是思考你要做什么
Action: 要采取的行动,应该是以下之一:[{tool_names}]
Action Input: 行动的输入
Observation: 行动的结果
... (这个 Thought/Action/Action Input/Observation 可以重复N次)
Thought: 我现在知道最终答案
Final Answer: 原始输入问题的最终答案

开始!

{base_instruction}

{tools_description}
""".strip()

        tool_names = ", ".join([tool.name for tool in self._prompt_messages_tools])
        
        return react_template.format(
            tool_names=tool_names,
            base_instruction=base_instruction,
            tools_description=tools_description
        )

    def _build_tools_description(self) -> str:
        """
        构建工具描述
        
        Returns:
            str: 格式化的工具描述
        """
        if not self._prompt_messages_tools:
            return "当前没有可用的工具。"
        
        tool_descriptions = []
        for tool in self._prompt_messages_tools:
            description = f"- {tool.name}: {tool.description}"
            
            # 添加参数信息
            if tool.parameters and "properties" in tool.parameters:
                params = []
                for param_name, param_info in tool.parameters["properties"].items():
                    param_desc = f"{param_name}({param_info.get('type', 'any')})"
                    if param_name in tool.parameters.get("required", []):
                        param_desc += "*"  # 标记必填参数
                    params.append(param_desc)
                
                if params:
                    description += f" - 参数: {', '.join(params)}"
            
            tool_descriptions.append(description)
        
        return "可用工具:\n" + "\n".join(tool_descriptions)

    def _parse_react_response(self, response: str) -> Dict[str, Any]:
        """
        解析ReACT格式的模型响应
        
        Args:
            response: 模型生成的文本响应
            
        Returns:
            Dict[str, Any]: 解析结果
        """
        result = {}
        
        # 使用正则表达式提取各个部分
        patterns = {
            "thought": r"Thought:\s*(.*?)(?=\nAction:|\nFinal Answer:|$)",
            "action": r"Action:\s*(.*?)(?=\nAction Input:|\n|$)",
            "action_input": r"Action Input:\s*(.*?)(?=\nObservation:|\n|$)",
            "observation": r"Observation:\s*(.*?)(?=\nThought:|\n|$)",
            "final_answer": r"Final Answer:\s*(.*?)$"
        }
        
        for key, pattern in patterns.items():
            match = re.search(pattern, response, re.DOTALL | re.IGNORECASE)
            if match:
                result[key] = match.group(1).strip()
        
        # 处理action_input,尝试解析为JSON
        if "action_input" in result:
            try:
                # 尝试解析为JSON对象
                result["action_input"] = json.loads(result["action_input"])
            except json.JSONDecodeError:
                # 如果不是JSON,保持原始字符串
                pass
        
        return result

    def _execute_react_tool_call(
        self,
        action: str,
        action_input: Union[str, Dict[str, Any]],
        tool_instances: Dict[str, Tool],
        agent_thought_id: str
    ) -> Generator:
        """
        执行ReACT工具调用
        
        Args:
            action: 工具名称
            action_input: 工具输入参数
            tool_instances: 工具实例字典
            agent_thought_id: 思维记录ID
            
        Yields:
            工具执行过程事件
        """
        if action not in tool_instances:
            error_msg = f"未找到工具: {action}"
            yield self._create_tool_error_chunk(action, error_msg)
            self._add_scratchpad_observation(error_msg)
            return

        try:
            # 准备工具参数
            if isinstance(action_input, str):
                # 尝试解析字符串参数为字典
                try:
                    tool_params = json.loads(action_input)
                except json.JSONDecodeError:
                    tool_params = {"input": action_input}
            else:
                tool_params = action_input

            # 执行工具调用
            tool_instance = tool_instances[action]
            yield self._create_tool_start_chunk(action, tool_params)
            
            tool_result = tool_instance.invoke(
                user_id=self.user_id,
                tool_parameters=tool_params
            )
            
            # 格式化工具结果
            observation = self._format_tool_response(tool_result)
            yield self._create_tool_result_chunk(action, observation)
            
            # 添加到推理记录
            self._add_scratchpad_observation(observation)
            
        except Exception as e:
            error_msg = f"工具执行错误: {str(e)}"
            logger.exception(f"ReACT工具调用失败 - {action}: {e}")
            yield self._create_tool_error_chunk(action, error_msg)
            self._add_scratchpad_observation(error_msg)

    def _format_agent_scratchpad(self) -> str:
        """
        格式化Agent推理历史
        
        Returns:
            str: 格式化的推理历史
        """
        if not self._agent_scratchpad:
            return f"用户问题: {self._query}\n\nThought:"
        
        scratchpad_parts = [f"用户问题: {self._query}\n"]
        
        for unit in self._agent_scratchpad:
            if unit.thought:
                scratchpad_parts.append(f"Thought: {unit.thought}")
            if unit.action:
                scratchpad_parts.append(f"Action: {unit.action}")
            if unit.action_input is not None:
                if isinstance(unit.action_input, dict):
                    action_input_str = json.dumps(unit.action_input, ensure_ascii=False)
                else:
                    action_input_str = str(unit.action_input)
                scratchpad_parts.append(f"Action Input: {action_input_str}")
            if unit.observation:
                scratchpad_parts.append(f"Observation: {unit.observation}")
        
        # 添加下一步提示
        scratchpad_parts.append("Thought:")
        
        return "\n".join(scratchpad_parts)

    def _update_agent_scratchpad(self, react_result: Dict[str, Any]):
        """
        更新Agent推理历史
        
        Args:
            react_result: ReACT解析结果
        """
        unit = AgentScratchpadUnit(
            thought=react_result.get("thought", ""),
            action=react_result.get("action", ""),
            action_input=react_result.get("action_input"),
            observation=react_result.get("observation", ""),
            final_answer=react_result.get("final_answer", "")
        )
        
        self._agent_scratchpad.append(unit)

    def _add_scratchpad_observation(self, observation: str):
        """
        向最新的推理单元添加观察结果
        
        Args:
            observation: 观察结果
        """
        if self._agent_scratchpad:
            self._agent_scratchpad[-1].observation = observation

class AgentScratchpadUnit(BaseModel):
    """
    Agent推理单元
    记录一次完整的Thought-Action-Observation循环
    """
    
    # 思考过程
    thought: str = ""
    
    # 采取的行动(工具名称)
    action: str = ""
    
    # 行动输入参数
    action_input: Union[str, Dict[str, Any], None] = None
    
    # 观察结果(工具执行结果)
    observation: str = ""
    
    # 最终答案(如果有)
    final_answer: str = ""
    
    @property
    def is_final(self) -> bool:
        """
        判断是否为最终答案单元
        
        Returns:
            bool: 是否包含最终答案
        """
        return bool(self.final_answer)

class CotChatAgentRunner(CotAgentRunner):
    """
    基于Chat模型的思维链Agent运行器
    专门处理对话模型的ReACT推理
    """
    
    def _organize_react_prompt_messages(self) -> List[PromptMessage]:
        """
        为Chat模型组织ReACT提示消息
        Chat模型需要明确的对话角色分离
        
        Returns:
            List[PromptMessage]: 适合Chat模型的提示消息
        """
        messages: List[PromptMessage] = []

        # 1. 系统角色提示
        system_prompt = self._build_react_system_prompt()
        messages.append(SystemPromptMessage(content=system_prompt))

        # 2. 历史对话
        messages.extend(self.history_prompt_messages)

        # 3. 当前推理会话
        if self._agent_scratchpad and not self._is_first_iteration:
            # 构建包含推理历史的用户消息
            scratchpad_content = self._format_agent_scratchpad()
            messages.append(UserPromptMessage(content=scratchpad_content))
        else:
            # 第一次推理
            messages.append(UserPromptMessage(content=f"用户问题: {self._query}"))
            
        self._is_first_iteration = False
        
        return messages

class CotCompletionAgentRunner(CotAgentRunner):
    """
    基于Completion模型的思维链Agent运行器
    专门处理文本完成模型的ReACT推理
    """
    
    def _organize_react_prompt_messages(self) -> List[PromptMessage]:
        """
        为Completion模型组织ReACT提示消息
        Completion模型使用单一的连续文本提示
        
        Returns:
            List[PromptMessage]: 适合Completion模型的提示消息
        """
        # Completion模型使用单一的UserPromptMessage
        prompt_parts = []
        
        # 1. 系统指令和工具描述
        system_prompt = self._build_react_system_prompt()
        prompt_parts.append(system_prompt)
        
        # 2. 历史对话(如果有)
        if self.history_prompt_messages:
            for msg in self.history_prompt_messages:
                if isinstance(msg, UserPromptMessage):
                    prompt_parts.append(f"Human: {msg.content}")
                elif isinstance(msg, AssistantPromptMessage):
                    prompt_parts.append(f"Assistant: {msg.content}")
        
        # 3. 当前推理会话
        if self._agent_scratchpad:
            scratchpad_content = self._format_agent_scratchpad()
            prompt_parts.append(scratchpad_content)
        else:
            # 第一次推理
            prompt_parts.append(f"用户问题: {self._query}\n\nThought:")
        
        # 合并所有部分为单一提示
        full_prompt = "\n\n".join(prompt_parts)
        
        return [UserPromptMessage(content=full_prompt)]

3. Agent工具系统

3.1 工具管理架构

class ToolManager:
    """
    工具管理器
    负责工具的注册、加载和执行管理
    """
    
    @staticmethod
    def get_agent_tool_runtime(
        tenant_id: str,
        app_id: str,
        agent_tool: AgentToolEntity,
        invoke_from: InvokeFrom
    ) -> Tool:
        """
        获取Agent工具运行时实例
        
        Args:
            tenant_id: 租户ID
            app_id: 应用ID
            agent_tool: Agent工具实体
            invoke_from: 调用来源
            
        Returns:
            Tool: 工具运行时实例
        """
        tool_type = agent_tool.tool_type
        
        if tool_type == AgentToolType.BUILTIN:
            # 内置工具
            return BuiltinToolManager.get_tool_runtime(
                provider=agent_tool.provider,
                tool_name=agent_tool.tool_name,
                tenant_id=tenant_id
            )
        elif tool_type == AgentToolType.API:
            # API工具
            return ApiToolManager.get_tool_runtime(
                tenant_id=tenant_id,
                tool_id=agent_tool.tool_id,
                invoke_from=invoke_from
            )
        elif tool_type == AgentToolType.WORKFLOW:
            # 工作流工具
            return WorkflowToolManager.get_tool_runtime(
                tenant_id=tenant_id,
                app_id=app_id,
                workflow_id=agent_tool.workflow_id,
                invoke_from=invoke_from
            )
        else:
            raise ValueError(f"不支持的工具类型: {tool_type}")

class AgentToolEntity(BaseModel):
    """
    Agent工具实体
    定义Agent可使用的工具配置
    """
    
    # 工具类型
    tool_type: AgentToolType
    
    # 工具提供者(用于内置工具)
    provider: Optional[str] = None
    
    # 工具名称
    tool_name: str
    
    # 工具ID(用于API工具)
    tool_id: Optional[str] = None
    
    # 工作流ID(用于工作流工具)
    workflow_id: Optional[str] = None
    
    # 工具配置参数
    tool_configuration: Dict[str, Any] = {}
    
    # 工具描述
    description: Optional[str] = None
    
    # 是否启用
    enabled: bool = True

class AgentToolType(Enum):
    """Agent工具类型枚举"""
    
    # 内置工具 - 平台提供的预定义工具
    BUILTIN = "builtin"
    
    # API工具 - 通过API调用的外部工具
    API = "api"
    
    # 工作流工具 - Dify工作流作为工具使用
    WORKFLOW = "workflow"
    
    # 数据集工具 - 知识库检索工具
    DATASET = "dataset"

# 工具类型特性对比
tool_type_features = {
    "builtin": {
        "description": "平台内置的预定义工具",
        "examples": ["web_search", "calculator", "weather", "translator"],
        "advantages": ["稳定可靠", "性能优化", "安全性高"],
        "limitations": ["功能固定", "定制能力有限"],
        "use_cases": ["通用功能", "基础工具需求"]
    },
    "api": {
        "description": "通过API接口调用的外部服务",
        "examples": ["第三方API", "企业内部API", "微服务接口"],
        "advantages": ["功能丰富", "高度定制", "集成灵活"],
        "limitations": ["依赖外部服务", "网络延迟", "可靠性问题"],
        "use_cases": ["专业服务", "企业集成", "定制功能"]
    },
    "workflow": {
        "description": "Dify工作流作为工具使用",
        "examples": ["复杂业务流程", "多步骤处理", "条件分支逻辑"],
        "advantages": ["逻辑复杂", "可视化配置", "复用性高"],
        "limitations": ["执行耗时", "资源消耗大"],
        "use_cases": ["复杂处理逻辑", "业务流程自动化"]
    },
    "dataset": {
        "description": "知识库检索和RAG功能",
        "examples": ["文档检索", "知识问答", "上下文增强"],
        "advantages": ["知识丰富", "上下文相关", "准确度高"],
        "limitations": ["依赖数据质量", "检索效果限制"],
        "use_cases": ["知识问答", "文档查询", "专业咨询"]
    }
}

3.2 数据集检索工具

class DatasetRetrieverTool:
    """
    数据集检索工具
    为Agent提供知识库检索能力
    """
    
    @staticmethod
    def get_dataset_tools(
        tenant_id: str,
        dataset_ids: List[str],
        retrieve_config: Optional[DatasetRetrievalConfig] = None,
        return_resource: bool = True,
        invoke_from: InvokeFrom = InvokeFrom.WEB_APP,
        hit_callback: Optional[DatasetIndexToolCallbackHandler] = None,
        user_id: str = "",
        inputs: Dict[str, Any] = None,
    ) -> List['DatasetRetrieverTool']:
        """
        创建数据集检索工具列表
        
        Args:
            tenant_id: 租户ID
            dataset_ids: 数据集ID列表
            retrieve_config: 检索配置
            return_resource: 是否返回资源信息
            invoke_from: 调用来源
            hit_callback: 检索回调处理器
            user_id: 用户ID
            inputs: 输入参数
            
        Returns:
            List[DatasetRetrieverTool]: 数据集工具列表
        """
        if not dataset_ids:
            return []
        
        tools = []
        for dataset_id in dataset_ids:
            try:
                # 验证数据集访问权限
                dataset = DatasetService.get_dataset(tenant_id, dataset_id)
                if not dataset:
                    logger.warning(f"数据集不存在: {dataset_id}")
                    continue
                
                # 创建检索工具实例
                tool = DatasetRetrieverTool(
                    tenant_id=tenant_id,
                    dataset_id=dataset_id,
                    retrieve_config=retrieve_config or DatasetRetrievalConfig(),
                    return_resource=return_resource,
                    invoke_from=invoke_from,
                    hit_callback=hit_callback,
                    user_id=user_id,
                    inputs=inputs or {}
                )
                
                tools.append(tool)
                
            except Exception as e:
                logger.exception(f"创建数据集工具失败 - {dataset_id}: {e}")
                continue
        
        return tools

    def __init__(
        self,
        tenant_id: str,
        dataset_id: str,
        retrieve_config: DatasetRetrievalConfig,
        return_resource: bool = True,
        invoke_from: InvokeFrom = InvokeFrom.WEB_APP,
        hit_callback: Optional[DatasetIndexToolCallbackHandler] = None,
        user_id: str = "",
        inputs: Dict[str, Any] = None,
    ):
        """
        初始化数据集检索工具
        
        Args:
            tenant_id: 租户ID
            dataset_id: 数据集ID
            retrieve_config: 检索配置
            return_resource: 是否返回资源信息
            invoke_from: 调用来源
            hit_callback: 检索回调处理器
            user_id: 用户ID
            inputs: 输入参数
        """
        self.tenant_id = tenant_id
        self.dataset_id = dataset_id
        self.retrieve_config = retrieve_config
        self.return_resource = return_resource
        self.invoke_from = invoke_from
        self.hit_callback = hit_callback
        self.user_id = user_id
        self.inputs = inputs or {}
        
        # 加载数据集信息
        self.dataset = DatasetService.get_dataset(tenant_id, dataset_id)
        if not self.dataset:
            raise ValueError(f"数据集不存在: {dataset_id}")
        
        # 初始化工具实体
        self.entity = ToolEntity(
            identity=ToolIdentity(
                name=f"dataset_{dataset_id}",
                label=I18nObject(
                    en_US=f"Query {self.dataset.name}",
                    zh_Hans=f"查询{self.dataset.name}"
                )
            ),
            description=ToolDescription(
                human=I18nObject(
                    en_US=f"Search and retrieve information from {self.dataset.name} dataset",
                    zh_Hans=f"从{self.dataset.name}数据集搜索和检索信息"
                ),
                llm=f"Use this tool to search for information in the {self.dataset.name} knowledge base. "
                    f"Provide a search query as input and get relevant documents and answers."
            )
        )

    def get_runtime_parameters(self) -> List[ToolParameter]:
        """
        获取运行时参数定义
        
        Returns:
            List[ToolParameter]: 参数列表
        """
        return [
            ToolParameter(
                name="query",
                label=I18nObject(
                    en_US="Search Query",
                    zh_Hans="搜索查询"
                ),
                human_description=I18nObject(
                    en_US="The search query to find relevant information",
                    zh_Hans="用于查找相关信息的搜索查询"
                ),
                llm_description="The search query string to find relevant documents and information",
                form=ToolParameter.ToolParameterForm.LLM,
                type=ToolParameter.ToolParameterType.STRING,
                required=True
            )
        ]

    def invoke(
        self,
        user_id: str,
        tool_parameters: Dict[str, Any]
    ) -> ToolInvokeMessage:
        """
        执行数据集检索
        
        Args:
            user_id: 用户ID
            tool_parameters: 工具参数
            
        Returns:
            ToolInvokeMessage: 检索结果
        """
        query = tool_parameters.get("query", "")
        if not query or not query.strip():
            return ToolInvokeMessage(
                type=ToolInvokeMessage.MessageType.TEXT,
                message="搜索查询不能为空"
            )

        try:
            # 执行数据集检索
            retrieval_service = RetrievalService()
            retrieve_result = retrieval_service.retrieve(
                tenant_id=self.tenant_id,
                dataset_id=self.dataset_id,
                query=query,
                top_k=self.retrieve_config.top_k,
                score_threshold=self.retrieve_config.score_threshold,
                reranking_config=self.retrieve_config.reranking_config,
                user_id=user_id
            )

            # 处理检索结果
            if not retrieve_result.documents:
                return ToolInvokeMessage(
                    type=ToolInvokeMessage.MessageType.TEXT,
                    message="未找到相关信息"
                )

            # 格式化检索结果
            result_parts = []
            
            # 添加文档内容
            for i, doc in enumerate(retrieve_result.documents[:self.retrieve_config.top_k]):
                doc_content = f"[文档 {i+1}]\n{doc.page_content}"
                if self.return_resource and doc.metadata:
                    # 添加资源信息
                    metadata_parts = []
                    if doc.metadata.get("source"):
                        metadata_parts.append(f"来源: {doc.metadata['source']}")
                    if doc.metadata.get("page"):
                        metadata_parts.append(f"页码: {doc.metadata['page']}")
                    if doc.metadata.get("score"):
                        metadata_parts.append(f"相关度: {doc.metadata['score']:.3f}")
                    
                    if metadata_parts:
                        doc_content += f"\n({', '.join(metadata_parts)})"
                
                result_parts.append(doc_content)

            # 合并结果
            final_result = "\n\n".join(result_parts)

            # 触发检索回调
            if self.hit_callback:
                self.hit_callback.on_retrieval_success(
                    query=query,
                    documents=retrieve_result.documents
                )

            return ToolInvokeMessage(
                type=ToolInvokeMessage.MessageType.TEXT,
                message=final_result
            )

        except Exception as e:
            error_message = f"数据集检索失败: {str(e)}"
            logger.exception(error_message)
            
            # 触发错误回调
            if self.hit_callback:
                self.hit_callback.on_retrieval_error(query=query, error=str(e))
            
            return ToolInvokeMessage(
                type=ToolInvokeMessage.MessageType.TEXT,
                message=error_message
            )

class DatasetRetrievalConfig(BaseModel):
    """数据集检索配置"""
    
    # 返回文档数量
    top_k: int = 3
    
    # 相关度阈值
    score_threshold: float = 0.5
    
    # 重排序配置
    reranking_config: Optional[RerankingConfig] = None
    
    # 检索模式
    retrieval_mode: RetrievalMode = RetrievalMode.SINGLE
    
    # 是否启用多轮检索
    multi_round_enabled: bool = False

class DatasetIndexToolCallbackHandler:
    """
    数据集检索工具回调处理器
    处理检索过程中的各种事件
    """
    
    def __init__(
        self,
        queue_manager: AppQueueManager,
        app_id: str,
        message_id: str,
        user_id: str,
        invoke_from: InvokeFrom
    ):
        """
        初始化回调处理器
        
        Args:
            queue_manager: 队列管理器
            app_id: 应用ID
            message_id: 消息ID
            user_id: 用户ID
            invoke_from: 调用来源
        """
        self.queue_manager = queue_manager
        self.app_id = app_id
        self.message_id = message_id
        self.user_id = user_id
        self.invoke_from = invoke_from

    def on_retrieval_success(self, query: str, documents: List[Document]):
        """
        检索成功回调
        
        Args:
            query: 搜索查询
            documents: 检索到的文档列表
        """
        # 发布检索资源事件
        retrieval_resources = [
            RetrievalResource(
                content=doc.page_content,
                metadata=doc.metadata,
                score=doc.metadata.get("score", 0.0) if doc.metadata else 0.0
            )
            for doc in documents
        ]
        
        self.queue_manager.publish(
            QueueRetrieverResourcesEvent(
                retrieval_resources=retrieval_resources
            ),
            PublishFrom.TOOL_ENGINE
        )

    def on_retrieval_error(self, query: str, error: str):
        """
        检索失败回调
        
        Args:
            query: 搜索查询
            error: 错误信息
        """
        logger.error(f"数据集检索失败 - 查询: {query}, 错误: {error}")
        
        # 可以发布错误事件或记录到监控系统
        # self.queue_manager.publish(
        #     QueueErrorEvent(error_message=f"数据集检索失败: {error}"),
        #     PublishFrom.TOOL_ENGINE
        # )

4. Agent执行引擎

4.1 执行控制流程

class AgentExecutionController:
    """
    Agent执行控制器
    管理Agent的执行流程、状态和资源
    """
    
    def __init__(
        self,
        agent_config: AgentEntity,
        model_instance: ModelInstance,
        tool_manager: ToolManager,
        queue_manager: AppQueueManager
    ):
        """
        初始化执行控制器
        
        Args:
            agent_config: Agent配置
            model_instance: 模型实例
            tool_manager: 工具管理器
            queue_manager: 队列管理器
        """
        self.agent_config = agent_config
        self.model_instance = model_instance
        self.tool_manager = tool_manager
        self.queue_manager = queue_manager
        
        # 执行状态
        self.execution_state = AgentExecutionState.INITIALIZED
        self.current_iteration = 0
        self.max_iterations = agent_config.max_iteration
        
        # 推理历史
        self.reasoning_history: List[AgentReasoningStep] = []
        
        # 性能统计
        self.performance_metrics = AgentPerformanceMetrics()

    def execute(
        self,
        query: str,
        context: Optional[AgentExecutionContext] = None
    ) -> Generator[AgentExecutionEvent, None, None]:
        """
        执行Agent推理
        
        Args:
            query: 用户查询
            context: 执行上下文
            
        Yields:
            AgentExecutionEvent: 执行过程事件
        """
        self.execution_state = AgentExecutionState.RUNNING
        start_time = time.time()
        
        try:
            # 发布开始事件
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.STARTED,
                data={"query": query, "config": self.agent_config.dict()}
            )

            # 选择执行策略
            if self.agent_config.strategy == AgentEntity.Strategy.FUNCTION_CALLING:
                yield from self._execute_function_calling(query, context)
            else:
                yield from self._execute_chain_of_thought(query, context)

            # 执行完成
            self.execution_state = AgentExecutionState.COMPLETED
            execution_time = time.time() - start_time
            
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.COMPLETED,
                data={
                    "execution_time": execution_time,
                    "iterations": self.current_iteration,
                    "metrics": self.performance_metrics.dict()
                }
            )

        except Exception as e:
            # 执行失败
            self.execution_state = AgentExecutionState.FAILED
            execution_time = time.time() - start_time
            
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.FAILED,
                data={
                    "error": str(e),
                    "execution_time": execution_time,
                    "iterations": self.current_iteration
                }
            )
            
            raise e

    def _execute_function_calling(
        self,
        query: str,
        context: Optional[AgentExecutionContext]
    ) -> Generator[AgentExecutionEvent, None, None]:
        """
        执行函数调用策略
        
        Args:
            query: 用户查询
            context: 执行上下文
            
        Yields:
            AgentExecutionEvent: 执行事件
        """
        while (self.current_iteration < self.max_iterations and 
               self.execution_state == AgentExecutionState.RUNNING):
            
            self.current_iteration += 1
            iteration_start = time.time()
            
            # 发布迭代开始事件
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.ITERATION_STARTED,
                data={"iteration": self.current_iteration}
            )

            # 准备提示消息
            prompt_messages = self._build_function_calling_prompt(query, context)
            
            # 调用模型
            llm_start = time.time()
            model_response = self.model_instance.invoke_llm(
                prompt_messages=prompt_messages,
                tools=self._get_available_tools(),
                stream=True,
                user=context.user_id if context else ""
            )
            
            # 处理模型响应
            tool_calls = []
            response_content = ""
            
            for chunk in model_response:
                # 发布LLM块事件
                yield AgentExecutionEvent(
                    type=AgentExecutionEventType.LLM_CHUNK,
                    data={"chunk": chunk.dict()}
                )
                
                # 收集工具调用
                if chunk.delta.message.tool_calls:
                    tool_calls.extend(chunk.delta.message.tool_calls)
                
                # 收集响应内容
                if chunk.delta.message.content:
                    response_content += chunk.delta.message.content

            llm_time = time.time() - llm_start
            
            # 记录推理步骤
            reasoning_step = AgentReasoningStep(
                iteration=self.current_iteration,
                thought=response_content,
                llm_time=llm_time,
                tool_calls=tool_calls
            )
            
            # 执行工具调用
            if tool_calls:
                tools_start = time.time()
                yield from self._execute_tools(tool_calls, reasoning_step)
                tools_time = time.time() - tools_start
                reasoning_step.tools_time = tools_time
            else:
                # 没有工具调用,推理完成
                self.execution_state = AgentExecutionState.COMPLETED
                reasoning_step.final_answer = response_content
            
            # 添加到推理历史
            self.reasoning_history.append(reasoning_step)
            
            # 更新性能指标
            iteration_time = time.time() - iteration_start
            self.performance_metrics.add_iteration(
                iteration_time=iteration_time,
                llm_time=llm_time,
                tools_time=getattr(reasoning_step, 'tools_time', 0),
                tool_count=len(tool_calls)
            )
            
            # 发布迭代完成事件
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.ITERATION_COMPLETED,
                data={
                    "iteration": self.current_iteration,
                    "reasoning_step": reasoning_step.dict(),
                    "iteration_time": iteration_time
                }
            )

    def _execute_tools(
        self,
        tool_calls: List[ToolCall],
        reasoning_step: AgentReasoningStep
    ) -> Generator[AgentExecutionEvent, None, None]:
        """
        执行工具调用列表
        
        Args:
            tool_calls: 工具调用列表
            reasoning_step: 当前推理步骤
            
        Yields:
            AgentExecutionEvent: 工具执行事件
        """
        tool_results = []
        
        for tool_call in tool_calls:
            tool_start = time.time()
            
            # 发布工具调用开始事件
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.TOOL_CALL_STARTED,
                data={
                    "tool_name": tool_call.function.name,
                    "tool_input": tool_call.function.arguments
                }
            )
            
            try:
                # 执行工具
                tool_result = self.tool_manager.invoke_tool(
                    tool_name=tool_call.function.name,
                    tool_input=json.loads(tool_call.function.arguments),
                    user_id=self.context.user_id if hasattr(self, 'context') else ""
                )
                
                tool_time = time.time() - tool_start
                
                # 记录工具结果
                tool_execution_result = ToolExecutionResult(
                    tool_name=tool_call.function.name,
                    tool_input=tool_call.function.arguments,
                    tool_output=tool_result.message,
                    execution_time=tool_time,
                    success=True
                )
                
                tool_results.append(tool_execution_result)
                
                # 发布工具调用成功事件
                yield AgentExecutionEvent(
                    type=AgentExecutionEventType.TOOL_CALL_COMPLETED,
                    data={
                        "tool_name": tool_call.function.name,
                        "tool_result": tool_result.message,
                        "execution_time": tool_time
                    }
                )
                
            except Exception as e:
                tool_time = time.time() - tool_start
                error_message = f"工具执行失败: {str(e)}"
                
                # 记录工具错误
                tool_execution_result = ToolExecutionResult(
                    tool_name=tool_call.function.name,
                    tool_input=tool_call.function.arguments,
                    tool_output=error_message,
                    execution_time=tool_time,
                    success=False,
                    error=str(e)
                )
                
                tool_results.append(tool_execution_result)
                
                # 发布工具调用失败事件
                yield AgentExecutionEvent(
                    type=AgentExecutionEventType.TOOL_CALL_FAILED,
                    data={
                        "tool_name": tool_call.function.name,
                        "error": error_message,
                        "execution_time": tool_time
                    }
                )
        
        # 更新推理步骤的工具结果
        reasoning_step.tool_results = tool_results

class AgentExecutionState(Enum):
    """Agent执行状态枚举"""
    
    INITIALIZED = "initialized"    # 已初始化
    RUNNING = "running"           # 运行中
    COMPLETED = "completed"       # 已完成
    FAILED = "failed"            # 执行失败
    TIMEOUT = "timeout"          # 执行超时
    CANCELLED = "cancelled"      # 已取消

class AgentExecutionEventType(Enum):
    """Agent执行事件类型"""
    
    STARTED = "started"                    # 开始执行
    ITERATION_STARTED = "iteration_started" # 迭代开始
    LLM_CHUNK = "llm_chunk"               # LLM流式输出
    TOOL_CALL_STARTED = "tool_call_started" # 工具调用开始
    TOOL_CALL_COMPLETED = "tool_call_completed" # 工具调用完成
    TOOL_CALL_FAILED = "tool_call_failed"   # 工具调用失败
    ITERATION_COMPLETED = "iteration_completed" # 迭代完成
    COMPLETED = "completed"                # 执行完成
    FAILED = "failed"                     # 执行失败

class AgentReasoningStep(BaseModel):
    """Agent推理步骤"""
    
    # 迭代次数
    iteration: int
    
    # 思考过程
    thought: str = ""
    
    # 工具调用列表
    tool_calls: List[ToolCall] = []
    
    # 工具执行结果
    tool_results: List[ToolExecutionResult] = []
    
    # 最终答案
    final_answer: str = ""
    
    # 时间统计
    llm_time: float = 0.0
    tools_time: float = 0.0
    
    # 是否为最终步骤
    @property
    def is_final(self) -> bool:
        return bool(self.final_answer)

class AgentPerformanceMetrics(BaseModel):
    """Agent性能指标"""
    
    # 总执行时间
    total_execution_time: float = 0.0
    
    # 总LLM时间
    total_llm_time: float = 0.0
    
    # 总工具时间
    total_tools_time: float = 0.0
    
    # 迭代次数
    total_iterations: int = 0
    
    # 工具调用次数
    total_tool_calls: int = 0
    
    # 平均迭代时间
    average_iteration_time: float = 0.0
    
    # 成功的工具调用次数
    successful_tool_calls: int = 0
    
    # 失败的工具调用次数
    failed_tool_calls: int = 0
    
    def add_iteration(
        self,
        iteration_time: float,
        llm_time: float,
        tools_time: float,
        tool_count: int
    ):
        """
        添加迭代统计
        
        Args:
            iteration_time: 迭代时间
            llm_time: LLM时间
            tools_time: 工具时间
            tool_count: 工具调用数量
        """
        self.total_execution_time += iteration_time
        self.total_llm_time += llm_time
        self.total_tools_time += tools_time
        self.total_iterations += 1
        self.total_tool_calls += tool_count
        
        # 计算平均值
        if self.total_iterations > 0:
            self.average_iteration_time = self.total_execution_time / self.total_iterations
    
    @property
    def llm_time_ratio(self) -> float:
        """LLM时间占比"""
        if self.total_execution_time > 0:
            return self.total_llm_time / self.total_execution_time
        return 0.0
    
    @property
    def tools_time_ratio(self) -> float:
        """工具时间占比"""
        if self.total_execution_time > 0:
            return self.total_tools_time / self.total_execution_time
        return 0.0
    
    @property
    def tool_success_rate(self) -> float:
        """工具调用成功率"""
        if self.total_tool_calls > 0:
            return self.successful_tool_calls / self.total_tool_calls
        return 0.0

5. Agent回调处理系统

5.1 回调处理器架构

class DifyAgentCallbackHandler:
    """
    Dify Agent回调处理器
    处理Agent执行过程中的各种回调事件
    """
    
    def __init__(self):
        """初始化回调处理器"""
        self.callbacks: List[AgentCallback] = []
        self.event_handlers: Dict[str, List[Callable]] = {}
        
    def register_callback(self, callback: 'AgentCallback'):
        """
        注册回调处理器
        
        Args:
            callback: 回调处理器实例
        """
        self.callbacks.append(callback)
    
    def register_event_handler(self, event_type: str, handler: Callable):
        """
        注册事件处理器
        
        Args:
            event_type: 事件类型
            handler: 处理器函数
        """
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler)
    
    def on_agent_start(self, agent_config: AgentEntity, query: str):
        """
        Agent开始执行回调
        
        Args:
            agent_config: Agent配置
            query: 用户查询
        """
        for callback in self.callbacks:
            try:
                callback.on_agent_start(agent_config, query)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_agent_start: {e}")
    
    def on_llm_start(self, prompt_messages: List[PromptMessage], tools: List[PromptMessageTool]):
        """
        LLM开始调用回调
        
        Args:
            prompt_messages: 提示消息列表
            tools: 可用工具列表
        """
        for callback in self.callbacks:
            try:
                callback.on_llm_start(prompt_messages, tools)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_llm_start: {e}")
    
    def on_llm_chunk(self, chunk: LLMResultChunk):
        """
        LLM流式输出块回调
        
        Args:
            chunk: LLM结果块
        """
        for callback in self.callbacks:
            try:
                callback.on_llm_chunk(chunk)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_llm_chunk: {e}")
    
    def on_llm_end(self, llm_result: LLMResult):
        """
        LLM调用结束回调
        
        Args:
            llm_result: LLM调用结果
        """
        for callback in self.callbacks:
            try:
                callback.on_llm_end(llm_result)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_llm_end: {e}")
    
    def on_tool_start(self, tool_name: str, tool_input: Dict[str, Any]):
        """
        工具开始调用回调
        
        Args:
            tool_name: 工具名称
            tool_input: 工具输入参数
        """
        for callback in self.callbacks:
            try:
                callback.on_tool_start(tool_name, tool_input)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_tool_start: {e}")
        
        # 触发事件处理器
        self._trigger_event_handlers("tool_start", {
            "tool_name": tool_name,
            "tool_input": tool_input
        })
    
    def on_tool_end(self, tool_name: str, tool_result: ToolInvokeMessage):
        """
        工具调用结束回调
        
        Args:
            tool_name: 工具名称
            tool_result: 工具调用结果
        """
        for callback in self.callbacks:
            try:
                callback.on_tool_end(tool_name, tool_result)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_tool_end: {e}")
        
        # 触发事件处理器
        self._trigger_event_handlers("tool_end", {
            "tool_name": tool_name,
            "tool_result": tool_result.dict() if hasattr(tool_result, 'dict') else str(tool_result)
        })
    
    def on_tool_error(self, tool_name: str, error: Exception):
        """
        工具调用错误回调
        
        Args:
            tool_name: 工具名称
            error: 错误信息
        """
        for callback in self.callbacks:
            try:
                callback.on_tool_error(tool_name, error)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_tool_error: {e}")
        
        # 触发事件处理器
        self._trigger_event_handlers("tool_error", {
            "tool_name": tool_name,
            "error": str(error)
        })
    
    def on_agent_finish(self, final_answer: str, reasoning_history: List[AgentReasoningStep]):
        """
        Agent执行完成回调
        
        Args:
            final_answer: 最终答案
            reasoning_history: 推理历史
        """
        for callback in self.callbacks:
            try:
                callback.on_agent_finish(final_answer, reasoning_history)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_agent_finish: {e}")
    
    def on_agent_error(self, error: Exception, reasoning_history: List[AgentReasoningStep]):
        """
        Agent执行错误回调
        
        Args:
            error: 错误信息
            reasoning_history: 推理历史
        """
        for callback in self.callbacks:
            try:
                callback.on_agent_error(error, reasoning_history)
            except Exception as e:
                logger.exception(f"回调处理器执行失败 - on_agent_error: {e}")
    
    def _trigger_event_handlers(self, event_type: str, event_data: Dict[str, Any]):
        """
        触发事件处理器
        
        Args:
            event_type: 事件类型
            event_data: 事件数据
        """
        handlers = self.event_handlers.get(event_type, [])
        for handler in handlers:
            try:
                handler(event_data)
            except Exception as e:
                logger.exception(f"事件处理器执行失败 - {event_type}: {e}")

class AgentCallback(ABC):
    """Agent回调抽象基类"""
    
    @abstractmethod
    def on_agent_start(self, agent_config: AgentEntity, query: str):
        """Agent开始执行"""
        pass
    
    @abstractmethod
    def on_llm_start(self, prompt_messages: List[PromptMessage], tools: List[PromptMessageTool]):
        """LLM开始调用"""
        pass
    
    @abstractmethod
    def on_llm_chunk(self, chunk: LLMResultChunk):
        """LLM流式输出块"""
        pass
    
    @abstractmethod
    def on_llm_end(self, llm_result: LLMResult):
        """LLM调用结束"""
        pass
    
    @abstractmethod
    def on_tool_start(self, tool_name: str, tool_input: Dict[str, Any]):
        """工具开始调用"""
        pass
    
    @abstractmethod
    def on_tool_end(self, tool_name: str, tool_result: ToolInvokeMessage):
        """工具调用结束"""
        pass
    
    @abstractmethod
    def on_tool_error(self, tool_name: str, error: Exception):
        """工具调用错误"""
        pass
    
    @abstractmethod
    def on_agent_finish(self, final_answer: str, reasoning_history: List[AgentReasoningStep]):
        """Agent执行完成"""
        pass
    
    @abstractmethod
    def on_agent_error(self, error: Exception, reasoning_history: List[AgentReasoningStep]):
        """Agent执行错误"""
        pass

class LoggingAgentCallback(AgentCallback):
    """日志记录回调处理器"""
    
    def __init__(self, logger_name: str = "agent"):
        self.logger = logging.getLogger(logger_name)
    
    def on_agent_start(self, agent_config: AgentEntity, query: str):
        self.logger.info(f"Agent开始执行 - 策略: {agent_config.strategy}, 查询: {query}")
    
    def on_llm_start(self, prompt_messages: List[PromptMessage], tools: List[PromptMessageTool]):
        tool_names = [tool.name for tool in tools]
        self.logger.info(f"LLM调用开始 - 工具: {tool_names}")
    
    def on_llm_chunk(self, chunk: LLMResultChunk):
        # 通常不记录每个chunk,避免日志过多
        pass
    
    def on_llm_end(self, llm_result: LLMResult):
        usage = llm_result.usage
        self.logger.info(f"LLM调用完成 - 令牌: {usage.total_tokens}, 成本: ${usage.total_price:.4f}")
    
    def on_tool_start(self, tool_name: str, tool_input: Dict[str, Any]):
        self.logger.info(f"工具调用开始 - {tool_name}: {tool_input}")
    
    def on_tool_end(self, tool_name: str, tool_result: ToolInvokeMessage):
        result_preview = str(tool_result.message)[:100] + "..." if len(str(tool_result.message)) > 100 else str(tool_result.message)
        self.logger.info(f"工具调用完成 - {tool_name}: {result_preview}")
    
    def on_tool_error(self, tool_name: str, error: Exception):
        self.logger.error(f"工具调用失败 - {tool_name}: {error}")
    
    def on_agent_finish(self, final_answer: str, reasoning_history: List[AgentReasoningStep]):
        self.logger.info(f"Agent执行完成 - 迭代次数: {len(reasoning_history)}")
    
    def on_agent_error(self, error: Exception, reasoning_history: List[AgentReasoningStep]):
        self.logger.error(f"Agent执行失败 - 迭代次数: {len(reasoning_history)}, 错误: {error}")

class MetricsAgentCallback(AgentCallback):
    """性能指标回调处理器"""
    
    def __init__(self, metrics_collector: 'AgentMetricsCollector'):
        self.metrics_collector = metrics_collector
    
    def on_agent_start(self, agent_config: AgentEntity, query: str):
        self.metrics_collector.record_agent_start(agent_config.strategy)
    
    def on_llm_start(self, prompt_messages: List[PromptMessage], tools: List[PromptMessageTool]):
        self.metrics_collector.record_llm_call()
    
    def on_llm_chunk(self, chunk: LLMResultChunk):
        pass  # 流式输出不单独记录
    
    def on_llm_end(self, llm_result: LLMResult):
        self.metrics_collector.record_token_usage(llm_result.usage)
    
    def on_tool_start(self, tool_name: str, tool_input: Dict[str, Any]):
        self.metrics_collector.record_tool_call(tool_name)
    
    def on_tool_end(self, tool_name: str, tool_result: ToolInvokeMessage):
        self.metrics_collector.record_tool_success(tool_name)
    
    def on_tool_error(self, tool_name: str, error: Exception):
        self.metrics_collector.record_tool_error(tool_name, error)
    
    def on_agent_finish(self, final_answer: str, reasoning_history: List[AgentReasoningStep]):
        self.metrics_collector.record_agent_success(len(reasoning_history))
    
    def on_agent_error(self, error: Exception, reasoning_history: List[AgentReasoningStep]):
        self.metrics_collector.record_agent_error(error, len(reasoning_history))

6. 生产环境Agent优化实践

6.1 Agent性能调优策略

根据生产环境实践,Agent模块的性能优化需要关注以下几个关键维度:

模型选择与配置优化

# 生产环境Agent模型配置最佳实践
PRODUCTION_AGENT_CONFIG = {
    "模型选择策略": {
        "高精度场景": {
            "推荐模型": ["gpt-4", "claude-3-opus", "qwen-max"],
            "配置参数": {
                "temperature": 0.1,        # 低温度保证稳定性
                "max_tokens": 2048,        # 充足的输出空间
                "top_p": 0.9              # 适度的多样性
            }
        },
        "高性能场景": {
            "推荐模型": ["gpt-3.5-turbo", "claude-3-haiku", "qwen-plus"],
            "配置参数": {
                "temperature": 0.3,        # 平衡性能和质量
                "max_tokens": 1024,        # 控制输出长度
                "top_p": 0.95
            }
        },
        "成本敏感场景": {
            "推荐模型": ["gpt-3.5-turbo", "moonshot-v1-8k", "qwen-turbo"],
            "配置参数": {
                "temperature": 0.5,
                "max_tokens": 512,         # 减少token消耗
                "top_p": 0.9
            }
        }
    },
    
    "工具调用优化": {
        "并行工具调用": {
            "enabled": True,
            "max_parallel_tools": 3,      # 最大并行工具数
            "timeout_per_tool": 30,       # 单工具超时
            "fallback_strategy": "sequential"  # 失败时回退到串行
        },
        "工具选择策略": {
            "smart_routing": True,        # 智能工具路由
            "tool_preference": "builtin_first",  # 优先使用内置工具
            "fallback_tools": ["web_search", "calculator"]  # 备选工具
        },
        "缓存策略": {
            "tool_result_cache": True,    # 工具结果缓存
            "cache_ttl": 600,            # 缓存10分钟
            "cache_key_strategy": "content_hash"  # 基于内容哈希的缓存键
        }
    }
}

class ProductionAgentOptimizer:
    """
    生产环境Agent优化器
    基于实际部署经验的性能优化策略
    """
    
    def __init__(self):
        self.performance_metrics = AgentPerformanceMetrics()
        self.optimization_rules = self._load_optimization_rules()
    
    def optimize_agent_config(
        self, 
        agent_config: AgentEntity,
        usage_pattern: dict,
        performance_requirements: dict
    ) -> AgentEntity:
        """
        根据使用模式和性能要求优化Agent配置
        
        Args:
            agent_config: 原始Agent配置
            usage_pattern: 使用模式统计
            performance_requirements: 性能要求
            
        Returns:
            AgentEntity: 优化后的Agent配置
        """
        optimized_config = agent_config.copy()
        
        # 1. 基于延迟要求优化策略选择
        if performance_requirements.get("max_latency", 30) < 10:
            if self._supports_function_calling(agent_config):
                optimized_config.strategy = AgentEntity.Strategy.FUNCTION_CALLING
            optimized_config.max_iteration = min(agent_config.max_iteration, 3)
        
        # 2. 基于成本要求优化模型选择
        if performance_requirements.get("cost_sensitive", False):
            optimized_config = self._optimize_for_cost(optimized_config)
        
        # 3. 基于准确性要求优化参数
        if performance_requirements.get("min_accuracy", 0.8) > 0.9:
            optimized_config = self._optimize_for_accuracy(optimized_config)
        
        # 4. 基于使用模式优化工具配置
        if usage_pattern.get("tool_usage_rate", 0) > 0.7:
            optimized_config.tools = self._optimize_tool_selection(
                optimized_config.tools,
                usage_pattern.get("popular_tools", [])
            )
        
        return optimized_config
    
    def _optimize_for_cost(self, config: AgentEntity) -> AgentEntity:
        """成本优化策略"""
        # 选择成本效益更高的模型
        cost_effective_models = {
            "gpt-4": "gpt-3.5-turbo",
            "claude-3-opus": "claude-3-haiku", 
            "qwen-max": "qwen-plus"
        }
        
        if config.model in cost_effective_models:
            config.model = cost_effective_models[config.model]
        
        # 减少最大迭代次数
        config.max_iteration = min(config.max_iteration, 5)
        
        return config
    
    def _optimize_for_accuracy(self, config: AgentEntity) -> AgentEntity:
        """准确性优化策略"""
        # 选择高精度模型
        high_accuracy_models = {
            "gpt-3.5-turbo": "gpt-4",
            "claude-3-haiku": "claude-3-opus",
            "qwen-plus": "qwen-max"
        }
        
        if config.model in high_accuracy_models:
            config.model = high_accuracy_models[config.model]
        
        # 增加迭代次数以提高准确性
        config.max_iteration = min(config.max_iteration + 2, 10)
        
        return config

Agent监控与故障诊断

class AgentMonitoringSystem:
    """
    Agent监控系统
    基于生产环境实践的全方位监控
    """
    
    def __init__(self):
        self.metrics_collector = AgentMetricsCollector()
        self.alert_manager = AgentAlertManager()
    
    def collect_realtime_metrics(self, agent_id: str) -> dict:
        """收集实时性能指标"""
        return {
            "response_time_p95": self._get_response_time_percentile(agent_id, 0.95),
            "success_rate": self._get_success_rate(agent_id),
            "tool_call_latency": self._get_tool_call_latency(agent_id),
            "token_consumption_rate": self._get_token_consumption_rate(agent_id),
            "concurrent_sessions": self._get_concurrent_sessions(agent_id),
            "error_distribution": self._get_error_distribution(agent_id)
        }
    
    def diagnose_performance_issues(self, agent_id: str) -> list[str]:
        """性能问题诊断"""
        issues = []
        metrics = self.collect_realtime_metrics(agent_id)
        
        # 响应时间检查
        if metrics["response_time_p95"] > 30:
            issues.append("响应时间过长:建议优化模型选择或减少工具调用")
        
        # 成功率检查
        if metrics["success_rate"] < 0.95:
            issues.append("成功率偏低:检查工具配置和网络连接")
        
        # 并发处理检查
        if metrics["concurrent_sessions"] > 100:
            issues.append("并发量过高:建议启用负载均衡或增加实例")
        
        # 成本效率检查
        if metrics["token_consumption_rate"] > 1000000:  # 每小时100万token
            issues.append("Token消耗过快:建议优化提示长度或调整模型")
        
        return issues
    
    def get_optimization_suggestions(self, agent_id: str) -> dict:
        """获取优化建议"""
        issues = self.diagnose_performance_issues(agent_id)
        metrics = self.collect_realtime_metrics(agent_id)
        
        suggestions = {
            "immediate_actions": [],      # 立即可执行的优化
            "configuration_changes": [],  # 配置调整建议
            "infrastructure_changes": [], # 基础设施调整
            "long_term_optimizations": [] # 长期优化方向
        }
        
        # 基于问题生成建议
        for issue in issues:
            if "响应时间" in issue:
                suggestions["immediate_actions"].append("启用函数调用策略")
                suggestions["configuration_changes"].append("切换到更快的模型")
            
            elif "成功率" in issue:
                suggestions["immediate_actions"].append("检查工具可用性")
                suggestions["configuration_changes"].append("增加重试次数")
            
            elif "并发量" in issue:
                suggestions["infrastructure_changes"].append("水平扩展Agent实例")
                suggestions["configuration_changes"].append("启用连接池")
        
        return suggestions

6.2 Agent模块核心特点

Dify的Agent模块设计体现了以下特点:

  1. 多策略支持:支持函数调用和思维链两种推理策略,适应不同模型能力
  2. 工具生态:完整的工具管理体系,支持内置、API、工作流、数据集等多种工具类型
  3. 执行控制:精细的执行流程控制和状态管理
  4. 性能监控:完善的回调机制和性能指标收集
  5. 扩展性强:模块化设计,便于添加新策略和工具类型
  6. 生产就绪:经过大规模生产验证的优化策略和监控体系

6.3 安全与可信Agent

  • 工具白名单:仅允许注册且通过审核的工具
  • 输出约束:要求结构化JSON Schema,超时回退纯文本
  • 防幻觉:
    • 优先RAG证据引用(含来源、分数)
    • 高风险场景要求二次确认(反问/确认)
    • 规则库:禁止金融承诺/医疗诊断等
class AgentSafety:
    def enforce_schema(self, output: str, schema: dict) -> dict:
        return jsonschema_validate(output, schema, coerce=True)

    def require_evidence(self, answer: str, citations: list[dict]) -> bool:
        return bool(citations) and all(c.get("source") for c in citations)

6.4 未来发展方向

  1. 增强推理策略:支持更多推理模式,如规划-执行、多Agent协作等
  2. 智能工具选择:基于上下文和历史的智能工具推荐
  3. 自适应优化:基于执行效果的自动参数调优
  4. 多模态支持:增强对图像、音频等多模态输入的处理能力
  5. 安全增强:加强工具调用的安全检查和风险控制

通过深入理解Dify Agent模块的架构和实现,开发者可以更好地利用其强大的智能推理能力,构建出高效可靠的AI Agent应用。


7. 关键函数调用路径速查

  • 推理(函数调用策略): FunctionCallAgentRunner.run() -> model_instance.invoke_llm() -> FunctionCallAgentRunner._handle_tool_calls_stream() -> FunctionCallAgentRunner._execute_tool_calls() -> Tool.invoke() -> queue_manager.publish(...)

  • 推理(思维链策略): CotAgentRunner.run() -> model_instance.invoke_llm() -> CotAgentRunner._parse_react_response() -> CotAgentRunner._execute_react_tool_call() -> Tool.invoke() -> queue_manager.publish(...)

  • 策略选择: select_agent_strategy(model_instance) -> LargeLanguageModel.get_model_schema(model, credentials) -> 特性判定 -> AgentEntity.Strategy.{FUNCTION_CALLING|CHAIN_OF_THOUGHT}

  • 工具运行时获取: ToolManager.get_agent_tool_runtime(...) -> BuiltinToolManager.get_tool_runtime(...) / ApiToolManager.get_tool_runtime(...) / WorkflowToolManager.get_tool_runtime(...)

  • 数据集检索工具(RAG): DatasetRetrieverTool.get_dataset_tools() -> DatasetService.get_dataset() -> DatasetRetrieverTool(...) -> DatasetRetrieverTool.invoke() -> RetrievalService.retrieve() -> DatasetIndexToolCallbackHandler.on_retrieval_success()

8. 关键函数核心代码与说明(精简摘录)

以下为与智能体执行密切相关的关键函数精简代码摘录,聚焦核心控制流与异常处理。便于交叉对照源码。

class FunctionCallAgentRunner(BaseAgentRunner):
    def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:
        """
        作用:在支持函数调用的模型上执行推理主循环;解析 tool_calls、执行工具、聚合观察并在终止条件满足时产出最终答案。
        输入:message(消息对象)、query(用户查询)
        输出:LLMResultChunk(流式结果)
        关键:
        - 在迭代上限前,优先消费模型返回的工具调用;
        - 工具执行失败不阻断主循环,记录错误并继续;
        - 最后一轮移除 tools 促使模型收敛输出最终答案。
        """
        tool_instances, prompt_messages_tools = self._init_prompt_tools()
        iteration_step = 1
        max_steps = min(self.app_config.agent.max_iteration, 99) + 1
        final_answer = ""
        llm_usage: Dict[str, Optional[LLMUsage]] = {"usage": None}

        while iteration_step <= max_steps:
            # 最后一轮强制出答案
            if iteration_step == max_steps:
                prompt_messages_tools = []

            thought_id = self.create_agent_thought(
                message_id=message.id, message="", tool_name="", tool_input="", messages_ids=[]
            )

            prompt_messages = self._organize_prompt_messages()
            self.recalc_llm_max_tokens(self.model_config, prompt_messages)

            chunks = self.model_instance.invoke_llm(
                prompt_messages=prompt_messages,
                model_parameters=self.application_generate_entity.model_conf.parameters,
                tools=prompt_messages_tools,
                stop=self.application_generate_entity.model_conf.stop,
                stream=self.stream_tool_call,
                user=self.user_id,
                callbacks=[],
            )

            tool_calls: List[Tuple[str, str, Dict[str, Any]]] = []
            response_text = ""
            current_usage = None

            if isinstance(chunks, Generator):
                for chunk in chunks:
                    if chunk.delta.message.tool_calls:
                        self._handle_tool_calls_stream(chunk, tool_calls)
                    if chunk.delta.message.content:
                        response_text += chunk.delta.message.content
                        yield chunk
                    if chunk.delta.usage:
                        current_usage = chunk.delta.usage
            else:
                if chunks.message.tool_calls:
                    tool_calls = self._extract_tool_calls(chunks.message.tool_calls)
                response_text = chunks.message.content or ""
                current_usage = chunks.usage

            if tool_calls:
                # 工具执行为可中断操作,错误以事件上报并继续
                yield from self._execute_tool_calls(tool_calls, tool_instances, thought_id)
            else:
                final_answer = response_text
                self._increase_usage(llm_usage, current_usage)
                self.save_agent_thought(
                    agent_thought_id=thought_id,
                    tool_name="",
                    tool_input={},
                    thought=response_text,
                    observation="",
                    answer=response_text,
                    messages_ids=[],
                    llm_usage=current_usage,
                )
                break

            # 记录当前轮
            self._increase_usage(llm_usage, current_usage)
            self.save_agent_thought(
                agent_thought_id=thought_id,
                tool_name=";".join([c[0] for c in tool_calls]) if tool_calls else "",
                tool_input={c[0]: c[2] for c in tool_calls} if tool_calls else {},
                thought=response_text,
                observation="",
                answer="",
                messages_ids=[],
                llm_usage=current_usage,
            )
            iteration_step += 1

        if final_answer:
            yield self._create_final_answer_chunk(final_answer, llm_usage["usage"])
class FunctionCallAgentRunner(BaseAgentRunner):
    def _execute_tool_calls(
        self,
        tool_calls: List[Tuple[str, str, Dict[str, Any]]],
        tool_instances: Dict[str, Tool],
        agent_thought_id: str,
    ) -> Generator[LLMResultChunk, None, None]:
        """
        作用:按序执行模型返回的工具请求;为每次调用发布 start/result/error 事件,聚合观察并写回思维记录。
        异常:单个工具失败不抛出到顶层;记录错误并继续。
        """
        observations: Dict[str, Any] = {}
        for tool_name, call_id, arguments in tool_calls:
            if tool_name not in tool_instances:
                err = f"tool '{tool_name}' not found"
                yield self._create_tool_error_chunk(tool_name, err)
                observations[tool_name] = err
                continue

            try:
                yield self._create_tool_start_chunk(tool_name, arguments)
                result = tool_instances[tool_name].invoke(user_id=self.user_id, tool_parameters=arguments)
                msg = self._format_tool_response(result)
                observations[tool_name] = msg
                yield self._create_tool_result_chunk(tool_name, msg)
            except Exception as e:
                err = f"tool '{tool_name}' failed: {e}"
                observations[tool_name] = err
                yield self._create_tool_error_chunk(tool_name, err)

        self._update_agent_thought_with_tool_results(agent_thought_id, observations)
class CotAgentRunner(BaseAgentRunner):
    def _parse_react_response(self, response: str) -> Dict[str, Any]:
        """
        作用:从 ReACT 文本响应中提取 thought/action/action_input/observation/final_answer。
        关键:正则具备容错;action_input 尝试 JSON 解析,失败保留原文。
        """
        patterns = {
            "thought": r"Thought:\s*(.*?)(?=\nAction:|\nFinal Answer:|$)",
            "action": r"Action:\s*(.*?)(?=\nAction Input:|\n|$)",
            "action_input": r"Action Input:\s*(.*?)(?=\nObservation:|\n|$)",
            "observation": r"Observation:\s*(.*?)(?=\nThought:|\n|$)",
            "final_answer": r"Final Answer:\s*(.*?)$",
        }
        out: Dict[str, Any] = {}
        for k, p in patterns.items():
            m = re.search(p, response, re.DOTALL | re.IGNORECASE)
            if m:
                out[k] = m.group(1).strip()
        if "action_input" in out:
            try:
                out["action_input"] = json.loads(out["action_input"])  # best-effort
            except json.JSONDecodeError:
                pass
        return out
class ToolManager:
    @staticmethod
    def get_agent_tool_runtime(tenant_id: str, app_id: str, agent_tool: AgentToolEntity, invoke_from: InvokeFrom) -> Tool:
        """
        作用:依据 AgentToolType 分派到对应运行时管理器;抽象屏蔽工具来源差异。
        """
        t = agent_tool.tool_type
        if t == AgentToolType.BUILTIN:
            return BuiltinToolManager.get_tool_runtime(
                provider=agent_tool.provider, tool_name=agent_tool.tool_name, tenant_id=tenant_id
            )
        if t == AgentToolType.API:
            return ApiToolManager.get_tool_runtime(tenant_id=tenant_id, tool_id=agent_tool.tool_id, invoke_from=invoke_from)
        if t == AgentToolType.WORKFLOW:
            return WorkflowToolManager.get_tool_runtime(
                tenant_id=tenant_id, app_id=app_id, workflow_id=agent_tool.workflow_id, invoke_from=invoke_from
            )
        raise ValueError(f"unsupported tool type: {t}")
class DatasetRetrieverTool:
    def invoke(self, user_id: str, tool_parameters: Dict[str, Any]) -> ToolInvokeMessage:
        """
        作用:按检索配置对指定 dataset 执行查询;将检索结果压缩为文本响应(可选包含来源/分数)。
        错误:捕获并以文本消息返回,避免中断上游推理。
        """
        q = (tool_parameters or {}).get("query", "").strip()
        if not q:
            return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.TEXT, message="搜索查询不能为空")
        try:
            svc = RetrievalService()
            r = svc.retrieve(
                tenant_id=self.tenant_id,
                dataset_id=self.dataset_id,
                query=q,
                top_k=self.retrieve_config.top_k,
                score_threshold=self.retrieve_config.score_threshold,
                reranking_config=self.retrieve_config.reranking_config,
                user_id=user_id,
            )
            if not r.documents:
                return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.TEXT, message="未找到相关信息")
            lines = []
            for i, doc in enumerate(r.documents[: self.retrieve_config.top_k]):
                meta = []
                if doc.metadata.get("source"):
                    meta.append(f"来源: {doc.metadata['source']}")
                if doc.metadata.get("page"):
                    meta.append(f"页码: {doc.metadata['page']}")
                if doc.metadata.get("score") is not None:
                    meta.append(f"相关度: {doc.metadata['score']:.3f}")
                head = f"[文档 {i+1}]" + (f" ({', '.join(meta)})" if meta else "")
                lines.append(head + "\n" + (doc.page_content or ""))
            return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.TEXT, message="\n\n".join(lines))
        except Exception as e:
            return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.TEXT, message=f"数据集检索失败: {e}")
class AgentExecutionController:
    def execute(self, query: str, context: Optional[AgentExecutionContext] = None) -> Generator[AgentExecutionEvent, None, None]:
        """
        作用:提供统一的执行生命周期事件(started/llm_chunk/tool_call/completed/failed);
        关键:每轮统计 LLM/工具耗时与次数;失败路径发出 failed 事件并抛出以便调用方处理。
        """
        self.execution_state = AgentExecutionState.RUNNING
        started_at = time.time()
        yield AgentExecutionEvent(type=AgentExecutionEventType.STARTED, data={"query": query})
        try:
            if self.agent_config.strategy == AgentEntity.Strategy.FUNCTION_CALLING:
                yield from self._execute_function_calling(query, context)
            else:
                yield from self._execute_chain_of_thought(query, context)
            self.execution_state = AgentExecutionState.COMPLETED
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.COMPLETED,
                data={"execution_time": time.time() - started_at, "iterations": self.current_iteration},
            )
        except Exception as e:
            self.execution_state = AgentExecutionState.FAILED
            yield AgentExecutionEvent(
                type=AgentExecutionEventType.FAILED,
                data={"error": str(e), "execution_time": time.time() - started_at, "iterations": self.current_iteration},
            )
            raise

8.1 功能说明(要点式)

  • FunctionCallAgentRunner.run:
    • 作用:工具优先的推理主循环;在终轮移除工具促收敛。
    • 边界:迭代上限;工具失败不致命;最终以 chunk 形式返回。
  • _execute_tool_calls:
    • 作用:逐个工具执行与事件发布;聚合观察并回写。
    • 边界:缺失/失败工具产生错误事件但不中断。
  • CotAgentRunner._parse_react_response:
    • 作用:宽容解析 ReACT 段落;尝试结构化 action_input。
    • 边界:未命中段落返回缺省键。
  • ToolManager.get_agent_tool_runtime:
    • 作用:多来源工具统一分派;屏蔽运行时差异。
    • 边界:未知类型抛出 ValueError。
  • DatasetRetrieverTool.invoke:
    • 作用:以配置执行检索并格式化文本;错误转文本返回。
    • 边界:空查询与无结果的用户级提示。
  • AgentExecutionController.execute:
    • 作用:生命周期事件与指标聚合;失败上抛。
    • 边界:策略分流,异常路径发出 failed 事件。

9. 关键函数调用链(按职责)

FunctionCallAgentRunner.run
  -> _init_prompt_tools
  -> _organize_prompt_messages
  -> model_instance.invoke_llm (tools=...)
    -> {有 tool_calls?}
       -> _handle_tool_calls_stream/_extract_tool_calls
       -> _execute_tool_calls
          -> Tool.invoke (Builtin/API/Workflow)
          -> _update_agent_thought_with_tool_results
    -> {无 tool_calls}
       -> _create_final_answer_chunk
CotAgentRunner.run
  -> _organize_react_prompt_messages
  -> model_instance.invoke_llm
  -> _parse_react_response
    -> {action+action_input}
       -> _execute_react_tool_call
          -> Tool.invoke
          -> _add_scratchpad_observation
    -> {final_answer}
       -> _create_final_answer_chunk
ToolManager.get_agent_tool_runtime
  -> when BUILTIN  -> BuiltinToolManager.get_tool_runtime
  -> when API      -> ApiToolManager.get_tool_runtime
  -> when WORKFLOW -> WorkflowToolManager.get_tool_runtime
DatasetRetrieverTool.invoke
  -> RetrievalService.retrieve
  -> format documents(+meta)
  -> ToolInvokeMessage(TEXT)
AgentExecutionController.execute
  -> {FUNCTION_CALLING}
     -> _execute_function_calling
        -> model.invoke_llm(stream)
        -> collect tool_calls/response
        -> _execute_tools -> tool_manager.invoke_tool
  -> {CHAIN_OF_THOUGHT}
     -> _execute_chain_of_thought

10. 统一时序图(精简版)

sequenceDiagram
  participant U as 用户
  participant R as AgentRunner
  participant M as 模型
  participant TM as 工具管理器
  participant T as 工具

  U->>R: query
  R->>M: invoke_llm(prompt, tools)
  M-->>R: chunk(content/tool_calls)
  alt 存在 tool_calls
    R->>TM: 解析与分派
    loop 工具逐个
      TM->>T: invoke(params)
      T-->>TM: ToolInvokeMessage
      TM-->>R: 观察(observation)
    end
    R->>M: 追加观察后继续推理
  else  tool_calls
    R-->>U: 最终答案()
  end
sequenceDiagram
  participant R as CotAgentRunner
  participant M as 模型
  participant T as 工具

  R->>M: prompt(ReACT)
  M-->>R: text(Thought/Action/...)
  R->>R: _parse_react_response
  alt Action 存在
    R->>T: invoke(action_input)
    T-->>R: observation
    R->>M: 追加Observation继续
  else Final Answer
    R-->>R: 结束并返回
  end
sequenceDiagram
  participant C as AgentExecutionController
  participant M as 模型
  participant TM as 工具管理器

  C->>C: STARTED
  alt FunctionCalling
    C->>M: invoke_llm(stream)
    M-->>C: chunks(tool_calls/contents)
    C->>TM: _execute_tools(tool_calls)
    TM-->>C: tool_results
  else ChainOfThought
    C->>M: invoke_llm(ReACT)
    M-->>C: response
  end
  C->>C: COMPLETED/FAILED

11. 关键结构与继承关系(类图)

classDiagram
  class BaseAgentRunner {
    +tenant_id: str
    +application_generate_entity
    +queue_manager
    +conversation
    +message
    +run(...)
  }
  class FunctionCallAgentRunner
  class CotAgentRunner {
    +_parse_react_response(str) Dict
    +_execute_react_tool_call(...)
  }
  class CotChatAgentRunner
  class CotCompletionAgentRunner

  BaseAgentRunner <|-- FunctionCallAgentRunner
  BaseAgentRunner <|-- CotAgentRunner
  CotAgentRunner <|-- CotChatAgentRunner
  CotAgentRunner <|-- CotCompletionAgentRunner

  class AgentEntity {
    +provider: str
    +model: str
    +strategy: Strategy
    +tools: List~AgentToolEntity~
    +max_iteration: int
  }
  class AgentToolEntity {
    +tool_type: AgentToolType
    +provider: str
    +tool_name: str
    +tool_id: str
    +workflow_id: str
  }
  class ToolManager {
    +get_agent_tool_runtime(...): Tool
  }
  class BuiltinToolManager
  class ApiToolManager
  class WorkflowToolManager

  ToolManager ..> BuiltinToolManager : dispatch
  ToolManager ..> ApiToolManager : dispatch
  ToolManager ..> WorkflowToolManager : dispatch

  AgentEntity "1" o-- "*" AgentToolEntity