FastGPT-04-Sandbox、MCP与插件系统


第一部分:Sandbox 沙箱服务(projects/sandbox)

模块概览

职责:提供隔离的代码执行环境,安全地运行用户自定义的 JavaScript/Python 代码。

技术栈:NestJS + VM2(JavaScript 沙箱)+ Subprocess(Python 沙箱)

目录结构

projects/sandbox/
├── src/
   ├── sandbox/
      ├── sandbox.controller.ts    # HTTP API 控制器
      ├── sandbox.service.ts       # 沙箱执行服务
      ├── jsFn/                    # JS 函数执行
      ├── dto/                     # DTO 定义
      └── constants.ts             # 常量配置
   ├── app.module.ts                # 应用模块
   └── main.ts                      # 入口文件
├── requirements.txt                  # Python 依赖
└── Dockerfile                        # 容器化配置

核心 API

POST /sandbox/js(执行 JavaScript 代码):

// sandbox.controller.ts

@Controller('sandbox')
export class SandboxController {
  constructor(private readonly sandboxService: SandboxService) {}

  @Post('js')
  async executeJs(@Body() body: ExecuteJsDto) {
    const { code, params, timeout = 10000 } = body;

    try {
      const result = await this.sandboxService.runJsCode({
        code,
        params,
        timeout
      });

      return {
        success: true,
        result
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }

  @Post('python')
  async executePython(@Body() body: ExecutePythonDto) {
    const { code, params, timeout = 30000 } = body;

    try {
      const result = await this.sandboxService.runPythonCode({
        code,
        params,
        timeout
      });

      return {
        success: true,
        result
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }
}

核心服务实现

JavaScript 沙箱(使用 VM2):

// sandbox.service.ts

import { NodeVM } from 'vm2';
import { Injectable } from '@nestjs/common';

@Injectable()
export class SandboxService {
  async runJsCode({
    code,
    params,
    timeout
  }: {
    code: string;
    params: Record<string, any>;
    timeout: number;
  }): Promise<any> {
    // 创建沙箱环境
    const vm = new NodeVM({
      console: 'off',      // 禁用 console
      sandbox: params,     // 注入参数
      timeout,             // 超时时间
      eval: false,         // 禁用 eval
      wasm: false,         // 禁用 WebAssembly
      require: {
        external: false,   // 禁止引入外部模块
        builtin: ['crypto', 'buffer'], // 仅允许部分内置模块
        root: './',
        mock: {}
      }
    });

    // 包装用户代码
    const wrappedCode = `
      module.exports = async function main(params) {
        ${code}
      }
    `;

    // 执行代码
    const fn = vm.run(wrappedCode, 'sandbox.js');
    const result = await Promise.race([
      fn(params),
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), timeout)
      )
    ]);

    return result;
  }

  async runPythonCode({
    code,
    params,
    timeout
  }: {
    code: string;
    params: Record<string, any>;
    timeout: number;
  }): Promise<any> {
    const { spawn } = require('child_process');

    return new Promise((resolve, reject) => {
      // 创建 Python 脚本
      const script = `
import json
import sys

# 从 stdin 读取参数
params = json.loads(sys.stdin.read())

# 用户代码
${code}

# 输出结果
print(json.dumps(result))
`;

      // 启动 Python 进程
      const python = spawn('python3', ['-c', script], {
        timeout
      });

      let stdout = '';
      let stderr = '';

      python.stdout.on('data', (data) => {
        stdout += data.toString();
      });

      python.stderr.on('data', (data) => {
        stderr += data.toString();
      });

      python.on('close', (code) => {
        if (code === 0) {
          try {
            const result = JSON.parse(stdout);
            resolve(result);
          } catch (error) {
            reject(new Error(`Parse error: ${error.message}`));
          }
        } else {
          reject(new Error(stderr || `Exit code: ${code}`));
        }
      });

      python.on('error', (error) => {
        reject(error);
      });

      // 传入参数
      python.stdin.write(JSON.stringify(params));
      python.stdin.end();
    });
  }
}

安全限制

资源限制

  • CPU:限制执行时间(10-30秒)
  • 内存:进程内存上限(256MB)
  • 网络:禁止网络访问
  • 文件系统:只读访问,禁止写入

代码限制

