RAGFlow-03-Agent模块

模块概览

1.1 职责与定位

Agent 模块是 RAGFlow 的智能代理工作流引擎,提供可视化编排的 Agent 能力。主要职责包括:

  1. 工作流编排:通过有向无环图(DAG)定义复杂的 Agent 执行流程
  2. 组件化设计:提供可复用的组件(LLM、Tool、Retrieval、Switch 等)
  3. 工具调用:支持 LLM 自主调用工具(检索、代码执行、外部 API)
  4. 状态管理:维护全局变量、历史记录、检索缓存
  5. 流式执行:支持流式输出与实时进度反馈
  6. 异常处理:组件级异常捕获与回退策略

1.2 输入与输出

输入

  • DSL(Domain Specific Language):JSON 格式的工作流定义
  • 运行时参数:query(用户问题)、files(上传文件)、inputs(自定义变量)
  • 全局变量:sys.query、sys.user_id、sys.conversation_turns、sys.files

输出

  • 事件流:workflow_started、node_started、node_finished、message、workflow_finished
  • 最终结果:answer(文本)、reference(引用)、outputs(自定义输出)
  • 执行轨迹:path(执行路径)、history(对话历史)、retrieval(检索记录)

1.3 核心概念

Graph(图)

  • 由 nodes(节点)和 edges(边)组成的有向无环图
  • 节点表示组件(Component),边表示数据流
  • 支持条件分支(Switch)、循环(Iteration)、并行执行

Component(组件)

  • 最小执行单元,包含参数(Param)与逻辑(invoke)
  • 输入来自上游组件的输出,输出传递给下游组件
  • 支持异常处理(exception_handler)与重试(max_retries)

Canvas(画布)

  • Graph 的子类,专门用于 Agent 工作流
  • 维护全局状态(globals)、对话历史(history)、检索记录(retrieval)
  • 支持流式执行(yield 事件)

1. 完整服务架构图(包含上游 API)

flowchart TB
    subgraph "客户端层"
        WebUI[前端 Web UI]
        SDK[Python SDK]
        HTTP[HTTP Client]
        OpenAI[OpenAI 兼容客户端]
    end

    subgraph "API 接口层"
        SessionAPI["/agents/:id/sessions<br/>POST 创建会话"]
        CompletionAPI["/completion<br/>POST Agent 对话"]
        CanvasAPI["/canvas/completion<br/>POST Canvas 执行"]
        ChatAPI["/chats/:id/completions<br/>POST Chat 完成"]
    end

    subgraph "服务编排层"
        APIApp[api_app.py<br/>API 路由与认证]
        CanvasApp[canvas_app.py<br/>Canvas 管理]
        SessionApp[session.py<br/>会话管理]
        
        subgraph "核心服务"
            UserCanvasService[UserCanvasService<br/>Canvas CRUD]
            ConversationService[ConversationService<br/>会话持久化]
            APITokenService[APIToken<br/>认证管理]
        end
    end

    subgraph "Agent 执行引擎层"
        Canvas[Canvas 执行器<br/>agent/canvas.py]
        
        subgraph "状态管理"
            PathManager[Path 路径队列<br/>组件执行顺序]
            GlobalsManager[Globals 全局变量<br/>sys.query/user_id/files]
            HistoryManager[History 对话历史<br/>多轮上下文]
            RetrievalCache[Retrieval 检索缓存<br/>chunks + doc_aggs]
        end
        
        subgraph "事件流"
            EventEmitter[事件发射器 SSE]
            EventTypes["workflow_started<br/>node_started<br/>node_finished<br/>message<br/>workflow_finished"]
        end
    end

    subgraph "组件层(Component)"
        Begin[Begin<br/>入口组件]
        LLM[LLM<br/>基础 LLM 调用]
        Agent[Agent<br/>ReAct 工具代理]
        Message[Message<br/>流式输出]
        
        subgraph "工具组件"
            Retrieval[Retrieval<br/>知识检索]
            CodeExec[CodeExec<br/>代码执行]
            WebSearch[WebSearch<br/>联网搜索]
            APICall[API Call<br/>外部 API]
        end
        
        subgraph "控制流组件"
            Switch[Switch<br/>条件分支]
            Categorize[Categorize<br/>意图分类]
            Iteration[Iteration<br/>循环控制]
        end
    end

    subgraph "底层服务层"
        LLMBundle[LLM Bundle<br/>统一 LLM 调用]
        
        subgraph "外部服务"
            OpenAIAPI[OpenAI API]
            DeepSeek[DeepSeek API]
            LocalLLM[本地 LLM]
            Ollama[Ollama]
        end
        
        subgraph "存储服务"
            Redis[(Redis<br/>日志/缓存)]
            ES[(Elasticsearch<br/>向量检索)]
            MySQL[(MySQL<br/>元数据)]
        end
        
        subgraph "执行环境"
            Sandbox[Sandbox<br/>代码沙箱]
            MCP[MCP Server<br/>工具服务器]
        end
    end

    WebUI --> SessionAPI
    WebUI --> CompletionAPI
    SDK --> CompletionAPI
    HTTP --> CanvasAPI
    OpenAI --> ChatAPI

    SessionAPI --> SessionApp
    CompletionAPI --> APIApp
    CanvasAPI --> CanvasApp
    ChatAPI --> APIApp

    SessionApp --> UserCanvasService
    APIApp --> ConversationService
    CanvasApp --> UserCanvasService
    APIApp --> APITokenService

    UserCanvasService --> Canvas
    ConversationService --> Canvas

    Canvas --> PathManager
    Canvas --> GlobalsManager
    Canvas --> HistoryManager
    Canvas --> RetrievalCache
    Canvas --> EventEmitter
    EventEmitter --> EventTypes

    Canvas --> Begin
    Canvas --> LLM
    Canvas --> Agent
    Canvas --> Message

    Agent --> Retrieval
    Agent --> CodeExec
    Agent --> WebSearch
    Agent --> APICall

    Canvas --> Switch
    Canvas --> Categorize
    Canvas --> Iteration

    LLM --> LLMBundle
    Agent --> LLMBundle
    Retrieval --> LLMBundle

    LLMBundle --> OpenAIAPI
    LLMBundle --> DeepSeek
    LLMBundle --> LocalLLM
    LLMBundle --> Ollama

    Canvas --> Redis
    Retrieval --> ES
    UserCanvasService --> MySQL

    CodeExec --> Sandbox
    Agent --> MCP

架构要点说明

1. 分层架构设计

  • 客户端层:提供多种接入方式(Web UI、SDK、HTTP、OpenAI 兼容)
  • API 接口层:RESTful API,负责请求验证、路由分发、SSE 流式响应
  • 服务编排层:业务逻辑编排,管理 Canvas/Conversation 生命周期
  • 执行引擎层:Canvas 工作流引擎,组件调度、状态管理、事件发射
  • 组件层:可复用的原子能力(LLM、Tool、Control Flow)
  • 底层服务层:LLM 调用、存储访问、外部服务集成

2. DSL 驱动架构

  • 工作流通过 JSON DSL 定义,前端可视化编辑器生成
  • DSL 包含 components、graph、history、retrieval、globals 五部分
  • 运行时解析 DSL,动态实例化组件并构建执行路径
  • DSL 持久化到 MySQL,支持版本管理和会话恢复

3. 组件化设计

  • 每个组件继承 ComponentBase,实现 invoke() 方法
  • 参数通过 ComponentParamBase 定义,支持类型校验与默认值
  • 组件输出通过 set_output() 设置,下游通过 get_input() 获取
  • 组件间解耦,通过变量引用({component_id@output_key})传递数据

4. 执行模型

  • 顺序执行:按 path 列表顺序执行组件
  • 条件分支:Switch/Categorize 组件根据条件选择下游路径
  • 循环执行:Iteration 组件重复执行子图,直到满足退出条件
  • 并行执行:同一批次的组件(无依赖)并行执行(ThreadPoolExecutor,max_workers=5)

