RAGFlow-07-Sandbox模块

模块概览

1.1 职责与定位

Sandbox 模块是 RAGFlow 的代码执行沙箱,为 Agent 提供安全的代码执行环境。主要职责包括:

  1. 代码执行隔离:在独立容器中执行用户代码,防止恶意代码破坏系统
  2. 多语言支持:Python、JavaScript、Shell、R、SQL 等
  3. 资源限制:CPU、内存、执行时间限制
  4. 依赖管理:预装常用库(numpy、pandas、requests 等)
  5. 结果返回:标准输出、标准错误、返回值

1.2 技术栈

核心技术

  • Docker:容器化隔离
  • gVisor(runsc):额外安全层(可选)
  • FastAPI:Executor Manager HTTP 服务
  • Python:Executor 实现语言

依赖库

  • sandbox/executor_manager/:执行器管理服务
  • sandbox/sandbox_base_image/:基础镜像定义
  • docker-compose.yml:服务编排

1. 模块架构图

flowchart TB
    subgraph "Agent 组件"
        CodeExec[CodeExec Tool<br/>代码执行工具]
    end

    subgraph "Sandbox Service"
        ExecutorManager[Executor Manager<br/>FastAPI 服务<br/>:8080]
        
        subgraph "执行器池"
            PythonExecutor[Python Executor<br/>Docker Container]
            JSExecutor[JS Executor<br/>Docker Container]
            ShellExecutor[Shell Executor<br/>Docker Container]
        end
        
        ResourceMonitor[资源监控器<br/>CPU/Memory/Timeout]
        SecurityFilter[安全过滤器<br/>危险函数检测]
    end

    subgraph "Docker 环境"
        BaseImage[Sandbox Base Image<br/>预装依赖]
        Network[隔离网络<br/>sandbox-net]
        Volume[临时卷<br/>/tmp/sandbox]
    end

    CodeExec -->|HTTP POST /execute| ExecutorManager
    
    ExecutorManager --> SecurityFilter
    SecurityFilter --> ResourceMonitor
    
    ResourceMonitor --> PythonExecutor
    ResourceMonitor --> JSExecutor
    ResourceMonitor --> ShellExecutor
    
    PythonExecutor --> BaseImage
    JSExecutor --> BaseImage
    ShellExecutor --> BaseImage
    
    BaseImage --> Network
    BaseImage --> Volume
    
    ExecutorManager -->|返回结果| CodeExec

2. 核心功能详细剖析

2.1 Executor Manager(执行器管理器)

2.1.1 核心代码

# sandbox/executor_manager/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import base64
import docker
import asyncio
from typing import Optional

app = FastAPI()
docker_client = docker.from_env()

class CodeExecutionRequest(BaseModel):
    code_b64: str  # Base64 编码的代码
    language: str  # python/javascript/shell
    arguments: dict = {}  # 传递给代码的参数
    timeout: int = 60  # 超时时间(秒)
    max_memory: str = "256m"  # 最大内存

class CodeExecutionResult(BaseModel):
    status: str  # success/error/timeout
    stdout: str  # 标准输出
    stderr: str  # 标准错误
    result: Optional[str] = None  # 返回值(Python)
    error: Optional[str] = None  # 错误信息
    execution_time: float  # 执行时间(秒)

@app.post("/execute", response_model=CodeExecutionResult)
async def execute_code(req: CodeExecutionRequest):
    """执行代码"""
    # 1. 解码代码
    try:
        code = base64.b64decode(req.code_b64).decode("utf-8")
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid base64: {e}")
    
    # 2. 安全检查
    if not is_safe_code(code, req.language):
        return CodeExecutionResult(
            status="error",
            stdout="",
            stderr="",
            error="Unsafe code detected",
            execution_time=0
        )
    
    # 3. 选择执行器
    executor = get_executor(req.language)
    
    # 4. 执行代码
    import time
    start_time = time.time()
    
    try:
        result = await asyncio.wait_for(
            executor.execute(code, req.arguments),
            timeout=req.timeout
        )
        result["execution_time"] = time.time() - start_time
        return CodeExecutionResult(**result)
    
    except asyncio.TimeoutError:
        return CodeExecutionResult(
            status="timeout",
            stdout="",
            stderr="",
            error=f"Execution timeout ({req.timeout}s)",
            execution_time=req.timeout
        )
    
    except Exception as e:
        return CodeExecutionResult(
            status="error",
            stdout="",
            stderr="",
            error=str(e),
            execution_time=time.time() - start_time
        )

