FastGPT-02-Service层

模块概览

职责与边界

packages/service 是 FastGPT 项目的服务层,包含所有核心业务逻辑的实现。该模块位于应用层(projects/app)和全局包(packages/global)之间,负责与数据库、外部服务交互,实现具体的业务功能。

核心职责

  • 业务逻辑实现:AI 模型调用、工作流执行、知识库管理、对话处理
  • 数据访问:MongoDB、Redis、向量数据库、对象存储的 CRUD 操作
  • 外部服务调用:LLM API、嵌入模型、重排模型、第三方插件
  • 后台任务处理:文件解析、向量化、Token 统计等异步任务
  • 权限与计费:用户权限检查、积分扣减、使用记录

边界

  • 不直接处理 HTTP 请求(由 projects/app 的 API Routes 处理)
  • 不包含 UI 组件(由 packages/web 提供)
  • 所有函数接受参数并返回结果,不依赖全局状态

输入输出

输入

  • 函数参数(业务对象、ID、配置)
  • 数据库查询结果
  • 外部 API 响应

输出

  • 业务对象(包含数据库文档和计算结果)
  • 异常(FastGPTError)
  • 副作用(数据库写入、文件存储、消息队列)

模块目录结构

packages/service/
├── common/           # 通用服务
│   ├── api/          # API 工具(分页、请求封装)
│   ├── buffer/       # 缓冲区(TTS、原始文本)
│   ├── bullmq/       # 消息队列
│   ├── cache/        # 缓存管理
│   ├── file/         # 文件处理(GridFS、图片、CSV、Multer)
│   ├── middle/       # 中间件(CORS、鉴权、限流、Tracks)
│   ├── mongo/        # MongoDB 连接和事务
│   ├── otel/         # OpenTelemetry 可观测
│   ├── redis/        # Redis 连接和缓存
│   ├── response/     # 响应格式化
│   ├── s3/           # 对象存储
│   ├── secret/       # 密钥管理
│   ├── string/       # 字符串处理(Tiktoken、Jieba)
│   ├── system/       # 系统工具(日志、配置)
│   └── vectorDB/     # 向量数据库(PG/Milvus/OceanBase)
├── core/             # 核心业务逻辑
│   ├── ai/           # AI 模型调用(LLM、嵌入、重排)
│   ├── app/          # 应用管理(CRUD、版本、权限)
│   ├── chat/         # 对话管理(保存、加载、历史)
│   ├── dataset/      # 知识库管理(数据集、集合、数据、搜索)
│   └── workflow/     # 工作流引擎(调度、节点执行、循环、Agent)
├── support/          # 支持服务
│   ├── activity/     # 活动管理
│   ├── mcp/          # MCP 工具调用
│   ├── openapi/      # OpenAPI Key 管理
│   ├── outLink/      # 外部链接(分享、嵌入)
│   ├── permission/   # 权限系统(RBAC、协作者)
│   ├── tmpData/      # 临时数据
│   ├── user/         # 用户管理(团队、登录、审计)
│   └── wallet/       # 计费系统(积分、账单、Usage)
├── thirdProvider/    # 第三方服务提供者
│   ├── doc2x/        # 文档转换服务
│   └── fastgptPlugin/# FastGPT 插件服务
├── type/             # 类型定义
│   ├── env.d.ts      # 环境变量类型
│   ├── next.d.ts     # Next.js 类型扩展
│   └── type.d.ts     # 通用类型
└── worker/           # 后台任务
    ├── countGptMessagesTokens/ # Token 统计
    ├── htmlStr2Md/     # HTML 转 Markdown
    ├── readFile/        # 文件解析(PDF、DOCX、Excel、CSV)
    └── text2Chunks/     # 文本分块

模块架构图

flowchart TB
    subgraph Service["packages/service"]
        subgraph Common["common/ 通用服务"]
            Mongo["MongoDB"]
            Redis["Redis"]
            VectorDB["VectorDB"]
            S3["S3"]
            BullMQ["BullMQ"]
            File["File"]
        end
        
        subgraph Core["core/ 核心业务"]
            AI["AI模型调用"]
            App["应用管理"]
            Chat["对话管理"]
            Dataset["知识库管理"]
            Workflow["工作流引擎"]
        end
        
        subgraph Support["support/ 支持服务"]
            User["用户管理"]
            Permission["权限系统"]
            Wallet["计费系统"]
            OutLink["外链管理"]
            OpenAPI["OpenAPI管理"]
        end
        
        subgraph Worker["worker/ 后台任务"]
            FileParser["文件解析"]
            TextChunk["文本分块"]
            TokenCount["Token统计"]
        end
    end
    
    subgraph External["外部依赖"]
        MongoDB[(MongoDB)]
        RedisDB[(Redis)]
        PG[(PostgreSQL<br/>向量数据库)]
        MinIO[(MinIO/S3)]
        LLM[LLM服务]
        EmbedAPI[嵌入模型]
    end
    
    subgraph Consumer["消费者"]
        AppAPI["projects/app<br/>API Routes"]
    end
    
    Core --> Common
    Support --> Common
    Support --> Core
    Worker --> Common
    Worker --> Core
    
    Common --> External
    Core --> External
    
    AppAPI --> Core
    AppAPI --> Support
    
    BullMQ -.消费.-> Worker
    
    AI --> LLM
    AI --> EmbedAPI
    Workflow --> AI
    Dataset --> VectorDB
    Chat --> Mongo
    
    style Service fill:#f0f8ff
    style Common fill:#fff4e1
    style Core fill:#e8f5e9
    style Support fill:#fce4ec
    style Worker fill:#f3e5f5
    style External fill:#e0f2f1
    style Consumer fill:#ffeeff

架构说明

分层结构

  1. common/:基础设施层,提供数据库连接、缓存、队列、文件处理等通用能力
  2. core/:核心业务层,实现 AI 调用、工作流执行、知识库检索等主要功能
  3. support/:支持业务层,提供用户、权限、计费等辅助功能
  4. worker/:后台任务层,处理耗时的异步任务

数据流向

API Routes → core/support 业务逻辑 → common 通用服务 → 外部存储/服务
                            BullMQ队列 → Worker后台任务

依赖规则

  • core/ 可依赖 common/
  • support/ 可依赖 common/ 和 core/
  • worker/ 可依赖 common/ 和 core/
  • 严禁 common/ 依赖 core/ 或 support/(避免循环依赖)