5. 状态管理

  • globals:全局变量字典,支持跨组件访问(如 sys.querysys.user_idsys.files
  • history:对话历史列表,用于多轮对话上下文
  • retrieval:检索结果缓存,避免重复检索(chunks + doc_aggs)
  • path:动态路径队列,记录执行轨迹与待执行组件

6. 流式架构

  • 基于 SSE(Server-Sent Events)推送实时事件
  • 支持 partial 延迟执行,实现流式 Token 生成
  • 事件类型:workflow_started、node_started、node_finished、message、message_end、workflow_finished
  • 客户端可实时展示执行进度与中间结果

2. 核心数据结构

2.1 Canvas DSL 结构

{
    "components": {
        "begin": {
            "obj": {
                "component_name": "Begin",
                "params": {}
            },
            "downstream": ["llm_0"],
            "upstream": []
        },
        "llm_0": {
            "obj": {
                "component_name": "LLM",
                "params": {
                    "llm_id": "gpt-4",
                    "sys_prompt": "你是一个智能助手...",
                    "prompts": [{"role": "user", "content": "{sys.query}"}],
                    "temperature": 0.7,
                    "cite": True
                }
            },
            "downstream": ["message_0"],
            "upstream": ["begin"]
        },
        "message_0": {
            "obj": {
                "component_name": "Message",
                "params": {
                    "content": "{llm_0@content}"
                }
            },
            "downstream": [],
            "upstream": ["llm_0"]
        }
    },
    "history": [],
    "path": ["begin"],
    "retrieval": [],
    "globals": {
        "sys.query": "",
        "sys.user_id": "",
        "sys.conversation_turns": 0,
        "sys.files": []
    }
}

2.2 组件数据结构 UML

classDiagram
    class ComponentBase {
        +Canvas _canvas
        +String _id
        +ComponentParamBase _param
        +Dict inputs
        +Dict outputs
        +invoke(**kwargs)
        +get_input()
        +set_output(key, value)
        +error()
        +reset()
    }

    class ComponentParamBase {
        +String description
        +Int max_retries
        +Float delay_after_error
        +String exception_method
        +Any exception_default_value
        +List exception_goto
        +Dict inputs
        +Dict outputs
        +check()
        +as_dict()
    }

    class LLM {
        +String llm_id
        +String sys_prompt
        +List prompts
        +Float temperature
        +Int max_tokens
        +Bool cite
        +_invoke(**kwargs)
    }

    class Agent {
        +List tools
        +List mcp
        +Int max_rounds
        +LLMBundle chat_mdl
        +ToolCallSession toolcall_session
        +_react_with_tools_streamly()
    }

    class Tool {
        +String function_name
        +String function_description
        +Dict function_parameters
        +get_meta()
        +_invoke(**kwargs)
    }

    class Retrieval {
        +List kb_ids
        +Float similarity_threshold
        +Int top_n
        +String rerank_id
        +_invoke(**kwargs)
    }

    ComponentBase <|-- LLM
    ComponentBase <|-- Agent
    ComponentBase <|-- Tool
    Tool <|-- Retrieval
    LLM <|-- Agent
    ComponentBase *-- ComponentParamBase

3. 核心功能详细剖析

3.1 Canvas 执行引擎

3.1.1 run 方法(核心流程)

功能:执行工作流,逐个调用组件,返回事件流

核心代码

# agent/canvas.py
class Canvas(Graph):
    def run(self, **kwargs):
        """执行工作流"""
        # 1. 初始化
        self.message_id = get_uuid()
        created_at = int(time.time())
        
        # 2. 重置组件状态
        for k, cpn in self.components.items():
            self.components[k]["obj"].reset(True)
        
        # 3. 设置全局变量
        for k in ["query", "user_id", "files"]:
            if kwargs.get(k):
                if k == "files":
                    self.globals[f"sys.{k}"] = self.get_files(kwargs[k])
                else:
                    self.globals[f"sys.{k}"] = kwargs[k]
        
        self.globals["sys.conversation_turns"] += 1
        
        # 4. 事件装饰器
        def decorate(event, dt):
            return {
                "event": event,
                "message_id": self.message_id,
                "created_at": created_at,
                "task_id": self.task_id,
                "data": dt
            }
        
        # 5. 初始化路径
        if not self.path:
            self.path.append("begin")
        
        yield decorate("workflow_started", {"inputs": kwargs.get("inputs")})
        
        # 6. 执行循环
        idx = len(self.path) - 1
        while idx < len(self.path):
            to = len(self.path)
            
            # 6.1 发送 node_started 事件
            for i in range(idx, to):
                yield decorate("node_started", {
                    "component_id": self.path[i],
                    "component_name": self.get_component_name(self.path[i])
                })
            
            # 6.2 批量执行组件(并行)
            _run_batch(idx, to)
            
            # 6.3 后处理
            for i in range(idx, to):
                cpn_obj = self.get_component_obj(self.path[i])
                
                # Message 组件流式输出
                if cpn_obj.component_name.lower() == "message":
                    if isinstance(cpn_obj.output("content"), partial):
                        for m in cpn_obj.output("content")():
                            yield decorate("message", {"content": m})
                    else:
                        yield decorate("message", {"content": cpn_obj.output("content")})
                    yield decorate("message_end", {"reference": self.get_reference()})
                
                # 异常处理
                if cpn_obj.error():
                    ex = cpn_obj.exception_handler()
                    if ex and ex["goto"]:
                        self.path.extend(ex["goto"])  # 跳转到异常处理分支
                    elif ex and ex["default_value"]:
                        yield decorate("message", {"content": ex["default_value"]})
                    else:
                        self.error = cpn_obj.error()
                
                yield decorate("node_finished", {
                    "component_id": cpn_obj._id,
                    "outputs": cpn_obj.output(),
                    "error": cpn_obj.error(),
                    "elapsed_time": cpn_obj.output("_elapsed_time")
                })
                
                # 路径扩展(根据组件类型)
                if cpn_obj.component_name.lower() in ["categorize", "switch"]:
                    self.path.extend(cpn_obj.output("_next"))  # 条件分支
                elif cpn_obj.component_name.lower() == "iteration":
                    self.path.append(cpn_obj.get_start())  # 循环开始
                elif not cpn["downstream"] and cpn_obj.get_parent():
                    self.path.append(cpn_obj.get_parent().get_start())  # 返回父级
                else:
                    self.path.extend(cpn["downstream"])  # 正常流转
            
            if self.error:
                break
            idx = to
        
        # 7. 结束
        yield decorate("workflow_finished", {
            "outputs": self.get_component_obj(self.path[-1]).output(),
            "elapsed_time": time.perf_counter() - st
        })

关键点

  1. 批量执行_run_batch 使用 ThreadPoolExecutor 并行执行组件
  2. 流式输出:Message 组件使用 partial 延迟执行,支持流式生成
  3. 路径动态扩展:根据组件输出动态添加下游节点到 path
  4. 异常捕获:每个组件执行失败时触发 exception_handler

3.2 Agent 工具调用

3.2.1 Agent 组件(LLM + Tools)

功能:LLM 自主调用工具,多轮交互完成任务

核心代码

# agent/component/agent_with_tools.py
class Agent(LLM, ToolBase):
    def __init__(self, canvas, id, param):
        # 1. 初始化 LLM
        LLM.__init__(self, canvas, id, param)
        
        # 2. 加载工具
        self.tools = {}
        for cpn in self._param.tools:
            cpn_obj = self._load_tool_obj(cpn)
            self.tools[cpn_obj.get_meta()["function"]["name"]] = cpn_obj
        
        # 3. 初始化 LLM Bundle(支持工具调用)
        self.chat_mdl = LLMBundle(
            self._canvas.get_tenant_id(),
            TenantLLMService.llm_id2llm_type(self._param.llm_id),
            self._param.llm_id,
            max_retries=self._param.max_retries,
            max_rounds=self._param.max_rounds,  # 最多 N 轮工具调用
            verbose_tool_use=True
        )
        
        # 4. 工具元数据
        self.tool_meta = [v.get_meta() for _, v in self.tools.items()]
        self.toolcall_session = LLMToolPluginCallSession(self.tools, self.callback)
    
    def _invoke(self, **kwargs):
        """执行 Agent"""
        # 1. 准备 Prompt 变量
        self._prepare_prompt_variables()
        
        # 2. 构建消息历史
        msg, sys_prompt = self._sys_prompt_and_msg(self._canvas.history, kwargs)
        
        # 3. 工具调用循环(ReAct)
        use_tools = []
        for answer in self._react_with_tools_streamly(sys_prompt, msg, use_tools):
            if isinstance(answer, partial):
                self.set_output("content", answer)
                return
        
        # 4. 记录历史
        self._canvas.add_user_input(msg[-1]["content"])
        self._canvas.add_assistant_output(answer)
    
    def _react_with_tools_streamly(self, prompt, history, use_tools):
        """ReAct 工具调用"""
        hist = deepcopy(history)
        token_count = 0
        last_calling = None
        
        # 工具调用回调
        def use_tool(name, args):
            nonlocal use_tools
            tool_response = self.toolcall_session.tool_call(name, args)
            use_tools.append({
                "name": name,
                "arguments": args,
                "results": tool_response
            })
            return name, tool_response
        
        # 完成回调
        def complete():
            # 生成最终答案
            pass
        
        # 调用 LLM(支持工具)
        for delta in self.chat_mdl.chat_streamly_with_tools(
            prompt,
            hist,
            self._param.gen_conf(),
            tools=self.tool_meta,
            tool_choice="auto",
            use_tool_callback=use_tool,
            complete_callback=complete
        ):
            if isinstance(delta, str):
                yield delta
            else:
                token_count = delta

工具调用流程

  1. LLM 判断是否需要工具(返回 function_call)
  2. Agent 调用对应工具(通过 use_tool 回调)
  3. 工具返回结果,注入 history
  4. LLM 继续生成(可能再次调用工具)
  5. 达到 max_rounds 或 LLM 决定停止,生成最终答案

3.3 核心组件详解

3.3.1 LLM 组件

功能:调用 LLM 生成答案,支持流式输出与引用插入

参数

参数 类型 必填 说明
llm_id String LLM 模型 ID(如 gpt-4)
sys_prompt String System Prompt
prompts List 用户 Prompt 列表(支持变量替换)
temperature Float 温度参数(默认 0.7)
max_tokens Int 最大 Token 数
cite Bool 是否插入引用(默认 True)

核心代码

# agent/component/llm.py
class LLM(ComponentBase):
    def _invoke(self, **kwargs):
        # 1. 准备变量
        self._prepare_prompt_variables()
        
        # 2. 构建消息
        msg, sys_prompt = self._sys_prompt_and_msg(self._canvas.history, kwargs)
        
        # 3. 调用 LLM(流式)
        def gen():
            ans = ""
            for delta in self.chat_mdl.chat_streamly(sys_prompt, msg, self._param.gen_conf(), images=self.imgs):
                if not isinstance(delta, str):
                    continue
                ans += delta
                yield delta
            
            # 4. 插入引用(可选)
            if self._param.cite and self._canvas.retrieval:
                chunks = self._canvas.retrieval[-1].get("chunks", [])
                if chunks:
                    ans = self._gen_citations(ans)
            
            # 5. 记录历史
            self._canvas.add_user_input(msg[-1]["content"])
            self._canvas.add_assistant_output(ans)
        
        self.set_output("content", partial(gen))

3.3.2 Retrieval 组件

功能:从知识库检索相关 chunks

参数

参数 类型 必填 说明
kb_ids List[String] 知识库 ID 列表
similarity_threshold Float 相似度阈值(默认 0.1)
top_n Int 返回 Top N(默认 6)
rerank_id String Rerank 模型 ID

核心代码

# agent/tools/retrieval.py
class Retrieval(ToolBase):
    def _invoke(self, **kwargs):
        # 1. 获取查询词
        query = kwargs.get("query", self._param.query)
        
        # 2. 检索
        from rag.nlp.search import Dealer
        dealer = Dealer(settings.docStoreConn)
        emb_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING)
        
        req = {
            "question": query,
            "kb_ids": self._param.kb_ids,
            "similarity": self._param.similarity_threshold,
            "topk": 1024
        }
        sres = dealer.search(req, search.index_name(tenant_id), self._param.kb_ids, emb_mdl)
        
        # 3. Rerank(可选)
        chunks = [sres.field[id] for id in sres.ids]
        if self._param.rerank_id:
            rerank_mdl = LLMBundle(tenant_id, LLMType.RERANK, rerank_id=self._param.rerank_id)
            chunks = rerank_mdl.rerank(query, chunks, top_n=self._param.top_n)
        
        # 4. 缓存结果
        self._canvas.retrieval.append({"chunks": chunks, "doc_aggs": sres.aggregation})
        
        # 5. 格式化输出
        content = chunks_format(chunks)
        self.set_output("content", content)