def is_safe_code(code: str, language: str) -> bool:
    """安全检查"""
    if language == "python":
        # 检测危险函数
        dangerous_patterns = [
            "os.system", "subprocess", "eval", "exec",
            "__import__", "open(", "compile"
        ]
        for pattern in dangerous_patterns:
            if pattern in code:
                return False
    
    return True

def get_executor(language: str):
    """获取执行器"""
    if language == "python":
        return PythonExecutor()
    elif language == "javascript":
        return JavaScriptExecutor()
    elif language == "shell":
        return ShellExecutor()
    else:
        raise ValueError(f"Unsupported language: {language}")

2.2 Python 执行器

2.2.1 核心代码

# sandbox/executor_manager/executors/python_executor.py
import docker
import json
import tempfile
import os

class PythonExecutor:
    def __init__(self):
        self.client = docker.from_env()
        self.image = "ragflow/sandbox-python:latest"
    
    async def execute(self, code: str, arguments: dict) -> dict:
        """执行 Python 代码"""
        # 1. 准备执行脚本
        wrapper_code = f"""
import sys
import json

# 注入参数
arguments = {json.dumps(arguments)}

# 用户代码
try:
    {self._indent_code(code, 4)}
    
    # 捕获返回值(如果有)
    if 'result' in locals():
        print("__RESULT_START__")
        print(json.dumps(result))
        print("__RESULT_END__")
except Exception as e:
    print(f"ERROR: {{e}}", file=sys.stderr)
    sys.exit(1)
"""
        
        # 2. 创建临时文件
        with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
            f.write(wrapper_code)
            script_path = f.name
        
        try:
            # 3. 运行 Docker 容器
            container = self.client.containers.run(
                image=self.image,
                command=["python", "/code/script.py"],
                volumes={
                    script_path: {"bind": "/code/script.py", "mode": "ro"}
                },
                mem_limit="256m",
                cpu_period=100000,
                cpu_quota=50000,  # 50% CPU
                network_mode="none",  # 禁用网络
                detach=True,
                remove=True
            )
            
            # 4. 等待执行完成
            result = container.wait()
            stdout = container.logs(stdout=True, stderr=False).decode("utf-8")
            stderr = container.logs(stdout=False, stderr=True).decode("utf-8")
            
            # 5. 解析返回值
            return_value = None
            if "__RESULT_START__" in stdout:
                start = stdout.index("__RESULT_START__") + len("__RESULT_START__\n")
                end = stdout.index("__RESULT_END__")
                return_value = stdout[start:end].strip()
            
            # 6. 判断状态
            exit_code = result["StatusCode"]
            status = "success" if exit_code == 0 else "error"
            
            return {
                "status": status,
                "stdout": stdout,
                "stderr": stderr,
                "result": return_value,
                "error": stderr if exit_code != 0 else None
            }
        
        finally:
            # 清理临时文件
            os.unlink(script_path)
    
    def _indent_code(self, code: str, spaces: int) -> str:
        """缩进代码"""
        indent = " " * spaces
        return "\n".join([indent + line for line in code.split("\n")])

2.3 Docker 基础镜像

2.3.1 Dockerfile

# sandbox/sandbox_base_image/Dockerfile
FROM python:3.10-slim

# 安装常用库
RUN pip install --no-cache-dir \
    numpy==1.24.3 \
    pandas==2.0.2 \
    requests==2.31.0 \
    matplotlib==3.7.1 \
    scikit-learn==1.3.0

# 创建代码目录
RUN mkdir /code

# 非 root 用户
RUN useradd -m -u 1000 sandbox
USER sandbox

WORKDIR /code

2.3.2 构建与推送

# 构建镜像
cd sandbox/sandbox_base_image
docker build -t ragflow/sandbox-python:latest .

# 推送到私有仓库
docker tag ragflow/sandbox-python:latest registry.example.com/ragflow/sandbox-python:latest
docker push registry.example.com/ragflow/sandbox-python:latest

2.4 安全机制

2.4.1 多层安全防护

1. 代码静态检查