  • 禁用 evalFunction 构造函数
  • 禁止引入外部模块(除白名单)
  • 禁止访问 processrequire 等全局对象

使用示例

// 在主应用中调用 Sandbox
import axios from 'axios';

const result = await axios.post('http://sandbox:3001/sandbox/js', {
  code: `
    // 用户自定义代码
    const sum = params.numbers.reduce((a, b) => a + b, 0);
    return { sum };
  `,
  params: {
    numbers: [1, 2, 3, 4, 5]
  },
  timeout: 10000
});

console.log(result.data); // { success: true, result: { sum: 15 } }

第二部分:MCP 服务器(projects/mcp_server)

模块概览

职责:实现 Model Context Protocol(MCP),为 LLM 提供工具调用的标准接口。

技术栈:Node.js + @modelcontextprotocol/sdk

目录结构

projects/mcp_server/
├── src/
│   ├── api/
│   │   ├── fastgpt.ts               # FastGPT API封装
│   │   └── request.ts               # HTTP请求工具
│   ├── index.ts                     # MCP服务器实现
│   ├── init.ts                      # 初始化配置
│   ├── type.d.ts                    # 类型定义
│   └── utils/
│       └── log.ts                   # 日志工具
├── package.json
└── Dockerfile

MCP 服务器实现

核心代码

// index.ts

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  ListToolsRequestSchema,
  CallToolRequestSchema,
  type Tool
} from '@modelcontextprotocol/sdk/types.js';
import { getFastGPTTools, callFastGPTTool } from './api/fastgpt.js';

// 创建 MCP 服务器
const server = new Server(
  {
    name: 'fastgpt-mcp-server',
    version: '1.0.0'
  },
  {
    capabilities: {
      tools: {}
    }
  }
);

// 注册工具列表处理器
server.setRequestHandler(ListToolsRequestSchema, async () => {
  try {
    // 从 FastGPT 获取可用工具
    const tools = await getFastGPTTools();

    return {
      tools: tools.map((tool) => ({
        name: tool.name,
        description: tool.description,
        inputSchema: {
          type: 'object',
          properties: tool.parameters,
          required: tool.required || []
        }
      }))
    };
  } catch (error) {
    console.error('Error listing tools:', error);
    return { tools: [] };
  }
});

// 注册工具调用处理器
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    // 调用 FastGPT 工具
    const result = await callFastGPTTool({
      toolName: name,
      arguments: args
    });

    return {
      content: [
        {
          type: 'text',
          text: JSON.stringify(result)
        }
      ]
    };
  } catch (error) {
    return {
      content: [
        {
          type: 'text',
          text: `Error: ${error.message}`
        }
      ],
      isError: true
    };
  }
});

// 启动服务器
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error('FastGPT MCP server running on stdio');
}

main().catch((error) => {
  console.error('Fatal error:', error);
  process.exit(1);
});

FastGPT API 封装

// api/fastgpt.ts

import axios from 'axios';

const FASTGPT_API_URL = process.env.FASTGPT_API_URL || 'http://localhost:3000';
const FASTGPT_API_KEY = process.env.FASTGPT_API_KEY;

// 获取工具列表
export async function getFastGPTTools(): Promise<Tool[]> {
  const response = await axios.get(
    `${FASTGPT_API_URL}/api/support/mcp/server/toolList`,
    {
      headers: {
        Authorization: `Bearer ${FASTGPT_API_KEY}`
      }
    }
  );

  return response.data.tools;
}

// 调用工具
export async function callFastGPTTool({
  toolName,
  arguments: args
}: {
  toolName: string;
  arguments: Record<string, any>;
}): Promise<any> {
  const response = await axios.post(
    `${FASTGPT_API_URL}/api/support/mcp/server/toolCall`,
    {
      toolName,
      inputs: args
    },
    {
      headers: {
        Authorization: `Bearer ${FASTGPT_API_KEY}`
      }
    }
  );

  return response.data.result;
}

MCP 协议说明

MCP(Model Context Protocol) 是一个标准协议,用于 LLM 与外部工具之间的通信:

协议流程

  1. 工具注册:MCP 服务器向客户端提供工具列表
  2. 工具调用:LLM 决定调用某个工具,发送调用请求
  3. 执行工具:MCP 服务器执行工具逻辑,返回结果
  4. 结果返回:LLM 根据工具结果生成最终回答

使用场景

  • Claude Desktop:Anthropic 的 Claude 桌面应用支持 MCP
  • 其他 MCP 客户端:任何支持 MCP 的客户端都可以调用 FastGPT 工具

配置示例(Claude Desktop):

{
  "mcpServers": {
    "fastgpt": {
      "command": "node",
      "args": ["/path/to/fastgpt/projects/mcp_server/dist/index.js"],
      "env": {
        "FASTGPT_API_URL": "http://localhost:3000",
        "FASTGPT_API_KEY": "your-api-key"
      }
    }
  }
}

第三部分:插件系统(plugins/)

模块概览

