FastGPT-00-总览

0. 摘要

项目目标与能力边界

FastGPT 是一个基于 LLM(大语言模型)的 AI Agent 构建平台,旨在为开发者和企业提供开箱即用的数据处理、模型调用等能力。通过可视化的 Flow 编排界面,用户可以灵活组合各种节点,实现复杂的 AI 应用场景。

核心能力

  • 工作流可视化编排:通过拖拽节点构建复杂的 AI 工作流
  • 知识库管理(RAG):支持多种文档格式导入、向量检索、混合检索、重排序
  • 对话管理:完整的对话历史记录、变量传递、上下文管理
  • 多模型接入:支持接入 OpenAI、国内模型等多种 LLM 和嵌入模型
  • 工具调用(Agent):支持 HTTP 工具、MCP 工具、自定义工具等
  • 插件系统:支持第三方插件开发和集成
  • 多租户权限:团队管理、协作者权限、资源隔离
  • 使用计费:基于 Token 和积分的灵活计费体系

非目标

  • 不提供模型训练能力(仅调用已有模型)
  • 不是通用的工作流引擎(专注于 AI 场景)

运行环境与部署形态

技术栈

  • 前端:Next.js 14、React 18、TypeScript、Chakra UI
  • 后端:Next.js API Routes、Node.js ≥20
  • 数据库
    • MongoDB(业务数据存储)
    • PostgreSQL + PG Vector 或 Milvus(向量数据库)
  • 缓存:Redis
  • 对象存储:S3 兼容存储
  • 消息队列:BullMQ(基于 Redis)

部署形态

  • 单体应用:projects/app 作为主应用,包含前端和 API
  • 微服务(可选)
    • Sandbox 服务(projects/sandbox):独立的代码执行沙箱,基于 NestJS
    • MCP 服务器(projects/mcp_server):Model Context Protocol 实现
  • Monorepo 架构:使用 pnpm workspace 管理多个包
  • 容器化部署:支持 Docker Compose 和 Kubernetes(Helm Charts)

1. 整体架构图

flowchart TB
    subgraph Client["客户端层"]
        WebUI[Web界面<br/>Next.js SSR]
        API[API调用<br/>OpenAPI/SDK]
    end
    
    subgraph AppLayer["应用层 (projects/app)"]
        NextPages[Next.js Pages]
        APIRoutes[API Routes]
        PageComponents[页面组件]
    end
    
    subgraph ServiceLayer["服务层 (packages/service)"]
        CoreService[核心服务<br/>AI/App/Chat/Dataset/Workflow]
        CommonService[通用服务<br/>Mongo/Redis/S3/VectorDB]
        SupportService[支持服务<br/>User/Permission/Wallet]
        Worker[后台任务<br/>Worker]
    end
    
    subgraph GlobalLayer["全局层 (packages/global)"]
        CoreTypes[核心类型定义<br/>AI/App/Chat/Dataset/Workflow]
        CommonUtils[通用工具<br/>Error/File/String/Time]
        OpenAPITypes[OpenAPI类型]
    end
    
    subgraph WebLayer["Web组件层 (packages/web)"]
        Components[通用UI组件]
        Hooks[React Hooks]
        I18N[国际化]
    end
    
    subgraph External["外部服务"]
        Sandbox[沙箱服务<br/>NestJS]
        MCPServer[MCP服务器<br/>Model Context Protocol]
        Plugins[插件系统<br/>LLM/OCR/TTS/STT]
    end
    
    subgraph Storage["存储层"]
        MongoDB[(MongoDB<br/>业务数据)]
        VectorDB[(向量数据库<br/>PG/Milvus)]
        Redis[(Redis<br/>缓存/队列)]
        S3[(对象存储<br/>S3)]
    end
    
    subgraph ThirdParty["第三方服务"]
        LLM[LLM服务<br/>OpenAI/国内模型]
        Embedding[嵌入模型]
        Rerank[重排模型]
    end
    
    WebUI --> NextPages
    API --> APIRoutes
    NextPages --> PageComponents
    APIRoutes --> CoreService
    PageComponents --> Components
    
    CoreService --> CommonService
    CoreService --> SupportService
    CoreService --> GlobalLayer
    CommonService --> Storage
    CoreService --> Worker
    
    Worker --> Storage
    
    APIRoutes --> Sandbox
    CoreService --> MCPServer
    CoreService --> Plugins
    
    CoreService --> ThirdParty
    
    Components --> Hooks
    Components --> I18N
    
    style Client fill:#e1f5ff
    style AppLayer fill:#fff4e1
    style ServiceLayer fill:#e8f5e9
    style GlobalLayer fill:#f3e5f5
    style WebLayer fill:#fce4ec
    style External fill:#fff3e0
    style Storage fill:#e0f2f1
    style ThirdParty fill:#f1f8e9