3.3.3 CodeExec 组件

功能:执行 Python/JavaScript 代码(在 Sandbox 中)

参数

参数 类型 必填 说明
lang String 语言(python/javascript)
script String 代码(支持变量)
arguments Dict 传递给代码的参数

核心代码

# agent/tools/code_exec.py
class CodeExec(ToolBase):
    def _execute_code(self, language, code, arguments):
        # 1. Base64 编码代码
        code_b64 = base64.b64encode(code.encode()).decode()
        
        # 2. 构建请求
        req = CodeExecutionRequest(
            code_b64=code_b64,
            language=language,
            arguments=arguments
        )
        
        # 3. 调用 Sandbox API
        resp = requests.post(
            f"http://sandbox-executor-manager:8080/execute",
            json=req.model_dump(),
            timeout=60
        )
        
        # 4. 解析结果
        result = CodeExecutionResult(**resp.json())
        if result.status != "success":
            self.set_output("_ERROR", result.error)
        else:
            self.set_output("content", result.stdout)
            self.set_output("result", result.result)

4. 完整调用链路分析

本章节从上游 API 接口开始,自上而下剖析 Agent 模块的完整调用链路。

4.1 调用入口:API 接口层

4.1.1 Agent 会话创建接口

路径POST /api/v1/agents/<agent_id>/sessions

文件api/apps/sdk/session.py:75-96

核心代码

@manager.route("/agents/<agent_id>/sessions", methods=["POST"])
@token_required
def create_agent_session(tenant_id, agent_id):
    # 1. 鉴权与查询 Canvas
    user_id = request.args.get("user_id", tenant_id)
    e, cvs = UserCanvasService.get_by_id(agent_id)
    if not e:
        return get_error_data_result("Agent not found.")
    
    # 2. 初始化 Canvas(解析 DSL)
    if not isinstance(cvs.dsl, str):
        cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
    
    session_id = get_uuid()
    canvas = Canvas(cvs.dsl, tenant_id, agent_id)
    canvas.reset()  # 重置状态:清空 path、history、retrieval
    
    # 3. 生成初始会话(包含 prologue 欢迎语)
    cvs.dsl = json.loads(str(canvas))
    conv = {
        "id": session_id, 
        "dialog_id": cvs.id, 
        "user_id": user_id,
        "message": [{"role": "assistant", "content": canvas.get_prologue()}], 
        "source": "agent", 
        "dsl": cvs.dsl
    }
    
    # 4. 持久化到 MySQL
    API4ConversationService.save(**conv)
    
    conv["agent_id"] = conv.pop("dialog_id")
    return get_result(data=conv)

关键点

  1. 从 MySQL 加载 Canvas DSL
  2. 实例化 Canvas 对象,解析 DSL 中的组件定义
  3. reset() 清空执行状态,初始化 globals
  4. 返回会话 ID,客户端后续使用此 ID 发起对话

4.1.2 Agent 对话接口(流式)

路径POST /api/v1/completion

文件api/apps/api_app.py:181-296

核心代码

@manager.route('/completion', methods=['POST'])
@validate_request("conversation_id", "messages")
def completion():
    # 1. Token 鉴权
    token = request.headers.get('Authorization').split()[1]
    objs = APIToken.query(token=token)
    
    # 2. 获取会话与 Canvas
    req = request.json
    e, conv = API4ConversationService.get_by_id(req["conversation_id"])
    msg = req["messages"]  # 历史消息列表
    
    if conv.source == "agent":
        stream = req.get("stream", True)
        
        # 3. 加载 Canvas DSL
        e, cvs = UserCanvasService.get_by_id(conv.dialog_id)
        if not isinstance(cvs.dsl, str):
            cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
        
        # 4. 实例化 Canvas(包含历史状态)
        canvas = Canvas(cvs.dsl, objs[0].tenant_id)
        canvas.messages.append(msg[-1])
        canvas.add_user_input(msg[-1]["content"])
        
        # 5. 执行工作流(返回生成器)
        answer = canvas.run(stream=stream)
        
        if stream:
            # 6. 流式 SSE 响应
            def sse():
                nonlocal answer, cvs, conv
                try:
                    final_ans = {"reference": [], "content": ""}
                    
                    # 7. 迭代事件流
                    for ans in answer():
                        # 事件格式:{"event": "message", "data": {"content": "..."}}
                        for k in ans.keys():
                            final_ans[k] = ans[k]
                        
                        # 格式化为 API 响应
                        ans = {"answer": ans["content"], "reference": ans.get("reference", [])}
                        yield "data:" + json.dumps({"code": 0, "data": ans}, ensure_ascii=False) + "\n\n"
                    
                    # 8. 更新历史
                    canvas.messages.append({"role": "assistant", "content": final_ans["content"]})
                    canvas.history.append(("assistant", final_ans["content"]))
                    
                    # 9. 持久化 Canvas 状态
                    cvs.dsl = json.loads(str(canvas))
                    API4ConversationService.append_message(conv.id, conv.to_dict())
                except Exception as e:
                    yield "data:" + json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) + "\n\n"
            
            # 10. 返回 SSE 流
            resp = Response(sse(), mimetype="text/event-stream")
            resp.headers.add_header("Cache-control", "no-cache")
            resp.headers.add_header("Connection", "keep-alive")
            return resp

关键点

  1. 从持久化的会话中恢复 Canvas 状态(DSL + history)
  2. canvas.run() 返回一个 partial 对象(延迟执行)
  3. 调用 answer() 触发执行,返回事件流生成器
  4. 逐个 yield SSE 事件到客户端
  5. 完成后更新 DSL 和会话到 MySQL

4.2 Canvas 执行引擎层

4.2.1 Canvas.run() 核心流程

文件agent/canvas.py:220-403

完整执行流程