核心数据结构与 Schema

1. MongoDB Schema 结构

FastGPT 使用 Mongoose 定义 MongoDB 的数据模型,主要 Schema 包括:

classDiagram
    class MongoChat {
        +ObjectId _id
        +string chatId
        +string teamId
        +string tmbId
        +string appId
        +string title
        +Date updateTime
        +Object variables
        +string source
        +string shareId
        +string outLinkUid
    }
    
    class MongoChatItem {
        +ObjectId _id
        +string chatId
        +string teamId
        +string appId
        +ChatRoleEnum obj
        +ChatItemValueItemType[] value
        +Date time
        +ObjectId dataId
        +string[] citeCollectionIds
        +Object responseData
        +number durationSeconds
    }
    
    class MongoApp {
        +ObjectId _id
        +string teamId
        +string tmbId
        +string name
        +string avatar
        +AppTypeEnum type
        +FlowNodeCommonType[] nodes
        +StoreEdgeItemType[] edges
        +ChatConfigType chatConfig
        +Date updateTime
        +string version
    }
    
    class MongoDataset {
        +ObjectId _id
        +string teamId
        +string name
        +string vectorModel
        +string agentModel
        +ChunkSettingsType chunkSettings
        +Date updateTime
    }
    
    class MongoDatasetCollection {
        +ObjectId _id
        +string datasetId
        +string name
        +DatasetCollectionTypeEnum type
        +string fileId
        +string rawLink
        +Date createTime
        +boolean forbid
    }
    
    class MongoDatasetData {
        +ObjectId _id
        +string datasetId
        +string collectionId
        +string q
        +string a
        +Date updateTime
        +number chunkIndex
    }
    
    class MongoTeam {
        +ObjectId _id
        +string name
        +string avatar
        +number balance
        +Date createTime
        +Date updateTime
        +string ownerId
    }
    
    class MongoUser {
        +ObjectId _id
        +string username
        +string password
        +string avatar
        +Date createTime
        +string timezone
    }
    
    MongoChat "1" *-- "0..*" MongoChatItem
    MongoApp "1" --> "0..*" MongoChat
    MongoDataset "1" *-- "0..*" MongoDatasetCollection
    MongoDatasetCollection "1" *-- "0..*" MongoDatasetData
    MongoTeam "1" *-- "0..*" MongoUser
    MongoTeam "1" *-- "0..*" MongoApp
    MongoTeam "1" *-- "0..*" MongoDataset

Schema 定义位置

  • Chat:core/chat/chatSchema.tschatItemSchema.ts
  • App:通过 packages/global 定义类型,Service 层直接使用
  • Dataset:core/dataset/**/schema.ts
  • User/Team:在 support/user/** 目录下

关键 API 与函数详解

1. AI 模型调用(core/ai)

1.1 LLM 调用核心函数

文件core/ai/llm/request.ts

// 创建 LLM 响应(统一封装流式和非流式调用)
export const createLLMResponse = async <T extends CompletionsBodyType>(
  args: CreateLLMResponseProps<T>
): Promise<LLMResponse> => {
  const { body, custonHeaders, userKey } = args;
  const { messages, useVision, requestOrigin, tools, toolCallMode } = body;

  // 1. 消息预处理
  const requestMessages = await loadRequestMessages({
    messages,
    useVision,
    origin: requestOrigin
  });

  // 2. 如果是 prompt 工具调用模式,重写消息
  const rewriteMessages = (() => {
    if (tools?.length && toolCallMode === 'prompt') {
      return promptToolCallMessageRewrite(requestMessages, tools);
    }
    return requestMessages;
  })();

  // 3. 格式化请求体(适配不同模型)
  const requestBody = await llmCompletionsBodyFormat({
    ...body,
    messages: rewriteMessages
  });

  // 4. 创建聊天补全(调用 OpenAI SDK 或兼容 API)
  const { response, isStreamResponse, getEmptyResponseTip } = await createChatCompletion({
    body: requestBody,
    userKey,
    options: {
      headers: {
        Accept: 'application/json, text/plain, */*',
        ...custonHeaders
      }
    }
  });

  // 5. 处理响应(流式或非流式)
  const { answerText, reasoningText, toolCalls, finish_reason, usage } = await (async () => {
    if (isStreamResponse) {
      return createStreamResponse({
        response,
        body,
        isAborted: args.isAborted,
        onStreaming: args.onStreaming,
        onReasoning: args.onReasoning,
        onToolCall: args.onToolCall,
        onToolParam: args.onToolParam
      });
    } else {
      return createCompleteResponse({
        response,
        body,
        onStreaming: args.onStreaming,
        onReasoning: args.onReasoning,
        onToolCall: args.onToolCall
      });
    }
  })();

  // 6. 构建助手消息(用于后续对话历史)
  const assistantMessage: ChatCompletionMessageParam[] = [
    {
      role: 'assistant',
      content: answerText || null,
      ...(toolCalls?.length ? { tool_calls: toolCalls } : {})
    }
  ];

  // 7. 返回完整结果
  return {
    answerText: answerText || '',
    finish_reason: finish_reason || 'stop',
    usage: usage,
    completeMessages: [...requestMessages, ...assistantMessage]
  };
};

核心逻辑说明

  1. 消息预处理loadRequestMessages):

    • 加载图片 URL(如果是多模态模型)
    • 处理知识库引用(数据集引用格式)
    • 格式化为 OpenAI 标准格式
  2. Prompt 工具调用重写promptToolCallMessageRewrite):

    • 将工具列表和说明拼接到 system prompt
    • 引导模型输出 1: 表示调用工具,0: 表示直接回答
    • 用于不支持 function calling 的模型
  3. 请求体格式化llmCompletionsBodyFormat):

    • 适配不同模型的特殊参数(如 GLM、文心等)
    • 处理 temperature、max_tokens 等参数
    • 添加 response_format(JSON mode)
  4. 流式响应处理createStreamResponse):

    • 逐块解析 SSE 数据
    • 提取 thinking 标签(推理过程)
    • 解析 tool_calls(工具调用)
    • 调用回调函数(onStreaming、onToolCall)
  5. 非流式响应处理createCompleteResponse):

    • 一次性返回完整内容
    • 解析 tool_calls
    • 提取推理内容