架构说明

1. 分层职责

客户端层

  • Web UI:基于 Next.js 的 SSR 应用,提供可视化的工作流编辑器、知识库管理、对话界面等
  • API 调用:支持通过 OpenAPI 标准接口或 SDK 直接调用

应用层(projects/app)

  • Next.js Pages:页面路由和 SSR 渲染
  • API Routes:RESTful API 端点实现
  • 职责:请求路由、身份验证、参数验证、响应格式化

服务层(packages/service)

  • 核心服务:AI 模型调用、应用管理、聊天会话、知识库、工作流引擎
  • 通用服务:数据库连接、缓存管理、文件存储、向量检索
  • 支持服务:用户管理、权限控制、计费系统
  • 后台任务:异步任务处理(文件解析、向量化、Token 统计等)
  • 职责:业务逻辑实现、数据访问、外部服务调用

全局层(packages/global)

  • 核心类型定义:所有业务模型的 TypeScript 类型和接口
  • 通用工具:错误处理、文件工具、字符串处理、时间工具
  • 常量定义:枚举、配置常量等
  • 职责:类型安全、代码复用、跨层共享

Web 组件层(packages/web)

  • 通用 UI 组件:按钮、表单、模态框、图标等
  • React Hooks:自定义 Hooks
  • 国际化:多语言支持
  • 职责:UI 复用、交互逻辑封装

2. 数据流与控制流

同步调用流程

客户端请求 → API Routes → 身份验证 → 参数验证 → Service层业务逻辑 → 数据库/外部服务 → 响应返回

异步任务流程

API Routes → 创建任务 → BullMQ队列 → Worker消费 → 执行任务 → 更新状态 → MongoDB

工作流执行流程

工作流触发 → 解析节点和边 → 构建执行队列 → 按依赖顺序执行节点 → 变量传递 → 结果汇总

3. 跨进程通信

  • 主应用 ↔ Sandbox:HTTP API(POST 请求)
  • 主应用 ↔ MCP Server:HTTP + SSE(双向流式通信)
  • 主应用 ↔ Plugins:HTTP API 或 gRPC
  • Service ↔ Worker:BullMQ 消息队列
  • Service ↔ 第三方 LLM:HTTP/HTTPS(支持流式响应)

4. 状态管理

  • 服务端状态:存储在 MongoDB(持久化)和 Redis(缓存)
  • 客户端状态:React State、Zustand(全局状态)
  • 会话状态:变量存储在 Chat 文档中,支持跨轮对话
  • 工作流运行时状态:内存中的节点队列和变量池

5. 高可用与扩展性

  • 水平扩展:应用层无状态,可启动多个实例
  • 数据库扩展:MongoDB 支持分片,向量数据库支持集群
  • 缓存层:Redis 提供缓存和会话共享
  • 异步解耦:通过消息队列实现任务异步化
  • 限流与熔断:支持 API 限流、并发控制

2. 全局时序图(主要业务闭环)

2.1 对话流程(Chat Completion)