职责:扩展 FastGPT 的能力,提供额外的模型服务(LLM、OCR、TTS、STT、重排序等)。

目录结构

plugins/
├── model/
│   ├── llm-Baichuan2/       # 百川LLM
│   ├── llm-ChatGLM2/        # ChatGLM LLM
│   ├── ocr-surya/           # Surya OCR
│   ├── pdf-marker/          # PDF Marker解析
│   ├── pdf-mineru/          # MinerU PDF解析
│   ├── pdf-mistral/         # Mistral PDF解析
│   ├── rerank-bge/          # BGE重排模型
│   ├── stt-sensevoice/      # SenseVoice语音识别
│   └── tts-cosevoice/       # CoseVoice语音合成
└── webcrawler/              # 网络爬虫
    └── SPIDER/

插件类型

1. LLM 插件(llm-Baichuan2)

功能:提供本地部署的百川 LLM API。

实现(Python FastAPI):

# llm-Baichuan2/server.py

from fastapi import FastAPI, Request
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

app = FastAPI()

# 加载模型
model_path = "/models/Baichuan2-13B-Chat"
model = AutoModelForCausalLM.from_pretrained(
    model_path,
    torch_dtype=torch.float16,
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_path)

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    max_tokens = body.get("max_tokens", 2000)
    temperature = body.get("temperature", 0.7)

    # 构建提示词
    prompt = ""
    for msg in messages:
        role = msg.get("role")
        content = msg.get("content")
        if role == "system":
            prompt += f"System: {content}\n"
        elif role == "user":
            prompt += f"User: {content}\n"
        elif role == "assistant":
            prompt += f"Assistant: {content}\n"

    prompt += "Assistant: "

    # 生成回复
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_tokens,
        temperature=temperature,
        do_sample=True
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    answer = response.split("Assistant: ")[-1].strip()

    return {
        "id": "chatcmpl-xxx",
        "object": "chat.completion",
        "model": "Baichuan2-13B-Chat",
        "choices": [
            {
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": answer
                },
                "finish_reason": "stop"
            }
        ],
        "usage": {
            "prompt_tokens": len(inputs.input_ids[0]),
            "completion_tokens": len(outputs[0]) - len(inputs.input_ids[0]),
            "total_tokens": len(outputs[0])
        }
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

部署

# llm-Baichuan2/Dockerfile

FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY server.py .

CMD ["python", "server.py"]

配置(在 FastGPT 的 model.json 中):

{
  "provider": "Baichuan",
  "model": "Baichuan2-13B-Chat",
  "name": "百川2-13B",
  "type": "llm",
  "maxContext": 4096,
  "maxResponse": 2000,
  "requestUrl": "http://llm-baichuan:8000/v1/chat/completions",
  "functionCall": false,
  "inputPrice": 0,
  "outputPrice": 0
}

2. 重排模型插件(rerank-bge)

功能:对检索结果进行重新排序,提高相关性。

实现

# rerank-bge/server.py

from fastapi import FastAPI
from sentence_transformers import CrossEncoder
import torch

app = FastAPI()

# 加载重排模型
model = CrossEncoder(
    'BAAI/bge-reranker-large',
    max_length=512,
    device='cuda' if torch.cuda.is_available() else 'cpu'
)