使用示例

import { createLLMResponse } from '@fastgpt/service/core/ai/llm/request';

const result = await createLLMResponse({
  body: {
    model: 'gpt-4',
    temperature: 0.7,
    maxTokens: 2000,
    messages: [
      { role: 'system', content: '你是一个友好的助手' },
      { role: 'user', content: '你好' }
    ],
    stream: true
  },
  onStreaming({ text }) {
    // 实时推送到客户端
    res.write(`data: ${JSON.stringify({ content: text })}\n\n`);
  },
  onToolCall({ call }) {
    console.log('工具调用:', call.function.name, call.function.arguments);
  }
});

console.log('最终答案:', result.answerText);
console.log('Token 消耗:', result.usage);

1.2 向量嵌入(Embedding)

文件core/ai/embedding.ts

// 根据文本生成向量
export const getVectorsByText = async ({
  model,
  input,
  type = 'query'
}: {
  model: EmbeddingModelItemType;
  input: string | string[];
  type?: 'query' | 'db';
}): Promise<{
  vectors: number[][];
  tokens: number;
}> => {
  // 1. 格式化输入
  const inputs = Array.isArray(input) ? input : [input];
  
  // 2. 批处理配置
  const batchSize = model.batchSize || 50;
  const allVectors: number[][] = [];
  let totalTokens = 0;

  // 3. 分批调用嵌入模型
  for (let i = 0; i < inputs.length; i += batchSize) {
    const batch = inputs.slice(i, i + batchSize);
    
    const { vectors, tokens } = await callEmbeddingModel({
      model,
      inputs: batch,
      type
    });
    
    allVectors.push(...vectors);
    totalTokens += tokens;
  }

  return {
    vectors: allVectors,
    tokens: totalTokens
  };
};

// 调用嵌入模型API
const callEmbeddingModel = async ({
  model,
  inputs,
  type
}: {
  model: EmbeddingModelItemType;
  inputs: string[];
  type: 'query' | 'db';
}): Promise<{
  vectors: number[][];
  tokens: number;
}> => {
  const ai = getAIApi();

  // 构建请求参数
  const requestBody = {
    model: model.model,
    input: inputs,
    encoding_format: 'float',
    ...(model.defaultConfig || {})
  };

  const response = await ai.embeddings.create(requestBody);

  // 提取向量
  const vectors = response.data.map((item: any) => item.embedding);

  // 归一化处理(如果模型配置要求)
  const normalizedVectors = model.normalization
    ? vectors.map(vec => normalizeVector(vec))
    : vectors;

  return {
    vectors: normalizedVectors,
    tokens: response.usage?.total_tokens || 0
  };
};

// 向量归一化
const normalizeVector = (vector: number[]): number[] => {
  const magnitude = Math.sqrt(vector.reduce((sum, val) => sum + val * val, 0));
  return vector.map(val => val / magnitude);
};

使用场景

  • 知识库数据向量化(存储到向量数据库)
  • 用户查询向量化(用于检索)
  • 批量处理文档分块

注意事项

  • 批处理大小(batchSize)影响性能和稳定性,默认 50
  • 归一化(normalization)确保余弦相似度计算准确
  • Token 统计用于计费

2. 工作流引擎(core/workflow)

2.1 工作流调度核心逻辑

文件core/workflow/dispatch/index.ts