sequenceDiagram
    autonumber
    participant C as 客户端
    participant API as API Routes<br/>/v2/chat/completions
    participant Auth as 身份验证
    participant ChatS as Chat Service
    participant WF as Workflow Engine
    participant Node as 工作流节点<br/>(AI/Dataset/Agent)
    participant LLM as LLM服务
    participant VDB as 向量数据库
    participant Mongo as MongoDB
    
    C->>API: POST 对话请求<br/>{messages, stream}
    API->>Auth: 验证 API Key / Token
    Auth-->>API: 用户信息 {teamId, tmbId}
    
    API->>ChatS: initChat(appId, chatId)
    ChatS->>Mongo: 查询应用配置和历史
    Mongo-->>ChatS: 返回配置和变量
    ChatS-->>API: 初始化结果
    
    API->>WF: dispatchWorkflow(nodes, edges)
    
    Note over WF: 构建执行队列<br/>按依赖关系排序节点
    
    loop 执行每个节点
        WF->>Node: dispatch(nodeType, inputs)
        
        alt 数据集检索节点
            Node->>VDB: 向量检索(query, topK)
            VDB-->>Node: 返回相关文档片段
        else AI对话节点
            Node->>LLM: completions(messages, model)
            LLM-->>Node: 流式返回生成内容
        else Agent工具调用
            Node->>LLM: completions(messages, tools)
            LLM-->>Node: tool_calls
            Node->>Node: 执行工具节点
            Node->>LLM: completions(messages + tool_results)
            LLM-->>Node: 最终回答
        end
        
        Node-->>WF: 节点输出结果
        Note over WF: 更新变量池<br/>触发下游节点
    end
    
    WF-->>API: 工作流执行结果<br/>{flowResponses, usages}
    
    API->>ChatS: saveChat(chatId, userContent, aiContent)
    ChatS->>Mongo: 保存对话记录
    Mongo-->>ChatS: 保存成功
    
    API-->>C: SSE流式响应<br/>JSON响应

时序图说明

入口与鉴权

  1. 请求验证:客户端发送对话请求,携带 Authorization header(Bearer token 或 API Key)
  2. 身份验证:验证 token 有效性,获取用户所属团队(teamId)和成员(tmbId)
  3. 权限检查:验证用户对应用的访问权限(读取、执行)

对话初始化

  1. 查询配置:从 MongoDB 加载应用的工作流配置(nodes、edges)
  2. 历史加载:获取对话历史和上下文变量
  3. 变量初始化:设置系统变量(cTime、appId 等)和用户变量

工作流执行

  1. 构建队列:分析节点依赖关系,确定执行顺序
  2. 节点调度:根据节点类型(FlowNodeTypeEnum)调用对应的 dispatch 函数
  3. 并发控制:同一团队下限制并发执行的节点数量
  4. 变量传递:节点输出自动注入到下游节点的输入

节点类型执行逻辑

数据集检索节点(datasetSearchNode)

  • 查询扩展(可选):使用 LLM 生成多个查询变体
  • 向量检索:调用 VectorDB 的相似度搜索
  • 全文检索(可选):使用 Jieba 分词后的关键词匹配
  • 混合检索:RRF(Reciprocal Rank Fusion)融合向量和全文结果
  • 重排序(可选):使用 Rerank 模型优化排序
  • Token 过滤:根据 maxTokens 限制截断

AI 对话节点(chatNode)

  • 构建 messages:system prompt + 历史 + 用户输入 + 知识库引用
  • 调用 LLM:支持流式和非流式
  • 流式响应:通过 SSE 实时推送到客户端

Agent 工具调用节点(agent)

  • 第一轮:LLM 判断是否需要调用工具,返回 tool_calls
  • 工具执行:运行对应的工具节点(HTTP、插件、MCP)
  • 第二轮:将工具执行结果作为 tool 消息追加到 messages,再次调用 LLM
  • 递归调用:最多执行 maxRunToolTimes 次

响应与保存

  1. 汇总结果:收集所有节点的输出、Token 消耗、耗时
  2. 保存对话:将用户消息和 AI 回复存入 MongoChatItem
  3. 计费记录:创建 Usage 记录,扣减团队积分
  4. 流式推送:通过 SSE 将结果实时返回客户端

幂等与重试

  • 幂等性:chatId + dataId 保证消息不重复
  • 重试策略:LLM 调用失败时自动重试(最多 3 次),指数退避
  • 超时控制:每个节点有独立的超时时间(默认 60s),整个工作流超时 300s
  • 失败回退:节点执行失败时记录错误,继续执行其他节点(除非是关键节点)

资源上界

  • 并发限制:每个团队同时执行的节点数上限(默认 10)
  • Token 上界:单次对话的 Token 总量限制(知识库、LLM 输入输出)
  • 工作流深度:嵌套调用深度不超过 20 层
  • 历史长度:对话历史最多保留 30 轮(可配置)

2.2 知识库导入与检索流程