class Canvas(Graph):
    def run(self, **kwargs):
        st = time.perf_counter()
        self.message_id = get_uuid()
        created_at = int(time.time())
        
        # ========== 阶段 1:初始化 ==========
        
        # 1.1 重置所有组件状态
        for k, cpn in self.components.items():
            self.components[k]["obj"].reset(True)
        
        # 1.2 设置全局变量
        for k in ["query", "user_id", "files"]:
            if kwargs.get(k):
                if k == "files":
                    self.globals[f"sys.{k}"] = self.get_files(kwargs[k])
                else:
                    self.globals[f"sys.{k}"] = kwargs[k]
        
        self.globals["sys.conversation_turns"] += 1
        
        # 1.3 初始化执行路径(从 begin 开始)
        if not self.path or self.path[-1].lower().find("userfillup") < 0:
            self.path.append("begin")
            self.retrieval.append({"chunks": [], "doc_aggs": []})
        
        # 1.4 定义事件装饰器
        def decorate(event, dt):
            return {
                "event": event,
                "message_id": self.message_id,
                "created_at": created_at,
                "task_id": self.task_id,
                "data": dt
            }
        
        yield decorate("workflow_started", {"inputs": kwargs.get("inputs")})
        
        # ========== 阶段 2:组件批量执行循环 ==========
        
        def _run_batch(f, t):
            """并行执行组件批次"""
            with ThreadPoolExecutor(max_workers=5) as executor:
                thr = []
                for i in range(f, t):
                    cpn_obj = self.get_component_obj(self.path[i])
                    
                    # Begin 组件传递 inputs
                    if cpn_obj.component_name.lower() in ["begin", "userfillup"]:
                        thr.append(executor.submit(cpn_obj.invoke, inputs=kwargs.get("inputs", {})))
                    # 其他组件从上游获取输入
                    else:
                        thr.append(executor.submit(cpn_obj.invoke, **cpn_obj.get_input()))
                
                # 等待所有组件完成
                for t in thr:
                    t.result()
        
        def _node_finished(cpn_obj):
            """生成 node_finished 事件"""
            return decorate("node_finished", {
                "inputs": cpn_obj.get_input_values(),
                "outputs": cpn_obj.output(),
                "component_id": cpn_obj._id,
                "component_name": self.get_component_name(cpn_obj._id),
                "error": cpn_obj.error(),
                "elapsed_time": time.perf_counter() - cpn_obj.output("_created_time")
            })
        
        self.error = ""
        idx = len(self.path) - 1
        partials = []  # 存储延迟执行的组件(如流式 LLM)
        
        # 主循环:处理 path 队列
        while idx < len(self.path):
            to = len(self.path)
            
            # 2.1 发送 node_started 事件
            for i in range(idx, to):
                yield decorate("node_started", {
                    "component_id": self.path[i],
                    "component_name": self.get_component_name(self.path[i])
                })
            
            # 2.2 批量并行执行组件
            _run_batch(idx, to)
            
            # ========== 阶段 3:后处理与路径扩展 ==========
            
            for i in range(idx, to):
                cpn = self.get_component(self.path[i])
                cpn_obj = self.get_component_obj(self.path[i])
                
                # 3.1 处理 Message 组件(流式输出)
                if cpn_obj.component_name.lower() == "message":
                    if isinstance(cpn_obj.output("content"), partial):
                        # 流式生成
                        _m = ""
                        for m in cpn_obj.output("content")():
                            if not m:
                                continue
                            yield decorate("message", {"content": m})
                            _m += m
                        cpn_obj.set_output("content", _m)
                    else:
                        # 非流式
                        yield decorate("message", {"content": cpn_obj.output("content")})
                    
                    cite = re.search(r"\[ID:[ 0-9]+\]", cpn_obj.output("content"))
                    yield decorate("message_end", {"reference": self.get_reference() if cite else None})
                    
                    # 3.1.1 处理 partials 队列(之前未发送 node_finished 的组件)
                    while partials:
                        _cpn_obj = self.get_component_obj(partials[0])
                        if isinstance(_cpn_obj.output("content"), partial):
                            break
                        yield _node_finished(_cpn_obj)
                        partials.pop(0)
                
                # 3.2 异常处理
                other_branch = False
                if cpn_obj.error():
                    ex = cpn_obj.exception_handler()
                    if ex and ex["goto"]:
                        # 跳转到异常处理分支
                        self.path.extend(ex["goto"])
                        other_branch = True
                    elif ex and ex["default_value"]:
                        # 使用默认值
                        yield decorate("message", {"content": ex["default_value"]})
                        yield decorate("message_end", {})
                    else:
                        self.error = cpn_obj.error()
                
                # 3.3 发送 node_finished 事件(或延迟)
                if cpn_obj.component_name.lower() != "iteration":
                    if isinstance(cpn_obj.output("content"), partial):
                        if self.error:
                            cpn_obj.set_output("content", None)
                            yield _node_finished(cpn_obj)
                        else:
                            partials.append(self.path[i])  # 延迟发送
                    else:
                        yield _node_finished(cpn_obj)
                
                # 3.4 路径动态扩展
                def _extend_path(cpn_ids):
                    if other_branch:
                        return
                    for cpn_id in cpn_ids:
                        if self.path[-1] != cpn_id:
                            self.path.append(cpn_id)
                
                # 根据组件类型扩展路径
                if cpn_obj.component_name.lower() == "iterationitem" and cpn_obj.end():
                    # 循环结束,跳出到父级
                    iter_obj = cpn_obj.get_parent()
                    yield _node_finished(iter_obj)
                    _extend_path(self.get_component(cpn["parent_id"])["downstream"])
                
                elif cpn_obj.component_name.lower() in ["categorize", "switch"]:
                    # 条件分支:根据输出选择下游
                    _extend_path(cpn_obj.output("_next"))
                
                elif cpn_obj.component_name.lower() == "iteration":
                    # 循环开始:添加第一个子组件
                    _extend_path([cpn_obj.get_start()])
                
                elif not cpn["downstream"] and cpn_obj.get_parent():
                    # 无下游且有父级:返回父级循环起点
                    _extend_path([cpn_obj.get_parent().get_start()])
                
                else:
                    # 正常流转:添加下游组件
                    _extend_path(cpn["downstream"])
            
            # 3.5 检查错误,终止执行
            if self.error:
                logging.error(f"Runtime Error: {self.error}")
                break
            
            idx = to
        
        # ========== 阶段 4:完成 ==========
        
        self.path = self.path[:idx]
        if not self.error:
            yield decorate("workflow_finished", {
                "inputs": kwargs.get("inputs"),
                "outputs": self.get_component_obj(self.path[-1]).output(),
                "elapsed_time": time.perf_counter() - st
            })
            self.history.append(("assistant", self.get_component_obj(self.path[-1]).output()))

关键技术点

  1. 动态路径管理

    • path 是一个队列,初始只有 ["begin"]
    • 每个组件执行后,根据类型(switch/iteration/normal)动态添加下游组件
    • 支持条件跳转、循环、异常跳转
  2. 并行执行

    • 使用 ThreadPoolExecutor 并行执行同批次组件(max_workers=5)
    • 同批次指 path[idx:to] 区间内的组件(无依赖)
  3. 流式处理

    • 组件可返回 partial 对象(延迟执行)
    • Message 组件调用 partial() 触发生成器,逐 Token yield
    • 流式组件的 node_finished 事件延迟到内容生成完成后发送
  4. 异常处理

    • 组件执行失败时,检查 exception_handler()
    • 支持三种策略:goto(跳转分支)、default_value(使用默认值)、中断执行

4.3 组件层执行细节

4.3.1 ComponentBase.invoke() 统一入口

文件agent/component/base.py:420-432

class ComponentBase(ABC):
    def invoke(self, **kwargs) -> dict[str, Any]:
        self.set_output("_created_time", time.perf_counter())
        
        try:
            # 调用子类实现的 _invoke()
            self._invoke(**kwargs)
        except Exception as e:
            # 异常捕获
            if self.get_exception_default_value():
                self.set_exception_default_value()
            else:
                self.set_output("_ERROR", str(e))
            logging.exception(e)
        
        # 记录执行时间
        self.set_output("_elapsed_time", time.perf_counter() - self.output("_created_time"))
        return self.output()
    
    @timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
    def _invoke(self, **kwargs):
        """子类必须实现此方法"""
        raise NotImplementedError()

关键点

  • 统一入口:所有组件通过 invoke() 调用
  • 超时保护:默认 10 分钟,通过 @timeout 装饰器实现
  • 异常捕获:设置 _ERROR 输出,触发 exception_handler
  • 性能追踪:记录 _created_time_elapsed_time

4.3.2 LLM 组件执行流程

文件agent/component/llm.py:200-240

class LLM(ComponentBase):
    def _invoke(self, **kwargs):
        # 1. 准备 Prompt 变量(替换 {component_id@key} 和 {sys.xxx})
        sys_prompt, msg, user_defined_prompt = self._prepare_prompt_variables()
        
        # 2. 检查是否需要流式输出
        downstreams = self._canvas.get_component(self._id)["downstream"]
        has_message_downstream = any([
            self._canvas.get_component_obj(cid).component_name.lower() == "message"
            for cid in downstreams
        ])
        
        if has_message_downstream and not self._param.output_structure:
            # 流式:返回 partial 对象
            self.set_output("content", partial(self._stream_and_cite, sys_prompt, msg))
            return
        
        # 3. 非流式:直接调用 LLM
        ans = self._generate([{"role": "system", "content": sys_prompt}, *msg])
        
        # 4. 插入引用(可选)
        if self._param.cite and self._canvas.get_reference()["chunks"]:
            ans = self._gen_citations(ans)
        
        # 5. 设置输出与记录历史
        self.set_output("content", ans)
        self._canvas.add_user_input(msg[-1]["content"])
        self._canvas.add_assistant_output(ans)
    
    def _stream_and_cite(self, sys_prompt, msg):
        """流式生成器(延迟执行)"""
        ans = ""
        for delta in self._generate_streamly([{"role": "system", "content": sys_prompt}, *msg]):
            ans += delta
            yield delta
        
        # 插入引用
        if self._param.cite and self._canvas.get_reference()["chunks"]:
            ans = self._gen_citations(ans)
        
        # 记录历史
        self._canvas.add_user_input(msg[-1]["content"])
        self._canvas.add_assistant_output(ans)

关键点

  1. 变量替换_prepare_prompt_variables() 解析 Prompt 中的 {component_id@key}
  2. 流式判断:检查下游是否有 Message 组件,决定是否流式
  3. 延迟执行partial 对象封装生成器,由 Canvas 在 Message 组件时调用
  4. 引用插入:流式完成后,通过 LLM 生成带引用标记的文本

4.3.3 Agent 组件 ReAct 流程

文件agent/component/agent_with_tools.py:211-334