// 运行工作流(核心调度函数)
export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowResponse> => {
  let {
    res,
    runtimeNodes = [],
    runtimeEdges = [],
    defaultSkipNodeQueue,
    histories = [],
    variables = {},
    externalProvider,
    retainDatasetCite = true,
    version = 'v1',
    responseDetail = true,
    responseAllData = true,
    usageId,
    concatUsage,
    runningUserInfo: { teamId }
  } = data;

  // 1. 检查递归深度(防止无限递归)
  data.workflowDispatchDeep++;
  const isRootRuntime = data.workflowDispatchDeep === 1;
  if (data.workflowDispatchDeep > 20) {
    return {
      flowResponses: [],
      flowUsages: [],
      assistantResponses: [],
      newVariables: variables
    };
  }

  const startTime = Date.now();
  const isDebugMode = data.mode === 'debug';

  /* 
    工作流队列控制
    特点:
      1. 可以控制一个 team 下,并发 run 的节点数量
      2. 每个节点,同时只会执行一个
      3. 采用回调的方式,避免深度递归
    方案:
      - 使用 activeRunQueue 记录待运行检查的节点
      - 每次添加新节点,以及节点运行结束后,均会执行一次 processActiveNode
      - checkNodeCanRun 会检查该节点状态:没满足运行条件/运行/跳过
  */
  class WorkflowQueue {
    runtimeNodesMap = new Map(runtimeNodes.map((item) => [item.nodeId, item]));
    runtimeEdgesMap = new Map(runtimeEdges.map((item) => [
      `${item.source}-${item.target}`,
      item
    ]));
    
    // 待处理的节点队列
    activeRunQueue: string[] = [];
    
    // 正在运行的节点集合
    runningNodeSet = new Set<string>();
    
    // 节点执行结果
    nodeResponses: Record<string, NodeResponseCompleteType> = {};
    
    // 并发控制
    maxConcurrent = 10; // 同一团队最大并发节点数

    // 添加节点到队列
    addToQueue(nodeId: string) {
      if (!this.activeRunQueue.includes(nodeId)) {
        this.activeRunQueue.push(nodeId);
      }
      this.processActiveNode();
    }

    // 处理队列中的节点
    async processActiveNode() {
      // 达到并发上限,等待
      if (this.runningNodeSet.size >= this.maxConcurrent) {
        return;
      }

      const nodeId = this.activeRunQueue.shift();
      if (!nodeId) return;

      const node = this.runtimeNodesMap.get(nodeId);
      if (!node) return;

      // 检查节点能否执行
      const status = this.checkNodeCanRun(nodeId);

      if (status === 'wait') {
        // 重新加入队列末尾
        this.activeRunQueue.push(nodeId);
        return;
      }

      if (status === 'skip') {
        // 跳过节点,激活下游边
        this.skipNode(nodeId);
        // 继续处理下一个节点
        this.processActiveNode();
        return;
      }

      // status === 'run',执行节点
      this.runningNodeSet.add(nodeId);
      
      try {
        const result = await this.executeNode(node);
        this.nodeResponses[nodeId] = result;
        
        // 激活下游节点
        this.activateDownstreamNodes(nodeId);
      } catch (error) {
        // 错误处理
        this.handleNodeError(nodeId, error);
      } finally {
        this.runningNodeSet.delete(nodeId);
        // 继续处理队列
        this.processActiveNode();
      }
    }

    // 检查节点是否可以执行
    checkNodeCanRun(nodeId: string): 'wait' | 'run' | 'skip' {
      const node = this.runtimeNodesMap.get(nodeId);
      if (!node) return 'skip';

      // 入口节点直接执行
      if (node.isEntry) return 'run';

      // 查找所有指向该节点的边
      const incomingEdges = runtimeEdges.filter(edge => edge.target === nodeId);
      
      if (incomingEdges.length === 0) {
        return 'skip'; // 孤立节点
      }

      // 检查所有入边是否都已激活
      const allActive = incomingEdges.every(edge => edge.status === 'active');
      if (allActive) {
        return 'run';
      }

      // 检查是否有任何入边被跳过
      const anySkipped = incomingEdges.some(edge => edge.status === 'skipped');
      if (anySkipped && !allActive) {
        return 'skip';
      }

      return 'wait';
    }

    // 执行节点
    async executeNode(node: RuntimeNodeItemType): Promise<NodeResponseCompleteType> {
      // 解析节点输入(从变量池或上游节点获取)
      const nodeInputs = this.resolveNodeInputs(node);

      // 根据节点类型调用对应的 dispatch 函数
      const dispatchFn = callbackMap[node.flowNodeType];
      if (!dispatchFn) {
        throw new Error(`Unknown node type: ${node.flowNodeType}`);
      }

      // 执行节点逻辑
      const result = await dispatchFn({
        ...data,
        node,
        params: nodeInputs,
        runtimeNodes,
        runtimeEdges,
        variables
      });

      // 将节点输出存储到变量池
      Object.keys(result).forEach(key => {
        variables[`${node.nodeId}.${key}`] = result[key];
      });

      return result;
    }

    // 解析节点输入
    resolveNodeInputs(node: RuntimeNodeItemType): Record<string, any> {
      const inputs: Record<string, any> = {};
      
      node.inputs.forEach(input => {
        const value = getReferenceVariableValue({
          value: input.value,
          variables,
          nodes: runtimeNodes
        });
        inputs[input.key] = value;
      });

      return inputs;
    }

    // 激活下游节点
    activateDownstreamNodes(nodeId: string) {
      const outgoingEdges = runtimeEdges.filter(edge => edge.source === nodeId);
      
      outgoingEdges.forEach(edge => {
        edge.status = 'active';
        this.addToQueue(edge.target);
      });
    }

    // 跳过节点
    skipNode(nodeId: string) {
      const outgoingEdges = runtimeEdges.filter(edge => edge.source === nodeId);
      
      outgoingEdges.forEach(edge => {
        edge.status = 'skipped';
        this.addToQueue(edge.target);
      });
    }
  }

  // 2. 创建工作流队列
  const queue = new WorkflowQueue();

  // 3. 找到所有入口节点,加入队列
  const entryNodes = runtimeNodes.filter(node => node.isEntry);
  entryNodes.forEach(node => queue.addToQueue(node.nodeId));

  // 4. 等待所有节点执行完毕
  await new Promise((resolve) => {
    const checkInterval = setInterval(() => {
      if (queue.activeRunQueue.length === 0 && queue.runningNodeSet.size === 0) {
        clearInterval(checkInterval);
        resolve(true);
      }
    }, 100);
  });

  // 5. 汇总结果
  const flowResponses = Object.values(queue.nodeResponses);
  const flowUsages = flowResponses
    .filter(res => res.usage)
    .map(res => res.usage);
  const assistantResponses = flowResponses
    .filter(res => res.assistantResponses)
    .flatMap(res => res.assistantResponses);

  const durationSeconds = (Date.now() - startTime) / 1000;

  return {
    flowResponses,
    flowUsages,
    assistantResponses,
    newVariables: variables,
    durationSeconds
  };
};

核心机制说明

  1. 队列调度

    • 使用 activeRunQueue 管理待检查的节点
    • 使用 runningNodeSet 跟踪正在执行的节点
    • 限制并发数量(默认 10),防止资源耗尽
  2. 节点状态检查

    • wait:等待上游节点完成
    • run:所有上游边已激活,执行节点
    • skip:上游有跳过的边,跳过该节点
  3. 变量传递

    • 节点输出存储到全局变量池 variables[nodeId.outputKey] = value
    • 下游节点通过引用 ['nodeId', 'outputKey'] 获取值
  4. 错误处理

    • 如果节点配置了 catchError: true,错误不中断工作流
    • 否则错误会传播到顶层,中止整个工作流
  5. 递归防护

    • 限制嵌套深度不超过 20 层
    • 防止循环调用或无限递归

2.2 Agent 工具调用

文件core/workflow/dispatch/ai/agent/toolCall.ts