sequenceDiagram
    autonumber
    participant U as 用户
    participant API as API Routes
    participant FileS as File Service
    participant S3 as 对象存储
    participant Worker as Worker进程
    participant Parser as 文件解析器
    participant Embed as 嵌入模型
    participant VDB as 向量数据库
    participant Mongo as MongoDB
    
    U->>API: 上传文件<br/>POST /dataset/collection/create
    API->>FileS: 处理上传文件
    FileS->>S3: 存储原始文件
    S3-->>FileS: 返回fileId
    
    FileS->>Mongo: 创建Collection记录<br/>{name, fileId, status: waiting}
    Mongo-->>FileS: 返回collectionId
    FileS->>Worker: 发送文件解析任务<br/>BullMQ
    
    API-->>U: 返回collectionId<br/>状态: waiting
    
    Worker->>S3: 下载文件
    S3-->>Worker: 文件流
    Worker->>Parser: 解析文件内容<br/>(PDF/DOCX/TXT等)
    Parser-->>Worker: 提取文本和结构
    
    Note over Worker: 文本分块<br/>RecursiveCharacterTextSplitter
    
    loop 每个文本块
        Worker->>Embed: 生成向量<br/>embedding(text)
        Embed-->>Worker: vector[1536]
        Worker->>VDB: 插入向量<br/>{id, vector, metadata}
        Worker->>Mongo: 插入DatasetData<br/>{q, a, vector}
    end
    
    Worker->>Mongo: 更新Collection状态<br/>status: active
    
    Note over U: 用户发起检索
    
    U->>API: 搜索知识库<br/>POST /dataset/searchTest
    API->>Embed: 生成查询向量<br/>embedding(query)
    Embed-->>API: queryVector[1536]
    
    API->>VDB: 向量检索<br/>search(queryVector, topK)
    VDB-->>API: 返回相似文档<br/>[{id, score, metadata}]
    
    API->>Mongo: 查询完整数据<br/>DatasetData.find({_id: {$in: ids}})
    Mongo-->>API: 返回文档内容<br/>{q, a, source}
    
    API-->>U: 返回检索结果<br/>[{score, q, a}]

时序图说明

文件上传阶段

  1. 上传请求:用户通过 API 上传文件,支持多种格式(PDF、DOCX、TXT、Markdown、CSV、Excel 等)
  2. 文件存储:原始文件先存储到 S3 或 MinIO,生成唯一的 fileId
  3. 创建 Collection:在 MongoDB 中创建 DatasetCollection 文档,状态为 waiting
  4. 任务入队:将文件解析任务发送到 BullMQ 队列,立即返回给用户

异步解析与向量化

  1. Worker 消费:后台 Worker 进程从队列中获取任务
  2. 文件下载:从 S3 下载文件到本地临时目录
  3. 内容解析
    • PDF:使用 pdf-parse 或调用 pdf-marker/mineru 插件(支持图表提取)
    • DOCX:使用 mammoth.js 提取文本和结构
    • Excel/CSV:按行解析,支持 Q&A 格式
    • 网页:使用 cheerio 解析 HTML,提取正文
  4. 文本分块:使用 RecursiveCharacterTextSplitter 按规则切分(默认 512 tokens,重叠 50)
  5. 向量生成:批量调用嵌入模型(如 text-embedding-ada-002)生成向量
  6. 存储
    • 向量存储到 VectorDB(PG Vector 或 Milvus)
    • 元数据和文本存储到 MongoDB(DatasetData 集合)
  7. 状态更新:Collection 状态更新为 active

检索阶段

  1. 查询向量化:用户查询文本通过同一嵌入模型生成 queryVector
  2. 向量检索:VectorDB 执行 ANN(近似最近邻)搜索,返回 topK 个最相似的文档 ID 和相似度分数
  3. 数据补全:根据 ID 从 MongoDB 查询完整的文档内容(q、a、source 等)
  4. 结果返回:返回排序后的检索结果,包含相似度分数和来源信息

边界条件

  • 文件大小限制:默认最大 100MB,可通过配置调整
  • 解析超时:单个文件解析超时 10 分钟,超时则标记为失败
  • 重试机制:解析失败自动重试 3 次,仍失败则记录错误日志
  • 并发控制:Worker 并发处理任务数限制为 CPU 核心数
  • 相似度阈值:默认相似度 < 0.3 的结果会被过滤(可配置)

异常与回退

  • 解析失败:Collection 状态标记为 error,记录错误信息,用户可重新上传
  • 向量化失败:部分数据块失败时,跳过该块,继续处理其他块
  • 存储失败:VectorDB 或 MongoDB 写入失败时,整个任务标记为失败,需重新执行

3. 模块边界与交互图

3.1 模块清单