@app.post("/rerank")
async def rerank(request: dict):
    query = request.get("query")
    documents = request.get("documents", [])

    # 构建输入对
    pairs = [[query, doc] for doc in documents]

    # 计算相关性分数
    scores = model.predict(pairs, batch_size=32)

    # 返回排序后的结果
    results = [
        {"index": i, "score": float(score)}
        for i, score in enumerate(scores)
    ]
    results.sort(key=lambda x: x["score"], reverse=True)

    return {
        "results": results
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

在 FastGPT 中使用

// 在 Service 层调用重排
import axios from 'axios';

const { data } = await axios.post('http://rerank-bge:8001/rerank', {
  query: '如何使用知识库?',
  documents: [
    '知识库用于存储文档...',
    '创建知识库的步骤...',
    '上传文件到知识库...'
  ]
});

// data.results: [{ index: 1, score: 0.95 }, { index: 0, score: 0.78 }, ...]

3. OCR 插件(ocr-surya)

功能:从图片中提取文字。

实现

# ocr-surya/server.py

from fastapi import FastAPI, File, UploadFile
from surya.ocr import run_ocr
from PIL import Image
import io

app = FastAPI()

@app.post("/ocr")
async def ocr(file: UploadFile = File(...)):
    # 读取图片
    contents = await file.read()
    image = Image.open(io.BytesIO(contents))

    # 执行 OCR
    result = run_ocr([image], langs=['en', 'zh'])

    # 提取文本
    text = "\n".join([line.text for line in result[0].text_lines])

    return {
        "text": text,
        "lines": [
            {
                "text": line.text,
                "confidence": line.confidence,
                "bbox": line.bbox
            }
            for line in result[0].text_lines
        ]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

4. PDF 解析插件(pdf-marker)

功能:解析 PDF,保留结构(表格、图片、公式)。

特点

  • 识别文档结构(标题、段落、表格)
  • 提取图片和图表
  • 识别数学公式(LaTeX)
  • 输出 Markdown 格式

部署

# docker-compose.yml

version: '3.8'
services:
  pdf-marker:
    image: your-registry/pdf-marker:latest
    ports:
      - "8003:8000"
    environment:
      - MODEL_PATH=/models
    volumes:
      - ./models:/models
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

插件开发规范

标准 HTTP API

  • POST /process:处理请求
  • 输入:JSON 格式的参数
  • 输出:JSON 格式的结果
  • 错误:返回 HTTP 错误码和错误信息

Docker 容器化

  • 每个插件独立容器
  • 通过环境变量配置
  • 支持 GPU 加速(如需要)

性能要求

  • 响应时间 < 5s(正常情况)
  • 支持并发请求
  • 内存占用可控

模块间交互

时序图:代码节点执行

sequenceDiagram
    autonumber
    participant WF as Workflow Engine
    participant API as App API
    participant SB as Sandbox Service
    participant VM as VM2/Python
    
    WF->>API: dispatch(codeNode)
    API->>SB: POST /sandbox/js<br/>{code, params}
    SB->>VM: 创建沙箱环境
    SB->>VM: 注入参数
    SB->>VM: 执行用户代码
    VM-->>SB: 返回结果
    SB-->>API: {success, result}
    API-->>WF: 节点输出

时序图:MCP 工具调用

sequenceDiagram
    autonumber
    participant LLM as LLM (Claude)
    participant MCP as MCP Client
    participant Server as MCP Server
    participant FG as FastGPT API
    
    LLM->>MCP: 需要调用工具
    MCP->>Server: ListTools()
    Server->>FG: GET /api/support/mcp/server/toolList
    FG-->>Server: 返回工具列表
    Server-->>MCP: tools[]
    
    MCP->>LLM: 可用工具
    LLM->>MCP: CallTool(name, args)
    MCP->>Server: CallTool(name, args)
    Server->>FG: POST /api/support/mcp/server/toolCall
    FG->>FG: 执行工具逻辑
    FG-->>Server: 返回结果
    Server-->>MCP: 工具结果
    MCP-->>LLM: 工具结果

部署与配置

Docker Compose 示例

# docker-compose.yml

version: '3.8'
services:
  # FastGPT 主应用
  fastgpt:
    image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt:latest
    ports:
      - "3000:3000"
    environment:
      - MONGODB_URI=mongodb://mongo:27017/fastgpt
      - SANDBOX_URL=http://sandbox:3001
    depends_on:
      - mongo
      - sandbox

  # Sandbox 沙箱服务
  sandbox:
    image: registry.cn-hangzhou.aliyuncs.com/fastgpt/sandbox:latest
    ports:
      - "3001:3001"
    environment:
      - NODE_ENV=production

  # MCP 服务器(可选)
  mcp_server:
    image: your-registry/fastgpt-mcp-server:latest
    environment:
      - FASTGPT_API_URL=http://fastgpt:3000
      - FASTGPT_API_KEY=your-api-key

  # 插件服务
  llm-baichuan:
    image: your-registry/llm-baichuan2:latest
    ports:
      - "8000:8000"
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  rerank-bge:
    image: your-registry/rerank-bge:latest
    ports:
      - "8001:8001"

  mongodb:
    image: mongo:5.0
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db

volumes:
  mongo_data:

总结与最佳实践

安全注意事项

Sandbox

  • 严格限制资源(CPU、内存、时间)
  • 禁止网络访问
  • 限制可用模块白名单
  • 定期更新沙箱环境

MCP

  • 使用 API Key 鉴权
  • 限制工具调用频率
  • 记录所有工具调用日志
  • 验证工具参数

插件

  • 插件间网络隔离
  • 输入参数校验
  • 输出结果过滤
  • 容器资源限制

性能优化

  1. Sandbox

    • 复用 VM 实例(池化)
    • 预编译常用代码
    • 缓存依赖模块
  2. MCP

    • 工具列表缓存
    • 批量工具调用
    • 异步执行
  3. 插件

    • 模型预加载
    • 批处理请求
    • GPU 加速