// 运行 Agent 工具调用(递归)
export const runToolCall = async (
  props: DispatchToolModuleProps & {
    maxRunToolTimes: number;
  },
  response?: RunToolResponse
): Promise<RunToolResponse> => {
  const {
    messages,
    toolNodes,
    toolModel,
    maxRunToolTimes,
    ...workflowProps
  } = props;

  const runningTimes = response?.runTimes || 0;

  // 1. 达到最大调用次数,返回
  if (runningTimes >= maxRunToolTimes) {
    return response || {
      runTimes: runningTimes,
      assistantResponses: [],
      completeMessages: messages,
      usage: { totalPoints: 0, tokens: 0 }
    };
  }

  // 2. 构建工具列表
  const tools: ChatCompletionTool[] = toolNodes.map(node => ({
    type: 'function',
    function: {
      name: node.nodeId,
      description: node.toolDescription || node.intro || '',
      parameters: {
        type: 'object',
        properties: node.inputs.reduce((acc, input) => {
          acc[input.key] = {
            type: input.valueType,
            description: input.label,
            ...(input.required ? {} : { optional: true })
          };
          return acc;
        }, {} as Record<string, any>),
        required: node.inputs
          .filter(input => input.required)
          .map(input => input.key)
      }
    }
  }));

  // 3. 调用 LLM 获取工具调用决策
  const llmResponse = await createLLMResponse({
    body: {
      model: toolModel.model,
      temperature: toolModel.temperature || 0.7,
      messages: messages,
      tools,
      tool_choice: 'auto'
    },
    onStreaming({ text }) {
      // 实时推送 AI 回复
      workflowProps.workflowStreamResponse?.({
        event: SseResponseEventEnum.answer,
        data: textAdaptGptResponse({ text })
      });
    },
    onToolCall({ call }) {
      // 推送工具调用信息
      workflowProps.workflowStreamResponse?.({
        event: SseResponseEventEnum.toolCall,
        data: {
          toolName: call.function.name,
          params: call.function.arguments
        }
      });
    }
  });

  // 4. 检查是否有工具调用
  const { toolCalls, completeMessages, usage } = llmResponse;

  if (!toolCalls || toolCalls.length === 0) {
    // 没有工具调用,直接返回 AI 回答
    return {
      runTimes: runningTimes + 1,
      assistantResponses: [{
        type: 'text',
        text: llmResponse.answerText
      }],
      completeMessages,
      usage: {
        totalPoints: response?.usage.totalPoints || 0 + usage.totalPoints,
        tokens: response?.usage.tokens || 0 + usage.tokens
      }
    };
  }

  // 5. 批量执行工具节点
  const toolResponses: ToolRunResponseItemType[] = [];
  
  for (const toolCall of toolCalls) {
    const toolNodeId = toolCall.function.name;
    const toolNode = toolNodes.find(node => node.nodeId === toolNodeId);
    
    if (!toolNode) {
      toolResponses.push({
        toolCallId: toolCall.id,
        toolName: toolCall.function.name,
        toolResponse: '工具不存在'
      });
      continue;
    }

    try {
      // 解析工具参数
      const toolParams = JSON.parse(toolCall.function.arguments);
      
      // 执行工具节点(可能是 HTTP 请求、数据库查询、插件调用等)
      const toolResult = await runWorkflow({
        ...workflowProps,
        runtimeNodes: [toolNode],
        runtimeEdges: [],
        variables: {
          ...workflowProps.variables,
          ...toolParams
        }
      });

      toolResponses.push({
        toolCallId: toolCall.id,
        toolName: toolCall.function.name,
        toolResponse: JSON.stringify(toolResult.flowResponses[0])
      });
    } catch (error) {
      toolResponses.push({
        toolCallId: toolCall.id,
        toolName: toolCall.function.name,
        toolResponse: `工具执行失败: ${getErrText(error)}`
      });
    }
  }

  // 6. 构建工具响应消息
  const toolMessages: ChatCompletionMessageParam[] = toolResponses.map(resp => ({
    role: 'tool',
    tool_call_id: resp.toolCallId,
    content: resp.toolResponse
  }));

  // 7. 递归调用,继续对话
  return runToolCall(
    props,
    {
      runTimes: runningTimes + 1,
      assistantResponses: [
        ...(response?.assistantResponses || []),
        {
          type: 'tool',
          tools: toolResponses
        }
      ],
      completeMessages: [...completeMessages, ...toolMessages],
      usage: {
        totalPoints: (response?.usage.totalPoints || 0) + usage.totalPoints,
        tokens: (response?.usage.tokens || 0) + usage.tokens
      }
    }
  );
};

Agent 工具调用流程

  1. 第一轮 LLM 调用

    • 传入 tools 参数(工具名称、描述、参数 schema)
    • LLM 判断是否需要调用工具,返回 tool_calls
  2. 工具执行

    • 根据 tool_calls 找到对应的工具节点
    • 解析参数,执行工具逻辑(可能是 HTTP 请求、数据库查询、插件调用)
    • 收集工具执行结果
  3. 第二轮 LLM 调用

    • 将工具结果作为 tool 消息追加到对话历史
    • LLM 根据工具结果生成最终回答,或继续调用其他工具
  4. 递归终止条件

    • 达到最大调用次数(maxRunToolTimes,默认 5)
    • LLM 不再返回 tool_calls

注意事项

  • 工具节点必须有 toolDescription 字段,描述工具功能
  • 工具参数通过节点的 inputs 定义
  • 工具执行失败不会中止整个流程,会将错误信息返回给 LLM

3. 知识库检索(core/dataset/search)

3.1 混合检索核心逻辑

文件core/dataset/search/controller.ts