模块路径 模块名称 职责 对外接口
packages/global 全局通用包 类型定义、常量、工具函数 TypeScript 模块导出
packages/service 服务层 业务逻辑、数据访问 函数导出
packages/web Web 组件 UI 组件、Hooks React 组件导出
projects/app 主应用 API Routes、页面 HTTP API、Web 页面
projects/sandbox 沙箱服务 代码执行隔离 HTTP API (NestJS)
projects/mcp_server MCP 服务器 Model Context Protocol HTTP + SSE
plugins/model 模型插件 LLM/嵌入/重排/OCR/TTS/STT HTTP API 或本地调用
plugins/webcrawler 网络爬虫 网页内容抓取 HTTP API

3.2 模块交互矩阵

调用方 → 被调方 packages/global packages/service packages/web projects/app sandbox mcp_server plugins
projects/app 导入类型和工具 导入业务逻辑 导入组件 - HTTP 同步 HTTP + SSE HTTP 同步
packages/service 导入类型和工具 内部调用 - - HTTP 同步 - HTTP 同步
packages/web 导入类型和工具 - 内部调用 - - - -
sandbox - - - 被调用 - - -
mcp_server - - - 被调用 - - -
plugins - - - 被调用 - - -

3.3 数据一致性要求

交互 同步/异步 错误语义 一致性要求
API → Service 同步 异常抛出 强一致
Service → MongoDB 同步 事务回滚 强一致(事务)
Service → VectorDB 同步 失败重试 最终一致
Service → Worker 异步(队列) 任务重试 最终一致
Service → LLM 同步(流式) 失败重试 弱一致(幂等)
App → Sandbox 同步 HTTP 超时返回错误 强一致
App → MCP 双向流式 连接断开重连 最终一致

4. 关键设计与权衡

4.1 数据一致性

MongoDB 事务

  • 对话保存(Chat + ChatItem)使用事务保证原子性
  • Collection 和 DatasetData 的创建使用事务
  • 计费记录(Usage)和积分扣减使用事务

最终一致性场景

  • 向量数据库与 MongoDB 之间:允许短暂不一致,通过后台任务同步
  • 缓存(Redis)与数据库:缓存过期后重新加载,容忍短暂脏读
  • Worker 任务:失败自动重试,保证最终执行成功

4.2 并发与锁策略

乐观锁

  • 使用版本号(version)或 ETag 实现并发控制
  • 应用于工作流配置更新、知识库数据修改

分布式锁

  • 使用 Redis 实现分布式锁
  • 场景:防止重复创建资源、限流

队列并发控制

  • BullMQ 的 concurrency 配置限制并发 Worker 数量
  • 工作流执行时,团队维度限制并发节点数

4.3 性能关键路径

P95 延迟目标

  • 简单对话(不含知识库):< 2s
  • 知识库检索 + 对话:< 5s
  • Agent 工具调用(1 轮):< 8s

性能优化点

  1. 缓存策略

    • 应用配置缓存(Redis,TTL 5min)
    • 模型配置缓存(Redis,TTL 10min)
    • 用户权限缓存(Redis,TTL 1min)
  2. 数据库索引

    • MongoDB:chatId、appId、teamId、tmbId 建立复合索引
    • VectorDB:向量索引(HNSW 或 IVF)
  3. 异步化

    • 文件解析和向量化完全异步
    • 对话日志推送异步
    • Token 统计异步
  4. 流式响应

    • LLM 响应采用 SSE 流式传输
    • 工作流节点支持流式输出
  5. 批处理

    • 向量生成批量调用(batch size 50)
    • MongoDB 批量插入

内存峰值

  • 单个工作流执行内存上限:500MB
  • Worker 进程内存上限:2GB
  • 文件解析内存上限:100MB/文件

I/O 热点

  • MongoDB 连接池:50 连接
  • VectorDB 连接池:20 连接
  • Redis 连接池:100 连接
  • LLM 并发请求数:团队维度限制(默认 5)

4.4 可观测性

日志

  • 使用 @fastgpt/service/common/system/log 统一日志
  • 级别:debug、info、warn、error
  • 结构化日志:包含 traceId、teamId、appId、chatId

监控指标

  • API 请求:QPS、P95、错误率
  • 工作流:执行次数、平均耗时、节点失败率
  • 数据库:连接数、慢查询、缓存命中率
  • 队列:任务积压数、处理速率、重试次数
  • LLM:Token 消耗、平均延迟、错误率

