RAGFlow-07-Sandbox模块
模块概览
1.1 职责与定位
Sandbox 模块是 RAGFlow 的代码执行沙箱,为 Agent 提供安全的代码执行环境。主要职责包括:
- 代码执行隔离:在独立容器中执行用户代码,防止恶意代码破坏系统
- 多语言支持:Python、JavaScript、Shell、R、SQL 等
- 资源限制:CPU、内存、执行时间限制
- 依赖管理:预装常用库(numpy、pandas、requests 等)
- 结果返回:标准输出、标准错误、返回值
1.2 技术栈
核心技术:
- Docker:容器化隔离
- gVisor(runsc):额外安全层(可选)
- FastAPI:Executor Manager HTTP 服务
- Python:Executor 实现语言
依赖库:
sandbox/executor_manager/:执行器管理服务sandbox/sandbox_base_image/:基础镜像定义docker-compose.yml:服务编排
1. 模块架构图
flowchart TB
subgraph "Agent 组件"
CodeExec[CodeExec Tool<br/>代码执行工具]
end
subgraph "Sandbox Service"
ExecutorManager[Executor Manager<br/>FastAPI 服务<br/>:8080]
subgraph "执行器池"
PythonExecutor[Python Executor<br/>Docker Container]
JSExecutor[JS Executor<br/>Docker Container]
ShellExecutor[Shell Executor<br/>Docker Container]
end
ResourceMonitor[资源监控器<br/>CPU/Memory/Timeout]
SecurityFilter[安全过滤器<br/>危险函数检测]
end
subgraph "Docker 环境"
BaseImage[Sandbox Base Image<br/>预装依赖]
Network[隔离网络<br/>sandbox-net]
Volume[临时卷<br/>/tmp/sandbox]
end
CodeExec -->|HTTP POST /execute| ExecutorManager
ExecutorManager --> SecurityFilter
SecurityFilter --> ResourceMonitor
ResourceMonitor --> PythonExecutor
ResourceMonitor --> JSExecutor
ResourceMonitor --> ShellExecutor
PythonExecutor --> BaseImage
JSExecutor --> BaseImage
ShellExecutor --> BaseImage
BaseImage --> Network
BaseImage --> Volume
ExecutorManager -->|返回结果| CodeExec
2. 核心功能详细剖析
2.1 Executor Manager(执行器管理器)
2.1.1 核心代码
# sandbox/executor_manager/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import base64
import docker
import asyncio
from typing import Optional
app = FastAPI()
docker_client = docker.from_env()
class CodeExecutionRequest(BaseModel):
code_b64: str # Base64 编码的代码
language: str # python/javascript/shell
arguments: dict = {} # 传递给代码的参数
timeout: int = 60 # 超时时间(秒)
max_memory: str = "256m" # 最大内存
class CodeExecutionResult(BaseModel):
status: str # success/error/timeout
stdout: str # 标准输出
stderr: str # 标准错误
result: Optional[str] = None # 返回值(Python)
error: Optional[str] = None # 错误信息
execution_time: float # 执行时间(秒)
@app.post("/execute", response_model=CodeExecutionResult)
async def execute_code(req: CodeExecutionRequest):
"""执行代码"""
# 1. 解码代码
try:
code = base64.b64decode(req.code_b64).decode("utf-8")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid base64: {e}")
# 2. 安全检查
if not is_safe_code(code, req.language):
return CodeExecutionResult(
status="error",
stdout="",
stderr="",
error="Unsafe code detected",
execution_time=0
)
# 3. 选择执行器
executor = get_executor(req.language)
# 4. 执行代码
import time
start_time = time.time()
try:
result = await asyncio.wait_for(
executor.execute(code, req.arguments),
timeout=req.timeout
)
result["execution_time"] = time.time() - start_time
return CodeExecutionResult(**result)
except asyncio.TimeoutError:
return CodeExecutionResult(
status="timeout",
stdout="",
stderr="",
error=f"Execution timeout ({req.timeout}s)",
execution_time=req.timeout
)
except Exception as e:
return CodeExecutionResult(
status="error",
stdout="",
stderr="",
error=str(e),
execution_time=time.time() - start_time
)
def is_safe_code(code: str, language: str) -> bool:
"""安全检查"""
if language == "python":
# 检测危险函数
dangerous_patterns = [
"os.system", "subprocess", "eval", "exec",
"__import__", "open(", "compile"
]
for pattern in dangerous_patterns:
if pattern in code:
return False
return True
def get_executor(language: str):
"""获取执行器"""
if language == "python":
return PythonExecutor()
elif language == "javascript":
return JavaScriptExecutor()
elif language == "shell":
return ShellExecutor()
else:
raise ValueError(f"Unsupported language: {language}")
2.2 Python 执行器
2.2.1 核心代码
# sandbox/executor_manager/executors/python_executor.py
import docker
import json
import tempfile
import os
class PythonExecutor:
def __init__(self):
self.client = docker.from_env()
self.image = "ragflow/sandbox-python:latest"
async def execute(self, code: str, arguments: dict) -> dict:
"""执行 Python 代码"""
# 1. 准备执行脚本
wrapper_code = f"""
import sys
import json
# 注入参数
arguments = {json.dumps(arguments)}
# 用户代码
try:
{self._indent_code(code, 4)}
# 捕获返回值(如果有)
if 'result' in locals():
print("__RESULT_START__")
print(json.dumps(result))
print("__RESULT_END__")
except Exception as e:
print(f"ERROR: {{e}}", file=sys.stderr)
sys.exit(1)
"""
# 2. 创建临时文件
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(wrapper_code)
script_path = f.name
try:
# 3. 运行 Docker 容器
container = self.client.containers.run(
image=self.image,
command=["python", "/code/script.py"],
volumes={
script_path: {"bind": "/code/script.py", "mode": "ro"}
},
mem_limit="256m",
cpu_period=100000,
cpu_quota=50000, # 50% CPU
network_mode="none", # 禁用网络
detach=True,
remove=True
)
# 4. 等待执行完成
result = container.wait()
stdout = container.logs(stdout=True, stderr=False).decode("utf-8")
stderr = container.logs(stdout=False, stderr=True).decode("utf-8")
# 5. 解析返回值
return_value = None
if "__RESULT_START__" in stdout:
start = stdout.index("__RESULT_START__") + len("__RESULT_START__\n")
end = stdout.index("__RESULT_END__")
return_value = stdout[start:end].strip()
# 6. 判断状态
exit_code = result["StatusCode"]
status = "success" if exit_code == 0 else "error"
return {
"status": status,
"stdout": stdout,
"stderr": stderr,
"result": return_value,
"error": stderr if exit_code != 0 else None
}
finally:
# 清理临时文件
os.unlink(script_path)
def _indent_code(self, code: str, spaces: int) -> str:
"""缩进代码"""
indent = " " * spaces
return "\n".join([indent + line for line in code.split("\n")])
2.3 Docker 基础镜像
2.3.1 Dockerfile
# sandbox/sandbox_base_image/Dockerfile
FROM python:3.10-slim
# 安装常用库
RUN pip install --no-cache-dir \
numpy==1.24.3 \
pandas==2.0.2 \
requests==2.31.0 \
matplotlib==3.7.1 \
scikit-learn==1.3.0
# 创建代码目录
RUN mkdir /code
# 非 root 用户
RUN useradd -m -u 1000 sandbox
USER sandbox
WORKDIR /code
2.3.2 构建与推送
# 构建镜像
cd sandbox/sandbox_base_image
docker build -t ragflow/sandbox-python:latest .
# 推送到私有仓库
docker tag ragflow/sandbox-python:latest registry.example.com/ragflow/sandbox-python:latest
docker push registry.example.com/ragflow/sandbox-python:latest
2.4 安全机制
2.4.1 多层安全防护
1. 代码静态检查:
def is_safe_code(code: str, language: str) -> bool:
"""危险函数检测"""
if language == "python":
dangerous_patterns = [
# 系统调用
"os.system", "subprocess.run", "subprocess.Popen",
# 动态执行
"eval", "exec", "compile", "__import__",
# 文件操作
"open(", "file(", "io.open",
# 网络操作
"socket", "urllib", "httplib",
# 危险模块
"ctypes", "cffi"
]
for pattern in dangerous_patterns:
if pattern in code:
logging.warning(f"Dangerous pattern detected: {pattern}")
return False
return True
2. Docker 资源限制:
container = docker_client.containers.run(
image="ragflow/sandbox-python:latest",
mem_limit="256m", # 内存限制
cpu_period=100000,
cpu_quota=50000, # CPU 限制(50%)
network_mode="none", # 禁用网络
pids_limit=50, # 进程数限制
read_only=True, # 只读文件系统
security_opt=["no-new-privileges"], # 禁止提权
cap_drop=["ALL"], # 删除所有 Capabilities
detach=True,
remove=True
)
3. gVisor 隔离(可选):
# docker-compose.yml
services:
sandbox-executor-manager:
image: ragflow/sandbox-executor-manager
runtime: runsc # 使用 gVisor
security_opt:
- seccomp:unconfined
3. 时序图
3.1 代码执行时序
sequenceDiagram
autonumber
participant Agent as Agent 组件
participant Mgr as Executor Manager
participant Filter as 安全过滤器
participant Docker as Docker Engine
participant Container as Python Container
Agent->>Mgr: POST /execute
note right of Agent: {code_b64, language, arguments}
Mgr->>Mgr: Base64 解码代码
Mgr->>Filter: 安全检查
Filter->>Filter: 检测危险函数
alt 不安全
Filter-->>Mgr: 返回错误
Mgr-->>Agent: status=error
else 安全
Filter-->>Mgr: 通过
Mgr->>Mgr: 准备执行脚本(包装代码)
Mgr->>Docker: 创建容器
Docker->>Container: 启动容器
Container->>Container: 执行代码
note right of Container: 资源限制:256MB / 50% CPU
alt 执行成功
Container-->>Docker: 返回 stdout/stderr
Docker-->>Mgr: 返回结果
Mgr->>Mgr: 解析返回值
Mgr-->>Agent: status=success + result
else 执行失败
Container-->>Docker: 返回错误
Docker-->>Mgr: 错误信息
Mgr-->>Agent: status=error + error
else 超时
Mgr->>Docker: 停止容器
Docker-->>Mgr: 容器已停止
Mgr-->>Agent: status=timeout
end
Docker->>Docker: 自动删除容器
end
4. 使用示例
4.1 Python 代码执行
请求:
{
"code_b64": "aW1wb3J0IG51bXB5IGFzIG5wCgpkYXRhID0gYXJndW1lbnRzWydkYXRhJ10KcmVzdWx0ID0gbnAubWVhbihkYXRhKQ==",
"language": "python",
"arguments": {
"data": [1, 2, 3, 4, 5]
},
"timeout": 30
}
代码(解码后):
import numpy as np
data = arguments['data']
result = np.mean(data)
响应:
{
"status": "success",
"stdout": "3.0\n",
"stderr": "",
"result": "3.0",
"error": null,
"execution_time": 0.52
}
5. 性能优化建议
1. 容器池化:
- 预创建容器池,避免每次执行都创建新容器
- 容器复用(执行完成后清理状态,而非销毁)
2. 镜像优化:
- 使用 Alpine 基础镜像(体积更小)
- 多阶段构建(减少层数)
3. 资源配额:
- 根据租户等级分配不同的资源配额
- 超过配额时排队等待
6. 最佳实践
1. 安全性:
- 定期更新基础镜像(修复漏洞)
- 禁用不必要的系统调用
- 使用 gVisor 额外隔离层
2. 可观测性:
- 记录所有执行日志(代码、结果、耗时)
- 监控容器资源使用情况
- 异常代码告警
3. 依赖管理:
- 预装常用库(避免运行时安装)
- 支持用户自定义依赖(安全沙箱内)