// 搜索数据集数据(主函数)
export async function searchDatasetData(
  props: SearchDatasetDataProps
): Promise<SearchDatasetDataResponse> {
  let {
    teamId,
    reRankQuery,
    queries,
    model,
    similarity = 0,
    limit: maxTokens,
    searchMode = DatasetSearchModeEnum.embedding,
    embeddingWeight = 0.5,
    usingReRank = false,
    rerankModel,
    rerankWeight = 0.5,
    datasetIds = [],
    collectionFilterMatch
  } = props;

  // 1. 初始化参数
  searchMode = DatasetSearchModeMap[searchMode] || DatasetSearchModeEnum.embedding;
  usingReRank = usingReRank && !!getDefaultRerankModel();

  // 2. 计算检索数量
  const { embeddingLimit, fullTextLimit } = (() => {
    if (searchMode === DatasetSearchModeEnum.embedding) {
      return { embeddingLimit: 100, fullTextLimit: 0 };
    }
    if (searchMode === DatasetSearchModeEnum.fullTextRecall) {
      return { embeddingLimit: 0, fullTextLimit: 100 };
    }
    return { embeddingLimit: 80, fullTextLimit: 60 };
  })();

  // 3. 获取禁用的集合
  const collections = await MongoDatasetCollection.find(
    {
      teamId,
      datasetId: { $in: datasetIds },
      forbid: true
    },
    '_id'
  );
  const forbidCollectionIdList = collections.map(item => String(item._id));

  // 4. 执行多查询检索
  const [{ tokens, embeddingRecallResults }, { fullTextRecallResults }] = await Promise.all([
    // 向量检索
    embeddingRecall({
      queries,
      limit: embeddingLimit,
      forbidCollectionIdList,
      filterCollectionIdList
    }),
    // 全文检索
    fullTextRecall({
      queries,
      limit: fullTextLimit,
      filterCollectionIdList,
      forbidCollectionIdList
    })
  ]);

  // 5. RRF 融合结果
  const rrfEmbRecall = datasetSearchResultConcat(
    embeddingRecallResults.map((list) => ({ weight: 1, list }))
  ).slice(0, embeddingLimit);
  
  const rrfFTRecall = datasetSearchResultConcat(
    fullTextRecallResults.map((list) => ({ weight: 1, list }))
  ).slice(0, fullTextLimit);

  // 6. 重排序(可选)
  const { results: reRankResults, inputTokens: reRankInputTokens } = await (async () => {
    if (!usingReRank || (!rrfEmbRecall.length && !rrfFTRecall.length)) {
      return { results: [], inputTokens: 0 };
    }

    // 合并向量和全文检索结果
    const allResults = [...rrfEmbRecall, ...rrfFTRecall];
    const uniqueResults = Array.from(
      new Map(allResults.map(item => [item.id, item])).values()
    );

    // 调用重排模型
    const { results, tokens } = await reRankRecall({
      query: reRankQuery,
      inputs: uniqueResults.map(item => ({
        id: item.id,
        text: item.q + '\n' + item.a
      })),
      model: rerankModel
    });

    return {
      results: results.map(r => ({
        ...uniqueResults.find(ur => ur.id === r.id)!,
        score: r.score
      })),
      inputTokens: tokens
    };
  })();

  // 7. 合并所有结果
  const allSearchResults = (() => {
    if (searchMode === DatasetSearchModeEnum.embedding) {
      return rrfEmbRecall;
    }
    if (searchMode === DatasetSearchModeEnum.fullTextRecall) {
      return rrfFTRecall;
    }

    // 混合检索:加权融合
    return datasetSearchResultConcat([
      { weight: embeddingWeight, list: rrfEmbRecall },
      { weight: 1 - embeddingWeight, list: rrfFTRecall }
    ]);
  })();

  // 8. 应用重排结果(如果有)
  const finalResults = usingReRank && reRankResults.length > 0
    ? datasetSearchResultConcat([
        { weight: rerankWeight, list: reRankResults },
        { weight: 1 - rerankWeight, list: allSearchResults }
      ])
    : allSearchResults;

  // 9. 过滤低相似度结果
  const filteredResults = finalResults.filter(item => item.score >= similarity);

  // 10. 按 Token 限制截断
  const { result: limitedResults, tokens: usedTokens } = await filterDatasetDataByMaxTokens({
    results: filteredResults,
    maxTokens
  });

  return {
    searchRes: limitedResults,
    tokens: tokens + reRankInputTokens,
    usedTokens
  };
}

检索算法详解

向量检索(embeddingRecall)

const embeddingRecall = async ({ queries, limit, forbidCollectionIdList }) => {
  // 1. 批量生成查询向量
  const { vectors, tokens } = await getVectorsByText({
    model: embeddingModel,
    input: queries,
    type: 'query'
  });

  // 2. 对每个查询向量执行 ANN 搜索
  const embeddingRecallResults = await Promise.all(
    vectors.map(async (vector) => {
      return recallFromVectorStore({
        vector,
        limit,
        datasetIds,
        forbidCollectionIdList
      });
    })
  );

  return { tokens, embeddingRecallResults };
};

全文检索(fullTextRecall)

const fullTextRecall = async ({ queries, limit }) => {
  // 1. 使用 Jieba 分词提取关键词
  const fullTextRecallResults = await Promise.all(
    queries.map(async (query) => {
      const keywords = await jiebaSplit({ text: query });
      
      // 2. 在 MongoDB 中执行全文搜索
      const results = await MongoDatasetDataText.find({
        datasetId: { $in: datasetIds },
        $text: { $search: keywords.join(' ') }
      })
      .sort({ score: { $meta: 'textScore' } })
      .limit(limit);

      return results.map(item => ({
        id: item.dataId,
        score: item.score,
        q: item.q,
        a: item.a
      }));
    })
  );

  return { fullTextRecallResults };
};

RRF 融合算法

export const datasetSearchResultConcat = (
  results: { weight: number; list: SearchDataResponseItemType[] }[]
): SearchDataResponseItemType[] => {
  const scoreMap = new Map<string, number>();

  results.forEach(({ weight, list }) => {
    list.forEach((item, index) => {
      const id = item.id;
      // RRF 公式:score = weight / (k + rank)
      const rrfScore = weight / (60 + index + 1);
      scoreMap.set(id, (scoreMap.get(id) || 0) + rrfScore);
    });
  });

  // 按融合后的分数排序
  const merged = Array.from(scoreMap.entries())
    .sort((a, b) => b[1] - a[1])
    .map(([id, score]) => {
      const item = results
        .flatMap(r => r.list)
        .find(item => item.id === id)!;
      return { ...item, score };
    });

  return merged;
};

4. 对话管理(core/chat)

4.1 保存对话

文件core/chat/saveChat.ts

// 保存对话记录
export async function saveChat(props: Props) {
  const {
    chatId,
    appId,
    teamId,
    tmbId,
    nodes,
    appChatConfig,
    variables,
    isUpdateUseTime,
    newTitle,
    source,
    sourceName,
    shareId,
    outLinkUid,
    userContent,
    aiContent,
    durationSeconds,
    errorMsg,
    metadata = {}
  } = props;

  if (!chatId || chatId === 'NO_RECORD_HISTORIES') return;

  try {
    // 1. 查询现有对话
    const chat = await MongoChat.findOne({ appId, chatId }, '_id metadata');

    // 2. 合并元数据
    const metadataUpdate = {
      ...chat?.metadata,
      ...metadata
    };

    // 3. 格式化保存的内容(移除知识库引用)
    const { aiResponse, nodeResponses } = formatAiContent({
      aiContent,
      durationSeconds,
      errorMsg
    });

    const processedContent = [userContent, aiResponse];

    // 4. 使用 MongoDB 事务保存
    await mongoSessionRun(async (session) => {
      // 4.1 创建对话项(用户消息 + AI回复)
      const [{ _id: chatItemIdHuman }, { _id: chatItemIdAi, dataId }] = 
        await MongoChatItem.create(
          processedContent.map((item) => ({
            chatId,
            teamId,
            tmbId,
            appId,
            ...item
          })),
          { session, ordered: true }
        );

      // 4.2 创建对话项响应(节点执行详情)
      if (nodeResponses) {
        await MongoChatItemResponse.create(
          nodeResponses.map((item) => ({
            teamId,
            appId,
            chatId,
            chatItemDataId: dataId,
            data: item
          })),
          { session, ordered: true }
        );
      }

      // 4.3 更新或创建 Chat 文档
      await MongoChat.updateOne(
        { appId, chatId },
        {
          $set: {
            teamId,
            tmbId,
            appId,
            chatId,
            variableList: appChatConfig.variables,
            welcomeText: appChatConfig.welcomeText,
            variables: variables || {},
            pluginInputs,
            title: newTitle,
            source,
            sourceName,
            shareId,
            outLinkUid,
            metadata: metadataUpdate,
            updateTime: new Date()
          }
        },
        { session, upsert: true }
      );

      // 4.4 推送对话日志(异步)
      pushChatLog({
        chatId,
        chatItemIdHuman: String(chatItemIdHuman),
        chatItemIdAi: String(chatItemIdAi),
        appId
      });
    });

    // 5. 创建对话数据日志(统计)
    await createChatDataLog({
      chatId,
      appId,
      teamId,
      tmbId
    });
  } catch (error) {
    addLog.error(`update chat history error`, error);
  }
}