Trace

  • 支持 OpenTelemetry
  • 追踪整个工作流执行链路

4.5 配置项

环境变量(核心)

# MongoDB
MONGODB_URI=mongodb://localhost:27017/fastgpt

# PostgreSQL (向量数据库)
PG_URL=postgresql://user:pass@localhost:5432/postgres

# Redis
REDIS_URL=redis://localhost:6379

# S3对象存储
S3_ENDPOINT=http://localhost:9000
S3_ACCESS_KEY=minioadmin
S3_SECRET_KEY=minioadmin
S3_BUCKET_NAME=fastgpt

# OneAPI (多模型代理)
ONEAPI_URL=http://localhost:3000/v1
ONEAPI_KEY=sk-xxx

# 商业版(可选)
FASTGPT_PRO_URL=
FASTGPT_PRO_KEY=

# 沙箱服务(可选)
SANDBOX_URL=http://localhost:3001

# MCP服务器(可选)
MCP_SERVER_URL=http://localhost:3002

应用配置(data/config.json)

  • 模型配置:可用的 LLM、嵌入模型、重排模型
  • 系统配置:文件大小限制、Token 限制、并发限制
  • 功能开关:是否启用知识库、是否启用插件等

5. 典型使用示例与最佳实践

5.1 最小可运行示例

场景:创建一个简单的 AI 对话应用

// 1. 通过API创建应用
const createAppResponse = await fetch('/api/core/app/create', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    name: '我的第一个应用',
    type: 'simple',
    avatar: '/icon/logo.svg'
  })
});

const { appId } = await createAppResponse.json();

// 2. 配置工作流(最简单的AI对话)
await fetch(`/api/core/app/update`, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    appId,
    nodes: [
      {
        nodeId: 'workflowStartNodeId',
        flowNodeType: 'workflowStart',
        position: { x: 300, y: 300 },
        inputs: [
          {
            key: 'userChatInput',
            renderTypeList: ['reference'],
            valueType: 'string',
            label: '用户问题',
            required: true
          }
        ]
      },
      {
        nodeId: 'aiChatNodeId',
        flowNodeType: 'chatNode',
        position: { x: 800, y: 300 },
        inputs: [
          {
            key: 'model',
            value: 'gpt-3.5-turbo',
            valueType: 'string'
          },
          {
            key: 'systemPrompt',
            value: '你是一个友好的AI助手',
            valueType: 'string'
          },
          {
            key: 'userChatInput',
            value: ['workflowStartNodeId', 'userChatInput'],
            valueType: 'string'
          }
        ]
      }
    ],
    edges: [
      {
        source: 'workflowStartNodeId',
        target: 'aiChatNodeId',
        sourceHandle: 'workflowStartNodeId-source-right',
        targetHandle: 'aiChatNodeId-target-left'
      }
    ]
  })
});

// 3. 发起对话
const chatResponse = await fetch('/api/v2/chat/completions', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    chatId: 'unique_chat_id', // 使用nanoid()生成
    appId: appId,
    stream: true,
    messages: [
      {
        role: 'user',
        content: '你好,介绍一下自己'
      }
    ]
  })
});

// 处理SSE流式响应
const reader = chatResponse.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  
  const chunk = decoder.decode(value);
  const lines = chunk.split('\n').filter(line => line.trim());
  
  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = JSON.parse(line.slice(6));
      console.log(data.choices[0].delta.content);
    }
  }
}

5.2 知识库 + RAG 示例

场景:创建带知识库的问答应用

// 1. 创建知识库
const createDatasetResponse = await fetch('/api/core/dataset/create', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    name: '产品文档知识库',
    type: 'dataset',
    avatar: '/icon/logo.svg',
    vectorModel: 'text-embedding-ada-002'
  })
});

const { datasetId } = await createDatasetResponse.json();

// 2. 上传文档
const formData = new FormData();
formData.append('file', file); // File对象
formData.append('datasetId', datasetId);
formData.append('mode', 'auto'); // 自动分块

