RAGFlow-03-Agent模块
模块概览
1.1 职责与定位
Agent 模块是 RAGFlow 的智能代理工作流引擎,提供可视化编排的 Agent 能力。主要职责包括:
- 工作流编排:通过有向无环图(DAG)定义复杂的 Agent 执行流程
- 组件化设计:提供可复用的组件(LLM、Tool、Retrieval、Switch 等)
- 工具调用:支持 LLM 自主调用工具(检索、代码执行、外部 API)
- 状态管理:维护全局变量、历史记录、检索缓存
- 流式执行:支持流式输出与实时进度反馈
- 异常处理:组件级异常捕获与回退策略
1.2 输入与输出
输入:
- DSL(Domain Specific Language):JSON 格式的工作流定义
- 运行时参数:query(用户问题)、files(上传文件)、inputs(自定义变量)
- 全局变量:sys.query、sys.user_id、sys.conversation_turns、sys.files
输出:
- 事件流:workflow_started、node_started、node_finished、message、workflow_finished
- 最终结果:answer(文本)、reference(引用)、outputs(自定义输出)
- 执行轨迹:path(执行路径)、history(对话历史)、retrieval(检索记录)
1.3 核心概念
Graph(图):
- 由 nodes(节点)和 edges(边)组成的有向无环图
- 节点表示组件(Component),边表示数据流
- 支持条件分支(Switch)、循环(Iteration)、并行执行
Component(组件):
- 最小执行单元,包含参数(Param)与逻辑(invoke)
- 输入来自上游组件的输出,输出传递给下游组件
- 支持异常处理(exception_handler)与重试(max_retries)
Canvas(画布):
- Graph 的子类,专门用于 Agent 工作流
- 维护全局状态(globals)、对话历史(history)、检索记录(retrieval)
- 支持流式执行(yield 事件)
1. 完整服务架构图(包含上游 API)
flowchart TB
subgraph "客户端层"
WebUI[前端 Web UI]
SDK[Python SDK]
HTTP[HTTP Client]
OpenAI[OpenAI 兼容客户端]
end
subgraph "API 接口层"
SessionAPI["/agents/:id/sessions<br/>POST 创建会话"]
CompletionAPI["/completion<br/>POST Agent 对话"]
CanvasAPI["/canvas/completion<br/>POST Canvas 执行"]
ChatAPI["/chats/:id/completions<br/>POST Chat 完成"]
end
subgraph "服务编排层"
APIApp[api_app.py<br/>API 路由与认证]
CanvasApp[canvas_app.py<br/>Canvas 管理]
SessionApp[session.py<br/>会话管理]
subgraph "核心服务"
UserCanvasService[UserCanvasService<br/>Canvas CRUD]
ConversationService[ConversationService<br/>会话持久化]
APITokenService[APIToken<br/>认证管理]
end
end
subgraph "Agent 执行引擎层"
Canvas[Canvas 执行器<br/>agent/canvas.py]
subgraph "状态管理"
PathManager[Path 路径队列<br/>组件执行顺序]
GlobalsManager[Globals 全局变量<br/>sys.query/user_id/files]
HistoryManager[History 对话历史<br/>多轮上下文]
RetrievalCache[Retrieval 检索缓存<br/>chunks + doc_aggs]
end
subgraph "事件流"
EventEmitter[事件发射器 SSE]
EventTypes["workflow_started<br/>node_started<br/>node_finished<br/>message<br/>workflow_finished"]
end
end
subgraph "组件层(Component)"
Begin[Begin<br/>入口组件]
LLM[LLM<br/>基础 LLM 调用]
Agent[Agent<br/>ReAct 工具代理]
Message[Message<br/>流式输出]
subgraph "工具组件"
Retrieval[Retrieval<br/>知识检索]
CodeExec[CodeExec<br/>代码执行]
WebSearch[WebSearch<br/>联网搜索]
APICall[API Call<br/>外部 API]
end
subgraph "控制流组件"
Switch[Switch<br/>条件分支]
Categorize[Categorize<br/>意图分类]
Iteration[Iteration<br/>循环控制]
end
end
subgraph "底层服务层"
LLMBundle[LLM Bundle<br/>统一 LLM 调用]
subgraph "外部服务"
OpenAIAPI[OpenAI API]
DeepSeek[DeepSeek API]
LocalLLM[本地 LLM]
Ollama[Ollama]
end
subgraph "存储服务"
Redis[(Redis<br/>日志/缓存)]
ES[(Elasticsearch<br/>向量检索)]
MySQL[(MySQL<br/>元数据)]
end
subgraph "执行环境"
Sandbox[Sandbox<br/>代码沙箱]
MCP[MCP Server<br/>工具服务器]
end
end
WebUI --> SessionAPI
WebUI --> CompletionAPI
SDK --> CompletionAPI
HTTP --> CanvasAPI
OpenAI --> ChatAPI
SessionAPI --> SessionApp
CompletionAPI --> APIApp
CanvasAPI --> CanvasApp
ChatAPI --> APIApp
SessionApp --> UserCanvasService
APIApp --> ConversationService
CanvasApp --> UserCanvasService
APIApp --> APITokenService
UserCanvasService --> Canvas
ConversationService --> Canvas
Canvas --> PathManager
Canvas --> GlobalsManager
Canvas --> HistoryManager
Canvas --> RetrievalCache
Canvas --> EventEmitter
EventEmitter --> EventTypes
Canvas --> Begin
Canvas --> LLM
Canvas --> Agent
Canvas --> Message
Agent --> Retrieval
Agent --> CodeExec
Agent --> WebSearch
Agent --> APICall
Canvas --> Switch
Canvas --> Categorize
Canvas --> Iteration
LLM --> LLMBundle
Agent --> LLMBundle
Retrieval --> LLMBundle
LLMBundle --> OpenAIAPI
LLMBundle --> DeepSeek
LLMBundle --> LocalLLM
LLMBundle --> Ollama
Canvas --> Redis
Retrieval --> ES
UserCanvasService --> MySQL
CodeExec --> Sandbox
Agent --> MCP
架构要点说明
1. 分层架构设计
- 客户端层:提供多种接入方式(Web UI、SDK、HTTP、OpenAI 兼容)
- API 接口层:RESTful API,负责请求验证、路由分发、SSE 流式响应
- 服务编排层:业务逻辑编排,管理 Canvas/Conversation 生命周期
- 执行引擎层:Canvas 工作流引擎,组件调度、状态管理、事件发射
- 组件层:可复用的原子能力(LLM、Tool、Control Flow)
- 底层服务层:LLM 调用、存储访问、外部服务集成
2. DSL 驱动架构
- 工作流通过 JSON DSL 定义,前端可视化编辑器生成
- DSL 包含 components、graph、history、retrieval、globals 五部分
- 运行时解析 DSL,动态实例化组件并构建执行路径
- DSL 持久化到 MySQL,支持版本管理和会话恢复
3. 组件化设计
- 每个组件继承
ComponentBase,实现invoke()方法 - 参数通过
ComponentParamBase定义,支持类型校验与默认值 - 组件输出通过
set_output()设置,下游通过get_input()获取 - 组件间解耦,通过变量引用(
{component_id@output_key})传递数据
4. 执行模型
- 顺序执行:按 path 列表顺序执行组件
- 条件分支:Switch/Categorize 组件根据条件选择下游路径
- 循环执行:Iteration 组件重复执行子图,直到满足退出条件
- 并行执行:同一批次的组件(无依赖)并行执行(ThreadPoolExecutor,max_workers=5)
5. 状态管理
- globals:全局变量字典,支持跨组件访问(如
sys.query、sys.user_id、sys.files) - history:对话历史列表,用于多轮对话上下文
- retrieval:检索结果缓存,避免重复检索(chunks + doc_aggs)
- path:动态路径队列,记录执行轨迹与待执行组件
6. 流式架构
- 基于 SSE(Server-Sent Events)推送实时事件
- 支持
partial延迟执行,实现流式 Token 生成 - 事件类型:workflow_started、node_started、node_finished、message、message_end、workflow_finished
- 客户端可实时展示执行进度与中间结果
2. 核心数据结构
2.1 Canvas DSL 结构
{
"components": {
"begin": {
"obj": {
"component_name": "Begin",
"params": {}
},
"downstream": ["llm_0"],
"upstream": []
},
"llm_0": {
"obj": {
"component_name": "LLM",
"params": {
"llm_id": "gpt-4",
"sys_prompt": "你是一个智能助手...",
"prompts": [{"role": "user", "content": "{sys.query}"}],
"temperature": 0.7,
"cite": True
}
},
"downstream": ["message_0"],
"upstream": ["begin"]
},
"message_0": {
"obj": {
"component_name": "Message",
"params": {
"content": "{llm_0@content}"
}
},
"downstream": [],
"upstream": ["llm_0"]
}
},
"history": [],
"path": ["begin"],
"retrieval": [],
"globals": {
"sys.query": "",
"sys.user_id": "",
"sys.conversation_turns": 0,
"sys.files": []
}
}
2.2 组件数据结构 UML
classDiagram
class ComponentBase {
+Canvas _canvas
+String _id
+ComponentParamBase _param
+Dict inputs
+Dict outputs
+invoke(**kwargs)
+get_input()
+set_output(key, value)
+error()
+reset()
}
class ComponentParamBase {
+String description
+Int max_retries
+Float delay_after_error
+String exception_method
+Any exception_default_value
+List exception_goto
+Dict inputs
+Dict outputs
+check()
+as_dict()
}
class LLM {
+String llm_id
+String sys_prompt
+List prompts
+Float temperature
+Int max_tokens
+Bool cite
+_invoke(**kwargs)
}
class Agent {
+List tools
+List mcp
+Int max_rounds
+LLMBundle chat_mdl
+ToolCallSession toolcall_session
+_react_with_tools_streamly()
}
class Tool {
+String function_name
+String function_description
+Dict function_parameters
+get_meta()
+_invoke(**kwargs)
}
class Retrieval {
+List kb_ids
+Float similarity_threshold
+Int top_n
+String rerank_id
+_invoke(**kwargs)
}
ComponentBase <|-- LLM
ComponentBase <|-- Agent
ComponentBase <|-- Tool
Tool <|-- Retrieval
LLM <|-- Agent
ComponentBase *-- ComponentParamBase
3. 核心功能详细剖析
3.1 Canvas 执行引擎
3.1.1 run 方法(核心流程)
功能:执行工作流,逐个调用组件,返回事件流
核心代码:
# agent/canvas.py
class Canvas(Graph):
def run(self, **kwargs):
"""执行工作流"""
# 1. 初始化
self.message_id = get_uuid()
created_at = int(time.time())
# 2. 重置组件状态
for k, cpn in self.components.items():
self.components[k]["obj"].reset(True)
# 3. 设置全局变量
for k in ["query", "user_id", "files"]:
if kwargs.get(k):
if k == "files":
self.globals[f"sys.{k}"] = self.get_files(kwargs[k])
else:
self.globals[f"sys.{k}"] = kwargs[k]
self.globals["sys.conversation_turns"] += 1
# 4. 事件装饰器
def decorate(event, dt):
return {
"event": event,
"message_id": self.message_id,
"created_at": created_at,
"task_id": self.task_id,
"data": dt
}
# 5. 初始化路径
if not self.path:
self.path.append("begin")
yield decorate("workflow_started", {"inputs": kwargs.get("inputs")})
# 6. 执行循环
idx = len(self.path) - 1
while idx < len(self.path):
to = len(self.path)
# 6.1 发送 node_started 事件
for i in range(idx, to):
yield decorate("node_started", {
"component_id": self.path[i],
"component_name": self.get_component_name(self.path[i])
})
# 6.2 批量执行组件(并行)
_run_batch(idx, to)
# 6.3 后处理
for i in range(idx, to):
cpn_obj = self.get_component_obj(self.path[i])
# Message 组件流式输出
if cpn_obj.component_name.lower() == "message":
if isinstance(cpn_obj.output("content"), partial):
for m in cpn_obj.output("content")():
yield decorate("message", {"content": m})
else:
yield decorate("message", {"content": cpn_obj.output("content")})
yield decorate("message_end", {"reference": self.get_reference()})
# 异常处理
if cpn_obj.error():
ex = cpn_obj.exception_handler()
if ex and ex["goto"]:
self.path.extend(ex["goto"]) # 跳转到异常处理分支
elif ex and ex["default_value"]:
yield decorate("message", {"content": ex["default_value"]})
else:
self.error = cpn_obj.error()
yield decorate("node_finished", {
"component_id": cpn_obj._id,
"outputs": cpn_obj.output(),
"error": cpn_obj.error(),
"elapsed_time": cpn_obj.output("_elapsed_time")
})
# 路径扩展(根据组件类型)
if cpn_obj.component_name.lower() in ["categorize", "switch"]:
self.path.extend(cpn_obj.output("_next")) # 条件分支
elif cpn_obj.component_name.lower() == "iteration":
self.path.append(cpn_obj.get_start()) # 循环开始
elif not cpn["downstream"] and cpn_obj.get_parent():
self.path.append(cpn_obj.get_parent().get_start()) # 返回父级
else:
self.path.extend(cpn["downstream"]) # 正常流转
if self.error:
break
idx = to
# 7. 结束
yield decorate("workflow_finished", {
"outputs": self.get_component_obj(self.path[-1]).output(),
"elapsed_time": time.perf_counter() - st
})
关键点:
- 批量执行:
_run_batch使用 ThreadPoolExecutor 并行执行组件 - 流式输出:Message 组件使用
partial延迟执行,支持流式生成 - 路径动态扩展:根据组件输出动态添加下游节点到 path
- 异常捕获:每个组件执行失败时触发 exception_handler
3.2 Agent 工具调用
3.2.1 Agent 组件(LLM + Tools)
功能:LLM 自主调用工具,多轮交互完成任务
核心代码:
# agent/component/agent_with_tools.py
class Agent(LLM, ToolBase):
def __init__(self, canvas, id, param):
# 1. 初始化 LLM
LLM.__init__(self, canvas, id, param)
# 2. 加载工具
self.tools = {}
for cpn in self._param.tools:
cpn_obj = self._load_tool_obj(cpn)
self.tools[cpn_obj.get_meta()["function"]["name"]] = cpn_obj
# 3. 初始化 LLM Bundle(支持工具调用)
self.chat_mdl = LLMBundle(
self._canvas.get_tenant_id(),
TenantLLMService.llm_id2llm_type(self._param.llm_id),
self._param.llm_id,
max_retries=self._param.max_retries,
max_rounds=self._param.max_rounds, # 最多 N 轮工具调用
verbose_tool_use=True
)
# 4. 工具元数据
self.tool_meta = [v.get_meta() for _, v in self.tools.items()]
self.toolcall_session = LLMToolPluginCallSession(self.tools, self.callback)
def _invoke(self, **kwargs):
"""执行 Agent"""
# 1. 准备 Prompt 变量
self._prepare_prompt_variables()
# 2. 构建消息历史
msg, sys_prompt = self._sys_prompt_and_msg(self._canvas.history, kwargs)
# 3. 工具调用循环(ReAct)
use_tools = []
for answer in self._react_with_tools_streamly(sys_prompt, msg, use_tools):
if isinstance(answer, partial):
self.set_output("content", answer)
return
# 4. 记录历史
self._canvas.add_user_input(msg[-1]["content"])
self._canvas.add_assistant_output(answer)
def _react_with_tools_streamly(self, prompt, history, use_tools):
"""ReAct 工具调用"""
hist = deepcopy(history)
token_count = 0
last_calling = None
# 工具调用回调
def use_tool(name, args):
nonlocal use_tools
tool_response = self.toolcall_session.tool_call(name, args)
use_tools.append({
"name": name,
"arguments": args,
"results": tool_response
})
return name, tool_response
# 完成回调
def complete():
# 生成最终答案
pass
# 调用 LLM(支持工具)
for delta in self.chat_mdl.chat_streamly_with_tools(
prompt,
hist,
self._param.gen_conf(),
tools=self.tool_meta,
tool_choice="auto",
use_tool_callback=use_tool,
complete_callback=complete
):
if isinstance(delta, str):
yield delta
else:
token_count = delta
工具调用流程:
- LLM 判断是否需要工具(返回 function_call)
- Agent 调用对应工具(通过
use_tool回调) - 工具返回结果,注入 history
- LLM 继续生成(可能再次调用工具)
- 达到 max_rounds 或 LLM 决定停止,生成最终答案
3.3 核心组件详解
3.3.1 LLM 组件
功能:调用 LLM 生成答案,支持流式输出与引用插入
参数:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| llm_id | String | 是 | LLM 模型 ID(如 gpt-4) |
| sys_prompt | String | 是 | System Prompt |
| prompts | List | 是 | 用户 Prompt 列表(支持变量替换) |
| temperature | Float | 否 | 温度参数(默认 0.7) |
| max_tokens | Int | 否 | 最大 Token 数 |
| cite | Bool | 否 | 是否插入引用(默认 True) |
核心代码:
# agent/component/llm.py
class LLM(ComponentBase):
def _invoke(self, **kwargs):
# 1. 准备变量
self._prepare_prompt_variables()
# 2. 构建消息
msg, sys_prompt = self._sys_prompt_and_msg(self._canvas.history, kwargs)
# 3. 调用 LLM(流式)
def gen():
ans = ""
for delta in self.chat_mdl.chat_streamly(sys_prompt, msg, self._param.gen_conf(), images=self.imgs):
if not isinstance(delta, str):
continue
ans += delta
yield delta
# 4. 插入引用(可选)
if self._param.cite and self._canvas.retrieval:
chunks = self._canvas.retrieval[-1].get("chunks", [])
if chunks:
ans = self._gen_citations(ans)
# 5. 记录历史
self._canvas.add_user_input(msg[-1]["content"])
self._canvas.add_assistant_output(ans)
self.set_output("content", partial(gen))
3.3.2 Retrieval 组件
功能:从知识库检索相关 chunks
参数:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| kb_ids | List[String] | 是 | 知识库 ID 列表 |
| similarity_threshold | Float | 否 | 相似度阈值(默认 0.1) |
| top_n | Int | 否 | 返回 Top N(默认 6) |
| rerank_id | String | 否 | Rerank 模型 ID |
核心代码:
# agent/tools/retrieval.py
class Retrieval(ToolBase):
def _invoke(self, **kwargs):
# 1. 获取查询词
query = kwargs.get("query", self._param.query)
# 2. 检索
from rag.nlp.search import Dealer
dealer = Dealer(settings.docStoreConn)
emb_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING)
req = {
"question": query,
"kb_ids": self._param.kb_ids,
"similarity": self._param.similarity_threshold,
"topk": 1024
}
sres = dealer.search(req, search.index_name(tenant_id), self._param.kb_ids, emb_mdl)
# 3. Rerank(可选)
chunks = [sres.field[id] for id in sres.ids]
if self._param.rerank_id:
rerank_mdl = LLMBundle(tenant_id, LLMType.RERANK, rerank_id=self._param.rerank_id)
chunks = rerank_mdl.rerank(query, chunks, top_n=self._param.top_n)
# 4. 缓存结果
self._canvas.retrieval.append({"chunks": chunks, "doc_aggs": sres.aggregation})
# 5. 格式化输出
content = chunks_format(chunks)
self.set_output("content", content)
3.3.3 CodeExec 组件
功能:执行 Python/JavaScript 代码(在 Sandbox 中)
参数:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| lang | String | 是 | 语言(python/javascript) |
| script | String | 是 | 代码(支持变量) |
| arguments | Dict | 否 | 传递给代码的参数 |
核心代码:
# agent/tools/code_exec.py
class CodeExec(ToolBase):
def _execute_code(self, language, code, arguments):
# 1. Base64 编码代码
code_b64 = base64.b64encode(code.encode()).decode()
# 2. 构建请求
req = CodeExecutionRequest(
code_b64=code_b64,
language=language,
arguments=arguments
)
# 3. 调用 Sandbox API
resp = requests.post(
f"http://sandbox-executor-manager:8080/execute",
json=req.model_dump(),
timeout=60
)
# 4. 解析结果
result = CodeExecutionResult(**resp.json())
if result.status != "success":
self.set_output("_ERROR", result.error)
else:
self.set_output("content", result.stdout)
self.set_output("result", result.result)
4. 完整调用链路分析
本章节从上游 API 接口开始,自上而下剖析 Agent 模块的完整调用链路。
4.1 调用入口:API 接口层
4.1.1 Agent 会话创建接口
路径:POST /api/v1/agents/<agent_id>/sessions
文件:api/apps/sdk/session.py:75-96
核心代码:
@manager.route("/agents/<agent_id>/sessions", methods=["POST"])
@token_required
def create_agent_session(tenant_id, agent_id):
# 1. 鉴权与查询 Canvas
user_id = request.args.get("user_id", tenant_id)
e, cvs = UserCanvasService.get_by_id(agent_id)
if not e:
return get_error_data_result("Agent not found.")
# 2. 初始化 Canvas(解析 DSL)
if not isinstance(cvs.dsl, str):
cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
session_id = get_uuid()
canvas = Canvas(cvs.dsl, tenant_id, agent_id)
canvas.reset() # 重置状态:清空 path、history、retrieval
# 3. 生成初始会话(包含 prologue 欢迎语)
cvs.dsl = json.loads(str(canvas))
conv = {
"id": session_id,
"dialog_id": cvs.id,
"user_id": user_id,
"message": [{"role": "assistant", "content": canvas.get_prologue()}],
"source": "agent",
"dsl": cvs.dsl
}
# 4. 持久化到 MySQL
API4ConversationService.save(**conv)
conv["agent_id"] = conv.pop("dialog_id")
return get_result(data=conv)
关键点:
- 从 MySQL 加载 Canvas DSL
- 实例化 Canvas 对象,解析 DSL 中的组件定义
- reset() 清空执行状态,初始化 globals
- 返回会话 ID,客户端后续使用此 ID 发起对话
4.1.2 Agent 对话接口(流式)
路径:POST /api/v1/completion
文件:api/apps/api_app.py:181-296
核心代码:
@manager.route('/completion', methods=['POST'])
@validate_request("conversation_id", "messages")
def completion():
# 1. Token 鉴权
token = request.headers.get('Authorization').split()[1]
objs = APIToken.query(token=token)
# 2. 获取会话与 Canvas
req = request.json
e, conv = API4ConversationService.get_by_id(req["conversation_id"])
msg = req["messages"] # 历史消息列表
if conv.source == "agent":
stream = req.get("stream", True)
# 3. 加载 Canvas DSL
e, cvs = UserCanvasService.get_by_id(conv.dialog_id)
if not isinstance(cvs.dsl, str):
cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
# 4. 实例化 Canvas(包含历史状态)
canvas = Canvas(cvs.dsl, objs[0].tenant_id)
canvas.messages.append(msg[-1])
canvas.add_user_input(msg[-1]["content"])
# 5. 执行工作流(返回生成器)
answer = canvas.run(stream=stream)
if stream:
# 6. 流式 SSE 响应
def sse():
nonlocal answer, cvs, conv
try:
final_ans = {"reference": [], "content": ""}
# 7. 迭代事件流
for ans in answer():
# 事件格式:{"event": "message", "data": {"content": "..."}}
for k in ans.keys():
final_ans[k] = ans[k]
# 格式化为 API 响应
ans = {"answer": ans["content"], "reference": ans.get("reference", [])}
yield "data:" + json.dumps({"code": 0, "data": ans}, ensure_ascii=False) + "\n\n"
# 8. 更新历史
canvas.messages.append({"role": "assistant", "content": final_ans["content"]})
canvas.history.append(("assistant", final_ans["content"]))
# 9. 持久化 Canvas 状态
cvs.dsl = json.loads(str(canvas))
API4ConversationService.append_message(conv.id, conv.to_dict())
except Exception as e:
yield "data:" + json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) + "\n\n"
# 10. 返回 SSE 流
resp = Response(sse(), mimetype="text/event-stream")
resp.headers.add_header("Cache-control", "no-cache")
resp.headers.add_header("Connection", "keep-alive")
return resp
关键点:
- 从持久化的会话中恢复 Canvas 状态(DSL + history)
canvas.run()返回一个partial对象(延迟执行)- 调用
answer()触发执行,返回事件流生成器 - 逐个 yield SSE 事件到客户端
- 完成后更新 DSL 和会话到 MySQL
4.2 Canvas 执行引擎层
4.2.1 Canvas.run() 核心流程
文件:agent/canvas.py:220-403
完整执行流程:
class Canvas(Graph):
def run(self, **kwargs):
st = time.perf_counter()
self.message_id = get_uuid()
created_at = int(time.time())
# ========== 阶段 1:初始化 ==========
# 1.1 重置所有组件状态
for k, cpn in self.components.items():
self.components[k]["obj"].reset(True)
# 1.2 设置全局变量
for k in ["query", "user_id", "files"]:
if kwargs.get(k):
if k == "files":
self.globals[f"sys.{k}"] = self.get_files(kwargs[k])
else:
self.globals[f"sys.{k}"] = kwargs[k]
self.globals["sys.conversation_turns"] += 1
# 1.3 初始化执行路径(从 begin 开始)
if not self.path or self.path[-1].lower().find("userfillup") < 0:
self.path.append("begin")
self.retrieval.append({"chunks": [], "doc_aggs": []})
# 1.4 定义事件装饰器
def decorate(event, dt):
return {
"event": event,
"message_id": self.message_id,
"created_at": created_at,
"task_id": self.task_id,
"data": dt
}
yield decorate("workflow_started", {"inputs": kwargs.get("inputs")})
# ========== 阶段 2:组件批量执行循环 ==========
def _run_batch(f, t):
"""并行执行组件批次"""
with ThreadPoolExecutor(max_workers=5) as executor:
thr = []
for i in range(f, t):
cpn_obj = self.get_component_obj(self.path[i])
# Begin 组件传递 inputs
if cpn_obj.component_name.lower() in ["begin", "userfillup"]:
thr.append(executor.submit(cpn_obj.invoke, inputs=kwargs.get("inputs", {})))
# 其他组件从上游获取输入
else:
thr.append(executor.submit(cpn_obj.invoke, **cpn_obj.get_input()))
# 等待所有组件完成
for t in thr:
t.result()
def _node_finished(cpn_obj):
"""生成 node_finished 事件"""
return decorate("node_finished", {
"inputs": cpn_obj.get_input_values(),
"outputs": cpn_obj.output(),
"component_id": cpn_obj._id,
"component_name": self.get_component_name(cpn_obj._id),
"error": cpn_obj.error(),
"elapsed_time": time.perf_counter() - cpn_obj.output("_created_time")
})
self.error = ""
idx = len(self.path) - 1
partials = [] # 存储延迟执行的组件(如流式 LLM)
# 主循环:处理 path 队列
while idx < len(self.path):
to = len(self.path)
# 2.1 发送 node_started 事件
for i in range(idx, to):
yield decorate("node_started", {
"component_id": self.path[i],
"component_name": self.get_component_name(self.path[i])
})
# 2.2 批量并行执行组件
_run_batch(idx, to)
# ========== 阶段 3:后处理与路径扩展 ==========
for i in range(idx, to):
cpn = self.get_component(self.path[i])
cpn_obj = self.get_component_obj(self.path[i])
# 3.1 处理 Message 组件(流式输出)
if cpn_obj.component_name.lower() == "message":
if isinstance(cpn_obj.output("content"), partial):
# 流式生成
_m = ""
for m in cpn_obj.output("content")():
if not m:
continue
yield decorate("message", {"content": m})
_m += m
cpn_obj.set_output("content", _m)
else:
# 非流式
yield decorate("message", {"content": cpn_obj.output("content")})
cite = re.search(r"\[ID:[ 0-9]+\]", cpn_obj.output("content"))
yield decorate("message_end", {"reference": self.get_reference() if cite else None})
# 3.1.1 处理 partials 队列(之前未发送 node_finished 的组件)
while partials:
_cpn_obj = self.get_component_obj(partials[0])
if isinstance(_cpn_obj.output("content"), partial):
break
yield _node_finished(_cpn_obj)
partials.pop(0)
# 3.2 异常处理
other_branch = False
if cpn_obj.error():
ex = cpn_obj.exception_handler()
if ex and ex["goto"]:
# 跳转到异常处理分支
self.path.extend(ex["goto"])
other_branch = True
elif ex and ex["default_value"]:
# 使用默认值
yield decorate("message", {"content": ex["default_value"]})
yield decorate("message_end", {})
else:
self.error = cpn_obj.error()
# 3.3 发送 node_finished 事件(或延迟)
if cpn_obj.component_name.lower() != "iteration":
if isinstance(cpn_obj.output("content"), partial):
if self.error:
cpn_obj.set_output("content", None)
yield _node_finished(cpn_obj)
else:
partials.append(self.path[i]) # 延迟发送
else:
yield _node_finished(cpn_obj)
# 3.4 路径动态扩展
def _extend_path(cpn_ids):
if other_branch:
return
for cpn_id in cpn_ids:
if self.path[-1] != cpn_id:
self.path.append(cpn_id)
# 根据组件类型扩展路径
if cpn_obj.component_name.lower() == "iterationitem" and cpn_obj.end():
# 循环结束,跳出到父级
iter_obj = cpn_obj.get_parent()
yield _node_finished(iter_obj)
_extend_path(self.get_component(cpn["parent_id"])["downstream"])
elif cpn_obj.component_name.lower() in ["categorize", "switch"]:
# 条件分支:根据输出选择下游
_extend_path(cpn_obj.output("_next"))
elif cpn_obj.component_name.lower() == "iteration":
# 循环开始:添加第一个子组件
_extend_path([cpn_obj.get_start()])
elif not cpn["downstream"] and cpn_obj.get_parent():
# 无下游且有父级:返回父级循环起点
_extend_path([cpn_obj.get_parent().get_start()])
else:
# 正常流转:添加下游组件
_extend_path(cpn["downstream"])
# 3.5 检查错误,终止执行
if self.error:
logging.error(f"Runtime Error: {self.error}")
break
idx = to
# ========== 阶段 4:完成 ==========
self.path = self.path[:idx]
if not self.error:
yield decorate("workflow_finished", {
"inputs": kwargs.get("inputs"),
"outputs": self.get_component_obj(self.path[-1]).output(),
"elapsed_time": time.perf_counter() - st
})
self.history.append(("assistant", self.get_component_obj(self.path[-1]).output()))
关键技术点:
-
动态路径管理:
path是一个队列,初始只有["begin"]- 每个组件执行后,根据类型(switch/iteration/normal)动态添加下游组件
- 支持条件跳转、循环、异常跳转
-
并行执行:
- 使用
ThreadPoolExecutor并行执行同批次组件(max_workers=5) - 同批次指
path[idx:to]区间内的组件(无依赖)
- 使用
-
流式处理:
- 组件可返回
partial对象(延迟执行) - Message 组件调用
partial()触发生成器,逐 Token yield - 流式组件的
node_finished事件延迟到内容生成完成后发送
- 组件可返回
-
异常处理:
- 组件执行失败时,检查
exception_handler() - 支持三种策略:goto(跳转分支)、default_value(使用默认值)、中断执行
- 组件执行失败时,检查
4.3 组件层执行细节
4.3.1 ComponentBase.invoke() 统一入口
文件:agent/component/base.py:420-432
class ComponentBase(ABC):
def invoke(self, **kwargs) -> dict[str, Any]:
self.set_output("_created_time", time.perf_counter())
try:
# 调用子类实现的 _invoke()
self._invoke(**kwargs)
except Exception as e:
# 异常捕获
if self.get_exception_default_value():
self.set_exception_default_value()
else:
self.set_output("_ERROR", str(e))
logging.exception(e)
# 记录执行时间
self.set_output("_elapsed_time", time.perf_counter() - self.output("_created_time"))
return self.output()
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
def _invoke(self, **kwargs):
"""子类必须实现此方法"""
raise NotImplementedError()
关键点:
- 统一入口:所有组件通过
invoke()调用 - 超时保护:默认 10 分钟,通过
@timeout装饰器实现 - 异常捕获:设置
_ERROR输出,触发exception_handler - 性能追踪:记录
_created_time和_elapsed_time
4.3.2 LLM 组件执行流程
文件:agent/component/llm.py:200-240
class LLM(ComponentBase):
def _invoke(self, **kwargs):
# 1. 准备 Prompt 变量(替换 {component_id@key} 和 {sys.xxx})
sys_prompt, msg, user_defined_prompt = self._prepare_prompt_variables()
# 2. 检查是否需要流式输出
downstreams = self._canvas.get_component(self._id)["downstream"]
has_message_downstream = any([
self._canvas.get_component_obj(cid).component_name.lower() == "message"
for cid in downstreams
])
if has_message_downstream and not self._param.output_structure:
# 流式:返回 partial 对象
self.set_output("content", partial(self._stream_and_cite, sys_prompt, msg))
return
# 3. 非流式:直接调用 LLM
ans = self._generate([{"role": "system", "content": sys_prompt}, *msg])
# 4. 插入引用(可选)
if self._param.cite and self._canvas.get_reference()["chunks"]:
ans = self._gen_citations(ans)
# 5. 设置输出与记录历史
self.set_output("content", ans)
self._canvas.add_user_input(msg[-1]["content"])
self._canvas.add_assistant_output(ans)
def _stream_and_cite(self, sys_prompt, msg):
"""流式生成器(延迟执行)"""
ans = ""
for delta in self._generate_streamly([{"role": "system", "content": sys_prompt}, *msg]):
ans += delta
yield delta
# 插入引用
if self._param.cite and self._canvas.get_reference()["chunks"]:
ans = self._gen_citations(ans)
# 记录历史
self._canvas.add_user_input(msg[-1]["content"])
self._canvas.add_assistant_output(ans)
关键点:
- 变量替换:
_prepare_prompt_variables()解析 Prompt 中的{component_id@key} - 流式判断:检查下游是否有 Message 组件,决定是否流式
- 延迟执行:
partial对象封装生成器,由 Canvas 在 Message 组件时调用 - 引用插入:流式完成后,通过 LLM 生成带引用标记的文本
4.3.3 Agent 组件 ReAct 流程
文件:agent/component/agent_with_tools.py:211-334
class Agent(LLM, ToolBase):
def _react_with_tools_streamly(self, prompt, history, use_tools, user_defined_prompt={}):
"""ReAct 工具调用循环"""
token_count = 0
hist = deepcopy(history)
# 1. 任务分析(生成任务描述)
st = timer()
task_desc = analyze_task(self.chat_mdl, prompt, history[-1]["content"], self.tool_meta, user_defined_prompt)
self.callback("analyze_task", {}, task_desc, elapsed_time=timer()-st)
# 2. 工具调用回调
def use_tool(name, args):
nonlocal hist, use_tools
tool_response = self.toolcall_session.tool_call(name, args)
use_tools.append({
"name": name,
"arguments": args,
"results": tool_response
})
return name, tool_response
# 3. 完成回调(生成最终答案)
def complete():
nonlocal hist
# 插入引用提示
need2cite = self._param.cite and self._canvas.get_reference()["chunks"]
if need2cite:
hist[0]["content"] += citation_prompt()
# 流式生成答案
for delta_ans in self._generate_streamly(hist):
yield delta_ans, 0
# 4. ReAct 循环(最多 max_rounds 轮)
for _ in range(self._param.max_rounds + 1):
# 4.1 调用 LLM 决策下一步动作
response, tk = next_step(self.chat_mdl, hist, self.tool_meta, task_desc, user_defined_prompt)
token_count += tk
hist.append({"role": "assistant", "content": response})
try:
# 4.2 解析 LLM 返回的 JSON(工具调用列表)
functions = json_repair.loads(re.sub(r"```.*", "", response))
# 4.3 并行执行工具调用
with ThreadPoolExecutor(max_workers=5) as executor:
thr = []
for func in functions:
name = func["name"]
args = func["arguments"]
# 检查是否完成任务
if name == COMPLETE_TASK:
# 生成最终答案
for txt, tkcnt in complete():
yield txt, tkcnt
return
# 提交工具调用任务
thr.append(executor.submit(use_tool, name, args))
# 4.4 反思工具结果,注入 history
st = timer()
reflection = reflect(self.chat_mdl, hist, [th.result() for th in thr], user_defined_prompt)
hist.append({"role": "user", "content": reflection})
self.callback("reflection", {}, reflection, elapsed_time=timer()-st)
except Exception as e:
# 4.5 工具调用失败,提示 LLM 修正
logging.exception(f"Wrong JSON argument format: {e}")
hist.append({"role": "user", "content": f"Tool call error: {e}"})
# 5. 达到最大轮次,强制生成答案
logging.warning(f"Exceed max rounds: {self._param.max_rounds}")
hist.append({"role": "user", "content": "Provide a FINAL answer based on existing data."})
for txt, tkcnt in complete():
yield txt, tkcnt
关键技术点:
-
任务分析:
analyze_task()使用 LLM 分析用户请求,生成任务描述- 帮助后续 LLM 更好地理解需要完成的目标
-
ReAct 循环:
- LLM 返回 JSON 格式的工具调用列表:
[{"name": "tool1", "arguments": {...}}, ...] - 并行执行所有工具调用(max_workers=5)
- 将工具结果通过
reflect()总结后注入 history
- LLM 返回 JSON 格式的工具调用列表:
-
工具调用会话:
LLMToolPluginCallSession管理工具对象字典- 支持 Component 工具(Retrieval、CodeExec)和 MCP 工具
-
终止条件:
- LLM 返回
COMPLETE_TASK工具调用 - 达到
max_rounds限制(默认 5 轮)
- LLM 返回
4.3.4 Retrieval 组件执行流程
文件:agent/tools/retrieval.py:82-206
class Retrieval(ToolBase, ABC):
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 12)))
def _invoke(self, **kwargs):
# 1. 解析知识库 ID(支持变量引用)
kb_ids = []
for id in self._param.kb_ids:
if id.find("@") < 0:
kb_ids.append(id)
continue
# 从 Canvas globals 获取变量值
kb_nm = self._canvas.get_variable_value(id)
kb_nm_list = kb_nm if isinstance(kb_nm, list) else [kb_nm]
for nm_or_id in kb_nm_list:
e, kb = KnowledgebaseService.get_by_name(nm_or_id, self._canvas._tenant_id)
if not e:
e, kb = KnowledgebaseService.get_by_id(nm_or_id)
kb_ids.append(kb.id)
# 2. 检查知识库有效性
kbs = KnowledgebaseService.get_by_ids(kb_ids)
if not kbs:
raise Exception("No dataset is selected.")
embd_nms = list(set([kb.embd_id for kb in kbs]))
assert len(embd_nms) == 1, "Knowledge bases use different embedding models."
# 3. 初始化 Embedding 和 Rerank 模型
embd_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING, embd_nms[0])
rerank_mdl = None
if self._param.rerank_id:
rerank_mdl = LLMBundle(kbs[0].tenant_id, LLMType.RERANK, self._param.rerank_id)
# 4. 变量替换查询词
query = kwargs["query"]
vars = self.get_input_elements_from_text(query)
vars = {k: o["value"] for k, o in vars.items()}
query = self.string_format(query, vars)
# 5. 元数据过滤(可选)
doc_ids = None
if self._param.meta_data_filter:
metas = DocumentService.get_meta_by_kbs(kb_ids)
if self._param.meta_data_filter.get("method") == "auto":
# LLM 自动生成过滤条件
chat_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.CHAT)
filters = gen_meta_filter(chat_mdl, metas, query)
doc_ids = meta_filter(metas, filters)
# 6. 跨语言检索(可选)
if self._param.cross_languages:
query = cross_languages(kbs[0].tenant_id, None, query, self._param.cross_languages)
# 7. 向量检索
kbinfos = settings.retriever.retrieval(
query,
embd_mdl,
[kb.tenant_id for kb in kbs],
kb_ids,
1, # page
self._param.top_n,
self._param.similarity_threshold,
self._param.similarity_threshold,
self._param.keywords_similarity_weight,
self._param.top_k,
doc_ids=doc_ids,
use_knowledge_graph=self._param.use_kg
)
# 8. Rerank(可选)
if rerank_mdl:
kbinfos["chunks"] = rerank_mdl.rerank(query, kbinfos["chunks"], self._param.top_n)
# 9. 缓存到 Canvas.retrieval
self._canvas.retrieval[-1] = {"chunks": kbinfos["chunks"], "doc_aggs": kbinfos["doc_aggs"]}
# 10. 格式化输出(用于 LLM Prompt)
formated_content = kb_prompt(kbinfos, self.chat_mdl.max_length if hasattr(self, "chat_mdl") else 8192)
self.set_output("formalized_content", formated_content)
self.set_output("chunks", kbinfos["chunks"])
self.set_output("doc_aggs", kbinfos["doc_aggs"])
return formated_content
关键技术点:
-
动态知识库选择:
- 支持从 Canvas globals 动态获取知识库 ID
- 适用于多租户、多知识库场景
-
检索流程:
- Embedding:将 query 向量化
- 向量检索:Elasticsearch 混合检索(向量 + 关键词)
- Rerank(可选):使用 Rerank 模型重新排序
-
元数据过滤:
- 支持手动和自动两种模式
- 自动模式使用 LLM 根据 query 生成过滤条件
-
结果缓存:
- 存储到
Canvas.retrieval[-1] - LLM 组件可通过
get_reference()获取用于引用插入
- 存储到
4.4 数据流与状态传递
4.4.1 组件间数据传递机制
变量引用语法:{component_id@output_key} 或 {sys.variable_name}
解析流程:
- 定义阶段(DSL):
{
"llm_0": {
"obj": {
"params": {
"sys_prompt": "根据以下知识回答:\n{retrieval_0@formalized_content}",
"prompts": [{"role": "user", "content": "{sys.query}"}]
}
}
}
}
- 执行阶段(Runtime):
# agent/component/base.py
class ComponentBase:
def get_input(self) -> dict[str, Any]:
"""获取组件输入"""
ans = {}
for k, v in self.get_input_elements().items():
# 从变量引用获取实际值
ans[k] = v.get("value")
return ans
def get_input_elements(self) -> dict[str, Any]:
"""解析变量引用"""
res = {}
text = self._param.sys_prompt # 示例:包含 {retrieval_0@formalized_content}
# 正则匹配:{component_id@key} 或 {sys.xxx}
for m in re.finditer(self.variable_ref_patt, text):
ref = m.group(1) # 例如:retrieval_0@formalized_content
if ref.find("@") > 0:
# 组件输出引用
arr = ref.split("@")
cpn_id, output_key = arr[0], arr[1]
cpn_obj = self._canvas.get_component_obj(cpn_id)
res[ref] = {"value": cpn_obj.output(output_key)}
else:
# 全局变量引用
res[ref] = {"value": self._canvas.globals.get(ref)}
return res
def string_format(self, text: str, args: dict) -> str:
"""替换变量"""
for k, v in args.items():
text = text.replace("{" + k + "}", str(v))
return text
关键点:
- 组件执行时动态解析变量引用
- 支持跨组件数据传递(通过
component_id@key) - 支持全局变量访问(通过
sys.xxx)
4.4.2 全局状态管理
Canvas 状态字典:
class Canvas(Graph):
def __init__(self, dsl: str, tenant_id=None, task_id=None):
# 全局变量
self.globals = {
"sys.query": "", # 用户当前问题
"sys.user_id": "", # 用户 ID
"sys.conversation_turns": 0, # 对话轮次
"sys.files": [] # 上传文件列表
}
# 对话历史(多轮上下文)
self.history = [] # [("user", "问题1"), ("assistant", "回答1"), ...]
# 检索结果缓存
self.retrieval = [] # [{"chunks": [...], "doc_aggs": {...}}, ...]
# 执行路径
self.path = [] # ["begin", "retrieval_0", "llm_0", "message_0"]
# 消息列表(用于持久化)
self.messages = [] # [{"role": "user", "content": "..."}, ...]
状态更新时机:
-
globals:
Canvas.run()初始化时设置sys.query、sys.user_id、sys.files- 每次对话
sys.conversation_turns自增
-
history:
- LLM/Agent 组件完成后,调用
Canvas.add_assistant_output() - 用于多轮对话上下文管理
- LLM/Agent 组件完成后,调用
-
retrieval:
- Retrieval 组件执行后,更新
Canvas.retrieval[-1] - LLM 组件可获取用于引用插入
- Retrieval 组件执行后,更新
-
path:
- Canvas 执行过程中动态扩展
- 记录完整执行轨迹
4.5 关键代码路径总结
完整调用链(自上而下):
API 请求
↓
api/apps/api_app.py:completion()
├─ APIToken 鉴权
├─ ConversationService.get_by_id() 获取会话
└─ UserCanvasService.get_by_id() 加载 Canvas DSL
↓
agent/canvas.py:Canvas.__init__()
├─ json.loads(dsl) 解析 DSL
└─ Canvas.load() 实例化组件
↓
agent/canvas.py:Canvas.run()
├─ 初始化 globals、path、retrieval
├─ yield "workflow_started"
└─ while idx < len(path): 主循环
├─ yield "node_started"
├─ _run_batch() 并行执行组件
│ ├─ ThreadPoolExecutor.submit(cpn.invoke)
│ └─ 等待所有 Future 完成
│ ↓
│ agent/component/base.py:ComponentBase.invoke()
│ ├─ self._invoke(**kwargs) 调用子类实现
│ └─ 捕获异常,记录执行时间
│ ↓
│ 根据组件类型分发:
│
│ ┌─ agent/component/llm.py:LLM._invoke()
│ │ ├─ _prepare_prompt_variables() 变量替换
│ │ ├─ 检查是否流式(下游有 Message)
│ │ ├─ 流式:set_output("content", partial(...))
│ │ └─ 非流式:self._generate() 直接调用 LLM
│ │ ↓
│ │ api/db/services/llm_service.py:LLMBundle.chat()
│ │ └─ 调用 OpenAI/DeepSeek/Ollama API
│ │
│ ├─ agent/component/agent_with_tools.py:Agent._invoke()
│ │ ├─ _prepare_prompt_variables()
│ │ └─ _react_with_tools_streamly() ReAct 循环
│ │ ├─ analyze_task() 任务分析
│ │ └─ for _ in range(max_rounds):
│ │ ├─ next_step() LLM 决策
│ │ ├─ json_repair.loads() 解析工具调用
│ │ ├─ ThreadPoolExecutor.submit(use_tool)
│ │ │ ↓
│ │ │ LLMToolPluginCallSession.tool_call()
│ │ │ └─ 调用 Retrieval/CodeExec/Web 等工具
│ │ │ ↓
│ │ │ agent/tools/retrieval.py:Retrieval._invoke()
│ │ │ ├─ Elasticsearch 向量检索
│ │ │ ├─ Rerank(可选)
│ │ │ └─ Canvas.retrieval[-1] = chunks
│ │ │
│ │ └─ reflect() 反思工具结果
│ │
│ └─ agent/tools/code_exec.py:CodeExec._invoke()
│ └─ requests.post(Sandbox API)
│
├─ 后处理:
│ ├─ Message 组件:调用 partial() 触发流式生成
│ │ └─ yield "message" 逐 Token 推送
│ ├─ 异常处理:检查 cpn_obj.error()
│ └─ 路径扩展:根据组件类型添加下游到 path
│
├─ yield "node_finished"
└─ idx = to 进入下一批次
├─ yield "workflow_finished"
└─ 返回最终结果
↓
api/apps/api_app.py:sse()
├─ 迭代 Canvas.run() 生成器
├─ 格式化为 SSE 事件
└─ yield "data: {...}\n\n"
↓
HTTP Response (SSE 流)
└─ 客户端接收实时事件
5. 详细时序图
5.1 完整请求处理时序(从 API 到组件)
sequenceDiagram
autonumber
participant Client as 客户端
participant API as API 层<br/>(api_app.py)
participant APIToken as APIToken<br/>鉴权服务
participant ConvService as ConversationService<br/>会话服务
participant CanvasService as UserCanvasService<br/>Canvas 服务
participant MySQL as MySQL<br/>数据库
participant Canvas as Canvas<br/>执行引擎
participant Begin as Begin<br/>组件
participant LLM as LLM<br/>组件
participant LLMBundle as LLMBundle<br/>LLM 服务
participant OpenAI as OpenAI API
Client->>+API: POST /api/v1/completion
Note over Client,API: Headers: Authorization: Bearer xxx<br/>Body: {conversation_id, messages: [...]}
API->>+APIToken: query(token)
APIToken->>MySQL: SELECT * FROM api_token WHERE token=?
MySQL-->>APIToken: token_record
APIToken-->>-API: tenant_id
API->>+ConvService: get_by_id(conversation_id)
ConvService->>MySQL: SELECT * FROM conversation WHERE id=?
MySQL-->>ConvService: conversation record
ConvService-->>-API: conv (包含 dialog_id, dsl, message)
API->>+CanvasService: get_by_id(conv.dialog_id)
CanvasService->>MySQL: SELECT * FROM user_canvas WHERE id=?
MySQL-->>CanvasService: canvas record (包含 DSL)
CanvasService-->>-API: cvs (DSL JSON)
API->>+Canvas: __init__(cvs.dsl, tenant_id)
Canvas->>Canvas: json.loads(dsl)
Canvas->>Canvas: load() - 实例化所有组件
Note over Canvas: 解析 components、graph、history、globals
Canvas-->>-API: canvas 实例
API->>Canvas: add_user_input(msg[-1].content)
API->>+Canvas: run(query=msg.content, user_id=tenant_id)
rect rgb(240, 248, 255)
Note over Canvas: ========== Canvas 执行流程 ==========
Canvas->>Canvas: 初始化 globals<br/>sys.query, sys.user_id, sys.conversation_turns++
Canvas->>Canvas: path.append("begin")
Canvas-->>API: yield {event: "workflow_started", data: {...}}
Canvas->>+Begin: invoke(inputs={})
Begin->>Begin: 设置初始变量
Begin-->>-Canvas: outputs = {}
Canvas-->>API: yield {event: "node_finished", component_id: "begin"}
Canvas->>Canvas: path.extend(["llm_0"])
Canvas-->>API: yield {event: "node_started", component_id: "llm_0"}
Canvas->>+LLM: invoke()
LLM->>LLM: _prepare_prompt_variables()<br/>替换 {sys.query} → 实际问题
LLM->>LLM: 检查下游是否有 Message → 是
LLM->>LLM: set_output("content", partial(stream_gen))
Note over LLM: 返回 partial 对象(延迟执行)
LLM-->>-Canvas: outputs = {content: <partial>}
Canvas->>Canvas: 检测到 partial,暂不发送 node_finished
Canvas->>Canvas: path.extend(["message_0"])
Canvas-->>API: yield {event: "node_started", component_id: "message_0"}
Canvas->>Canvas: 发现 Message 组件,调用 partial()
Canvas->>+LLM: content.partial() - 触发流式生成
LLM->>+LLMBundle: chat_streamly(sys_prompt, msg, gen_conf)
LLMBundle->>+OpenAI: POST /v1/chat/completions (stream=true)
loop 流式 Token 生成
OpenAI-->>LLMBundle: {"delta": {"content": "tok"}}
LLMBundle-->>LLM: yield "tok"
LLM-->>Canvas: yield "tok"
Canvas-->>API: yield {event: "message", data: {content: "tok"}}
API-->>Client: data: {code: 0, data: {answer: "tok"}}\n\n
end
OpenAI-->>-LLMBundle: {"finish_reason": "stop"}
LLMBundle-->>-LLM: 完成
LLM->>Canvas: add_assistant_output(full_answer)
LLM-->>-Canvas: 生成完毕
Canvas->>Canvas: 检查是否有引用
Canvas-->>API: yield {event: "message_end", data: {reference: [...]}}
Canvas-->>API: yield {event: "node_finished", component_id: "llm_0"}
Canvas-->>API: yield {event: "workflow_finished", outputs: {...}}
end
Canvas-->>-API: 返回最终结果
API->>Canvas: str(canvas) → DSL JSON
API->>+CanvasService: update_by_id(dialog_id, cvs.to_dict())
CanvasService->>MySQL: UPDATE user_canvas SET dsl=?
MySQL-->>CanvasService: OK
CanvasService-->>-API: 完成
API->>+ConvService: append_message(conv_id, conv.to_dict())
ConvService->>MySQL: UPDATE conversation SET message=?, reference=?
MySQL-->>ConvService: OK
ConvService-->>-API: 完成
API-->>Client: data: {code: 0, data: true}\n\n
API-->>-Client: SSE 连接关闭
时序图说明:
-
鉴权阶段(步骤 1-6):
- 客户端携带 Bearer Token 发起请求
- API 层通过 APIToken 服务验证 Token,获取 tenant_id
- 加载会话记录(Conversation)和 Canvas DSL
-
Canvas 初始化(步骤 7-10):
- 解析 DSL JSON,实例化所有组件对象
- 恢复会话状态(history、globals、retrieval)
-
组件执行流程(步骤 11-35):
- Begin 组件:初始化入口变量
- LLM 组件:
- 变量替换(
{sys.query}→ 实际问题) - 检测到下游有 Message,返回
partial对象
- 变量替换(
- Message 组件:
- 触发
partial()执行 - LLM 流式生成,逐 Token yield 到客户端
- 触发
-
流式响应(步骤 25-30):
- OpenAI API 返回流式 Token
- 通过 LLMBundle → LLM → Canvas → API → Client 逐层传递
- 客户端实时接收 SSE 事件
-
状态持久化(步骤 37-45):
- 更新 Canvas DSL 到 MySQL
- 更新会话消息和引用到 MySQL
5.2 Agent ReAct 工具调用时序
sequenceDiagram
autonumber
participant Client as 客户端
participant Canvas as Canvas<br/>执行引擎
participant Agent as Agent<br/>组件
participant LLMBundle as LLMBundle<br/>LLM 服务
participant PromptGen as Prompt<br/>生成器
participant ThreadPool as ThreadPool<br/>Executor
participant Retrieval as Retrieval<br/>工具
participant CodeExec as CodeExec<br/>工具
participant Sandbox as Sandbox<br/>沙箱
participant ES as Elasticsearch<br/>检索引擎
Client->>+Canvas: run(query="分析用户增长趋势并生成图表")
Canvas->>+Agent: invoke()
Agent->>Agent: _prepare_prompt_variables()
Agent->>+PromptGen: analyze_task(chat_mdl, prompt, query, tool_meta)
PromptGen->>LLMBundle: chat("分析以下任务需要哪些工具...")
LLMBundle-->>PromptGen: task_desc = "需要检索用户数据,分析趋势,生成图表"
PromptGen-->>-Agent: task_desc
Agent->>Canvas: callback("analyze_task", task_desc)
rect rgb(255, 250, 240)
Note over Agent: ========== ReAct 循环开始 ==========
Agent->>Agent: for round in range(max_rounds):
Agent->>+PromptGen: next_step(chat_mdl, hist, tool_meta, task_desc)
PromptGen->>LLMBundle: chat("根据任务描述,决定下一步操作...")
LLMBundle-->>PromptGen: response = [{"name": "search_my_dataset", "arguments": {"query": "用户增长数据"}}, ...]
PromptGen-->>-Agent: response (JSON 工具调用列表)
Agent->>Agent: hist.append({role: "assistant", content: response})
Agent->>Agent: json_repair.loads(response) → functions
rect rgb(240, 255, 240)
Note over Agent,CodeExec: ========== 并行工具调用 ==========
Agent->>+ThreadPool: submit(use_tool, "search_my_dataset", args)
Agent->>+ThreadPool: submit(use_tool, "code_exec", args)
par 并行执行工具
ThreadPool->>+Retrieval: tool_call("search_my_dataset", args)
Retrieval->>Retrieval: 解析 kb_ids, query
Retrieval->>+ES: 向量检索 + 关键词检索
ES-->>-Retrieval: chunks = [{content: "...", doc_id: "..."}]
Retrieval->>Retrieval: Rerank(可选)
Retrieval->>Canvas: retrieval[-1] = {chunks, doc_aggs}
Retrieval->>Retrieval: kb_prompt(chunks) → formated_content
Retrieval-->>-ThreadPool: formated_content
and
ThreadPool->>+CodeExec: tool_call("code_exec", args)
CodeExec->>CodeExec: base64.encode(code)
CodeExec->>+Sandbox: POST /execute {code_b64, language: "python"}
Sandbox->>Sandbox: 创建隔离容器
Sandbox->>Sandbox: 执行 Python 代码
Sandbox-->>-CodeExec: {status: "success", stdout: "...", result: {...}}
CodeExec->>CodeExec: set_output("content", stdout)
CodeExec-->>-ThreadPool: stdout
end
ThreadPool-->>Agent: [("search_my_dataset", result1), ("code_exec", result2)]
ThreadPool-->>-Agent: 所有工具调用完成
end
Agent->>Agent: use_tools.append({name, arguments, results})
Agent->>+PromptGen: reflect(chat_mdl, hist, tool_results)
PromptGen->>LLMBundle: chat("总结以下工具调用结果...")
LLMBundle-->>PromptGen: reflection = "检索到用户增长数据,代码执行成功生成图表"
PromptGen-->>-Agent: reflection
Agent->>Agent: hist.append({role: "user", content: reflection})
Agent->>Canvas: callback("reflection", reflection)
Agent->>+PromptGen: next_step(chat_mdl, hist, tool_meta, task_desc)
PromptGen->>LLMBundle: chat("根据工具结果,决定下一步...")
LLMBundle-->>PromptGen: response = [{"name": "COMPLETE_TASK", "arguments": {}}]
PromptGen-->>-Agent: response
Agent->>Agent: 检测到 COMPLETE_TASK,退出循环
end
rect rgb(255, 240, 240)
Note over Agent,LLMBundle: ========== 生成最终答案 ==========
Agent->>Agent: complete() 回调
Agent->>Agent: hist[0].content += citation_prompt()
Agent->>+LLMBundle: chat_streamly(hist)
loop 流式生成
LLMBundle-->>Agent: yield "根据检索到的数据分析..."
Agent-->>Canvas: yield "根据检索到的数据分析..."
Canvas-->>Client: SSE event: {event: "message", content: "..."}
end
LLMBundle-->>-Agent: 完成
end
Agent->>Canvas: set_output("content", final_answer)
Agent->>Canvas: set_output("use_tools", use_tools)
Agent-->>-Canvas: 返回
Canvas->>Canvas: add_assistant_output(final_answer)
Canvas-->>Client: yield {event: "workflow_finished"}
Canvas-->>-Client: 完成
时序图说明:
-
任务分析阶段(步骤 1-6):
analyze_task()使用 LLM 分析用户请求- 生成任务描述,帮助后续决策
-
ReAct 循环(步骤 7-28):
- 决策阶段:
next_step()调用 LLM 决定需要哪些工具 - 执行阶段:使用 ThreadPoolExecutor 并行调用多个工具
- 反思阶段:
reflect()总结工具结果,注入到 history
- 决策阶段:
-
并行工具调用(步骤 13-23):
- Retrieval 工具:
- Elasticsearch 向量检索 + 关键词检索
- Rerank 重新排序
- 格式化为 Prompt 可用文本
- CodeExec 工具:
- Base64 编码代码
- 发送到 Sandbox 执行
- 返回 stdout 和 result
- Retrieval 工具:
-
终止判断(步骤 29-32):
- LLM 返回
COMPLETE_TASK工具调用 - 退出 ReAct 循环
- LLM 返回
-
最终答案生成(步骤 33-40):
- 插入引用提示(citation_prompt)
- 流式生成综合答案
- 返回给客户端
5.3 Canvas 组件批量执行时序
sequenceDiagram
autonumber
participant Canvas as Canvas<br/>执行引擎
participant ThreadPool as ThreadPool<br/>Executor
participant Begin as Begin<br/>组件
participant Retrieval as Retrieval<br/>组件
participant LLM as LLM<br/>组件
participant Message as Message<br/>组件
participant ES as Elasticsearch
participant OpenAI as OpenAI API
Canvas->>Canvas: run(query="RAGFlow 如何部署?")
Canvas->>Canvas: path = ["begin"]
Canvas->>Canvas: globals["sys.query"] = "RAGFlow 如何部署?"
rect rgb(240, 248, 255)
Note over Canvas: ========== 批次 1:Begin ==========
Canvas->>Canvas: idx = 0, to = 1
Canvas->>Canvas: yield {event: "node_started", component_id: "begin"}
Canvas->>+ThreadPool: _run_batch(0, 1)
ThreadPool->>+Begin: invoke(inputs={})
Begin->>Begin: 设置初始变量
Begin->>Begin: set_output("outputs", {})
Begin-->>-ThreadPool: outputs = {}
ThreadPool-->>-Canvas: 批次完成
Canvas->>Begin: get_component("begin")["downstream"]
Canvas->>Canvas: path.extend(["retrieval_0"])
Canvas->>Canvas: yield {event: "node_finished", component_id: "begin"}
end
rect rgb(240, 255, 240)
Note over Canvas: ========== 批次 2:Retrieval ==========
Canvas->>Canvas: idx = 1, to = 2
Canvas->>Canvas: yield {event: "node_started", component_id: "retrieval_0"}
Canvas->>+ThreadPool: _run_batch(1, 2)
ThreadPool->>+Retrieval: invoke(query="{sys.query}")
Retrieval->>Retrieval: get_input() → {query: "RAGFlow 如何部署?"}
Retrieval->>Retrieval: string_format(query, vars)
Retrieval->>+ES: 向量检索(query, kb_ids, top_n=8)
ES-->>-Retrieval: chunks = [{content: "...", score: 0.89}, ...]
Retrieval->>Canvas: retrieval[-1] = {chunks, doc_aggs}
Retrieval->>Retrieval: kb_prompt(chunks) → formated_content
Retrieval->>Retrieval: set_output("formalized_content", formated_content)
Retrieval-->>-ThreadPool: outputs = {formalized_content: "..."}
ThreadPool-->>-Canvas: 批次完成
Canvas->>Retrieval: get_component("retrieval_0")["downstream"]
Canvas->>Canvas: path.extend(["llm_0"])
Canvas->>Canvas: yield {event: "node_finished", component_id: "retrieval_0"}
end
rect rgb(255, 250, 240)
Note over Canvas: ========== 批次 3:LLM ==========
Canvas->>Canvas: idx = 2, to = 3
Canvas->>Canvas: yield {event: "node_started", component_id: "llm_0"}
Canvas->>+ThreadPool: _run_batch(2, 3)
ThreadPool->>+LLM: invoke()
LLM->>LLM: get_input() → {knowledge: formalized_content, query: "..."}
LLM->>LLM: sys_prompt = "根据以下知识回答:\n{retrieval_0@formalized_content}"
LLM->>LLM: string_format(sys_prompt, {知识内容})
LLM->>LLM: 检查下游是否有 Message → 是
LLM->>LLM: set_output("content", partial(_stream_and_cite))
Note over LLM: 返回 partial 对象,延迟执行
LLM-->>-ThreadPool: outputs = {content: <partial>}
ThreadPool-->>-Canvas: 批次完成
Canvas->>Canvas: 检测到 partial,加入 partials 队列
Canvas->>LLM: get_component("llm_0")["downstream"]
Canvas->>Canvas: path.extend(["message_0"])
Note over Canvas: 暂不发送 node_finished
end
rect rgb(255, 240, 255)
Note over Canvas: ========== 批次 4:Message(触发流式) ==========
Canvas->>Canvas: idx = 3, to = 4
Canvas->>Canvas: yield {event: "node_started", component_id: "message_0"}
Canvas->>+ThreadPool: _run_batch(3, 4)
ThreadPool->>+Message: invoke(content="{llm_0@content}")
Message->>Message: get_input() → {content: <partial>}
Message->>Message: set_output("content", <partial>)
Message-->>-ThreadPool: outputs = {content: <partial>}
ThreadPool-->>-Canvas: 批次完成
Note over Canvas: 后处理:发现 Message 组件
Canvas->>Canvas: 检测到 Message.output("content") 是 partial
Canvas->>+Message: output("content")() → 调用 partial
Message->>+LLM: _stream_and_cite() 生成器
LLM->>+OpenAI: POST /v1/chat/completions (stream=true)
loop 流式 Token 生成
OpenAI-->>LLM: {"delta": {"content": "tok"}}
LLM-->>Message: yield "tok"
Message-->>Canvas: yield "tok"
Canvas->>Canvas: yield {event: "message", data: {content: "tok"}}
end
OpenAI-->>-LLM: {"finish_reason": "stop"}
LLM->>Canvas: add_assistant_output(full_answer)
LLM-->>-Message: 生成完毕
Message-->>-Canvas: 完成
Canvas->>Canvas: set_output("content", full_answer)
Canvas->>Canvas: 检查引用:re.search(r"\[ID:[ 0-9]+\]", content)
Canvas->>Canvas: yield {event: "message_end", data: {reference: [...]}}
Note over Canvas: 处理 partials 队列
Canvas->>Canvas: partials = ["llm_0"]
Canvas->>Canvas: yield {event: "node_finished", component_id: "llm_0"}
Canvas->>Canvas: yield {event: "node_finished", component_id: "message_0"}
end
Canvas->>Canvas: path = ["begin", "retrieval_0", "llm_0", "message_0"]
Canvas->>Canvas: idx = 4, len(path) = 4 → 退出循环
Canvas->>Canvas: yield {event: "workflow_finished", outputs: {...}}
时序图说明:
-
批次划分:
- Canvas 将 path 分为多个批次
- 每个批次包含当前 path[idx:to] 区间的组件
- 批次内组件并行执行
-
并行执行:
- 使用
ThreadPoolExecutor(max_workers=5) - 同时提交批次内所有组件的
invoke()调用 - 等待所有 Future 完成后继续
- 使用
-
路径扩展:
- 每个组件执行后,根据
downstream扩展 path - 动态添加下游组件到执行队列
- 每个组件执行后,根据
-
流式处理:
- LLM 返回
partial对象 - Message 组件调用
partial()触发生成器 - 逐 Token yield 到客户端
- LLM 返回
-
事件顺序:
node_started→ 批量执行 →node_finishedmessage事件在流式生成时逐个发送message_end包含引用信息
5.4 模块间交互总结
关键交互模式:
-
API → Canvas:
- API 层负责鉴权、会话管理、SSE 流式响应
- Canvas 负责工作流编排、组件调度、状态管理
-
Canvas → Component:
- Canvas 通过
invoke()统一调用组件 - 组件通过变量引用
{component_id@key}获取输入 - 组件通过
set_output()设置输出
- Canvas 通过
-
Component → LLMBundle:
- LLM/Agent 组件通过 LLMBundle 调用 LLM
- LLMBundle 统一管理 OpenAI/DeepSeek/Ollama/本地模型
-
Component → External Service:
- Retrieval → Elasticsearch(向量检索)
- CodeExec → Sandbox(代码执行)
- WebSearch → Tavily/DuckDuckGo(联网搜索)
-
流式传递链:
OpenAI API → LLMBundle → LLM Component → Canvas → API Layer → SSE → Client
6. 最佳实践与案例
6.1 简单问答 Agent
DSL 示例:
{
"components": {
"begin": {
"obj": {"component_name": "Begin", "params": {}},
"downstream": ["retrieval_0"]
},
"retrieval_0": {
"obj": {
"component_name": "Retrieval",
"params": {
"kb_ids": ["kb_uuid_1"],
"query": "{sys.query}",
"top_n": 6
}
},
"downstream": ["llm_0"]
},
"llm_0": {
"obj": {
"component_name": "LLM",
"params": {
"llm_id": "gpt-4",
"sys_prompt": "根据以下知识回答问题:\n{retrieval_0@content}",
"prompts": [{"role": "user", "content": "{sys.query}"}]
}
},
"downstream": ["message_0"]
},
"message_0": {
"obj": {
"component_name": "Message",
"params": {"content": "{llm_0@content}"}
},
"downstream": []
}
}
}
6.2 ReAct Agent(多工具调用)
DSL 示例:
{
"components": {
"begin": {"...": "..."},
"agent_0": {
"obj": {
"component_name": "Agent",
"params": {
"llm_id": "gpt-4",
"sys_prompt": "你是一个智能助手,可以使用以下工具完成任务。",
"tools": [
{
"component_name": "Retrieval",
"name": "knowledge_search",
"params": {"kb_ids": ["kb_uuid_1"]}
},
{
"component_name": "CodeExec",
"name": "code_interpreter",
"params": {"lang": "python"}
},
{
"component_name": "WebSearch",
"name": "web_search",
"params": {"engine": "tavily"}
}
],
"max_rounds": 5
}
},
"downstream": ["message_0"]
}
}
}
6.3 条件分支 Agent
DSL 示例:
{
"components": {
"categorize_0": {
"obj": {
"component_name": "Categorize",
"params": {
"llm_id": "gpt-4",
"items": [
{"key": "technical", "content": "{sys.query}", "examples": "代码错误、部署问题"},
{"key": "business", "content": "{sys.query}", "examples": "产品功能、价格咨询"}
]
}
},
"downstream": {
"technical": ["agent_tech"],
"business": ["agent_biz"]
}
},
"agent_tech": {"...": "技术支持 Agent"},
"agent_biz": {"...": "业务咨询 Agent"}
}
}
7. 性能优化建议
1. 并行执行:
- 无依赖的组件并行执行(ThreadPoolExecutor)
- 设置合理的 max_workers(默认 5)
2. 缓存策略:
- 检索结果缓存到 Canvas.retrieval
- LLM 响应缓存(相同 Prompt)
3. 超时控制:
- 组件级超时(通过
@timeout装饰器) - 默认超时:LLM 10 分钟,CodeExec 5 分钟
4. 异常处理:
- 组件失败时使用 exception_handler 回退
- 设置 max_retries(默认 0)