MongoDB 事务说明

FastGPT 使用 MongoDB 事务保证对话保存的原子性:

export const mongoSessionRun = async <T>(
  fn: (session: ClientSession) => Promise<T>
): Promise<T> => {
  const session = await mongoose.startSession();
  session.startTransaction();

  try {
    const result = await fn(session);
    await session.commitTransaction();
    return result;
  } catch (error) {
    await session.abortTransaction();
    throw error;
  } finally {
    await session.endSession();
  }
};

对话项结构

{
  _id: ObjectId,
  chatId: 'unique_chat_id',
  teamId: ObjectId,
  appId: ObjectId,
  obj: 'Human' | 'AI',
  value: [
    {
      type: 'text',
      text: { content: '用户问题或AI回答' }
    },
    {
      type: 'tool',
      tools: [
        {
          toolName: '天气查询',
          toolParams: { city: '北京' },
          toolResponse: '{"temp": 15, "weather": "晴"}'
        }
      ]
    },
    {
      type: 'interactive',
      interactive: {
        type: 'userSelect',
        params: {
          options: ['选项A', '选项B']
        }
      }
    }
  ],
  time: Date,
  durationSeconds: 3.5,
  citeCollectionIds: [ObjectId],  // 引用的知识库集合ID
  responseData: [...]              // 节点执行详情
}

模块间调用链与时序图

场景:完整对话流程(包含知识库检索和 Agent 工具调用)

sequenceDiagram
    autonumber
    participant API as API Routes
    participant Chat as Chat Service
    participant WF as Workflow Engine
    participant DS as Dataset Search
    participant VDB as Vector DB
    participant Agent as Agent Tool
    participant LLM as LLM API
    participant Save as Save Chat
    participant Mongo as MongoDB
    
    API->>Chat: initChat(appId, chatId)
    Chat->>Mongo: 查询应用配置和历史
    Mongo-->>Chat: 返回配置
    Chat-->>API: 初始化结果
    
    API->>WF: dispatchWorkflow(nodes, edges)
    Note over WF: 解析工作流<br/>构建执行队列
    
    WF->>DS: dispatch(datasetSearchNode)
    DS->>LLM: queryExtension(query)
    LLM-->>DS: 扩展查询
    DS->>VDB: embedding recall
    VDB-->>DS: 向量检索结果
    DS->>Mongo: fulltext recall
    Mongo-->>DS: 全文检索结果
    DS->>LLM: rerank(results)
    LLM-->>DS: 重排后结果
    DS-->>WF: 返回知识库引用
    
    WF->>Agent: dispatch(agentNode)
    Agent->>LLM: completions(tools)
    LLM-->>Agent: tool_calls
    
    loop 工具执行
        Agent->>WF: runWorkflow(toolNode)
        WF->>WF: 执行HTTP/Plugin节点
        WF-->>Agent: 工具执行结果
        Agent->>LLM: completions(tool_results)
        LLM-->>Agent: 继续或结束
    end
    
    Agent-->>WF: 最终回答
    WF-->>API: 工作流执行结果
    
    API->>Save: saveChat(userContent, aiContent)
    Save->>Mongo: 开启事务
    Save->>Mongo: 创建 ChatItem (Human)
    Save->>Mongo: 创建 ChatItem (AI)
    Save->>Mongo: 创建 ChatItemResponse
    Save->>Mongo: 更新 Chat
    Save->>Mongo: 提交事务
    Mongo-->>Save: 保存成功
    
    Save-->>API: 保存完成
    API-->>API: 返回SSE流式响应

关键时间节点

  1. 初始化(0-100ms):查询配置和历史
  2. 知识库检索(100-1000ms):向量化、检索、重排
  3. Agent 调用(1000-5000ms):LLM 决策、工具执行、生成回答
  4. 保存对话(5000-5200ms):MongoDB 事务写入
  5. 总耗时:2-10 秒(取决于工具调用轮数)

配置与可观测

1. 数据库连接配置

MongoDB 连接common/mongo/init.ts):

const options = {
  bufferCommands: true,
  maxConnecting: 30,           // 最大连接数
  maxPoolSize: 30,             // 连接池最大大小
  minPoolSize: 20,             // 连接池最小大小
  connectTimeoutMS: 60000,     // 连接超时
  waitQueueTimeoutMS: 60000,   // 等待队列超时
  socketTimeoutMS: 60000,      // Socket 超时
  maxIdleTimeMS: 300000,       // 最大空闲时间
  retryWrites: true,           // 自动重试写入
  retryReads: true             // 自动重试读取
};

await db.connect(url, options);

Redis 连接common/redis/index.ts):

const redis = new Redis({
  host: process.env.REDIS_HOST || 'localhost',
  port: Number(process.env.REDIS_PORT) || 6379,
  password: process.env.REDIS_PASSWORD,
  db: Number(process.env.REDIS_DB) || 0,
  lazyConnect: true,
  maxRetriesPerRequest: 3,
  retryStrategy: (times) => {
    return Math.min(times * 50, 2000);
  }
});