await fetch('/api/core/dataset/collection/create', {
  method: 'POST',
  headers: {
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: formData
});

// 3. 配置工作流(知识库检索 + AI对话)
await fetch(`/api/core/app/update`, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    appId,
    nodes: [
      {
        nodeId: 'workflowStart',
        flowNodeType: 'workflowStart',
        // ... 省略其他配置
      },
      {
        nodeId: 'datasetSearch',
        flowNodeType: 'datasetSearchNode',
        inputs: [
          {
            key: 'datasetIds',
            value: [datasetId],
            valueType: 'selectDataset'
          },
          {
            key: 'query',
            value: ['workflowStart', 'userChatInput'],
            valueType: 'string'
          },
          {
            key: 'similarity',
            value: 0.5, // 相似度阈值
            valueType: 'number'
          },
          {
            key: 'limit',
            value: 5000, // 最大Token
            valueType: 'number'
          },
          {
            key: 'searchMode',
            value: 'embedding', // 向量检索
            valueType: 'string'
          }
        ]
      },
      {
        nodeId: 'aiChat',
        flowNodeType: 'chatNode',
        inputs: [
          {
            key: 'model',
            value: 'gpt-4',
            valueType: 'string'
          },
          {
            key: 'systemPrompt',
            value: '你是一个专业的客服,请根据知识库内容回答用户问题。如果知识库中没有相关信息,请诚实告知。',
            valueType: 'string'
          },
          {
            key: 'userChatInput',
            value: ['workflowStart', 'userChatInput'],
            valueType: 'string'
          },
          {
            key: 'quoteQA', // 引用知识库
            value: ['datasetSearch', 'quoteQA'],
            valueType: 'datasetQuote'
          }
        ]
      }
    ],
    edges: [
      {
        source: 'workflowStart',
        target: 'datasetSearch'
      },
      {
        source: 'datasetSearch',
        target: 'aiChat'
      }
    ]
  })
});

// 4. 测试检索效果
const searchTestResponse = await fetch('/api/core/dataset/searchTest', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    datasetId,
    text: '如何使用知识库?',
    similarity: 0.5,
    limit: 5000
  })
});

const searchResults = await searchTestResponse.json();
console.log('检索结果:', searchResults);

5.3 Agent 工具调用示例

场景:创建能调用天气查询工具的 Agent

// 1. 创建HTTP工具
const createToolResponse = await fetch('/api/core/app/httpTools/create', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    name: '天气查询',
    description: '查询指定城市的天气信息',
    url: 'https://api.weather.com/query',
    method: 'GET',
    params: [
      {
        key: 'city',
        type: 'string',
        required: true,
        description: '城市名称,如:北京、上海'
      }
    ]
  })
});

const { toolId } = await createToolResponse.json();

// 2. 配置Agent工作流
await fetch(`/api/core/app/update`, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    appId,
    nodes: [
      {
        nodeId: 'workflowStart',
        flowNodeType: 'workflowStart',
        // ... 省略
      },
      {
        nodeId: 'agent',
        flowNodeType: 'agent',
        inputs: [
          {
            key: 'model',
            value: 'gpt-4', // Agent需要支持function calling的模型
            valueType: 'string'
          },
          {
            key: 'systemPrompt',
            value: '你是一个智能助手,可以帮用户查询天气信息。',
            valueType: 'string'
          },
          {
            key: 'userChatInput',
            value: ['workflowStart', 'userChatInput'],
            valueType: 'string'
          },
          {
            key: 'maxRunTimes',
            value: 5, // 最多执行5轮工具调用
            valueType: 'number'
          }
        ]
      },
      {
        nodeId: 'weatherTool',
        flowNodeType: 'tool',
        inputs: [
          {
            key: 'toolId',
            value: toolId,
            valueType: 'string'
          }
        ]
      }
    ],
    edges: [
      {
        source: 'workflowStart',
        target: 'agent'
      },
      {
        source: 'agent',
        target: 'weatherTool',
        sourceHandle: 'agent-source-tool'
      },
      {
        source: 'weatherTool',
        target: 'agent',
        targetHandle: 'agent-target-tool'
      }
    ]
  })
});

// 3. 测试工具调用
const chatResponse = await fetch('/api/v2/chat/completions', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    chatId: 'unique_chat_id',
    appId: appId,
    stream: false,
    messages: [
      {
        role: 'user',
        content: '帮我查一下北京今天的天气'
      }
    ]
  })
});

const result = await chatResponse.json();
console.log('AI回复:', result.choices[0].message.content);
console.log('工具调用记录:', result.usage.toolCalls);

5.4 扩展点:自定义插件接入

场景:开发一个自定义的情感分析插件

// 插件代码(独立服务)
import express from 'express';

