RAGFlow-01-API模块
模块概览
1.1 职责与定位
API 模块是 RAGFlow 的统一入口层,承担以下核心职责:
- HTTP API 网关:提供 RESTful API,接收前端/SDK 请求,实现路由分发
- 认证与授权:基于 Flask-Login + JWT Token 实现用户认证,支持 OAuth/OIDC 单点登录
- 业务服务编排:封装业务逻辑到 Service 层,协调 RAG/Agent/DeepDoc 等引擎模块
- 数据持久化:通过 Peewee ORM 操作 MySQL/PostgreSQL,管理用户、知识库、文档、对话等元数据
- 任务调度:将文档解析、GraphRAG 构建等耗时任务推送到 Redis 队列,由 TaskExecutor 异步处理
- API 文档生成:集成 Flasgger 自动生成 Swagger 文档(访问
/apidocs/)
1.2 输入与输出
输入:
- HTTP 请求(JSON 格式),包含 Authorization Header(Bearer Token)或 Session Cookie
- Web Socket 连接(用于实时通知,如文档解析进度)
- SDK 调用(Python/JavaScript),封装 HTTP 请求
输出:
- JSON 响应(统一格式:
{code, message, data}) - SSE 流式响应(对话生成)
- 静态文件(文档下载、图片预览)
1.3 上下游依赖
上游调用方:
- Web 前端(React SPA)
- Python SDK(
ragflow_sdk) - 第三方应用(通过 API Key 调用)
下游依赖:
- MySQL/PostgreSQL:元数据存储(用户、知识库、文档、对话、任务)
- Redis:Session 存储、任务队列(Redis Stream)、LLM 缓存、分布式锁
- Elasticsearch/Infinity:向量检索与全文检索
- MinIO/S3:文件存储(原始文档、图片、头像)
- RAG 模块:检索、重排、生成
- Agent 模块:工作流执行
- DeepDoc 模块:文档解析(通过 TaskExecutor 异步调用)
- GraphRAG 模块:图谱构建(通过 TaskExecutor 异步调用)
1.4 生命周期
启动阶段:
ragflow_server.py加载配置(settings.py、service_conf.yaml)- 初始化数据库连接池(MySQL、Redis、ES、MinIO)
- 注册 Blueprint 路由(
user_app、kb_app、document_app、dialog_app、sdk/*) - 启动 Flask HTTP 服务器(默认端口 80)
- 启动后台线程
update_progress(定期更新文档解析进度)
运行阶段:
- HTTP 请求到达 → 认证装饰器
@login_required/@token_required校验 → 路由分发 → Service 层处理 → 返回响应 - 长时任务通过 Redis Stream 推送到队列,立即返回任务 ID
停止阶段:
- 接收 SIGINT/SIGTERM 信号 → 关闭 MCP 连接 → 停止后台线程 → 关闭数据库连接
1. 服务启动与初始化架构
1.1 服务启动流程图
flowchart TB
Start([ragflow_server.py 启动]) --> InitLogger[初始化日志系统]
InitLogger --> InitSettings[加载配置文件<br/>settings.init_settings]
InitSettings --> InitDB[初始化数据库连接池<br/>init_web_db]
InitDB --> InitData[初始化默认数据<br/>init_web_data]
InitData --> InitRuntimeConfig[初始化运行时配置<br/>RuntimeConfig.init_env]
InitRuntimeConfig --> RegisterBlueprints[注册路由 Blueprint<br/>扫描 apps/ 目录]
RegisterBlueprints --> InitPlugins[加载插件<br/>GlobalPluginManager.load_plugins]
InitPlugins --> StartBackgroundThread[启动后台线程<br/>update_progress]
StartBackgroundThread --> StartHTTPServer[启动 HTTP 服务器<br/>Werkzeug run_simple]
StartHTTPServer --> Ready([服务就绪])
subgraph "配置初始化详情"
InitSettings --> LoadYAML[加载 service_conf.yaml]
LoadYAML --> LoadLLMFactories[加载 llm_factories.json]
LoadLLMFactories --> ConnectDB[连接 MySQL/PostgreSQL]
ConnectDB --> ConnectRedis[连接 Redis]
ConnectRedis --> ConnectES[连接 Elasticsearch/Infinity]
ConnectES --> ConnectMinIO[连接 MinIO/S3]
ConnectMinIO --> InitRetriever[初始化 Retriever]
end
启动流程要点说明:
- 日志初始化:调用
init_root_logger("ragflow_server"),配置日志级别与格式 - 配置加载:
- 加载
conf/service_conf.yaml:数据库、Redis、ES、MinIO 连接信息 - 加载
conf/llm_factories.json:LLM 工厂配置(OpenAI、Ollama、DeepSeek 等) - 加载环境变量:覆盖配置文件中的敏感信息(如 API Key)
- 加载
- 数据库初始化:
init_web_db():创建所有 Peewee Model 对应的表(User、Tenant、Knowledgebase、Document 等)init_web_data():插入默认数据(默认租户、LLM 配置等)
- Blueprint 注册:
- 扫描
api/apps/目录下所有*_app.py文件 - 为每个 Blueprint 分配 URL 前缀(如
/v1/user、/api/v1/datasets)
- 扫描
- 后台线程启动:
update_progress线程:每 6 秒更新文档解析进度(从 Redis 读取状态更新到 MySQL)
- HTTP 服务器启动:
- 使用 Werkzeug 内置服务器(生产环境建议使用 Gunicorn)
- 默认监听
0.0.0.0:80(可通过配置修改)
1.2 整体模块架构图
flowchart TB
subgraph "外部请求层"
WebUI[Web UI<br/>React SPA]
SDK[Python SDK]
HTTP[第三方 HTTP Client]
end
subgraph "API 层(Flask 应用)"
Server[ragflow_server.py<br/>Flask App 入口]
subgraph "认证与中间件层"
LoginManager[login_manager<br/>Flask-Login]
TokenAuth[token_required<br/>JWT 校验装饰器]
OAuth[OAuth/OIDC<br/>第三方登录适配器]
CORS[CORS 跨域]
ErrorHandler[全局异常处理器]
end
subgraph "路由层(Blueprint)"
direction LR
UserApp[user_app<br/>用户管理]
KBApp[kb_app<br/>知识库管理]
DocApp[document_app<br/>文档管理]
DialogApp[dialog_app<br/>对话配置]
ConvApp[conversation_app<br/>会话管理]
CanvasApp[canvas_app<br/>Agent 画布]
ChunkApp[chunk_app<br/>切片管理]
SearchApp[search_app<br/>检索管理]
SDKRoutes[sdk/*<br/>SDK 专用路由]
end
subgraph "Service 业务服务层"
direction TB
UserSvc[UserService<br/>用户/租户]
KBSvc[KnowledgebaseService<br/>知识库]
DocSvc[DocumentService<br/>文档]
FileSvc[FileService<br/>文件]
DialogSvc[DialogService<br/>对话配置]
ConvSvc[ConversationService<br/>会话]
TaskSvc[TaskService<br/>任务调度]
LLMSvc[LLMService<br/>LLM 调用封装]
TenantLLMSvc[TenantLLMService<br/>租户 LLM 配置]
end
subgraph "数据访问层(ORM)"
Models[(db_models.py<br/>Peewee Models<br/>User/Tenant/KB/Doc/...)]
end
end
subgraph "核心引擎层"
direction TB
RAGEngine[RAG Engine<br/>检索增强生成]
RAGRetriever[Retriever<br/>混合检索]
RAGRerank[Reranker<br/>重排序]
RAGGenerator[Generator<br/>LLM 生成]
AgentEngine[Agent Engine<br/>工作流执行]
AgentCanvas[Canvas<br/>DAG 图执行器]
AgentComponents[Components<br/>LLM/Tool/Switch/Iteration]
end
subgraph "存储层"
MySQL[(MySQL/PG<br/>元数据存储)]
Redis[(Redis<br/>Session/Queue/Cache)]
ES[(Elasticsearch/Infinity<br/>向量+全文检索)]
MinIO[(MinIO/S3<br/>文件对象存储)]
end
subgraph "外部依赖层"
LLMProviders[LLM 提供商<br/>OpenAI/Ollama/DeepSeek/Azure]
EmbeddingProviders[Embedding 模型<br/>bge-large-zh/bce-embedding]
RerankProviders[Rerank 模型<br/>bge-reranker-v2-m3]
end
WebUI --> Server
SDK --> Server
HTTP --> Server
Server --> LoginManager
Server --> TokenAuth
Server --> OAuth
Server --> CORS
Server --> ErrorHandler
LoginManager --> UserApp
TokenAuth --> SDKRoutes
UserApp --> UserSvc
KBApp --> KBSvc
DocApp --> DocSvc
DocApp --> FileSvc
DialogApp --> DialogSvc
ConvApp --> ConvSvc
ConvApp --> DialogSvc
CanvasApp --> AgentEngine
ChunkApp --> DocSvc
SearchApp --> RAGEngine
UserSvc --> Models
KBSvc --> Models
DocSvc --> Models
DialogSvc --> Models
ConvSvc --> Models
FileSvc --> Models
TaskSvc --> Models
Models --> MySQL
ConvSvc --> DialogSvc
DialogSvc --> RAGEngine
DialogSvc --> LLMSvc
LLMSvc --> TenantLLMSvc
RAGEngine --> RAGRetriever
RAGEngine --> RAGRerank
RAGEngine --> RAGGenerator
RAGRetriever --> ES
RAGGenerator --> LLMProviders
RAGRerank --> RerankProviders
AgentEngine --> AgentCanvas
AgentCanvas --> AgentComponents
AgentComponents --> LLMProviders
TaskSvc -.推送任务.-> Redis
FileSvc --> MinIO
TenantLLMSvc --> Redis
LoginManager --> Redis
LLMSvc --> EmbeddingProviders
1.3 模块交互矩阵
| 调用方 | 被调方 | 调用场景 | 同步/异步 | 错误语义 | 一致性要求 |
|---|---|---|---|---|---|
| user_app | UserService | 登录、注册、修改用户信息 | 同步 | 认证失败返回 401 | 强一致(MySQL 事务) |
| kb_app | KnowledgebaseService | 创建、更新、删除知识库 | 同步 | 名称冲突返回 400 | 强一致(UNIQUE 约束) |
| document_app | FileService + DocumentService | 上传文档 | 同步(上传)+ 异步(解析) | 文件格式错误返回 415 | 最终一致(任务队列) |
| document_app | TaskService | 推送解析任务 | 异步 | 队列满返回 503 | 最终一致(At-Least-Once) |
| conversation_app | DialogService | 对话配置 | 同步 | 知识库不存在返回 404 | 强一致 |
| conversation_app | RAG Engine | 检索 + 生成 | 流式同步 | ES 超时返回 504 | 弱一致(缓存可用) |
| canvas_app | Agent Engine | 工作流执行 | 同步/流式 | 组件执行失败返回 500 | 无需一致性(无状态) |
| dialog_service | Retriever | 混合检索 | 同步 | 索引不存在返回 404 | 弱一致(ES Refresh) |
| dialog_service | LLMBundle | LLM 生成 | 同步/流式 | API Key 无效返回 401 | 无需一致性 |
| task_service | Redis Queue | 推送任务 | 异步 | 连接失败返回 503 | At-Least-Once |
| TaskExecutor | DeepDoc Parser | 文档解析 | 同步 | 解析失败更新 progress=-1 | 最终一致 |
| TaskExecutor | Elasticsearch | 批量索引 | 同步 | 批量失败部分重试 | 最终一致 |
关键设计决策:
-
同步 vs 异步:
- 同步:元数据操作(CRUD)、认证、检索、生成
- 异步:文档解析、GraphRAG 构建、Raptor 聚类
-
一致性保证:
- 强一致:用户、知识库、对话配置(MySQL 事务 + UNIQUE 约束)
- 最终一致:文档解析进度(任务队列 + 定时同步)
- 弱一致:LLM 缓存、检索结果(可接受短暂过期)
-
错误语义:
- 4xx 客户端错误:参数错误、认证失败、权限不足(不应重试)
- 5xx 服务器错误:数据库连接失败、ES 超时、LLM API 错误(可重试)
架构要点说明
1. 分层职责明确
- 路由层(Blueprint):每个
*_app.py文件负责一类资源的 API 路由,使用 Flask Blueprint 注册到应用 - Service 层:封装业务逻辑,隔离路由层与数据访问层,单一职责原则
- ORM 层:Peewee 模型定义,提供类型安全的数据库操作,支持事务与连接池
2. 认证机制
- Session 认证(Web UI):Flask-Session 存储在 Redis,支持多实例共享
- Token 认证(SDK/API):JWT Token 放在 Authorization Header,无状态
- OAuth/OIDC:支持 GitHub、Google 等第三方登录,配置在
service_conf.yaml的oauth节
3. 扩展点
- 自定义装饰器:
@login_required、@token_required、@check_team_permission - 错误处理:全局异常捕获
app.errorhandler(Exception),返回统一 JSON 错误响应 - 请求校验:Pydantic 模型(
api/validation.py)校验请求参数,自动生成 API 文档
4. 状态持有位置
- 用户 Session:Redis(key:
flask_session:<session_id>,TTL: 7 天) - 任务进度:MySQL
Task表(progress字段 0-1,status字段枚举) - 对话上下文:MySQL
Conversation表(多轮对话历史)
5. 资源占用
- 内存:Flask Worker 进程 ~200MB/进程,建议 4-8 进程
- 连接数:MySQL 连接池 32,Redis 连接池 10,ES 连接池 10
- QPS 上界:单实例 ~500 QPS(无 LLM 调用),受 MySQL 与 ES 性能限制
2. 数据模型与数据库设计
2.1 核心数据模型 UML 图
classDiagram
class User {
+String id [PK]
+String email [UQ]
+String password_hash
+String nickname
+String avatar
+String access_token [UQ]
+DateTime create_time
+login()
+to_json()
}
class Tenant {
+String id [PK]
+String name
+String llm_id
+String embd_id
+JSONField llm_config
+JSONField asr_config
+Int credit
}
class UserTenant {
+String id [PK]
+String tenant_id [FK]
+String user_id [FK]
+String role [ENUM]
+String invited_by
+DateTime create_time
}
class Knowledgebase {
+String id [PK]
+String tenant_id [FK]
+String name [UQ per tenant]
+String avatar
+String description
+String embd_id
+String parser_id
+JSONField parser_config
+String language
+String permission
+Int doc_num
+Int token_num
+Int chunk_num
+DateTime create_time
}
class Document {
+String id [PK]
+String kb_id [FK]
+String name
+String location
+Int size
+String type
+String parser_id
+JSONField parser_config
+String source_type
+Float progress
+Int chunk_num
+Int token_num
+String status
+DateTime create_time
}
class File {
+String id [PK]
+String tenant_id [FK]
+String created_by [FK]
+String name
+String location
+String type
+String source_type
+Int size
+DateTime create_time
}
class File2Document {
+String id [PK]
+String file_id [FK]
+String document_id [FK]
}
class Task {
+String id [PK]
+String doc_id [FK]
+Int from_page
+Int to_page
+String status
+Float progress
+String progress_msg
+DateTime begin_at
+DateTime create_time
}
class Dialog {
+String id [PK]
+String tenant_id [FK]
+String name
+String description
+String icon
+ListField kb_ids
+String llm_id
+JSONField llm_setting
+JSONField prompt_config
+Float similarity_threshold
+Float vector_similarity_weight
+Int top_n
+String rerank_id
+DateTime create_time
}
class Conversation {
+String id [PK]
+String dialog_id [FK]
+String name
+String message [JSON]
+String reference [JSON]
+Int tokens_num
+DateTime create_time
}
User "1" --> "N" UserTenant : belongs to
Tenant "1" --> "N" UserTenant : has
Tenant "1" --> "N" Knowledgebase : owns
Knowledgebase "1" --> "N" Document : contains
Document "1" --> "N" Task : has tasks
Document "1" --> "1" File2Document : maps to
File "1" --> "1" File2Document : maps to
Tenant "1" --> "N" Dialog : owns
Dialog "1" --> "N" Conversation : has
User "1" --> "N" File : uploads
2.2 关键字段说明
User 表(用户)
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| id | String(32) | 是 | 主键, UUID | 用户唯一标识 |
| String(128) | 是 | 唯一 | 登录邮箱 | |
| password | String(256) | 否 | - | 密码哈希(OAuth 用户无密码) |
| nickname | String(128) | 是 | - | 显示名称 |
| avatar | String(1024) | 否 | - | 头像 URL |
| access_token | String(32) | 是 | 唯一 | API Token(UUID) |
| is_active | String(1) | 是 | 默认 ‘1’ | 账户状态(‘1’ 启用,‘0’ 禁用) |
| create_time | BigInt | 是 | - | 创建时间戳(毫秒) |
Knowledgebase 表(知识库)
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| id | String(32) | 是 | 主键, UUID | 知识库唯一标识 |
| tenant_id | String(32) | 是 | 外键 User.id | 所属租户 |
| name | String(128) | 是 | 租户内唯一 | 知识库名称 |
| embd_id | String(128) | 是 | - | Embedding 模型 ID |
| parser_id | String(128) | 是 | 枚举 | 解析器类型(naive/book/qa 等) |
| parser_config | JSON | 是 | - | 解析器配置(切片大小、重叠等) |
| permission | String(16) | 是 | 默认 ‘me’ | 权限(‘me’ 私有,’team’ 团队) |
| doc_num | Int | 是 | 默认 0 | 文档数量 |
| chunk_num | Int | 是 | 默认 0 | 切片数量 |
| token_num | BigInt | 是 | 默认 0 | Token 总数 |
Document 表(文档)
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| id | String(32) | 是 | 主键, UUID | 文档唯一标识 |
| kb_id | String(32) | 是 | 外键 Knowledgebase.id | 所属知识库 |
| name | String(256) | 是 | - | 文档名称 |
| location | String(512) | 是 | - | MinIO 存储路径 |
| size | Int | 是 | - | 文件大小(字节) |
| type | String(32) | 是 | - | 文件类型(pdf/docx/excel 等) |
| parser_id | String(128) | 是 | - | 解析器类型(继承自知识库) |
| parser_config | JSON | 是 | - | 解析器配置(可覆盖知识库配置) |
| progress | Float | 是 | 默认 0.0 | 解析进度(0.0-1.0,-1 表示失败) |
| chunk_num | Int | 是 | 默认 0 | 切片数量 |
| status | String(16) | 是 | 默认 ‘1’ | 状态(‘1’ 有效,‘0’ 删除) |
Dialog 表(对话配置)
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| id | String(32) | 是 | 主键, UUID | 对话唯一标识 |
| tenant_id | String(32) | 是 | 外键 User.id | 所属租户 |
| name | String(128) | 是 | - | 对话名称 |
| kb_ids | List[String] | 是 | - | 关联知识库 ID 列表 |
| llm_id | String(128) | 是 | - | LLM 模型 ID |
| llm_setting | JSON | 是 | - | LLM 参数(temperature/max_tokens 等) |
| prompt_config | JSON | 是 | - | Prompt 配置(system/prologue/quote 等) |
| similarity_threshold | Float | 是 | 默认 0.2 | 相似度阈值 |
| vector_similarity_weight | Float | 是 | 默认 0.7 | 向量权重(1-weight 为全文权重) |
| top_n | Int | 是 | 默认 6 | 返回 Top N 结果 |
| rerank_id | String(128) | 否 | - | Rerank 模型 ID |
Conversation 表(对话记录)
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| id | String(32) | 是 | 主键, UUID | 会话唯一标识 |
| dialog_id | String(32) | 是 | 外键 Dialog.id | 所属对话配置 |
| name | String(256) | 否 | - | 会话名称(首轮问题摘要) |
| message | JSON | 是 | - | 消息列表(role/content/doc_aggs) |
| reference | JSON | 否 | - | 引用的 chunk 信息 |
| tokens_num | Int | 是 | 默认 0 | 累计 Token 消耗 |
3. API 详细规格
3.1 用户认证 API
3.1.1 用户登录(POST /v1/user/login)
基本信息:
- 名称:
login - 协议与方法:HTTP POST
/v1/user/login - 幂等性:否(多次登录会刷新 access_token)
请求结构体:
{
"email": "user@example.com", # 用户邮箱,必填
"password": "<encrypted>" # RSA 加密后的密码,必填
}
字段表:
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| String | 是 | 长度 5-128 | 用户注册邮箱 | |
| password | String | 是 | 加密后长度 ~256 | 前端使用公钥 RSA 加密,后端解密 |
响应结构体:
{
"code": 0,
"message": "Welcome back!",
"data": {
"id": "user_uuid",
"email": "user@example.com",
"nickname": "张三",
"avatar": "http://minio/avatar.png",
"access_token": "new_uuid_token",
"create_time": 1697000000000
}
}
字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| code | Int | 是 | 错误码(0 成功,100 认证失败,500 服务器错误) |
| message | String | 是 | 人类可读消息 |
| data.id | String | 是 | 用户 UUID |
| data.access_token | String | 是 | 新生成的 API Token,用于后续请求 |
入口函数与签名:
@manager.route("/login", methods=["POST"])
def login():
"""用户登录"""
if not request.json:
return get_json_result(data=False, code=settings.RetCode.AUTHENTICATION_ERROR, message="Unauthorized!")
email = request.json.get("email", "")
users = UserService.query(email=email)
if not users:
return get_json_result(
data=False,
code=settings.RetCode.AUTHENTICATION_ERROR,
message=f"Email: {email} is not registered!",
)
# 解密密码
password = request.json.get("password")
password = decrypt(password) # RSA 解密
# 校验密码
user = UserService.query_user(email, password)
# 检查账户状态
if user and user.is_active == "0":
return get_json_result(
data=False,
code=settings.RetCode.FORBIDDEN,
message="This account has been disabled, please contact the administrator!",
)
elif user:
# 登录成功,刷新 access_token
response_data = user.to_json()
user.access_token = get_uuid() # 生成新 Token
login_user(user) # Flask-Login 记录登录态
user.update_time = current_timestamp()
user.save() # 更新数据库
return construct_response(data=response_data, auth=user.get_id(), message="Welcome back!")
else:
return get_json_result(
data=False,
code=settings.RetCode.AUTHENTICATION_ERROR,
message="Email and password do not match!",
)
关键代码解释:
- 密码解密:前端使用 RSA 公钥加密密码,后端使用私钥解密,防止明文传输
- Token 刷新:每次登录生成新 UUID 作为 access_token,旧 Token 自动失效
- Session 存储:
login_user(user)将用户 ID 写入 Flask-Session(存储在 Redis) - 数据库更新:更新
update_time记录最后登录时间
调用链与上游函数:
# api/apps/__init__.py
@login_manager.request_loader
def load_user(web_request):
"""从 Authorization Header 或 Session 加载用户"""
authorization = web_request.headers.get("Authorization")
if authorization:
# Token 认证(SDK 调用)
jwt = Serializer(secret_key=settings.SECRET_KEY)
access_token = str(jwt.loads(authorization)) # 解析 JWT
user = UserService.query(access_token=access_token, status=StatusEnum.VALID.value)
return user[0] if user else None
else:
# Session 认证(Web UI)
return None # Flask-Login 会从 Session 读取
# api/db/services/user_service.py
class UserService(CommonService):
@classmethod
def query_user(cls, email, password):
"""通过邮箱和密码查询用户"""
users = cls.query(email=email, status=StatusEnum.VALID.value)
if not users:
return None
user = users[0]
# 校验密码哈希
if check_password_hash(user.password, password):
return user
return None
时序图:
sequenceDiagram
autonumber
participant C as 客户端(前端/SDK)
participant API as /v1/user/login
participant UserSvc as UserService
participant MySQL as MySQL
participant Redis as Redis
C->>API: POST 登录请求(email + encrypted_password)
API->>API: decrypt(password) - RSA 解密
API->>UserSvc: query(email)
UserSvc->>MySQL: SELECT * FROM user WHERE email=?
MySQL-->>UserSvc: 返回用户记录
alt 用户不存在
API-->>C: 401 Unauthorized
else 用户存在
API->>UserSvc: query_user(email, password)
UserSvc->>UserSvc: check_password_hash()
alt 密码错误
API-->>C: 401 密码错误
else 密码正确
UserSvc-->>API: 返回 User 对象
API->>API: user.access_token = get_uuid()
API->>API: login_user(user) - 写入 Session
API->>Redis: 存储 Session
Redis-->>API: OK
API->>MySQL: UPDATE user SET access_token=?, update_time=?
MySQL-->>API: OK
API-->>C: 200 OK + user_data + access_token
end
end
说明要点:
- 幂等点:不幂等,每次登录生成新 Token
- 失败重试:密码错误不应重试;网络错误可重试
- 锁与版本控制:无需锁,单用户串行操作
- 超时设定:MySQL 查询超时 5 秒,Redis 超时 3 秒
- 性能要点:密码哈希校验慢(~100ms),使用 bcrypt;建议前端限流(1 秒内最多 1 次)
3.1.2 用户注册(POST /v1/user/register)
基本信息:
- 名称:
register - 协议与方法:HTTP POST
/v1/user/register - 幂等性:是(重复注册相同邮箱返回已存在错误)
请求结构体:
{
"email": "new@example.com",
"nickname": "张三",
"password": "<encrypted>",
"captcha": "123456" # 可选,邮箱验证码
}
响应结构体:
{
"code": 0,
"message": "Register successfully!",
"data": {
"id": "user_uuid",
"email": "new@example.com",
"access_token": "uuid_token"
}
}
核心代码(简化):
@manager.route("/register", methods=["POST"])
def register():
req = request.json
# 校验邮箱是否已注册
if UserService.query(email=req["email"]):
return get_data_error_result(message="Email already registered!")
# 校验验证码(如果启用)
if settings.REQUIRE_EMAIL_VERIFICATION:
if not verify_otp(req["email"], req["captcha"]):
return get_data_error_result(message="Invalid captcha!")
# 创建用户
user_id = get_uuid()
tenant_id = get_uuid()
password_hash = generate_password_hash(decrypt(req["password"]))
# 1. 创建 Tenant
TenantService.save(
id=tenant_id,
name=req["nickname"],
llm_id=settings.DEFAULT_LLM,
embd_id=settings.DEFAULT_EMBEDDING
)
# 2. 创建 User
UserService.save(
id=user_id,
email=req["email"],
nickname=req["nickname"],
password=password_hash,
access_token=get_uuid()
)
# 3. 关联 User 与 Tenant
UserTenantService.save(
tenant_id=tenant_id,
user_id=user_id,
role=UserTenantRole.OWNER.value
)
# 4. 初始化 LLM 配置
for llm in get_init_tenant_llm():
TenantLLMService.save(tenant_id=tenant_id, **llm)
return get_json_result(data=user.to_json())
关键点:
- 注册是事务性操作,创建 User、Tenant、UserTenant、TenantLLM 四个表记录,失败则回滚
- 新用户自动成为其 Tenant 的 Owner,拥有全部权限
- 默认 LLM 配置从
llm_factories.json读取,包含 OpenAI/Ollama/Azure 等
3.2 知识库管理 API
3.2.1 创建知识库(POST /api/v1/datasets)
基本信息:
- 名称:
create_dataset - 协议与方法:HTTP POST
/api/v1/datasets - 幂等性:是(租户内知识库名称唯一,重复创建返回错误)
请求结构体:
class CreateDatasetReq(BaseModel):
name: str = Field(..., min_length=1, max_length=128, description="知识库名称")
avatar: Optional[str] = Field(None, description="Base64 编码的头像")
description: Optional[str] = Field("", description="描述")
embedding_model: Optional[str] = Field(None, description="Embedding 模型名称")
permission: Optional[str] = Field("me", description="权限:me(私有)、team(团队)")
chunk_method: str = Field("naive", description="切片方法")
parser_config: Optional[Dict] = Field(None, description="解析器配置")
字段表:
| 字段 | 类型 | 必填 | 约束/默认 | 说明 |
|---|---|---|---|---|
| name | string | 是 | 长度 1-128 | 知识库名称(租户内唯一) |
| avatar | string | 否 | Base64 | 知识库头像 |
| description | string | 否 | 默认 "" | 描述信息 |
| embedding_model | string | 否 | 默认继承租户 | Embedding 模型(如 BAAI/bge-large-zh-v1.5) |
| permission | string | 否 | 默认 me | 权限:me/team |
| chunk_method | string | 是 | 默认 naive | 切片方法:naive/book/qa/table/laws 等 |
| parser_config | object | 否 | 默认根据 chunk_method 生成 | 解析器配置 JSON |
响应结构体:
{
"code": 0,
"message": "success",
"data": {
"id": "kb_uuid",
"name": "我的知识库",
"avatar": "base64...",
"embedding_model": "BAAI/bge-large-zh-v1.5",
"chunk_method": "naive",
"parser_config": {
"chunk_token_count": 128,
"layout_recognize": true,
"delimiter": "\\n!?。;!?"
},
"permission": "me",
"doc_num": 0,
"chunk_num": 0,
"create_time": 1697000000000
}
}
入口函数核心代码:
@manager.route("/datasets", methods=["POST"])
@token_required
def create(tenant_id):
"""创建知识库"""
req, err = validate_and_parse_json_request(request, CreateDatasetReq)
if err is not None:
return get_error_argument_result(err)
# 1. 校验名称唯一性
if KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value):
return get_error_operating_result(message=f"Dataset name '{req['name']}' already exists")
# 2. 生成默认配置
req["parser_config"] = get_parser_config(req["parser_id"], req["parser_config"])
req["id"] = get_uuid()
req["tenant_id"] = tenant_id
req["created_by"] = tenant_id
# 3. 继承租户默认 Embedding 模型
ok, t = TenantService.get_by_id(tenant_id)
if not req.get("embd_id"):
req["embd_id"] = t.embd_id
else:
# 校验 Embedding 模型是否可用
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
if not ok:
return err
# 4. 保存到数据库
if not KnowledgebaseService.save(**req):
return get_error_data_result(message="Create dataset error.(Database error)")
# 5. 返回创建结果
ok, k = KnowledgebaseService.get_by_id(req["id"])
response_data = remap_dictionary_keys(k.to_dict())
return get_result(data=response_data)
关键逻辑解释:
- @token_required 装饰器:从 Authorization Header 提取 JWT,解析出 tenant_id,验证 Token 有效性
- 名称唯一性检查:租户内知识库名称必须唯一,避免冲突
- parser_config 生成:根据 chunk_method 从预定义模板生成默认配置,用户可覆盖
- Embedding 模型校验:检查租户是否配置了该模型,避免运行时错误
- 返回字段映射:
remap_dictionary_keys将数据库字段名映射为 API 字段名(如embd_id→embedding_model)
调用链上游函数:
# api/utils/api_utils.py
def get_parser_config(parser_id, user_config):
"""获取解析器配置"""
# 从预定义模板加载默认配置
default_config = DEFAULT_PARSER_CONFIG.get(parser_id, {})
if user_config:
# 用户配置覆盖默认配置
return deep_merge(default_config, user_config)
return default_config
# api/db/services/knowledgebase_service.py
class KnowledgebaseService(CommonService):
model = Knowledgebase
@classmethod
def save(cls, **kwargs):
"""保存知识库"""
# 设置默认值
kwargs.setdefault("doc_num", 0)
kwargs.setdefault("chunk_num", 0)
kwargs.setdefault("token_num", 0)
kwargs.setdefault("create_time", current_timestamp())
kwargs.setdefault("update_time", current_timestamp())
return super().save(**kwargs)
时序图:
sequenceDiagram
autonumber
participant C as 客户端(SDK/Web UI)
participant API as /api/v1/datasets
participant TokenAuth as token_required
participant KBSvc as KnowledgebaseService
participant TenantSvc as TenantService
participant MySQL as MySQL
C->>API: POST 创建知识库请求 + Authorization Header
API->>TokenAuth: 校验 Token
TokenAuth->>MySQL: 查询 User (access_token=?)
MySQL-->>TokenAuth: 返回 User
TokenAuth-->>API: 返回 tenant_id
API->>API: validate_and_parse_json_request()
API->>KBSvc: get_or_none(name, tenant_id)
KBSvc->>MySQL: SELECT * FROM knowledgebase WHERE name=? AND tenant_id=?
MySQL-->>KBSvc: 返回空(名称未占用)
API->>TenantSvc: get_by_id(tenant_id)
TenantSvc->>MySQL: SELECT * FROM tenant WHERE id=?
MySQL-->>TenantSvc: 返回 Tenant
TenantSvc-->>API: 返回默认 embd_id
API->>API: get_parser_config(parser_id, parser_config)
API->>KBSvc: save(id, name, embd_id, parser_config, ...)
KBSvc->>MySQL: INSERT INTO knowledgebase VALUES (...)
MySQL-->>KBSvc: OK
API->>KBSvc: get_by_id(kb_id)
KBSvc->>MySQL: SELECT * FROM knowledgebase WHERE id=?
MySQL-->>KBSvc: 返回 Knowledgebase
KBSvc-->>API: 返回 KB 对象
API->>API: remap_dictionary_keys()
API-->>C: 200 OK + KB 数据
说明要点:
- 幂等性:名称唯一性约束保证幂等,重复创建返回 400 错误而非重复记录
- 失败重试:数据库插入失败可重试(幂等键为 name + tenant_id)
- 锁与并发:无需锁,MySQL UNIQUE 约束保证原子性
- 超时设定:API 总超时 10 秒,数据库操作超时 5 秒
- 性能要点:INSERT 操作 ~10ms,QPS 上限 ~1000
3.2.2 更新知识库(PUT /api/v1/datasets/{dataset_id})
基本信息:
- 名称:
update_dataset - 协议与方法:HTTP PUT
/api/v1/datasets/{dataset_id} - 幂等性:是(多次提交相同更新,结果一致)
请求结构体:
class UpdateDatasetReq(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=128)
avatar: Optional[str] = None
description: Optional[str] = None
embedding_model: Optional[str] = None
permission: Optional[str] = None
chunk_method: Optional[str] = None
parser_config: Optional[Dict] = None
pagerank: Optional[int] = Field(None, ge=0, le=10, description="PageRank 权重")
字段表:
| 字段 | 类型 | 必填 | 约束 | 说明 |
|---|---|---|---|---|
| name | string | 否 | 长度 1-128 | 新名称(租户内唯一) |
| embedding_model | string | 否 | - | 更改 Embedding 模型(仅当 chunk_num=0 时允许) |
| chunk_method | string | 否 | - | 更改切片方法(触发重新解析) |
| parser_config | object | 否 | - | 部分更新配置(深度合并) |
| pagerank | int | 否 | 0-10 | PageRank 权重(仅 Elasticsearch 支持) |
核心代码:
@manager.route("/datasets/<dataset_id>", methods=["PUT"])
@token_required
def update(tenant_id, dataset_id):
"""更新知识库"""
req, err = validate_and_parse_json_request(request, UpdateDatasetReq, exclude_unset=True)
if not req:
return get_error_argument_result(message="No properties were modified")
# 1. 校验权限
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
if kb is None:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'")
# 2. 深度合并 parser_config
if req.get("parser_config"):
req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"])
# 3. 校验 embedding_model 修改
if "embd_id" in req:
if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id:
return get_error_data_result(
message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}")
# 4. 更新 PageRank(同步更新 ES)
if "pagerank" in req and req["pagerank"] != kb.pagerank:
if req["pagerank"] > 0:
settings.docStoreConn.update(
{"kb_id": kb.id},
{PAGERANK_FLD: req["pagerank"]},
search.index_name(kb.tenant_id),
kb.id
)
else:
settings.docStoreConn.update(
{"exists": PAGERANK_FLD},
{"remove": PAGERANK_FLD},
search.index_name(kb.tenant_id),
kb.id
)
# 5. 更新数据库
if not KnowledgebaseService.update_by_id(kb.id, req):
return get_error_data_result(message="Update dataset error.(Database error)")
# 6. 返回更新结果
ok, k = KnowledgebaseService.get_by_id(kb.id)
response_data = remap_dictionary_keys(k.to_dict())
return get_result(data=response_data)
关键限制:
- Embedding 模型不可变:当知识库已有 chunks 时,禁止修改 embedding_model(向量维度不兼容)
- PageRank 同步:更新 MySQL 的同时,同步更新 Elasticsearch 的 PageRank 字段(影响检索排序)
- 深度合并:
parser_config采用深度合并策略,用户仅需提供变更部分,保留未修改字段
3.2.3 删除知识库(DELETE /api/v1/datasets)
基本信息:
- 名称:
delete_datasets - 协议与方法:HTTP DELETE
/api/v1/datasets - 幂等性:是(重复删除返回成功)
请求结构体:
{
"ids": ["kb_uuid_1", "kb_uuid_2"] // null 表示删除全部,[] 表示不删除
}
响应结构体:
{
"code": 0,
"message": "Successfully deleted 2 datasets",
"data": {
"success_count": 2,
"errors": []
}
}
核心代码:
@manager.route("/datasets", methods=["DELETE"])
@token_required
def delete(tenant_id):
"""批量删除知识库"""
req, err = validate_and_parse_json_request(request, DeleteDatasetReq)
# 1. 获取待删除的知识库列表
kb_id_instance_pairs = []
if req["ids"] is None:
# null 表示删除全部
kbs = KnowledgebaseService.query(tenant_id=tenant_id)
for kb in kbs:
kb_id_instance_pairs.append((kb.id, kb))
else:
# 指定 ID 列表
for kb_id in req["ids"]:
kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id)
if kb is None:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for datasets: '{kb_id}'")
kb_id_instance_pairs.append((kb_id, kb))
# 2. 逐个删除知识库
errors = []
success_count = 0
for kb_id, kb in kb_id_instance_pairs:
# 2.1 删除所有文档
for doc in DocumentService.query(kb_id=kb_id):
if not DocumentService.remove_document(doc, tenant_id):
errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'")
continue
# 2.2 删除文件关联
f2d = File2DocumentService.get_by_document_id(doc.id)
FileService.filter_delete([File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc.id)
# 2.3 删除知识库
if not KnowledgebaseService.delete_by_id(kb_id):
errors.append(f"Delete dataset error for {kb_id}")
continue
success_count += 1
# 3. 返回结果
if not errors:
return get_result()
return get_result(data={"success_count": success_count, "errors": errors[:5]})
关键点:
- 级联删除:删除知识库前,先删除所有文档、文件、ES 索引、MinIO 文件
- 部分失败:允许部分成功,返回成功数量与错误列表
- 软删除:
delete_by_id实际上是更新status='0',非物理删除
3.3 文档管理 API
3.3.1 上传文档(POST /v1/documents/upload)
基本信息:
- 名称:
upload_document - 协议与方法:HTTP POST
/v1/documents/upload(multipart/form-data) - 幂等性:否(重复上传会创建新文档记录)
请求结构体(form-data):
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| file | File | 是 | 上传的文件(最大 1GB) |
| kb_id | string | 是 | 目标知识库 ID |
| run | int | 否 | 是否立即解析(1 是,0 否) |
| parser_config | string | 否 | JSON 字符串,覆盖知识库配置 |
响应结构体:
{
"code": 0,
"message": "success",
"data": {
"doc_id": "doc_uuid",
"name": "example.pdf",
"size": 1024000,
"type": "pdf",
"progress": 0.0,
"chunk_num": 0,
"create_time": 1697000000000
}
}
核心代码:
@manager.route("/documents/upload", methods=["POST"])
@login_required
def upload():
"""上传文档"""
kb_id = request.form.get("kb_id")
if not kb_id:
return get_error_argument_result(message="kb_id is required")
# 1. 校验知识库权限
kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=current_user.id)
if not kb:
return get_error_permission_result(message="Knowledge base not found")
# 2. 获取上传文件
file = request.files.get("file")
if not file:
return get_error_argument_result(message="file is required")
# 3. 校验文件大小
file_size = len(file.read())
file.seek(0)
if file_size > settings.MAX_FILE_SIZE:
return get_error_argument_result(message=f"File size exceeds {settings.MAX_FILE_SIZE} bytes")
# 4. 上传到 MinIO
file_location = f"{current_user.id}/{kb_id}/{get_uuid()}/{file.filename}"
STORAGE_IMPL.put(bucket=settings.MINIO_BUCKET, key=file_location, data=file.read())
# 5. 创建 File 记录
file_id = get_uuid()
FileService.save(
id=file_id,
tenant_id=current_user.id,
created_by=current_user.id,
name=file.filename,
location=file_location,
type=get_file_type(file.filename),
size=file_size
)
# 6. 创建 Document 记录
doc_id = get_uuid()
DocumentService.save(
id=doc_id,
kb_id=kb_id,
name=file.filename,
location=file_location,
size=file_size,
type=get_file_type(file.filename),
parser_id=kb.parser_id,
parser_config=kb.parser_config,
progress=0.0,
chunk_num=0
)
# 7. 创建 File2Document 关联
File2DocumentService.save(file_id=file_id, document_id=doc_id)
# 8. 推送解析任务(如果 run=1)
if request.form.get("run") == "1":
TaskService.push_parsing_task(doc_id, kb_id, current_user.id)
return get_result(data={"doc_id": doc_id})
关键流程:
- 文件存储:先上传到 MinIO,获取 location 后再插入数据库(避免孤儿文件)
- 三表关联:File(文件元数据)、Document(文档元数据)、File2Document(多对一关系)
- 异步解析:推送任务到 Redis Stream,TaskExecutor 后台消费
时序图:
sequenceDiagram
autonumber
participant C as 客户端
participant API as /v1/documents/upload
participant MinIO as MinIO
participant MySQL as MySQL
participant Redis as Redis 队列
participant TaskExec as TaskExecutor
C->>API: POST 上传文件(multipart/form-data)
API->>API: 校验文件大小
API->>MinIO: PUT 上传文件
MinIO-->>API: 返回存储路径
API->>MySQL: INSERT INTO file
API->>MySQL: INSERT INTO document
API->>MySQL: INSERT INTO file2document
MySQL-->>API: OK
alt run=1(立即解析)
API->>Redis: 推送解析任务
Redis-->>API: OK
API-->>C: 201 Created + doc_id
Redis->>TaskExec: 拉取任务
TaskExec->>MinIO: 下载文件
TaskExec->>TaskExec: 调用 DeepDoc 解析
TaskExec->>MySQL: 更新 progress
else run=0(不解析)
API-->>C: 201 Created + doc_id
end
3.4 对话 API
3.4.1 创建对话配置(POST /api/v1/chats)
基本信息:
- 名称:
create_chat - 协议与方法:HTTP POST
/api/v1/chats - 幂等性:否(每次创建新对话配置)
请求结构体:
{
"name": "技术支持助手",
"description": "回答技术问题的助手",
"avatar": "base64...",
"dataset_ids": ["kb_uuid_1", "kb_uuid_2"],
"llm": {
"model_name": "gpt-4",
"temperature": 0.7,
"max_tokens": 2048,
"top_p": 0.9
},
"prompt": {
"prompt": "你是一个专业的技术支持助手...",
"opener": "你好!我是技术支持助手,有什么可以帮您?",
"show_quote": true,
"variables": [{"key": "knowledge", "optional": false}]
},
"similarity_threshold": 0.2,
"keywords_similarity_weight": 0.3,
"top_n": 6,
"rerank_model": "BAAI/bge-reranker-v2-m3"
}
字段表:
| 字段 | 类型 | 必填 | 约束/默认 | 说明 |
|---|---|---|---|---|
| name | string | 是 | 长度 1-128 | 对话名称(租户内唯一) |
| dataset_ids | array | 否 | 默认 [] | 关联的知识库 ID 列表 |
| llm.model_name | string | 否 | 默认继承租户 | LLM 模型名称 |
| llm.temperature | float | 否 | 默认 0.1 | 温度参数(0-1) |
| llm.max_tokens | int | 否 | 默认 512 | 最大 Token 数 |
| prompt.prompt | string | 是 | - | System Prompt(必须包含 {knowledge}) |
| prompt.opener | string | 否 | 默认欢迎语 | 首次对话开场白 |
| prompt.show_quote | bool | 否 | 默认 true | 是否显示引用来源 |
| similarity_threshold | float | 否 | 默认 0.2 | 相似度阈值(0-1) |
| keywords_similarity_weight | float | 否 | 默认 0.3 | 关键词权重(1-此值为向量权重) |
| top_n | int | 否 | 默认 6 | 返回 Top N 结果 |
| rerank_model | string | 否 | 默认 "" | Rerank 模型名称 |
响应结构体:
{
"code": 0,
"message": "success",
"data": {
"id": "chat_uuid",
"name": "技术支持助手",
"dataset_ids": ["kb_uuid_1", "kb_uuid_2"],
"llm": {"model_name": "gpt-4", ...},
"prompt": {...},
"create_time": 1697000000000
}
}
核心代码:
@manager.route("/chats", methods=["POST"])
@token_required
def create(tenant_id):
"""创建对话配置"""
req = request.json
# 1. 校验知识库权限与状态
ids = req.get("dataset_ids", [])
for kb_id in ids:
if not KnowledgebaseService.accessible(kb_id=kb_id, user_id=tenant_id):
return get_error_data_result(f"You don't own the dataset {kb_id}")
kbs = KnowledgebaseService.query(id=kb_id)
if kbs[0].chunk_num == 0:
return get_error_data_result(f"The dataset {kb_id} doesn't own parsed file")
# 2. 校验 Embedding 模型一致性
kbs = KnowledgebaseService.get_by_ids(ids) if ids else []
embd_ids = [kb.embd_id for kb in kbs]
if len(set(embd_ids)) > 1:
return get_result(message='Datasets use different embedding models.', code=settings.RetCode.AUTHENTICATION_ERROR)
# 3. 处理 LLM 配置
llm = req.get("llm", {})
if "model_name" in llm:
req["llm_id"] = llm.pop("model_name")
req["llm_setting"] = llm
# 4. 处理 Prompt 配置(字段映射)
prompt = req.get("prompt", {})
key_mapping = {
"parameters": "variables",
"prologue": "opener",
"quote": "show_quote",
"system": "prompt"
}
for new_key, old_key in key_mapping.items():
if old_key in prompt:
prompt[new_key] = prompt.pop(old_key)
req["prompt_config"] = prompt
# 5. 校验 Prompt 变量
for p in req["prompt_config"]["parameters"]:
if not p["optional"] and req["prompt_config"]["system"].find("{%s}" % p["key"]) < 0:
return get_error_data_result(message=f"Parameter '{p['key']}' is not used")
# 6. 保存到数据库
req["id"] = get_uuid()
req["tenant_id"] = tenant_id
req["kb_ids"] = ids
if not DialogService.save(**req):
return get_error_data_result(message="Fail to new a chat!")
return get_result(data=req)
关键校验:
- Embedding 一致性:多个知识库必须使用相同 Embedding 模型(向量空间一致)
- 知识库状态:必须有已解析文档(chunk_num > 0),否则无法检索
- Prompt 变量校验:必填变量(如
{knowledge})必须出现在 System Prompt 中
3.4.2 对话补全(POST /api/v1/chats/{chat_id}/completions)
基本信息:
- 名称:
chat_completions - 协议与方法:HTTP POST
/api/v1/chats/{chat_id}/completions - 幂等性:是(相同 conversation_id + question 返回相同结果)
请求结构体:
{
"conversation_id": "conv_uuid", // 可选,不提供则创建新会话
"question": "RAGFlow 如何部署?",
"stream": true // 是否流式返回
}
响应结构体(非流式):
{
"code": 0,
"message": "success",
"data": {
"answer": "RAGFlow 部署步骤如下:\n1. 安装 Docker...",
"reference": [
{
"doc_id": "doc_uuid",
"doc_name": "部署文档.pdf",
"chunk_id": "chunk_uuid",
"content": "...部署步骤...",
"similarity": 0.85
}
],
"conversation_id": "conv_uuid"
}
}
响应结构体(流式 SSE):
data: {"answer": "RAGFlow", "reference": []}
data: {"answer": " 部署", "reference": []}
data: {"answer": "步骤如下", "reference": []}
...
data: {"answer": "", "reference": [...]} // 最后一条包含完整 reference
核心代码:
@manager.route("/chats/<chat_id>/completions", methods=["POST"])
@token_required
def completions(tenant_id, chat_id):
"""对话补全"""
req = request.json
# 1. 校验对话权限
if not DialogService.query(tenant_id=tenant_id, id=chat_id):
return get_error_data_result(message="You do not own the chat")
# 2. 获取或创建会话
conversation_id = req.get("conversation_id")
if not conversation_id:
conversation_id = get_uuid()
ConversationService.save(
id=conversation_id,
dialog_id=chat_id,
name=req["question"][:50], # 首轮问题作为会话名
message=[],
reference=[]
)
# 3. 调用 RAG 引擎
from rag.app.conversation import chat
dialog = DialogService.get_by_id(chat_id)
conv = ConversationService.get_by_id(conversation_id)
if req.get("stream"):
# 流式返回(SSE)
def generate():
for chunk in chat(
dialog=dialog[1],
messages=conv[1].message,
question=req["question"],
stream=True,
**dialog[1].to_dict()
):
yield f"data: {json.dumps(chunk)}\n\n"
return Response(generate(), mimetype="text/event-stream")
else:
# 非流式返回
answer, reference = chat(
dialog=dialog[1],
messages=conv[1].message,
question=req["question"],
stream=False,
**dialog[1].to_dict()
)
# 4. 保存对话记录
conv[1].message.append({"role": "user", "content": req["question"]})
conv[1].message.append({"role": "assistant", "content": answer})
conv[1].reference = reference
conv[1].save()
return get_result(data={
"answer": answer,
"reference": reference,
"conversation_id": conversation_id
})
关键点:
- 流式与非流式:流式使用 SSE 协议,前端逐 Token 渲染;非流式等待完整响应
- 会话管理:conversation_id 关联多轮对话,message 字段存储完整历史
- RAG 引擎调用:
rag.app.conversation.chat封装检索→重排→生成流程
4. 核心 API 调用链路深度剖析
本节从上游接口入手,自上而下详细剖析每个关键 API 的完整调用链路,包括路由层、Service 层、存储层的详细交互。
4.1 用户登录完整调用链路
API 端点:POST /v1/user/login
4.1.1 调用链路拓扑图
graph TB
Client[客户端] -->|1. POST /v1/user/login| UserApp[user_app.py::login]
UserApp -->|2. request.json.get| RequestData{解析请求体}
RequestData -->|3. email| UserService1[UserService.query<br/>查询用户是否存在]
UserService1 -->|4. SELECT| MySQL1[(MySQL)]
RequestData -->|5. password| DecryptFunc[decrypt 函数<br/>RSA 解密密码]
UserService1 -->|6. 用户存在| UserService2[UserService.query_user<br/>校验密码]
UserService2 -->|7. check_password_hash| PasswordCheck{密码校验}
PasswordCheck -->|8. 密码正确| GenerateToken[get_uuid<br/>生成新 access_token]
GenerateToken -->|9. login_user| FlaskLogin[Flask-Login<br/>写入 Session]
FlaskLogin -->|10. Session.set| Redis1[(Redis)]
GenerateToken -->|11. user.save| MySQL2[(MySQL)]
MySQL2 -->|12. UPDATE user| UpdateToken[更新 access_token<br/>和 update_time]
UpdateToken -->|13. to_json| ResponseData[构造响应数据]
ResponseData -->|14. 200 OK| Client
style UserApp fill:#e1f5ff
style UserService1 fill:#fff4e1
style UserService2 fill:#fff4e1
style DecryptFunc fill:#ffe1f5
style FlaskLogin fill:#e1ffe1
style Redis1 fill:#f0f0f0
style MySQL1 fill:#f0f0f0
style MySQL2 fill:#f0f0f0
4.1.2 关键代码调用链
第 1 层:路由层(user_app.py)
# api/apps/user_app.py: 67-138
@manager.route("/login", methods=["POST"])
def login():
# 步骤 1: 获取请求数据
email = request.json.get("email", "")
password = request.json.get("password")
# 步骤 2: 查询用户(调用 Service 层)
users = UserService.query(email=email)
if not users:
return get_json_result(code=settings.RetCode.AUTHENTICATION_ERROR,
message=f"Email: {email} is not registered!")
# 步骤 3: 解密密码(调用工具函数)
password = decrypt(password) # RSA 私钥解密
# 步骤 4: 验证密码(调用 Service 层)
user = UserService.query_user(email, password)
# 步骤 5: 生成新 Token 并登录
user.access_token = get_uuid() # 生成 UUID Token
login_user(user) # Flask-Login 写入 Session(存储到 Redis)
# 步骤 6: 更新数据库
user.update_time = current_timestamp()
user.save() # Peewee ORM 执行 UPDATE
# 步骤 7: 返回响应
return construct_response(data=user.to_json(), auth=user.get_id())
第 2 层:Service 层(user_service.py)
# api/db/services/user_service.py
class UserService(CommonService):
model = User # Peewee Model
@classmethod
@DB.connection_context()
def query(cls, **kwargs):
"""查询用户(步骤 2)"""
# 调用 Peewee ORM 查询
users = cls.model.select().where(
cls.model.email == kwargs.get('email')
).execute()
return list(users)
@classmethod
def query_user(cls, email, password):
"""验证用户密码(步骤 4)"""
users = cls.query(email=email, status=StatusEnum.VALID.value)
if not users:
return None
user = users[0]
# 使用 werkzeug.security 校验密码哈希
if check_password_hash(user.password, password):
return user
return None
第 3 层:ORM 层(db_models.py)
# api/db/db_models.py
class User(BaseModel):
id = CharField(max_length=32, primary_key=True)
email = CharField(max_length=128, unique=True)
password = CharField(max_length=256, null=True)
access_token = CharField(max_length=32, unique=True)
nickname = CharField(max_length=128)
avatar = CharField(max_length=1024, null=True)
is_active = CharField(max_length=1, default='1')
create_time = BigIntegerField()
update_time = BigIntegerField()
def save(self, *args, **kwargs):
"""保存用户(步骤 6)"""
# Peewee 自动生成 SQL:
# UPDATE user SET access_token=?, update_time=? WHERE id=?
return super().save(*args, **kwargs)
第 4 层:认证层(Flask-Login)
# api/apps/__init__.py: 145-177
@login_manager.request_loader
def load_user(web_request):
"""从 Authorization Header 或 Session 加载用户"""
authorization = web_request.headers.get("Authorization")
if authorization:
# Token 认证(SDK 调用)
jwt = Serializer(secret_key=settings.SECRET_KEY)
access_token = str(jwt.loads(authorization))
user = UserService.query(access_token=access_token,
status=StatusEnum.VALID.value)
return user[0] if user else None
else:
# Session 认证(Web UI),Flask-Login 自动从 Redis 读取
return None
4.1.3 时序图(包含存储层交互)
sequenceDiagram
autonumber
participant C as 客户端
participant R as user_app::login
participant US as UserService
participant Crypt as decrypt 函数
participant FL as Flask-Login
participant M as MySQL
participant Red as Redis
C->>R: POST /v1/user/login<br/>{email, encrypted_password}
R->>US: UserService.query(email=email)
US->>M: SELECT * FROM user<br/>WHERE email=? AND status='1'
M-->>US: User 记录
US-->>R: users 列表
alt 用户不存在
R-->>C: 401 Unauthorized<br/>"Email not registered"
end
R->>Crypt: decrypt(password)
Crypt->>Crypt: 使用 RSA 私钥解密
Crypt-->>R: 明文密码
R->>US: UserService.query_user(email, password)
US->>US: check_password_hash(user.password, password)
alt 密码错误
US-->>R: None
R-->>C: 401 Unauthorized<br/>"Password mismatch"
end
US-->>R: User 对象
R->>R: user.access_token = get_uuid()
R->>FL: login_user(user)
FL->>Red: SET flask_session:<session_id><br/>{user_id: xxx}
Red-->>FL: OK
R->>R: user.update_time = now()
R->>M: UPDATE user<br/>SET access_token=?, update_time=?<br/>WHERE id=?
M-->>R: OK
R->>R: response = user.to_json()
R-->>C: 200 OK<br/>{id, email, access_token, ...}
时序图关键要点:
-
第 1-4 步(用户查询):
- 路由层调用
UserService.query(email) - Service 层通过 Peewee ORM 生成 SQL:
SELECT * FROM user WHERE email=? AND status='1' - MySQL 返回用户记录或空结果
- 性能指标:索引查询,耗时约 1-5ms
- 路由层调用
-
第 5-6 步(密码解密):
- 前端使用公钥 RSA 加密密码(2048 位),密文长度 ~344 字节(Base64)
- 后端使用私钥解密,耗时约 5-10ms
- 安全要点:防止明文传输被抓包
-
第 7-10 步(密码校验):
- 使用
werkzeug.security.check_password_hash校验 - 密码哈希算法:bcrypt(默认 12 轮)
- 性能瓶颈:bcrypt 校验耗时约 100-200ms(安全性与性能平衡)
- 使用
-
第 11-14 步(Session 写入):
login_user(user)将用户 ID 写入 Flask Session- Session 存储在 Redis:
flask_session:<uuid>,TTL 默认 7 天 - 并发安全:Redis SET 操作原子性保证
-
第 15-16 步(数据库更新):
- 更新
access_token(新生成的 UUID)和update_time - SQL:
UPDATE user SET access_token=?, update_time=? WHERE id=? - 幂等性:多次登录生成新 Token,旧 Token 自动失效
- 更新
-
第 17-18 步(响应返回):
user.to_json()序列化用户对象(排除 password 字段)- 返回 200 OK,包含新
access_token
4.2 文档上传完整调用链路
API 端点:POST /v1/documents/upload(multipart/form-data)
4.2.1 调用链路拓扑图
graph TB
Client[客户端] -->|1. POST multipart| DocApp[document_app.py::upload]
DocApp -->|2. request.files.getlist| FileObjs{获取上传文件列表}
DocApp -->|3. request.form.get| KBId[获取 kb_id 参数]
KBId -->|4. get_by_id| KBService[KnowledgebaseService<br/>查询知识库]
KBService -->|5. SELECT| MySQL1[(MySQL)]
DocApp -->|6. check_kb_team_permission| PermCheck{权限校验}
FileObjs -->|7. for file_obj| FileService[FileService.upload_document<br/>批量上传]
FileService -->|8. STORAGE_IMPL.put| MinIO[(MinIO)]
MinIO -->|9. 返回 location| FileService
FileService -->|10. FileService.save| MySQL2[(MySQL)]
MySQL2 -->|11. INSERT file| FileRec[File 记录]
FileService -->|12. DocumentService.insert| MySQL3[(MySQL)]
MySQL3 -->|13. INSERT document| DocRec[Document 记录]
FileService -->|14. File2DocumentService.save| MySQL4[(MySQL)]
MySQL4 -->|15. INSERT file2document| Mapping[关联记录]
Mapping -->|16. run=1?| CheckRun{是否立即解析}
CheckRun -->|17. Yes| TaskService[TaskService.queue_tasks<br/>推送解析任务]
TaskService -->|18. XADD| Redis[(Redis Stream)]
Redis -->|19. 返回任务 ID| DocApp
DocApp -->|20. 200 OK| Client
style DocApp fill:#e1f5ff
style FileService fill:#fff4e1
style KBService fill:#fff4e1
style TaskService fill:#fff4e1
style MinIO fill:#f0f0f0
style MySQL1 fill:#f0f0f0
style MySQL2 fill:#f0f0f0
style MySQL3 fill:#f0f0f0
style MySQL4 fill:#f0f0f0
style Redis fill:#f0f0f0
4.2.2 关键代码调用链
第 1 层:路由层(document_app.py)
# api/apps/document_app.py: 52-84
@manager.route("/upload", methods=["POST"])
@login_required
@validate_request("kb_id")
def upload():
kb_id = request.form.get("kb_id")
file_objs = request.files.getlist("file")
# 步骤 1: 校验知识库权限
e, kb = KnowledgebaseService.get_by_id(kb_id)
if not e:
raise LookupError("Can't find this knowledgebase!")
if not check_kb_team_permission(kb, current_user.id):
return get_json_result(code=settings.RetCode.AUTHENTICATION_ERROR)
# 步骤 2: 批量上传文档(调用 FileService)
err, files = FileService.upload_document(kb, file_objs, current_user.id)
if err:
return get_json_result(data=files, message="\n".join(err),
code=settings.RetCode.SERVER_ERROR)
return get_json_result(data=files)
第 2 层:Service 层(file_service.py)
# api/db/services/file_service.py
class FileService(CommonService):
@classmethod
@DB.connection_context()
def upload_document(cls, kb, file_objs, user_id):
"""批量上传文档(步骤 2)"""
errors = []
uploaded_files = []
for file_obj in file_objs:
try:
# 步骤 2.1: 生成存储路径
file_id = get_uuid()
location = f"{user_id}/{kb.id}/{file_id}/{file_obj.filename}"
# 步骤 2.2: 上传到 MinIO
file_bin = file_obj.read()
STORAGE_IMPL.put(bucket=kb.id, key=location, data=file_bin)
# 步骤 2.3: 创建 File 记录
cls.save(
id=file_id,
tenant_id=user_id,
created_by=user_id,
name=file_obj.filename,
location=location,
type=get_file_type(file_obj.filename),
size=len(file_bin)
)
# 步骤 2.4: 创建 Document 记录
doc_id = get_uuid()
DocumentService.insert({
"id": doc_id,
"kb_id": kb.id,
"name": file_obj.filename,
"location": location,
"size": len(file_bin),
"type": get_file_type(file_obj.filename),
"parser_id": kb.parser_id,
"parser_config": kb.parser_config,
"progress": 0.0,
"chunk_num": 0
})
# 步骤 2.5: 创建关联记录
File2DocumentService.save(file_id=file_id, document_id=doc_id)
# 步骤 2.6: 推送解析任务(如果 run=1)
if request.form.get("run") == "1":
from api.db.services.task_service import queue_tasks
queue_tasks(
doc={"id": doc_id, "kb_id": kb.id, ...},
bucket=kb.id,
name=location,
priority=0
)
uploaded_files.append((file_obj.filename, file_bin))
except Exception as e:
errors.append(f"Upload {file_obj.filename} failed: {str(e)}")
return errors, uploaded_files
第 3 层:任务调度层(task_service.py)
# api/db/services/task_service.py: 326-431
def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
"""创建并推送解析任务"""
parse_task_array = []
# 步骤 1: 根据文档类型生成任务列表
if doc["type"] == FileType.PDF.value:
# PDF 按页范围拆分任务
file_bin = STORAGE_IMPL.get(bucket, name)
pages = PdfParser.total_page_number(doc["name"], file_bin)
page_size = doc["parser_config"].get("task_page_size", 12)
for p in range(0, pages, page_size):
task = {
"id": get_uuid(),
"doc_id": doc["id"],
"from_page": p,
"to_page": min(p + page_size, pages),
"progress": 0.0,
"begin_at": datetime.now()
}
parse_task_array.append(task)
else:
# 其他文件类型单任务
task = {
"id": get_uuid(),
"doc_id": doc["id"],
"from_page": 0,
"to_page": 100000000,
"progress": 0.0
}
parse_task_array.append(task)
# 步骤 2: 计算任务摘要(用于去重)
for task in parse_task_array:
hasher = xxhash.xxh64()
hasher.update(doc["id"].encode("utf-8"))
hasher.update(str(task["from_page"]).encode("utf-8"))
hasher.update(str(task["to_page"]).encode("utf-8"))
task["digest"] = hasher.hexdigest()
task["priority"] = priority
# 步骤 3: 插入任务到数据库
bulk_insert_into_db(Task, parse_task_array, replace_on_conflict=True)
DocumentService.begin2parse(doc["id"]) # 更新文档状态为 "解析中"
# 步骤 4: 推送任务到 Redis Stream
for task in parse_task_array:
if task["progress"] < 1.0: # 仅推送未完成任务
REDIS_CONN.queue_product(
queue_name=get_svr_queue_name(priority),
message=task
)
4.2.3 时序图(包含异步任务推送)
sequenceDiagram
autonumber
participant C as 客户端
participant D as document_app::upload
participant FS as FileService
participant KB as KnowledgebaseService
participant MinIO as MinIO
participant M as MySQL
participant TS as TaskService
participant Red as Redis Stream
C->>D: POST /v1/documents/upload<br/>(multipart/form-data)
D->>D: request.files.getlist("file")
D->>D: request.form.get("kb_id")
D->>KB: KnowledgebaseService.get_by_id(kb_id)
KB->>M: SELECT * FROM knowledgebase<br/>WHERE id=?
M-->>KB: KB 记录
KB-->>D: kb 对象
D->>D: check_kb_team_permission(kb, user_id)
D->>FS: FileService.upload_document(kb, file_objs, user_id)
loop 每个文件
FS->>FS: file_id = get_uuid()<br/>location = "{user_id}/{kb_id}/{file_id}/{filename}"
FS->>MinIO: STORAGE_IMPL.put(bucket, location, file_bin)
MinIO-->>FS: OK
FS->>M: INSERT INTO file<br/>(id, name, location, size, ...)
M-->>FS: OK
FS->>M: INSERT INTO document<br/>(id, kb_id, name, location, parser_id, ...)
M-->>FS: doc_id
FS->>M: INSERT INTO file2document<br/>(file_id, document_id)
M-->>FS: OK
end
FS-->>D: (errors, uploaded_files)
alt run=1(立即解析)
D->>TS: TaskService.queue_tasks(doc, bucket, name, priority)
TS->>MinIO: STORAGE_IMPL.get(bucket, name)
MinIO-->>TS: file_bin
TS->>TS: PdfParser.total_page_number(file_bin)<br/>计算页数
TS->>TS: 生成 parse_task_array<br/>(按 page_size 拆分)
TS->>M: BULK INSERT INTO task<br/>(id, doc_id, from_page, to_page, ...)
M-->>TS: OK
TS->>M: UPDATE document<br/>SET status='1' (解析中)
M-->>TS: OK
loop 每个未完成任务
TS->>Red: XADD ragflow_svr<br/>{doc_id, from_page, to_page, ...}
Red-->>TS: task_id
end
TS-->>D: 任务推送完成
end
D-->>C: 200 OK<br/>[{doc_id, name, size, progress=0.0}]
时序图关键要点:
-
第 1-7 步(权限校验):
- 查询知识库是否存在且属于当前租户
- 调用
check_kb_team_permission校验用户权限(Owner/Editor) - 性能指标:单次查询耗时约 2-5ms
-
第 8-11 步(文件上传):
- 生成存储路径:
{tenant_id}/{kb_id}/{file_id}/{filename} - 调用 MinIO SDK 上传文件(支持断点续传)
- 性能瓶颈:10MB 文件上传耗时约 500-1000ms(取决于网络带宽)
- 生成存储路径:
-
第 12-17 步(元数据插入):
- 依次插入
file、document、file2document三张表 - 事务性:使用 Peewee
@DB.connection_context()保证原子性 - 失败回滚:任何步骤失败,自动回滚前面的插入操作
- 依次插入
-
第 18-26 步(任务推送):
- 根据文档类型计算任务数量:
- PDF:按
task_page_size(默认 12 页)拆分 - Excel:按 3000 行拆分
- 其他:单任务
- PDF:按
- 计算任务摘要(xxhash)用于去重和复用
- 推送到 Redis Stream:
XADD ragflow_svr * field1 value1 field2 value2 ...
- 根据文档类型计算任务数量:
-
第 27-28 步(响应返回):
- 返回文档列表,包含
doc_id、progress=0.0 - 前端轮询
/documents/{doc_id}查询解析进度
- 返回文档列表,包含
4.3 对话补全完整调用链路
API 端点:POST /v1/conversation/completion
4.3.1 调用链路拓扑图
graph TB
Client[客户端] -->|1. POST completion| ConvApp[conversation_app.py::completion]
ConvApp -->|2. get_by_id| ConvService[ConversationService<br/>获取会话]
ConvService -->|3. SELECT| MySQL1[(MySQL)]
ConvApp -->|4. get_by_id| DialogService[DialogService<br/>获取对话配置]
DialogService -->|5. SELECT| MySQL2[(MySQL)]
DialogService -->|6. chat 函数| RAGChat[dialog_service::chat<br/>RAG 核心逻辑]
RAGChat -->|7. get_models| ModelsInit[初始化模型<br/>embd/rerank/chat/tts]
ModelsInit -->|8. LLMBundle| LLMService[LLMService<br/>租户 LLM 配置]
LLMService -->|9. get_api_key| Redis1[(Redis)]
RAGChat -->|10. retriever.retrieval| Retriever[Retriever.retrieval<br/>混合检索]
Retriever -->|11. embd_mdl.encode| EmbeddingAPI[Embedding 模型<br/>编码查询向量]
Retriever -->|12. hybrid_search| ES[(Elasticsearch)]
ES -->|13. 返回候选 chunks| Retriever
Retriever -->|14. rerank_mdl.rerank| RerankAPI[Rerank 模型<br/>重排序]
RerankAPI -->|15. 返回 Top-K| Retriever
Retriever -->|16. 格式化 chunks| RAGChat
RAGChat -->|17. kb_prompt| BuildPrompt[构建 Prompt<br/>system + knowledge]
BuildPrompt -->|18. chat_mdl.chat_streamly| ChatAPI[Chat 模型<br/>流式生成]
ChatAPI -->|19. yield chunks| RAGChat
RAGChat -->|20. yield SSE| ConvApp
ConvApp -->|21. SSE 流| Client
style ConvApp fill:#e1f5ff
style ConvService fill:#fff4e1
style DialogService fill:#fff4e1
style RAGChat fill:#ffe1f5
style Retriever fill:#e1ffe1
style BuildPrompt fill:#ffe1e1
style ES fill:#f0f0f0
style MySQL1 fill:#f0f0f0
style MySQL2 fill:#f0f0f0
style Redis1 fill:#f0f0f0
4.3.2 关键代码调用链
第 1 层:路由层(conversation_app.py)
# api/apps/conversation_app.py: 167-250
@manager.route("/completion", methods=["POST"])
@login_required
@validate_request("conversation_id", "messages")
def completion():
req = request.json
msg = [m for m in req["messages"] if m["role"] != "system"]
message_id = msg[-1].get("id")
# 步骤 1: 获取会话与对话配置
e, conv = ConversationService.get_by_id(req["conversation_id"])
conv.message = deepcopy(req["messages"])
e, dia = DialogService.get_by_id(conv.dialog_id)
# 步骤 2: 准备 reference 字段
if not conv.reference:
conv.reference = []
conv.reference.append({"chunks": [], "doc_aggs": []})
# 步骤 3: 流式生成
def stream():
nonlocal dia, msg, req, conv
try:
for ans in chat(dia, msg, True, **req): # 调用 RAG 引擎
ans = structure_answer(conv, ans, message_id, conv.id)
yield "data:" + json.dumps({
"code": 0,
"message": "",
"data": ans
}, ensure_ascii=False) + "\n\n"
ConversationService.update_by_id(conv.id, conv.to_dict())
except Exception as e:
yield "data:" + json.dumps({
"code": 500,
"message": str(e),
"data": {"answer": "**ERROR**: " + str(e)}
}, ensure_ascii=False) + "\n\n"
# 步骤 4: 返回 SSE 响应
if req.get("stream", True):
resp = Response(stream(), mimetype="text/event-stream")
resp.headers.add_header("Cache-control", "no-cache")
resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
return resp
第 2 层:RAG 核心逻辑(dialog_service.py)
# api/db/services/dialog_service.py: 338-617
def chat(dialog, messages, stream=True, **kwargs):
"""RAG 核心流程:检索 + 重排 + 生成"""
# 步骤 1: 初始化模型
kbs, embd_mdl, rerank_mdl, chat_mdl, tts_mdl = get_models(dialog)
# 步骤 2: 提取查询(最近 3 轮对话)
questions = [m["content"] for m in messages if m["role"] == "user"][-3:]
# 步骤 3: 多轮对话精炼(可选)
if len(questions) > 1 and dialog.prompt_config.get("refine_multiturn"):
questions = [full_question(dialog.tenant_id, dialog.llm_id, messages)]
else:
questions = questions[-1:]
# 步骤 4: 关键词提取(可选)
if dialog.prompt_config.get("keyword", False):
questions[-1] += keyword_extraction(chat_mdl, questions[-1])
# 步骤 5: 混合检索
retriever = settings.retriever # Retriever 单例
kbinfos = retriever.retrieval(
question=questions[-1],
embd_mdl=embd_mdl,
tenant_ids=[dialog.tenant_id],
kb_ids=dialog.kb_ids,
page=1,
page_size=dialog.top_n or 6,
similarity_threshold=dialog.similarity_threshold or 0.2,
vector_similarity_weight=dialog.vector_similarity_weight or 0.7,
top=1024, # 初筛 1024 个候选
rerank_mdl=rerank_mdl
)
# 步骤 6: 格式化检索结果
knowledges = kb_prompt(kbinfos, max_tokens=chat_mdl.max_length)
# 步骤 7: 构建 Prompt
prompt_config = dialog.prompt_config
system_prompt = prompt_config["system"].replace("{knowledge}", knowledges)
# 步骤 8: 构建消息列表
msg = [
{"role": "system", "content": system_prompt},
*messages[:-1], # 历史消息
{"role": "user", "content": questions[-1]}
]
# 步骤 9: 流式生成
if stream:
last_ans = ""
answer = ""
for ans in chat_mdl.chat_streamly(system_prompt, msg[1:], gen_conf):
answer = ans
delta_ans = ans[len(last_ans):]
if num_tokens_from_string(delta_ans) < 16:
continue # 累积到 16 tokens 再返回
last_ans = answer
yield {
"answer": answer,
"reference": {},
"audio_binary": tts(tts_mdl, delta_ans) # TTS(可选)
}
# 最后一个 chunk 包含完整 reference
yield {
"answer": answer,
"reference": kbinfos, # 检索结果
"prompt": system_prompt
}
第 3 层:混合检索(rag/nlp/search.py)
# rag/nlp/search.py
class Dealer:
def retrieval(self, question, embd_mdl, tenant_ids, kb_ids,
page, page_size, similarity_threshold,
vector_similarity_weight, top, rerank_mdl=None, **kwargs):
"""混合检索:向量检索 + 全文检索 + RRF 融合"""
# 步骤 1: 编码查询向量
query_vector = embd_mdl.encode([question])[0] # 768 维向量
# 步骤 2: 构建 ES 查询(Hybrid Search)
index_name = search.index_name(tenant_ids[0])
query = {
"bool": {
"must": [
{"terms": {"kb_id": kb_ids}}, # 过滤知识库
{
"bool": {
"should": [
# 向量检索(KNN)
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'q_vec') + 1.0",
"params": {"query_vector": query_vector}
}
}
},
# 全文检索(BM25)
{
"multi_match": {
"query": question,
"fields": ["content_with_weight", "title^2"],
"type": "best_fields"
}
}
]
}
}
],
"filter": {
"range": {
"similarity": {"gte": similarity_threshold} # 相似度阈值
}
}
}
}
# 步骤 3: 执行 ES 查询
response = self.conn.search(
index=index_name,
body={"query": query, "size": top, "from": 0}
)
chunks = [hit["_source"] for hit in response["hits"]["hits"]]
# 步骤 4: Rerank 重排序(可选)
if rerank_mdl:
pairs = [(question, chunk["content"]) for chunk in chunks]
scores = rerank_mdl.rerank(pairs)
chunks = [chunk for _, chunk in sorted(zip(scores, chunks), reverse=True)]
chunks = chunks[:page_size] # Top-K
# 步骤 5: 格式化返回结果
return {
"total": len(chunks),
"chunks": chunks,
"doc_aggs": self._aggregate_by_doc(chunks) # 按文档聚合
}
第 4 层:LLM 调用(rag/llm/chat_model.py)
# rag/llm/chat_model.py
class ChatModel(Base):
def chat_streamly(self, system, history, gen_conf):
"""流式生成"""
# 调用 LLM API(OpenAI/Ollama/DeepSeek)
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": system},
*history
],
stream=True,
**gen_conf
)
for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
4.3.3 时序图(RAG 全流程)
sequenceDiagram
autonumber
participant C as 客户端
participant CA as conversation_app::completion
participant CS as ConversationService
participant DS as DialogService
participant Chat as dialog_service::chat
participant R as Retriever
participant EM as Embedding 模型
participant ES as Elasticsearch
participant RM as Rerank 模型
participant CM as Chat 模型
participant M as MySQL
C->>CA: POST /completion<br/>{conversation_id, messages, stream=true}
CA->>CS: ConversationService.get_by_id(conversation_id)
CS->>M: SELECT * FROM conversation WHERE id=?
M-->>CS: Conversation 记录
CS-->>CA: conv 对象
CA->>DS: DialogService.get_by_id(conv.dialog_id)
DS->>M: SELECT * FROM dialog WHERE id=?
M-->>DS: Dialog 配置
DS-->>CA: dia 对象
CA->>Chat: chat(dia, messages, stream=True)
Chat->>Chat: get_models(dia)<br/>初始化 embd/rerank/chat 模型
Chat->>Chat: questions = messages[-3:]<br/>提取最近 3 轮对话
alt 多轮对话精炼
Chat->>CM: full_question(messages)<br/>LLM 精炼查询
CM-->>Chat: 精炼后的查询
end
alt 关键词提取
Chat->>CM: keyword_extraction(question)
CM-->>Chat: 关键词列表
end
Chat->>R: retriever.retrieval(question, embd_mdl, kb_ids, ...)
R->>EM: embd_mdl.encode([question])
EM-->>R: query_vector (768 维)
R->>ES: hybrid_search<br/>(vector + BM25 + RRF)
ES->>ES: KNN 向量检索<br/>cosine_similarity
ES->>ES: BM25 全文检索<br/>multi_match
ES->>ES: RRF 融合排序<br/>score = 1/(60 + rank)
ES-->>R: Top 1024 候选 chunks
alt Rerank 重排序
R->>RM: rerank_mdl.rerank(pairs)
RM-->>R: rerank_scores
R->>R: 按 score 排序<br/>取 Top-K
end
R-->>Chat: kbinfos (chunks + doc_aggs)
Chat->>Chat: kb_prompt(kbinfos, max_tokens)<br/>格式化检索结果
Chat->>Chat: system = prompt.replace("{knowledge}", knowledges)
Chat->>Chat: 构建消息列表 msg
Chat->>CM: chat_mdl.chat_streamly(system, msg, gen_conf)
CM->>CM: LLM API (OpenAI/Ollama)
loop 流式生成
CM-->>Chat: yield chunk ("RAG")
Chat-->>CA: yield {"answer": "RAG", "reference": {}}
CA-->>C: SSE data: {...}
CM-->>Chat: yield chunk ("Flow")
Chat-->>CA: yield {"answer": "Flow", "reference": {}}
CA-->>C: SSE data: {...}
end
CM-->>Chat: 流结束
Chat-->>CA: yield {"answer": "", "reference": kbinfos}
CA-->>C: SSE data: {"reference": [...]}
CA->>CS: ConversationService.update_by_id(conv.id, conv.to_dict())
CS->>M: UPDATE conversation<br/>SET message=?, reference=?
M-->>CS: OK
时序图关键要点:
-
第 1-10 步(初始化):
- 从 MySQL 加载 Conversation 和 Dialog 配置
- 初始化模型:Embedding、Rerank、Chat、TTS(从 Redis 读取租户 API Key)
- 性能指标:初始化耗时约 10-20ms
-
第 11-15 步(查询精炼):
- 多轮对话精炼:使用 LLM 将多轮对话改写为单轮完整查询
- 关键词提取:使用 LLM 提取查询关键词(补充到查询末尾)
- 可选功能:通过
prompt_config配置启用
-
第 16-23 步(混合检索):
- 向量检索:使用 Embedding 模型编码查询(768 维),ES KNN 检索(cosine 相似度)
- 全文检索:ES BM25 算法,匹配
content_with_weight和title字段 - RRF 融合:
score = 1/(60 + rank_vector) + 1/(60 + rank_bm25) - 性能指标:初筛 1024 个候选耗时约 50-100ms
-
第 24-27 步(Rerank 重排序):
- 使用 Rerank 模型(如
bge-reranker-v2-m3)重新打分 - 输入:
(query, chunk_content)对 - 输出:相关性分数(0-1)
- 取 Top-K(默认 6 个)
- 性能瓶颈:Rerank 耗时约 100-200ms(100 个候选)
- 使用 Rerank 模型(如
-
第 28-32 步(Prompt 构建):
- 格式化检索结果:
kb_prompt将 chunks 拼接为文本 - 替换占位符:
{knowledge}→ 检索结果文本 - 构建消息列表:
[system, ...history, user_question] - Token 限制:确保总 Token 数不超过 LLM
max_tokens
- 格式化检索结果:
-
第 33-42 步(流式生成):
- 调用 Chat 模型 API(
stream=True) - 逐 Token 返回生成结果(SSE 协议)
- 累积到 16 tokens 再返回(减少网络开销)
- 最后一个 chunk 包含完整
reference - 性能指标:生成 500 tokens 耗时约 5-10 秒(取决于 LLM)
- 调用 Chat 模型 API(
-
第 43-45 步(保存会话):
- 更新 Conversation 的
message和reference字段 message:追加 user 和 assistant 消息reference:追加检索结果- 数据库更新:
UPDATE conversation SET message=?, reference=?
- 更新 Conversation 的
5. 核心业务流程时序图
5.1 文档上传与解析全流程
sequenceDiagram
autonumber
participant U as 用户
participant API as Flask API
participant DocSvc as DocumentService
participant FileSvc as FileService
participant MinIO as MinIO
participant MySQL as MySQL
participant Redis as Redis Stream
participant TaskExec as TaskExecutor
participant DeepDoc as DeepDoc 解析器
participant Pipeline as Pipeline
participant ES as Elasticsearch
U->>API: POST /v1/documents/upload (file, kb_id, run=1)
API->>FileSvc: 校验文件大小与类型
FileSvc-->>API: OK
API->>MinIO: PUT 上传文件
MinIO-->>API: 返回 location
API->>MySQL: BEGIN TRANSACTION
API->>FileSvc: save(file_id, name, location, size)
FileSvc->>MySQL: INSERT INTO file
API->>DocSvc: save(doc_id, kb_id, name, location, parser_id)
DocSvc->>MySQL: INSERT INTO document
API->>FileSvc: save file2document(file_id, doc_id)
FileSvc->>MySQL: INSERT INTO file2document
API->>MySQL: COMMIT
MySQL-->>API: OK
API->>Redis: XADD 推送解析任务 (doc_id, kb_id, tenant_id)
Redis-->>API: 返回任务 ID
API-->>U: 201 Created (doc_id, progress=0.0)
Note over U,API: --- 异步分界线 ---
TaskExec->>Redis: XREADGROUP 拉取任务
Redis-->>TaskExec: 返回任务 (doc_id, kb_id, tenant_id)
TaskExec->>MinIO: GET 下载文件
MinIO-->>TaskExec: 返回文件流
TaskExec->>DeepDoc: parse(file_stream, parser_id, parser_config)
DeepDoc->>DeepDoc: 布局分析(YOLO 模型)
DeepDoc->>DeepDoc: OCR 识别文本
DeepDoc->>DeepDoc: 表格识别与结构化
DeepDoc-->>TaskExec: 返回解析结果 (sections, tables, images)
TaskExec->>Pipeline: run(doc_id, sections, kb.parser_config)
Pipeline->>Pipeline: Tokenizer 分词
Pipeline->>Pipeline: Splitter 切片 (chunk_size=128)
Pipeline->>Pipeline: Extractor 提取实体(NER)
Pipeline->>EmbedModel: batch_encode(chunks)
EmbedModel-->>Pipeline: 返回向量 (768 维)
Pipeline->>ES: bulk_index(chunks, vectors, metadata)
ES-->>Pipeline: 索引成功
Pipeline->>MySQL: UPDATE document SET progress=1.0, chunk_num=X
MySQL-->>Pipeline: OK
TaskExec->>Redis: XACK 确认任务完成
TaskExec-->>U: (WebSocket/Polling) 通知解析完成
流程要点:
- 同步阶段(API 处理):文件上传 → 数据库记录 → 推送任务 → 立即返回
- 异步阶段(TaskExecutor 处理):下载文件 → 解析 → 切片 → Embedding → 索引
- 进度更新:TaskExecutor 每 10 秒更新一次 progress(0.1 → 0.5 → 0.8 → 1.0)
- 错误处理:任何环节失败,progress 设为 -1,error_message 记录详情
说明要点:
本时序图展示文档上传到解析完成的完整异步流程:
-
同步阶段(API处理,第1-18步):
- 文件上传到MinIO
- 创建File、Document、File2Document三张表
- 推送任务到Redis Stream
- 立即返回客户端(progress=0.0)
-
异步阶段(TaskExecutor处理,第19步开始):
- 从Redis Stream拉取任务
- 调用DeepDoc解析器
- 执行Pipeline处理(分词、切片、Embedding)
- 批量索引到Elasticsearch
- 更新进度到MySQL
-
进度更新机制:
update_progress后台线程每6秒从MySQL读取任务状态- 更新Document表的progress字段(0.0 → 0.1 → 0.5 → 1.0)
- 前端通过轮询
GET /documents/{doc_id}获取进度
-
错误处理:
- 解析失败:progress设为-1,error_message记录错误
- 任务重试:重新推送到Redis Stream
- 部分成功:已解析的chunks保留,未解析的任务继续
5.2 RAG 对话查询全流程
此时序图已在第 4.3 节详细展示,此处省略重复内容。主要包括:
- Dialog 和 Conversation 配置加载
- 多轮对话精炼与关键词提取
- 混合检索(向量 + 全文 + RRF 融合)
- Rerank 重排序
- Prompt 构建与流式生成
- 会话保存
流程要点:
- 查询改写:使用 NLP 提取关键词,生成多个查询变体(同义词扩展)
- 混合检索:向量检索(语义相似度)+ 全文检索(关键词匹配),RRF 融合
- 重排序:使用 Rerank 模型重新打分,提升精准度(可选)
- LLM 缓存:相同问题直接返回缓存答案,降低成本
- 流式响应:使用 SSE 协议,前端逐 Token 渲染,提升体验
性能指标:
- 检索阶段:50-100ms(1024个候选)
- Rerank阶段:100-200ms(100个候选)
- LLM生成:5-10秒(500 tokens,取决于模型)
- 端到端延迟:6-12秒
5.3 Agent 工作流执行全流程
sequenceDiagram
autonumber
participant U as 用户
participant API as canvas_app
participant Canvas as Canvas Graph
participant Begin as Begin 组件
participant LLM1 as LLM 组件1<br/>(工具判断)
participant Tool as Tool 组件<br/>(Retrieval)
participant ES as Elasticsearch
participant LLM2 as LLM 组件2<br/>(结果生成)
participant Answer as Answer 组件
participant MySQL as MySQL
U->>API: POST /canvases/{canvas_id}/run<br/>{query, files}
API->>MySQL: SELECT canvas WHERE id=?
MySQL-->>API: 返回 Canvas DSL
API->>Canvas: Canvas(dsl, tenant_id)
Canvas->>Canvas: 解析 DSL<br/>构建 DAG 图
Canvas->>Canvas: 初始化全局变量<br/>sys.query, sys.files
Canvas->>Begin: invoke()
Note over Begin: 入口组件<br/>直接传递输入
Begin-->>Canvas: output(query=query, files=files)
Canvas->>Canvas: 查找下游组件<br/>根据 downstream 连线
Canvas->>LLM1: invoke(query=query)
LLM1->>LLM1: 构建 Prompt<br/>"判断是否需要工具..."
LLM1->>LLM1: 调用 LLM API
Note over LLM1: LLM 返回 JSON:<br/>{need_tool: true,<br/>tool_name: "retrieval",<br/>args: {...}}
LLM1-->>Canvas: output(need_tool=True,<br/>tool_name="retrieval",<br/>args={"query": "RAGFlow 部署"})
alt 需要工具调用
Canvas->>Tool: invoke(tool_name="retrieval",<br/>args={...})
Tool->>ES: retrieval(query="RAGFlow 部署",<br/>kb_ids=[...])
ES->>ES: 混合检索<br/>(向量 + BM25)
ES-->>Tool: 返回 Top-K chunks
Tool->>Tool: 格式化检索结果<br/>为文本
Tool-->>Canvas: output(tool_result="检索到 3 条结果:<br/>1. ...")
Canvas->>LLM2: invoke(query=query,<br/>tool_result=tool_result)
LLM2->>LLM2: 构建 Prompt<br/>"根据检索结果回答..."
LLM2->>LLM2: 调用 LLM API
LLM2-->>Canvas: output(need_tool=False,<br/>answer="根据检索结果...")
else 不需要工具
Canvas->>LLM2: invoke(query=query)
LLM2->>LLM2: 直接生成答案
LLM2-->>Canvas: output(answer="...")
end
Canvas->>Answer: invoke(answer=answer)
Answer->>Answer: 格式化输出<br/>(Markdown 渲染)
Answer-->>Canvas: output(final_answer=answer)
Canvas->>Canvas: 收集执行路径<br/>path = [Begin, LLM1, Tool, LLM2, Answer]
Canvas-->>API: 返回最终结果<br/>{answer, references, path}
API->>MySQL: INSERT INTO canvas_run<br/>(canvas_id, path, duration, ...)
MySQL-->>API: OK
API-->>U: 200 OK<br/>{answer, references, path}
流程要点说明:
-
DAG 图构建(第1-5步):
- 从 MySQL 加载 Canvas DSL(JSON 格式)
- 解析 DSL 构建有向无环图(DAG)
- 每个节点是一个组件(Begin/LLM/Tool/Switch/Answer 等)
- 边表示数据流向(upstream → downstream)
-
组件执行模型(第6-10步):
- 每个组件实现
invoke()方法(输入参数)和output()方法(输出结果) - Canvas 按拓扑排序顺序执行组件
- 上游组件的
output()作为下游组件的invoke()输入
- 每个组件实现
-
动态路由(第11-20步):
- LLM 组件返回结构化 JSON:
{need_tool: bool, tool_name: str, args: dict} - Canvas 根据
need_tool决定是否调用 Tool 组件 - Switch 组件支持条件分支(if-else)
- Iteration 组件支持循环执行(while)
- LLM 组件返回结构化 JSON:
-
工具调用(第13-17步):
- Tool 组件封装检索、文件读取、HTTP 请求等功能
- 检索工具调用 Elasticsearch 混合检索
- 返回格式化的文本结果供 LLM 使用
-
状态管理(第21-24步):
- Canvas 维护全局状态:
sys.query、sys.files、sys.history - 组件间共享状态,实现多步推理
- 记录执行路径(path)用于调试与可视化
- Canvas 维护全局状态:
-
执行日志(第25-27步):
- 保存执行记录到 MySQL:canvas_id、path、duration、status
- 用于统计分析与性能优化
关键设计:
- 有向无环图(DAG):组件间通过
upstream/downstream连接,保证无循环依赖 - 动态路由:LLM 组件判断是否需要工具调用,Switch 组件实现条件分支
- 状态传递:每个组件
output()的结果作为下游invoke()的输入 - 循环支持:Iteration 组件支持循环执行,直到 LLM 不再请求工具或达到最大迭代次数
性能指标:
- 单个 LLM 调用:1-3秒
- 检索调用:50-100ms
- 端到端延迟:3-10秒(取决于组件数量和 LLM 次数)
5.4 知识库构建与 GraphRAG 流程
sequenceDiagram
autonumber
participant U as 用户
participant API as kb_app
participant KBSvc as KnowledgebaseService
participant TaskSvc as TaskService
participant Redis as Redis Stream
participant TaskExec as TaskExecutor
participant GraphRAG as GraphRAG Engine
participant ES as Elasticsearch
participant MySQL as MySQL
U->>API: POST /api/v1/datasets/{kb_id}/chunk_method<br/>{chunk_method: "knowledge_graph"}
API->>KBSvc: get_by_id(kb_id)
KBSvc->>MySQL: SELECT * FROM knowledgebase WHERE id=?
MySQL-->>KBSvc: 返回 KB
KBSvc-->>API: kb 对象
API->>API: 校验权限<br/>check_kb_team_permission
API->>KBSvc: update_by_id(kb_id,<br/>{parser_id: "knowledge_graph"})
KBSvc->>MySQL: UPDATE knowledgebase<br/>SET parser_id=?, parser_config=?
MySQL-->>KBSvc: OK
API->>TaskSvc: queue_raptor_o_graphrag_tasks<br/>(doc, "graphrag", priority=0)
TaskSvc->>MySQL: INSERT INTO task<br/>(id, doc_id, task_type="graphrag", ...)
MySQL-->>TaskSvc: OK
TaskSvc->>Redis: XADD ragflow_svr<br/>{task_type: "graphrag", doc_ids: [...]}
Redis-->>TaskSvc: task_id
TaskSvc-->>API: 返回 task_id
API-->>U: 202 Accepted<br/>{task_id, status: "queued"}
Note over U,API: --- 异步分界线 ---
TaskExec->>Redis: XREADGROUP ragflow_group<br/>BLOCK 5000 COUNT 1
Redis-->>TaskExec: 返回 graphrag 任务
TaskExec->>ES: 查询所有 chunks<br/>SELECT * FROM {kb_id}
ES-->>TaskExec: 返回 chunks 列表
TaskExec->>GraphRAG: build_graph(chunks, kb_config)
GraphRAG->>GraphRAG: 1. 实体识别<br/>使用 NER 模型提取实体
GraphRAG->>GraphRAG: 2. 关系抽取<br/>使用 LLM 抽取三元组<br/>(subject, predicate, object)
GraphRAG->>GraphRAG: 3. 实体消歧<br/>合并同义实体
GraphRAG->>GraphRAG: 4. 社区检测<br/>Leiden 算法聚类
GraphRAG->>GraphRAG: 5. 社区摘要<br/>使用 LLM 生成摘要
GraphRAG-->>TaskExec: 返回知识图谱<br/>{entities, relations, communities}
TaskExec->>ES: bulk_index(entities + relations)
ES-->>TaskExec: 索引成功
TaskExec->>MySQL: UPDATE task<br/>SET progress=1.0, status='done'
MySQL-->>TaskExec: OK
TaskExec->>Redis: XACK ragflow_svr<br/>ragflow_group {task_id}
Redis-->>TaskExec: OK
TaskExec-->>U: (WebSocket/Polling)<br/>通知 GraphRAG 构建完成
流程要点:
-
知识库配置更新(第1-7步):
- 用户选择
chunk_method = "knowledge_graph" - 更新知识库的
parser_id和parser_config - 触发 GraphRAG 构建任务
- 用户选择
-
任务推送(第8-12步):
- 创建
task_type="graphrag"任务 - 推送到 Redis Stream
- 立即返回
202 Accepted(异步处理)
- 创建
-
GraphRAG 构建(第13-22步):
- 实体识别:使用 NER 模型(如 SpaCy)提取命名实体
- 关系抽取:使用 LLM 提取三元组(主语、谓语、宾语)
- 实体消歧:合并同义实体(如"美国"和"USA")
- 社区检测:使用 Leiden 算法聚类相关实体
- 社区摘要:为每个社区生成摘要(供检索使用)
-
索引更新(第23-24步):
- 将实体和关系批量索引到 Elasticsearch
- 支持实体检索和关系查询
-
进度更新(第25-28步):
- 更新任务进度到 MySQL
- 确认任务完成(XACK)
- 通知前端
性能指标:
- 1000个 chunks 的知识图谱构建:10-30分钟
- 实体识别:~5秒/100 chunks
- 关系抽取:~20秒/100 chunks(调用 LLM)
- 社区检测:~10秒/1000 entities
- 端到端延迟:取决于文档数量和 LLM 速度
6. 核心数据结构详细说明
6.1 Parser Config 结构
# naive 方法(通用切片)
{
"chunk_token_count": 128, # 切片 Token 数量
"delimiter": "\\n!?;。;!?", # 分隔符
"layout_recognize": True, # 是否识别布局
"task_page_size": 12 # 每个任务处理页数
}
# qa 方法(问答对)
{
"raptor": {
"use_raptor": False, # 是否使用 RAPTOR
"prompt": "...", # RAPTOR 摘要 Prompt
"max_token": 256,
"threshold": 0.1,
"max_cluster": 64
}
}
# table 方法(表格识别)
{
"chunk_token_count": 128,
"html4excel": True # Excel 转 HTML
}
# book 方法(书籍章节)
{
"chunk_token_count": 512, # 章节级切片
"delimiter": "\\n\\n", # 段落分隔
"toc": { # 目录识别
"use_toc": True,
"toc_prompt": "识别目录..."
}
}
# knowledge_graph 方法(知识图谱)
{
"entity_types": ["PERSON", "ORG", "LOC", "DATE"], # 实体类型
"relation_prompt": "提取三元组...", # 关系抽取 Prompt
"community_algorithm": "leiden", # 社区检测算法
"summary_prompt": "生成社区摘要..." # 摘要 Prompt
}
6.2 LLM Setting 结构
{
"model_name": "gpt-4", # 模型名称(必填)
"temperature": 0.7, # 温度参数(0-1)
"top_p": 0.9, # Nucleus 采样
"presence_penalty": 0.0, # 存在惩罚(-2 到 2)
"frequency_penalty": 0.0, # 频率惩罚(-2 到 2)
"max_tokens": 512 # 最大 Token 数
}
6.3 Prompt Config 结构
{
"system": "你是一个智能助手...\n知识库:\n{knowledge}", # System Prompt(必填)
"prologue": "你好!我是你的助手,有什么可以帮您?", # 开场白
"parameters": [ # 变量列表
{
"key": "knowledge", # 变量名
"optional": False # 是否可选
}
],
"quote": True, # 是否显示引用
"empty_response": "抱歉,未找到相关内容!", # 空结果回复
"tts": False, # 是否启用 TTS
"refine_multiturn": True # 是否精炼多轮对话
}
7. 最佳实践与使用建议
7.1 API 认证最佳实践
方案选择:
- Web UI:使用 Session 认证(Cookie),支持 Remember Me
- SDK/API:使用 Token 认证(Authorization Header),无状态
- 第三方集成:使用 API Key,定期轮换
Token 管理:
# 1. 获取 Token(登录后)
response = requests.post("http://ragflow/v1/user/login", json={
"email": "user@example.com",
"password": encrypt_password("password")
})
token = response.json()["data"]["access_token"]
# 2. 使用 Token 调用 API
headers = {"Authorization": f"ragflow-{token}"}
response = requests.post("http://ragflow/api/v1/datasets", headers=headers, json={...})
# 3. Token 刷新(重新登录)
if response.status_code == 401:
# Token 过期,重新登录
login()
7.2 文档上传批量处理
场景:批量上传 100+ 文档
from concurrent.futures import ThreadPoolExecutor
import os
def upload_file(file_path, kb_id, access_token):
"""上传单个文件"""
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'kb_id': kb_id, 'run': '1'}
headers = {'Authorization': f'ragflow-{access_token}'}
response = requests.post(
'http://ragflow/v1/documents/upload',
files=files,
data=data,
headers=headers
)
return response.json()
# 并发上传(限制并发数避免服务器过载)
files = ['doc1.pdf', 'doc2.docx', ...]
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(lambda f: upload_file(f, kb_id, token), files)
for result in results:
print(f"Uploaded: {result['data']['doc_id']}")
注意:
- 限制并发数(建议 <= 4),避免触发限流
- 大文件(> 100MB)使用分块上传(需自行实现)
- 上传后轮询
/documents/{doc_id}查询解析进度
7.3 对话流式响应最佳实践
前端实现(JavaScript):
const eventSource = new EventSource(
`http://ragflow/api/v1/chats/${chatId}/completions?stream=true`,
{
headers: {
'Authorization': `ragflow-${accessToken}`,
'Content-Type': 'application/json'
},
method: 'POST',
body: JSON.stringify({
conversation_id: convId,
question: userQuestion
})
}
);
let fullAnswer = '';
let reference = [];
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
fullAnswer += data.answer || '';
reference = data.reference || reference;
// 更新 UI(逐字渲染)
document.getElementById('answer').innerText = fullAnswer;
if (data.reference && data.reference.length > 0) {
// 渲染引用来源
renderReferences(data.reference);
}
};
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
};
7.4 知识库配置调优
场景:优化检索精度与速度
参数调优:
| 参数 | 默认值 | 推荐范围 | 说明 |
|---|---|---|---|
| similarity_threshold | 0.2 | 0.1-0.4 | 阈值越高,结果越精准但可能遗漏 |
| vector_similarity_weight | 0.7 | 0.5-0.9 | 向量权重,技术文档建议 0.7,新闻类建议 0.5 |
| top_n | 6 | 3-10 | 返回结果数,影响 LLM 上下文长度 |
| rerank_model | "" | bge-reranker-v2-m3 | 启用 Rerank 提升 10-20% 精度 |
| chunk_token_count | 128 | 64-512 | 切片大小,长文档建议 256,短问答建议 64 |
实战经验:
- 技术文档:chunk_size=256, vector_weight=0.7, rerank=True
- 客服问答:chunk_size=64, vector_weight=0.5, top_n=3
- 法律文本:chunk_size=512, vector_weight=0.8, rerank=True, parser_id=laws
8. 故障排查与监控
8.1 常见错误码
| 错误码 | 含义 | 排查方向 |
|---|---|---|
| 100 | 认证失败 | 检查 Token 是否过期/正确 |
| 101 | 权限不足 | 检查资源是否属于当前租户 |
| 102 | 参数错误 | 检查请求体是否符合 API 规范 |
| 201 | 数据不存在 | 检查资源 ID 是否正确 |
| 202 | 数据重复 | 检查名称是否重复 |
| 500 | 服务器错误 | 查看后端日志 |
8.2 日志查询
# 查看 API 日志
docker logs -f ragflow-server | grep ERROR
# 查看任务执行日志
docker logs -f ragflow-task-executor | grep "doc_id=<doc_uuid>"
# 查看 MySQL 慢查询
docker exec -it ragflow-mysql mysql -e "SELECT * FROM mysql.slow_log LIMIT 10;"
# 查看 Redis 队列积压
docker exec -it ragflow-redis redis-cli XINFO STREAM ragflow_svr
8.3 性能监控指标
关键指标:
- API QPS:
requests_per_second> 100 - 平均延迟:
avg_response_time< 500ms - 错误率:
error_rate< 1% - 数据库连接数:
mysql_connections< 32(连接池上限) - Redis 内存:
redis_memory_used< 8GB - ES 查询延迟:
es_query_latency_p95< 300ms
告警规则:
- API 5xx 错误率 > 1%,持续 5 分钟
- Task 队列积压 > 1000,持续 10 分钟
- MySQL 连接数 > 30,持续 5 分钟
- Redis 内存使用 > 90%