def is_safe_code(code: str, language: str) -> bool:
    """危险函数检测"""
    if language == "python":
        dangerous_patterns = [
            # 系统调用
            "os.system", "subprocess.run", "subprocess.Popen",
            # 动态执行
            "eval", "exec", "compile", "__import__",
            # 文件操作
            "open(", "file(", "io.open",
            # 网络操作
            "socket", "urllib", "httplib",
            # 危险模块
            "ctypes", "cffi"
        ]
        
        for pattern in dangerous_patterns:
            if pattern in code:
                logging.warning(f"Dangerous pattern detected: {pattern}")
                return False
    
    return True

2. Docker 资源限制

container = docker_client.containers.run(
    image="ragflow/sandbox-python:latest",
    mem_limit="256m",  # 内存限制
    cpu_period=100000,
    cpu_quota=50000,  # CPU 限制(50%)
    network_mode="none",  # 禁用网络
    pids_limit=50,  # 进程数限制
    read_only=True,  # 只读文件系统
    security_opt=["no-new-privileges"],  # 禁止提权
    cap_drop=["ALL"],  # 删除所有 Capabilities
    detach=True,
    remove=True
)

3. gVisor 隔离(可选)

# docker-compose.yml
services:
  sandbox-executor-manager:
    image: ragflow/sandbox-executor-manager
    runtime: runsc  # 使用 gVisor
    security_opt:
      - seccomp:unconfined

3. 时序图

3.1 代码执行时序

sequenceDiagram
    autonumber
    participant Agent as Agent 组件
    participant Mgr as Executor Manager
    participant Filter as 安全过滤器
    participant Docker as Docker Engine
    participant Container as Python Container

    Agent->>Mgr: POST /execute
    note right of Agent: {code_b64, language, arguments}

    Mgr->>Mgr: Base64 解码代码
    Mgr->>Filter: 安全检查
    Filter->>Filter: 检测危险函数
    alt 不安全
        Filter-->>Mgr: 返回错误
        Mgr-->>Agent: status=error
    else 安全
        Filter-->>Mgr: 通过

        Mgr->>Mgr: 准备执行脚本(包装代码)
        Mgr->>Docker: 创建容器
        Docker->>Container: 启动容器
        Container->>Container: 执行代码
        note right of Container: 资源限制:256MB / 50% CPU
        
        alt 执行成功
            Container-->>Docker: 返回 stdout/stderr
            Docker-->>Mgr: 返回结果
            Mgr->>Mgr: 解析返回值
            Mgr-->>Agent: status=success + result
        else 执行失败
            Container-->>Docker: 返回错误
            Docker-->>Mgr: 错误信息
            Mgr-->>Agent: status=error + error
        else 超时
            Mgr->>Docker: 停止容器
            Docker-->>Mgr: 容器已停止
            Mgr-->>Agent: status=timeout
        end

        Docker->>Docker: 自动删除容器
    end

4. 使用示例

4.1 Python 代码执行

请求

{
  "code_b64": "aW1wb3J0IG51bXB5IGFzIG5wCgpkYXRhID0gYXJndW1lbnRzWydkYXRhJ10KcmVzdWx0ID0gbnAubWVhbihkYXRhKQ==",
  "language": "python",
  "arguments": {
    "data": [1, 2, 3, 4, 5]
  },
  "timeout": 30
}

代码(解码后)

import numpy as np

data = arguments['data']
result = np.mean(data)

响应

{
  "status": "success",
  "stdout": "3.0\n",
  "stderr": "",
  "result": "3.0",
  "error": null,
  "execution_time": 0.52
}

5. 性能优化建议

1. 容器池化

  • 预创建容器池,避免每次执行都创建新容器
  • 容器复用(执行完成后清理状态,而非销毁)

2. 镜像优化

  • 使用 Alpine 基础镜像(体积更小)
  • 多阶段构建(减少层数)

3. 资源配额

  • 根据租户等级分配不同的资源配额
  • 超过配额时排队等待

6. 最佳实践

1. 安全性

  • 定期更新基础镜像(修复漏洞)
  • 禁用不必要的系统调用
  • 使用 gVisor 额外隔离层

2. 可观测性

  • 记录所有执行日志(代码、结果、耗时)
  • 监控容器资源使用情况
  • 异常代码告警

3. 依赖管理

  • 预装常用库(避免运行时安装)
  • 支持用户自定义依赖(安全沙箱内)