class Agent(LLM, ToolBase):
    def _react_with_tools_streamly(self, prompt, history, use_tools, user_defined_prompt={}):
        """ReAct 工具调用循环"""
        token_count = 0
        hist = deepcopy(history)
        
        # 1. 任务分析(生成任务描述)
        st = timer()
        task_desc = analyze_task(self.chat_mdl, prompt, history[-1]["content"], self.tool_meta, user_defined_prompt)
        self.callback("analyze_task", {}, task_desc, elapsed_time=timer()-st)
        
        # 2. 工具调用回调
        def use_tool(name, args):
            nonlocal hist, use_tools
            tool_response = self.toolcall_session.tool_call(name, args)
            use_tools.append({
                "name": name,
                "arguments": args,
                "results": tool_response
            })
            return name, tool_response
        
        # 3. 完成回调(生成最终答案)
        def complete():
            nonlocal hist
            # 插入引用提示
            need2cite = self._param.cite and self._canvas.get_reference()["chunks"]
            if need2cite:
                hist[0]["content"] += citation_prompt()
            
            # 流式生成答案
            for delta_ans in self._generate_streamly(hist):
                yield delta_ans, 0
        
        # 4. ReAct 循环(最多 max_rounds 轮)
        for _ in range(self._param.max_rounds + 1):
            # 4.1 调用 LLM 决策下一步动作
            response, tk = next_step(self.chat_mdl, hist, self.tool_meta, task_desc, user_defined_prompt)
            token_count += tk
            hist.append({"role": "assistant", "content": response})
            
            try:
                # 4.2 解析 LLM 返回的 JSON(工具调用列表)
                functions = json_repair.loads(re.sub(r"```.*", "", response))
                
                # 4.3 并行执行工具调用
                with ThreadPoolExecutor(max_workers=5) as executor:
                    thr = []
                    for func in functions:
                        name = func["name"]
                        args = func["arguments"]
                        
                        # 检查是否完成任务
                        if name == COMPLETE_TASK:
                            # 生成最终答案
                            for txt, tkcnt in complete():
                                yield txt, tkcnt
                            return
                        
                        # 提交工具调用任务
                        thr.append(executor.submit(use_tool, name, args))
                    
                    # 4.4 反思工具结果,注入 history
                    st = timer()
                    reflection = reflect(self.chat_mdl, hist, [th.result() for th in thr], user_defined_prompt)
                    hist.append({"role": "user", "content": reflection})
                    self.callback("reflection", {}, reflection, elapsed_time=timer()-st)
            
            except Exception as e:
                # 4.5 工具调用失败,提示 LLM 修正
                logging.exception(f"Wrong JSON argument format: {e}")
                hist.append({"role": "user", "content": f"Tool call error: {e}"})
        
        # 5. 达到最大轮次,强制生成答案
        logging.warning(f"Exceed max rounds: {self._param.max_rounds}")
        hist.append({"role": "user", "content": "Provide a FINAL answer based on existing data."})
        for txt, tkcnt in complete():
            yield txt, tkcnt

关键技术点

  1. 任务分析

    • analyze_task() 使用 LLM 分析用户请求,生成任务描述
    • 帮助后续 LLM 更好地理解需要完成的目标
  2. ReAct 循环

    • LLM 返回 JSON 格式的工具调用列表:[{"name": "tool1", "arguments": {...}}, ...]
    • 并行执行所有工具调用(max_workers=5)
    • 将工具结果通过 reflect() 总结后注入 history
  3. 工具调用会话

    • LLMToolPluginCallSession 管理工具对象字典
    • 支持 Component 工具(Retrieval、CodeExec)和 MCP 工具
  4. 终止条件

    • LLM 返回 COMPLETE_TASK 工具调用
    • 达到 max_rounds 限制(默认 5 轮)

4.3.4 Retrieval 组件执行流程

文件agent/tools/retrieval.py:82-206

class Retrieval(ToolBase, ABC):
    @timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 12)))
    def _invoke(self, **kwargs):
        # 1. 解析知识库 ID(支持变量引用)
        kb_ids = []
        for id in self._param.kb_ids:
            if id.find("@") < 0:
                kb_ids.append(id)
                continue
            # 从 Canvas globals 获取变量值
            kb_nm = self._canvas.get_variable_value(id)
            kb_nm_list = kb_nm if isinstance(kb_nm, list) else [kb_nm]
            for nm_or_id in kb_nm_list:
                e, kb = KnowledgebaseService.get_by_name(nm_or_id, self._canvas._tenant_id)
                if not e:
                    e, kb = KnowledgebaseService.get_by_id(nm_or_id)
                kb_ids.append(kb.id)
        
        # 2. 检查知识库有效性
        kbs = KnowledgebaseService.get_by_ids(kb_ids)
        if not kbs:
            raise Exception("No dataset is selected.")
        
        embd_nms = list(set([kb.embd_id for kb in kbs]))
        assert len(embd_nms) == 1, "Knowledge bases use different embedding models."
        
        # 3. 初始化 Embedding 和 Rerank 模型
        embd_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING, embd_nms[0])
        
        rerank_mdl = None
        if self._param.rerank_id:
            rerank_mdl = LLMBundle(kbs[0].tenant_id, LLMType.RERANK, self._param.rerank_id)
        
        # 4. 变量替换查询词
        query = kwargs["query"]
        vars = self.get_input_elements_from_text(query)
        vars = {k: o["value"] for k, o in vars.items()}
        query = self.string_format(query, vars)
        
        # 5. 元数据过滤(可选)
        doc_ids = None
        if self._param.meta_data_filter:
            metas = DocumentService.get_meta_by_kbs(kb_ids)
            if self._param.meta_data_filter.get("method") == "auto":
                # LLM 自动生成过滤条件
                chat_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.CHAT)
                filters = gen_meta_filter(chat_mdl, metas, query)
                doc_ids = meta_filter(metas, filters)
        
        # 6. 跨语言检索(可选)
        if self._param.cross_languages:
            query = cross_languages(kbs[0].tenant_id, None, query, self._param.cross_languages)
        
        # 7. 向量检索
        kbinfos = settings.retriever.retrieval(
            query,
            embd_mdl,
            [kb.tenant_id for kb in kbs],
            kb_ids,
            1,  # page
            self._param.top_n,
            self._param.similarity_threshold,
            self._param.similarity_threshold,
            self._param.keywords_similarity_weight,
            self._param.top_k,
            doc_ids=doc_ids,
            use_knowledge_graph=self._param.use_kg
        )
        
        # 8. Rerank(可选)
        if rerank_mdl:
            kbinfos["chunks"] = rerank_mdl.rerank(query, kbinfos["chunks"], self._param.top_n)
        
        # 9. 缓存到 Canvas.retrieval
        self._canvas.retrieval[-1] = {"chunks": kbinfos["chunks"], "doc_aggs": kbinfos["doc_aggs"]}
        
        # 10. 格式化输出(用于 LLM Prompt)
        formated_content = kb_prompt(kbinfos, self.chat_mdl.max_length if hasattr(self, "chat_mdl") else 8192)
        
        self.set_output("formalized_content", formated_content)
        self.set_output("chunks", kbinfos["chunks"])
        self.set_output("doc_aggs", kbinfos["doc_aggs"])
        
        return formated_content

关键技术点

  1. 动态知识库选择

    • 支持从 Canvas globals 动态获取知识库 ID
    • 适用于多租户、多知识库场景
  2. 检索流程

    • Embedding:将 query 向量化
    • 向量检索:Elasticsearch 混合检索(向量 + 关键词)
    • Rerank(可选):使用 Rerank 模型重新排序
  3. 元数据过滤

    • 支持手动和自动两种模式
    • 自动模式使用 LLM 根据 query 生成过滤条件
  4. 结果缓存

    • 存储到 Canvas.retrieval[-1]
    • LLM 组件可通过 get_reference() 获取用于引用插入

4.4 数据流与状态传递

4.4.1 组件间数据传递机制

变量引用语法{component_id@output_key}{sys.variable_name}

解析流程

  1. 定义阶段(DSL)
{
    "llm_0": {
        "obj": {
            "params": {
                "sys_prompt": "根据以下知识回答:\n{retrieval_0@formalized_content}",
                "prompts": [{"role": "user", "content": "{sys.query}"}]
            }
        }
    }
}
  1. 执行阶段(Runtime)
# agent/component/base.py
class ComponentBase:
    def get_input(self) -> dict[str, Any]:
        """获取组件输入"""
        ans = {}
        for k, v in self.get_input_elements().items():
            # 从变量引用获取实际值
            ans[k] = v.get("value")
        return ans
    
    def get_input_elements(self) -> dict[str, Any]:
        """解析变量引用"""
        res = {}
        text = self._param.sys_prompt  # 示例:包含 {retrieval_0@formalized_content}
        
        # 正则匹配:{component_id@key} 或 {sys.xxx}
        for m in re.finditer(self.variable_ref_patt, text):
            ref = m.group(1)  # 例如:retrieval_0@formalized_content
            
            if ref.find("@") > 0:
                # 组件输出引用
                arr = ref.split("@")
                cpn_id, output_key = arr[0], arr[1]
                cpn_obj = self._canvas.get_component_obj(cpn_id)
                res[ref] = {"value": cpn_obj.output(output_key)}
            else:
                # 全局变量引用
                res[ref] = {"value": self._canvas.globals.get(ref)}
        
        return res
    
    def string_format(self, text: str, args: dict) -> str:
        """替换变量"""
        for k, v in args.items():
            text = text.replace("{" + k + "}", str(v))
        return text