向量数据库连接common/vectorDB/controller.ts):

// 根据环境变量选择向量数据库
const getVectorObj = () => {
  if (PG_ADDRESS) return new PgVectorCtrl();
  if (OCEANBASE_ADDRESS) return new ObVectorCtrl();
  if (MILVUS_ADDRESS) return new MilvusCtrl();
  
  return new PgVectorCtrl(); // 默认 PostgreSQL
};

2. 日志与监控

日志工具common/system/log.ts):

export const addLog = {
  info: (message: string, meta?: any) => {
    console.log(`[INFO] ${message}`, meta);
  },
  warn: (message: string, meta?: any) => {
    console.warn(`[WARN] ${message}`, meta);
  },
  error: (message: string, error?: any, meta?: any) => {
    console.error(`[ERROR] ${message}`, error, meta);
  },
  debug: (message: string, meta?: any) => {
    if (process.env.NODE_ENV === 'development') {
      console.debug(`[DEBUG] ${message}`, meta);
    }
  }
};

使用示例

import { addLog } from '@fastgpt/service/common/system/log';

try {
  const result = await callLLM(params);
  addLog.info('LLM call success', {
    model: params.model,
    tokens: result.usage.tokens
  });
} catch (error) {
  addLog.error('LLM call failed', error, {
    model: params.model,
    prompt: params.messages
  });
  throw error;
}

3. 性能监控指标

关键指标

  • LLM 调用延迟:P50、P95、P99
  • 向量检索延迟:单次检索时间
  • 数据库查询延迟:慢查询监控
  • 队列积压:Worker 任务积压数
  • 缓存命中率:Redis 缓存命中率
  • 错误率:各服务的错误率

OpenTelemetry 集成common/otel/):

import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('fastgpt-service');

export const withSpan = async <T>(
  name: string,
  fn: () => Promise<T>
): Promise<T> => {
  const span = tracer.startSpan(name);
  
  try {
    const result = await fn();
    span.setStatus({ code: SpanStatusCode.OK });
    return result;
  } catch (error) {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: getErrText(error)
    });
    throw error;
  } finally {
    span.end();
  }
};

后台任务(Worker)

文件解析任务

文件worker/readFile/index.ts

// 解析文件(根据扩展名调度不同的解析器)
export const readFileRawText = async ({
  extension,
  filePath,
  encoding = 'utf-8'
}: {
  extension: string;
  filePath: string;
  encoding?: string;
}): Promise<string> => {
  switch (extension) {
    case 'txt':
    case 'md':
    case 'json':
      return readTxtFile(filePath, encoding);
    
    case 'pdf':
      return readPdfFile(filePath);
    
    case 'docx':
      return readDocxFile(filePath);
    
    case 'xlsx':
    case 'xls':
      return readExcelFile(filePath);
    
    case 'csv':
      return readCsvFile(filePath);
    
    case 'html':
    case 'htm':
      return readHtmlFile(filePath);
    
    default:
      throw new Error(`Unsupported file type: ${extension}`);
  }
};

// PDF 解析(调用 pdf-parse 或外部插件)
const readPdfFile = async (filePath: string): Promise<string> => {
  // 检查是否配置了外部 PDF 解析服务
  if (process.env.PDF_PARSE_SERVICE_URL) {
    return callExternalPdfParser(filePath);
  }
  
  // 使用内置 pdf-parse
  const dataBuffer = await fs.readFile(filePath);
  const data = await pdfParse(dataBuffer);
  return data.text;
};

// DOCX 解析
const readDocxFile = async (filePath: string): Promise<string> => {
  const result = await mammoth.extractRawText({ path: filePath });
  return result.value;
};

BullMQ 队列配置

import { Queue, Worker } from 'bullmq';
import { getRedisConnection } from '../common/redis';

// 创建文件解析队列
export const fileParseQueue = new Queue('file-parse', {
  connection: getRedisConnection(),
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000
    },
    removeOnComplete: 100,
    removeOnFail: 1000
  }
});

// 创建 Worker 消费任务
export const fileParseWorker = new Worker(
  'file-parse',
  async (job) => {
    const { collectionId, fileId, filePath } = job.data;
    
    try {
      // 1. 下载文件
      const localPath = await downloadFileFromS3(fileId);
      
      // 2. 解析文件
      const rawText = await readFileRawText({
        extension: getFileExtension(filePath),
        filePath: localPath
      });
      
      // 3. 文本分块
      const chunks = await text2Chunks({
        text: rawText,
        chunkSize: 512,
        chunkOverlap: 50
      });
      
      // 4. 向量化并存储
      for (const chunk of chunks) {
        await insertDatasetDataVector({
          datasetId,
          collectionId,
          q: chunk,
          a: '',
          model: embeddingModel,
          inputs: [chunk]
        });
      }
      
      // 5. 更新 Collection 状态
      await MongoDatasetCollection.updateOne(
        { _id: collectionId },
        { $set: { status: 'active' } }
      );
      
      return { success: true, chunksCount: chunks.length };
    } catch (error) {
      addLog.error('File parse failed', error, { collectionId, fileId });
      throw error;
    }
  },
  {
    connection: getRedisConnection(),
    concurrency: 5 // 最多5个并发任务
  }
);

总结与最佳实践

开发规范

  1. 函数命名:动词开头,清晰表达功能(如 createLLMResponsesearchDatasetData
  2. 错误处理:使用 try-catch,抛出 FastGPTError,记录日志
  3. 类型安全:所有函数参数和返回值必须定义类型
  4. 异步操作:使用 async/await,避免回调地狱
  5. 事务管理:涉及多个数据库操作时使用 MongoDB 事务

性能优化

  1. 批处理:向量生成、数据库查询尽量批量操作
  2. 缓存策略:热点数据(模型配置、用户权限)使用 Redis 缓存
  3. 异步化:耗时操作(文件解析、向量化)使用 BullMQ 异步处理
  4. 连接池:合理配置数据库连接池大小
  5. 限流:团队维度限制并发请求数

测试建议

  1. 单元测试:测试纯函数逻辑(字符串处理、数据转换)
  2. 集成测试:测试数据库操作、外部 API 调用
  3. E2E 测试:测试完整的业务流程(对话、知识库检索)
  4. 性能测试:压测关键路径(LLM 调用、向量检索)