const app = express();
app.use(express.json());

// 插件必须实现的接口
app.post('/analyze', async (req, res) => {
  const { text } = req.body;
  
  // 调用情感分析模型(示例)
  const sentiment = await analyzeSentiment(text);
  
  res.json({
    sentiment: sentiment.label, // positive/negative/neutral
    score: sentiment.score
  });
});

app.listen(3010, () => {
  console.log('情感分析插件启动在 3010 端口');
});

// 在FastGPT中使用插件
// 1. 创建插件应用
const createPluginResponse = await fetch('/api/core/app/create', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    name: '情感分析插件',
    type: 'plugin',
    avatar: '/icon/plugin.svg'
  })
});

const { pluginId } = await createPluginResponse.json();

// 2. 配置插件工作流
await fetch(`/api/core/app/update`, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_API_KEY'
  },
  body: JSON.stringify({
    appId: pluginId,
    nodes: [
      {
        nodeId: 'pluginInput',
        flowNodeType: 'pluginInput',
        inputs: [
          {
            key: 'text',
            valueType: 'string',
            label: '待分析文本',
            required: true
          }
        ]
      },
      {
        nodeId: 'httpRequest',
        flowNodeType: 'httpRequest468',
        inputs: [
          {
            key: 'url',
            value: 'http://localhost:3010/analyze',
            valueType: 'string'
          },
          {
            key: 'method',
            value: 'POST',
            valueType: 'string'
          },
          {
            key: 'body',
            value: JSON.stringify({
              text: '{{text}}'
            }),
            valueType: 'string'
          }
        ]
      },
      {
        nodeId: 'pluginOutput',
        flowNodeType: 'pluginOutput',
        inputs: [
          {
            key: 'sentiment',
            value: ['httpRequest', 'sentiment'],
            valueType: 'string',
            label: '情感倾向'
          },
          {
            key: 'score',
            value: ['httpRequest', 'score'],
            valueType: 'number',
            label: '置信度'
          }
        ]
      }
    ],
    edges: [
      {
        source: 'pluginInput',
        target: 'httpRequest'
      },
      {
        source: 'httpRequest',
        target: 'pluginOutput'
      }
    ]
  })
});

// 3. 在主应用中使用插件
// 在主应用的工作流中添加"运行插件"节点
{
  nodeId: 'runPlugin',
  flowNodeType: 'pluginModule',
  inputs: [
    {
      key: 'pluginId',
      value: pluginId,
      valueType: 'string'
    },
    {
      key: 'text',
      value: ['workflowStart', 'userChatInput'],
      valueType: 'string'
    }
  ]
}

5.5 规模化注意事项

性能优化

  1. 数据库索引:确保 MongoDB 和向量数据库建立了必要的索引
  2. 缓存预热:启动时预加载热点数据(模型配置、系统配置)
  3. 连接池配置:根据并发量调整数据库连接池大小
  4. 限流策略:对每个团队设置 API 调用频率限制

高可用部署

  1. 应用多副本:至少部署 3 个 app 实例,使用负载均衡
  2. 数据库集群:MongoDB 副本集 + 读写分离,向量数据库集群
  3. Redis 哨兵:使用 Redis Sentinel 或 Cluster 保证缓存可用性
  4. 对象存储:使用云服务商的 S3(如 AWS S3、阿里云 OSS)

监控告警

  1. 关键指标:API 错误率、P95 延迟、数据库连接数、队列积压
  2. 告警规则:错误率 > 5%、P95 > 10s、队列积压 > 1000
  3. 日志聚合:使用 ELK 或 Loki 收集日志

成本优化

  1. Token 消耗:监控 LLM Token 使用量,避免滥用
  2. 存储成本:定期清理过期对话和文件
  3. 向量数据库:使用量化技术减少存储开销

6. 版本信息

  • 当前版本:v4.13.2
  • 最低 Node.js 版本:20
  • 包管理器:pnpm ≥ 9.15.9
  • 主要依赖
    • Next.js 14.2.32
    • React 18.3.1
    • MongoDB Driver(通过 Mongoose)
    • Redis(通过 ioredis)
    • BullMQ
    • Minio(S3 客户端)

7. 参考资源

  • 官方文档:https://doc.fastgpt.io
  • GitHub 仓库:https://github.com/labring/FastGPT
  • API 文档:/openapi(应用内查看)
  • 社区交流:飞书话题群(见 README)