关键点

  • 组件执行时动态解析变量引用
  • 支持跨组件数据传递(通过 component_id@key
  • 支持全局变量访问(通过 sys.xxx

4.4.2 全局状态管理

Canvas 状态字典

class Canvas(Graph):
    def __init__(self, dsl: str, tenant_id=None, task_id=None):
        # 全局变量
        self.globals = {
            "sys.query": "",           # 用户当前问题
            "sys.user_id": "",         # 用户 ID
            "sys.conversation_turns": 0,  # 对话轮次
            "sys.files": []            # 上传文件列表
        }
        
        # 对话历史(多轮上下文)
        self.history = []  # [("user", "问题1"), ("assistant", "回答1"), ...]
        
        # 检索结果缓存
        self.retrieval = []  # [{"chunks": [...], "doc_aggs": {...}}, ...]
        
        # 执行路径
        self.path = []  # ["begin", "retrieval_0", "llm_0", "message_0"]
        
        # 消息列表(用于持久化)
        self.messages = []  # [{"role": "user", "content": "..."}, ...]

状态更新时机

  1. globals

    • Canvas.run() 初始化时设置 sys.querysys.user_idsys.files
    • 每次对话 sys.conversation_turns 自增
  2. history

    • LLM/Agent 组件完成后,调用 Canvas.add_assistant_output()
    • 用于多轮对话上下文管理
  3. retrieval

    • Retrieval 组件执行后,更新 Canvas.retrieval[-1]
    • LLM 组件可获取用于引用插入
  4. path

    • Canvas 执行过程中动态扩展
    • 记录完整执行轨迹

4.5 关键代码路径总结

完整调用链(自上而下)

API 请求
  
api/apps/api_app.py:completion()
  ├─ APIToken 鉴权
  ├─ ConversationService.get_by_id() 获取会话
  └─ UserCanvasService.get_by_id() 加载 Canvas DSL
      
agent/canvas.py:Canvas.__init__()
  ├─ json.loads(dsl) 解析 DSL
  └─ Canvas.load() 实例化组件
      
agent/canvas.py:Canvas.run()
  ├─ 初始化 globalspathretrieval
  ├─ yield "workflow_started"
  └─ while idx < len(path): 主循环
      ├─ yield "node_started"
      ├─ _run_batch() 并行执行组件
         ├─ ThreadPoolExecutor.submit(cpn.invoke)
         └─ 等待所有 Future 完成
             
         agent/component/base.py:ComponentBase.invoke()
           ├─ self._invoke(**kwargs) 调用子类实现
           └─ 捕获异常,记录执行时间
               
           根据组件类型分发:
           
           ┌─ agent/component/llm.py:LLM._invoke()
              ├─ _prepare_prompt_variables() 变量替换
              ├─ 检查是否流式(下游有 Message
              ├─ 流式:set_output("content", partial(...))
              └─ 非流式:self._generate() 直接调用 LLM
                  
              api/db/services/llm_service.py:LLMBundle.chat()
                └─ 调用 OpenAI/DeepSeek/Ollama API
           
           ├─ agent/component/agent_with_tools.py:Agent._invoke()
              ├─ _prepare_prompt_variables()
              └─ _react_with_tools_streamly() ReAct 循环
                  ├─ analyze_task() 任务分析
                  └─ for _ in range(max_rounds):
                      ├─ next_step() LLM 决策
                      ├─ json_repair.loads() 解析工具调用
                      ├─ ThreadPoolExecutor.submit(use_tool)
                         
                         LLMToolPluginCallSession.tool_call()
                           └─ 调用 Retrieval/CodeExec/Web 等工具
                               
                           agent/tools/retrieval.py:Retrieval._invoke()
                             ├─ Elasticsearch 向量检索
                             ├─ Rerank(可选)
                             └─ Canvas.retrieval[-1] = chunks
                      
                      └─ reflect() 反思工具结果
           
           └─ agent/tools/code_exec.py:CodeExec._invoke()
               └─ requests.post(Sandbox API)
      
      ├─ 后处理:
         ├─ Message 组件:调用 partial() 触发流式生成
            └─ yield "message"  Token 推送
         ├─ 异常处理:检查 cpn_obj.error()
         └─ 路径扩展:根据组件类型添加下游到 path
      
      ├─ yield "node_finished"
      └─ idx = to 进入下一批次
  
  ├─ yield "workflow_finished"
  └─ 返回最终结果
      
api/apps/api_app.py:sse()
  ├─ 迭代 Canvas.run() 生成器
  ├─ 格式化为 SSE 事件
  └─ yield "data: {...}\n\n"
      
HTTP Response (SSE )
  └─ 客户端接收实时事件

5. 详细时序图

5.1 完整请求处理时序(从 API 到组件)

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant API as API <br/>(api_app.py)
    participant APIToken as APIToken<br/>鉴权服务
    participant ConvService as ConversationService<br/>会话服务
    participant CanvasService as UserCanvasService<br/>Canvas 服务
    participant MySQL as MySQL<br/>数据库
    participant Canvas as Canvas<br/>执行引擎
    participant Begin as Begin<br/>组件
    participant LLM as LLM<br/>组件
    participant LLMBundle as LLMBundle<br/>LLM 服务
    participant OpenAI as OpenAI API

    Client->>+API: POST /api/v1/completion
    Note over Client,API: Headers: Authorization: Bearer xxx<br/>Body: {conversation_id, messages: [...]}
    
    API->>+APIToken: query(token)
    APIToken->>MySQL: SELECT * FROM api_token WHERE token=?
    MySQL-->>APIToken: token_record
    APIToken-->>-API: tenant_id
    
    API->>+ConvService: get_by_id(conversation_id)
    ConvService->>MySQL: SELECT * FROM conversation WHERE id=?
    MySQL-->>ConvService: conversation record
    ConvService-->>-API: conv (包含 dialog_id, dsl, message)
    
    API->>+CanvasService: get_by_id(conv.dialog_id)
    CanvasService->>MySQL: SELECT * FROM user_canvas WHERE id=?
    MySQL-->>CanvasService: canvas record (包含 DSL)
    CanvasService-->>-API: cvs (DSL JSON)
    
    API->>+Canvas: __init__(cvs.dsl, tenant_id)
    Canvas->>Canvas: json.loads(dsl)
    Canvas->>Canvas: load() - 实例化所有组件
    Note over Canvas: 解析 componentsgraphhistoryglobals
    Canvas-->>-API: canvas 实例
    
    API->>Canvas: add_user_input(msg[-1].content)
    API->>+Canvas: run(query=msg.content, user_id=tenant_id)
    
    rect rgb(240, 248, 255)
        Note over Canvas: ========== Canvas 执行流程 ==========
        
        Canvas->>Canvas: 初始化 globals<br/>sys.query, sys.user_id, sys.conversation_turns++
        Canvas->>Canvas: path.append("begin")
        Canvas-->>API: yield {event: "workflow_started", data: {...}}
        
        Canvas->>+Begin: invoke(inputs={})
        Begin->>Begin: 设置初始变量
        Begin-->>-Canvas: outputs = {}
        Canvas-->>API: yield {event: "node_finished", component_id: "begin"}
        
        Canvas->>Canvas: path.extend(["llm_0"])
        Canvas-->>API: yield {event: "node_started", component_id: "llm_0"}
        
        Canvas->>+LLM: invoke()
        LLM->>LLM: _prepare_prompt_variables()<br/>替换 {sys.query}  实际问题
        LLM->>LLM: 检查下游是否有 Message  
        LLM->>LLM: set_output("content", partial(stream_gen))
        Note over LLM: 返回 partial 对象(延迟执行)
        LLM-->>-Canvas: outputs = {content: <partial>}
        
        Canvas->>Canvas: 检测到 partial,暂不发送 node_finished
        Canvas->>Canvas: path.extend(["message_0"])
        Canvas-->>API: yield {event: "node_started", component_id: "message_0"}
        
        Canvas->>Canvas: 发现 Message 组件,调用 partial()
        Canvas->>+LLM: content.partial() - 触发流式生成
        LLM->>+LLMBundle: chat_streamly(sys_prompt, msg, gen_conf)
        LLMBundle->>+OpenAI: POST /v1/chat/completions (stream=true)
        
        loop 流式 Token 生成
            OpenAI-->>LLMBundle: {"delta": {"content": "tok"}}
            LLMBundle-->>LLM: yield "tok"
            LLM-->>Canvas: yield "tok"
            Canvas-->>API: yield {event: "message", data: {content: "tok"}}
            API-->>Client: data: {code: 0, data: {answer: "tok"}}\n\n
        end
        
        OpenAI-->>-LLMBundle: {"finish_reason": "stop"}
        LLMBundle-->>-LLM: 完成
        LLM->>Canvas: add_assistant_output(full_answer)
        LLM-->>-Canvas: 生成完毕
        
        Canvas->>Canvas: 检查是否有引用
        Canvas-->>API: yield {event: "message_end", data: {reference: [...]}}
        Canvas-->>API: yield {event: "node_finished", component_id: "llm_0"}
        Canvas-->>API: yield {event: "workflow_finished", outputs: {...}}
    end
    
    Canvas-->>-API: 返回最终结果
    
    API->>Canvas: str(canvas)  DSL JSON
    API->>+CanvasService: update_by_id(dialog_id, cvs.to_dict())
    CanvasService->>MySQL: UPDATE user_canvas SET dsl=?
    MySQL-->>CanvasService: OK
    CanvasService-->>-API: 完成
    
    API->>+ConvService: append_message(conv_id, conv.to_dict())
    ConvService->>MySQL: UPDATE conversation SET message=?, reference=?
    MySQL-->>ConvService: OK
    ConvService-->>-API: 完成
    
    API-->>Client: data: {code: 0, data: true}\n\n
    API-->>-Client: SSE 连接关闭

时序图说明

  1. 鉴权阶段(步骤 1-6)

    • 客户端携带 Bearer Token 发起请求
    • API 层通过 APIToken 服务验证 Token,获取 tenant_id
    • 加载会话记录(Conversation)和 Canvas DSL
  2. Canvas 初始化(步骤 7-10)

    • 解析 DSL JSON,实例化所有组件对象
    • 恢复会话状态(history、globals、retrieval)
  3. 组件执行流程(步骤 11-35)

    • Begin 组件:初始化入口变量
    • LLM 组件
      • 变量替换({sys.query} → 实际问题)
      • 检测到下游有 Message,返回 partial 对象
    • Message 组件
      • 触发 partial() 执行
      • LLM 流式生成,逐 Token yield 到客户端
  4. 流式响应(步骤 25-30)

    • OpenAI API 返回流式 Token
    • 通过 LLMBundle → LLM → Canvas → API → Client 逐层传递
    • 客户端实时接收 SSE 事件
  5. 状态持久化(步骤 37-45)

    • 更新 Canvas DSL 到 MySQL
    • 更新会话消息和引用到 MySQL

5.2 Agent ReAct 工具调用时序

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Canvas as Canvas<br/>执行引擎
    participant Agent as Agent<br/>组件
    participant LLMBundle as LLMBundle<br/>LLM 服务
    participant PromptGen as Prompt<br/>生成器
    participant ThreadPool as ThreadPool<br/>Executor
    participant Retrieval as Retrieval<br/>工具
    participant CodeExec as CodeExec<br/>工具
    participant Sandbox as Sandbox<br/>沙箱
    participant ES as Elasticsearch<br/>检索引擎

    Client->>+Canvas: run(query="分析用户增长趋势并生成图表")
    Canvas->>+Agent: invoke()
    
    Agent->>Agent: _prepare_prompt_variables()
    Agent->>+PromptGen: analyze_task(chat_mdl, prompt, query, tool_meta)
    PromptGen->>LLMBundle: chat("分析以下任务需要哪些工具...")
    LLMBundle-->>PromptGen: task_desc = "需要检索用户数据,分析趋势,生成图表"
    PromptGen-->>-Agent: task_desc
    Agent->>Canvas: callback("analyze_task", task_desc)
    
    rect rgb(255, 250, 240)
        Note over Agent: ========== ReAct 循环开始 ==========
        
        Agent->>Agent: for round in range(max_rounds):
        
        Agent->>+PromptGen: next_step(chat_mdl, hist, tool_meta, task_desc)
        PromptGen->>LLMBundle: chat("根据任务描述,决定下一步操作...")
        LLMBundle-->>PromptGen: response = [{"name": "search_my_dataset", "arguments": {"query": "用户增长数据"}}, ...]
        PromptGen-->>-Agent: response (JSON 工具调用列表)
        
        Agent->>Agent: hist.append({role: "assistant", content: response})
        Agent->>Agent: json_repair.loads(response)  functions
        
        rect rgb(240, 255, 240)
            Note over Agent,CodeExec: ========== 并行工具调用 ==========
            
            Agent->>+ThreadPool: submit(use_tool, "search_my_dataset", args)
            Agent->>+ThreadPool: submit(use_tool, "code_exec", args)
            
            par 并行执行工具
                ThreadPool->>+Retrieval: tool_call("search_my_dataset", args)
                Retrieval->>Retrieval: 解析 kb_ids, query
                Retrieval->>+ES: 向量检索 + 关键词检索
                ES-->>-Retrieval: chunks = [{content: "...", doc_id: "..."}]
                Retrieval->>Retrieval: Rerank(可选)
                Retrieval->>Canvas: retrieval[-1] = {chunks, doc_aggs}
                Retrieval->>Retrieval: kb_prompt(chunks)  formated_content
                Retrieval-->>-ThreadPool: formated_content
                
            and
                ThreadPool->>+CodeExec: tool_call("code_exec", args)
                CodeExec->>CodeExec: base64.encode(code)
                CodeExec->>+Sandbox: POST /execute {code_b64, language: "python"}
                Sandbox->>Sandbox: 创建隔离容器
                Sandbox->>Sandbox: 执行 Python 代码
                Sandbox-->>-CodeExec: {status: "success", stdout: "...", result: {...}}
                CodeExec->>CodeExec: set_output("content", stdout)
                CodeExec-->>-ThreadPool: stdout
            end
            
            ThreadPool-->>Agent: [("search_my_dataset", result1), ("code_exec", result2)]
            ThreadPool-->>-Agent: 所有工具调用完成
        end
        
        Agent->>Agent: use_tools.append({name, arguments, results})
        
        Agent->>+PromptGen: reflect(chat_mdl, hist, tool_results)
        PromptGen->>LLMBundle: chat("总结以下工具调用结果...")
        LLMBundle-->>PromptGen: reflection = "检索到用户增长数据,代码执行成功生成图表"
        PromptGen-->>-Agent: reflection
        
        Agent->>Agent: hist.append({role: "user", content: reflection})
        Agent->>Canvas: callback("reflection", reflection)
        
        Agent->>+PromptGen: next_step(chat_mdl, hist, tool_meta, task_desc)
        PromptGen->>LLMBundle: chat("根据工具结果,决定下一步...")
        LLMBundle-->>PromptGen: response = [{"name": "COMPLETE_TASK", "arguments": {}}]
        PromptGen-->>-Agent: response
        
        Agent->>Agent: 检测到 COMPLETE_TASK,退出循环
    end
    
    rect rgb(255, 240, 240)
        Note over Agent,LLMBundle: ========== 生成最终答案 ==========
        
        Agent->>Agent: complete() 回调
        Agent->>Agent: hist[0].content += citation_prompt()
        Agent->>+LLMBundle: chat_streamly(hist)
        
        loop 流式生成
            LLMBundle-->>Agent: yield "根据检索到的数据分析..."
            Agent-->>Canvas: yield "根据检索到的数据分析..."
            Canvas-->>Client: SSE event: {event: "message", content: "..."}
        end
        
        LLMBundle-->>-Agent: 完成
    end
    
    Agent->>Canvas: set_output("content", final_answer)
    Agent->>Canvas: set_output("use_tools", use_tools)
    Agent-->>-Canvas: 返回
    
    Canvas->>Canvas: add_assistant_output(final_answer)
    Canvas-->>Client: yield {event: "workflow_finished"}
    Canvas-->>-Client: 完成

时序图说明

  1. 任务分析阶段(步骤 1-6)

    • analyze_task() 使用 LLM 分析用户请求
    • 生成任务描述,帮助后续决策
  2. ReAct 循环(步骤 7-28)

    • 决策阶段next_step() 调用 LLM 决定需要哪些工具
    • 执行阶段:使用 ThreadPoolExecutor 并行调用多个工具
    • 反思阶段reflect() 总结工具结果,注入到 history
  3. 并行工具调用(步骤 13-23)

    • Retrieval 工具
      • Elasticsearch 向量检索 + 关键词检索
      • Rerank 重新排序
      • 格式化为 Prompt 可用文本
    • CodeExec 工具
      • Base64 编码代码
      • 发送到 Sandbox 执行
      • 返回 stdout 和 result
  4. 终止判断(步骤 29-32)

    • LLM 返回 COMPLETE_TASK 工具调用
    • 退出 ReAct 循环
  5. 最终答案生成(步骤 33-40)

    • 插入引用提示(citation_prompt)
    • 流式生成综合答案
    • 返回给客户端

5.3 Canvas 组件批量执行时序

sequenceDiagram
    autonumber
    participant Canvas as Canvas<br/>执行引擎
    participant ThreadPool as ThreadPool<br/>Executor
    participant Begin as Begin<br/>组件
    participant Retrieval as Retrieval<br/>组件
    participant LLM as LLM<br/>组件
    participant Message as Message<br/>组件
    participant ES as Elasticsearch
    participant OpenAI as OpenAI API

    Canvas->>Canvas: run(query="RAGFlow 如何部署?")
    Canvas->>Canvas: path = ["begin"]
    Canvas->>Canvas: globals["sys.query"] = "RAGFlow 如何部署?"
    
    rect rgb(240, 248, 255)
        Note over Canvas: ========== 批次 1Begin ==========
        
        Canvas->>Canvas: idx = 0, to = 1
        Canvas->>Canvas: yield {event: "node_started", component_id: "begin"}
        
        Canvas->>+ThreadPool: _run_batch(0, 1)
        ThreadPool->>+Begin: invoke(inputs={})
        Begin->>Begin: 设置初始变量
        Begin->>Begin: set_output("outputs", {})
        Begin-->>-ThreadPool: outputs = {}
        ThreadPool-->>-Canvas: 批次完成
        
        Canvas->>Begin: get_component("begin")["downstream"]
        Canvas->>Canvas: path.extend(["retrieval_0"])
        Canvas->>Canvas: yield {event: "node_finished", component_id: "begin"}
    end
    
    rect rgb(240, 255, 240)
        Note over Canvas: ========== 批次 2Retrieval ==========
        
        Canvas->>Canvas: idx = 1, to = 2
        Canvas->>Canvas: yield {event: "node_started", component_id: "retrieval_0"}
        
        Canvas->>+ThreadPool: _run_batch(1, 2)
        ThreadPool->>+Retrieval: invoke(query="{sys.query}")
        Retrieval->>Retrieval: get_input()  {query: "RAGFlow 如何部署?"}
        Retrieval->>Retrieval: string_format(query, vars)
        Retrieval->>+ES: 向量检索(query, kb_ids, top_n=8)
        ES-->>-Retrieval: chunks = [{content: "...", score: 0.89}, ...]
        Retrieval->>Canvas: retrieval[-1] = {chunks, doc_aggs}
        Retrieval->>Retrieval: kb_prompt(chunks)  formated_content
        Retrieval->>Retrieval: set_output("formalized_content", formated_content)
        Retrieval-->>-ThreadPool: outputs = {formalized_content: "..."}
        ThreadPool-->>-Canvas: 批次完成
        
        Canvas->>Retrieval: get_component("retrieval_0")["downstream"]
        Canvas->>Canvas: path.extend(["llm_0"])
        Canvas->>Canvas: yield {event: "node_finished", component_id: "retrieval_0"}
    end
    
    rect rgb(255, 250, 240)
        Note over Canvas: ========== 批次 3LLM ==========
        
        Canvas->>Canvas: idx = 2, to = 3
        Canvas->>Canvas: yield {event: "node_started", component_id: "llm_0"}
        
        Canvas->>+ThreadPool: _run_batch(2, 3)
        ThreadPool->>+LLM: invoke()
        LLM->>LLM: get_input()  {knowledge: formalized_content, query: "..."}
        LLM->>LLM: sys_prompt = "根据以下知识回答:\n{retrieval_0@formalized_content}"
        LLM->>LLM: string_format(sys_prompt, {知识内容})
        LLM->>LLM: 检查下游是否有 Message  
        LLM->>LLM: set_output("content", partial(_stream_and_cite))
        Note over LLM: 返回 partial 对象,延迟执行
        LLM-->>-ThreadPool: outputs = {content: <partial>}
        ThreadPool-->>-Canvas: 批次完成
        
        Canvas->>Canvas: 检测到 partial,加入 partials 队列
        Canvas->>LLM: get_component("llm_0")["downstream"]
        Canvas->>Canvas: path.extend(["message_0"])
        Note over Canvas: 暂不发送 node_finished
    end
    
    rect rgb(255, 240, 255)
        Note over Canvas: ========== 批次 4Message(触发流式) ==========
        
        Canvas->>Canvas: idx = 3, to = 4
        Canvas->>Canvas: yield {event: "node_started", component_id: "message_0"}
        
        Canvas->>+ThreadPool: _run_batch(3, 4)
        ThreadPool->>+Message: invoke(content="{llm_0@content}")
        Message->>Message: get_input()  {content: <partial>}
        Message->>Message: set_output("content", <partial>)
        Message-->>-ThreadPool: outputs = {content: <partial>}
        ThreadPool-->>-Canvas: 批次完成
        
        Note over Canvas: 后处理:发现 Message 组件
        Canvas->>Canvas: 检测到 Message.output("content")  partial
        Canvas->>+Message: output("content")()  调用 partial
        Message->>+LLM: _stream_and_cite() 生成器
        LLM->>+OpenAI: POST /v1/chat/completions (stream=true)
        
        loop 流式 Token 生成
            OpenAI-->>LLM: {"delta": {"content": "tok"}}
            LLM-->>Message: yield "tok"
            Message-->>Canvas: yield "tok"
            Canvas->>Canvas: yield {event: "message", data: {content: "tok"}}
        end
        
        OpenAI-->>-LLM: {"finish_reason": "stop"}
        LLM->>Canvas: add_assistant_output(full_answer)
        LLM-->>-Message: 生成完毕
        Message-->>-Canvas: 完成
        
        Canvas->>Canvas: set_output("content", full_answer)
        Canvas->>Canvas: 检查引用:re.search(r"\[ID:[ 0-9]+\]", content)
        Canvas->>Canvas: yield {event: "message_end", data: {reference: [...]}}
        
        Note over Canvas: 处理 partials 队列
        Canvas->>Canvas: partials = ["llm_0"]
        Canvas->>Canvas: yield {event: "node_finished", component_id: "llm_0"}
        Canvas->>Canvas: yield {event: "node_finished", component_id: "message_0"}
    end
    
    Canvas->>Canvas: path = ["begin", "retrieval_0", "llm_0", "message_0"]
    Canvas->>Canvas: idx = 4, len(path) = 4  退出循环
    Canvas->>Canvas: yield {event: "workflow_finished", outputs: {...}}

时序图说明

  1. 批次划分

    • Canvas 将 path 分为多个批次
    • 每个批次包含当前 path[idx:to] 区间的组件
    • 批次内组件并行执行
  2. 并行执行

    • 使用 ThreadPoolExecutor(max_workers=5)
    • 同时提交批次内所有组件的 invoke() 调用
    • 等待所有 Future 完成后继续
  3. 路径扩展

    • 每个组件执行后,根据 downstream 扩展 path
    • 动态添加下游组件到执行队列
  4. 流式处理

    • LLM 返回 partial 对象
    • Message 组件调用 partial() 触发生成器
    • 逐 Token yield 到客户端
  5. 事件顺序

    • node_started → 批量执行 → node_finished
    • message 事件在流式生成时逐个发送
    • message_end 包含引用信息

5.4 模块间交互总结

关键交互模式

  1. API → Canvas

    • API 层负责鉴权、会话管理、SSE 流式响应
    • Canvas 负责工作流编排、组件调度、状态管理
  2. Canvas → Component

    • Canvas 通过 invoke() 统一调用组件
    • 组件通过变量引用 {component_id@key} 获取输入
    • 组件通过 set_output() 设置输出
  3. Component → LLMBundle

    • LLM/Agent 组件通过 LLMBundle 调用 LLM
    • LLMBundle 统一管理 OpenAI/DeepSeek/Ollama/本地模型
  4. Component → External Service

    • Retrieval → Elasticsearch(向量检索)
    • CodeExec → Sandbox(代码执行)
    • WebSearch → Tavily/DuckDuckGo(联网搜索)
  5. 流式传递链

    OpenAI API → LLMBundle → LLM Component → Canvas → API Layer → SSE → Client
    

6. 最佳实践与案例

6.1 简单问答 Agent

DSL 示例

{
    "components": {
        "begin": {
            "obj": {"component_name": "Begin", "params": {}},
            "downstream": ["retrieval_0"]
        },
        "retrieval_0": {
            "obj": {
                "component_name": "Retrieval",
                "params": {
                    "kb_ids": ["kb_uuid_1"],
                    "query": "{sys.query}",
                    "top_n": 6
                }
            },
            "downstream": ["llm_0"]
        },
        "llm_0": {
            "obj": {
                "component_name": "LLM",
                "params": {
                    "llm_id": "gpt-4",
                    "sys_prompt": "根据以下知识回答问题:\n{retrieval_0@content}",
                    "prompts": [{"role": "user", "content": "{sys.query}"}]
                }
            },
            "downstream": ["message_0"]
        },
        "message_0": {
            "obj": {
                "component_name": "Message",
                "params": {"content": "{llm_0@content}"}
            },
            "downstream": []
        }
    }
}

6.2 ReAct Agent(多工具调用)

DSL 示例

{
    "components": {
        "begin": {"...": "..."},
        "agent_0": {
            "obj": {
                "component_name": "Agent",
                "params": {
                    "llm_id": "gpt-4",
                    "sys_prompt": "你是一个智能助手,可以使用以下工具完成任务。",
                    "tools": [
                        {
                            "component_name": "Retrieval",
                            "name": "knowledge_search",
                            "params": {"kb_ids": ["kb_uuid_1"]}
                        },
                        {
                            "component_name": "CodeExec",
                            "name": "code_interpreter",
                            "params": {"lang": "python"}
                        },
                        {
                            "component_name": "WebSearch",
                            "name": "web_search",
                            "params": {"engine": "tavily"}
                        }
                    ],
                    "max_rounds": 5
                }
            },
            "downstream": ["message_0"]
        }
    }
}

6.3 条件分支 Agent

DSL 示例

{
    "components": {
        "categorize_0": {
            "obj": {
                "component_name": "Categorize",
                "params": {
                    "llm_id": "gpt-4",
                    "items": [
                        {"key": "technical", "content": "{sys.query}", "examples": "代码错误、部署问题"},
                        {"key": "business", "content": "{sys.query}", "examples": "产品功能、价格咨询"}
                    ]
                }
            },
            "downstream": {
                "technical": ["agent_tech"],
                "business": ["agent_biz"]
            }
        },
        "agent_tech": {"...": "技术支持 Agent"},
        "agent_biz": {"...": "业务咨询 Agent"}
    }
}

7. 性能优化建议

1. 并行执行

  • 无依赖的组件并行执行(ThreadPoolExecutor)
  • 设置合理的 max_workers(默认 5)

2. 缓存策略

  • 检索结果缓存到 Canvas.retrieval
  • LLM 响应缓存(相同 Prompt)

3. 超时控制

  • 组件级超时(通过 @timeout 装饰器)
  • 默认超时:LLM 10 分钟,CodeExec 5 分钟

4. 异常处理

  • 组件失败时使用 exception_handler 回退
  • 设置 max_retries(默认 0)