RAGFlow-06-Admin模块
模块概览
1.1 职责与定位
Admin 模块是 RAGFlow 的后台管理服务,提供系统管理、租户管理、权限控制等功能。主要职责包括:
- 租户管理:创建、更新、删除、查询租户(Tenant)
- 用户管理:用户注册、登录、权限分配、团队管理
- 系统配置管理:LLM 配置、Embedding 配置、系统参数
- 权限控制:基于角色的访问控制(RBAC)
- API Key 管理:生成、撤销、查询 API Key
- 审计日志:记录关键操作日志
- 资源配额管理:Token 配额、存储配额
1.2 技术栈
框架:
- FastAPI:Admin Server 使用 FastAPI 框架
- 认证:JWT Token + API Key 双重认证
- 数据库:与主服务共享 MySQL 数据库
核心依赖:
admin/server/admin_server.py:Admin Server 主入口admin/server/auth.py:认证与授权admin/server/roles.py:角色定义与权限映射admin/server/services.py:业务逻辑层
1. 模块架构图
flowchart TB
subgraph "客户端"
AdminCLI[Admin CLI<br/>命令行工具]
AdminUI[Admin Web UI<br/>管理界面]
end
subgraph "Admin Server"
FastAPI[FastAPI 应用]
AuthMiddleware[认证中间件<br/>JWT + API Key]
RoleMiddleware[权限中间件<br/>RBAC]
subgraph "路由层"
TenantRoutes[租户路由<br/>/tenants]
UserRoutes[用户路由<br/>/users]
ConfigRoutes[配置路由<br/>/configs]
AuditRoutes[审计路由<br/>/audits]
end
subgraph "服务层"
TenantService[TenantService]
UserService[UserService]
ConfigService[ConfigService]
AuditService[AuditService]
end
end
subgraph "存储层"
MySQL[(MySQL<br/>租户/用户/配置)]
Redis[(Redis<br/>会话缓存)]
end
AdminCLI --> FastAPI
AdminUI --> FastAPI
FastAPI --> AuthMiddleware
AuthMiddleware --> RoleMiddleware
RoleMiddleware --> TenantRoutes
RoleMiddleware --> UserRoutes
RoleMiddleware --> ConfigRoutes
RoleMiddleware --> AuditRoutes
TenantRoutes --> TenantService
UserRoutes --> UserService
ConfigRoutes --> ConfigService
AuditRoutes --> AuditService
TenantService --> MySQL
UserService --> MySQL
ConfigService --> MySQL
AuditService --> MySQL
AuthMiddleware --> Redis
2. 核心功能详细剖析
2.1 认证机制
2.1.1 JWT Token 认证
功能:用户登录后颁发 JWT Token,后续请求携带 Token 访问
核心代码:
# admin/server/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 小时
security = HTTPBearer()
def create_access_token(data: dict, expires_delta: timedelta = None):
"""创建 JWT Token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""从 Token 中获取当前用户"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# 从数据库查询用户
from admin.server.services import UserService
user = UserService.get_by_id(user_id)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
2.1.2 API Key 认证
功能:为第三方应用颁发 API Key,支持权限范围限制
核心代码:
# admin/server/auth.py
from fastapi import Header, HTTPException
async def verify_api_key(x_api_key: str = Header(...)):
"""验证 API Key"""
from admin.server.services import APIKeyService
api_key = APIKeyService.get_by_key(x_api_key)
if not api_key:
raise HTTPException(status_code=401, detail="Invalid API Key")
# 检查是否过期
if api_key.expired_at and api_key.expired_at < datetime.now():
raise HTTPException(status_code=401, detail="API Key expired")
# 检查是否被撤销
if api_key.revoked:
raise HTTPException(status_code=401, detail="API Key revoked")
# 记录使用次数
APIKeyService.increment_usage(api_key.id)
return api_key
2.2 角色与权限
2.2.1 角色定义
# admin/server/roles.py
from enum import Enum
class Role(str, Enum):
"""角色枚举"""
SUPER_ADMIN = "super_admin" # 超级管理员(所有权限)
TENANT_ADMIN = "tenant_admin" # 租户管理员(租户内所有权限)
USER = "user" # 普通用户(基本权限)
GUEST = "guest" # 访客(只读权限)
class Permission(str, Enum):
"""权限枚举"""
# 租户管理
TENANT_CREATE = "tenant:create"
TENANT_READ = "tenant:read"
TENANT_UPDATE = "tenant:update"
TENANT_DELETE = "tenant:delete"
# 用户管理
USER_CREATE = "user:create"
USER_READ = "user:read"
USER_UPDATE = "user:update"
USER_DELETE = "user:delete"
# 配置管理
CONFIG_READ = "config:read"
CONFIG_UPDATE = "config:update"
# 角色权限映射
ROLE_PERMISSIONS = {
Role.SUPER_ADMIN: [p for p in Permission], # 所有权限
Role.TENANT_ADMIN: [
Permission.USER_CREATE,
Permission.USER_READ,
Permission.USER_UPDATE,
Permission.USER_DELETE,
Permission.CONFIG_READ,
Permission.CONFIG_UPDATE
],
Role.USER: [
Permission.USER_READ,
Permission.CONFIG_READ
],
Role.GUEST: [
Permission.USER_READ,
Permission.CONFIG_READ
]
}
2.2.2 权限检查装饰器
# admin/server/auth.py
from functools import wraps
from admin.server.roles import Permission, ROLE_PERMISSIONS
def require_permission(permission: Permission):
"""权限检查装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, current_user=None, **kwargs):
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")
# 检查用户角色是否拥有该权限
user_permissions = ROLE_PERMISSIONS.get(current_user.role, [])
if permission not in user_permissions:
raise HTTPException(
status_code=403,
detail=f"Permission denied: {permission}"
)
return await func(*args, current_user=current_user, **kwargs)
return wrapper
return decorator
2.3 租户管理
2.3.1 租户服务
功能:创建、更新、删除、查询租户
核心代码:
# admin/server/services.py
class TenantService:
@staticmethod
def create_tenant(name: str, email: str, quota: dict) -> Tenant:
"""创建租户"""
from api.db import Tenant
# 1. 检查租户是否已存在
existing = Tenant.select().where(Tenant.email == email).first()
if existing:
raise ValueError("Tenant already exists")
# 2. 创建租户
tenant = Tenant(
id=get_uuid(),
name=name,
email=email,
status=TenantStatus.ACTIVE,
llm_id="", # 待配置
embd_id="", # 待配置
token_num=0,
token_num_max=quota.get("token_num_max", 1000000),
chunk_num=0,
chunk_num_max=quota.get("chunk_num_max", 100000),
parse_num=0,
parse_num_max=quota.get("parse_num_max", 10000)
)
tenant.save()
# 3. 创建默认管理员用户
admin_user = UserService.create_user(
tenant_id=tenant.id,
email=email,
nickname=f"{name} Admin",
password=generate_random_password(),
role=Role.TENANT_ADMIN
)
# 4. 审计日志
AuditService.log("tenant.create", tenant.id, {"name": name})
return tenant
@staticmethod
def update_quota(tenant_id: str, quota: dict):
"""更新租户配额"""
tenant = Tenant.get_by_id(tenant_id)
if not tenant:
raise ValueError("Tenant not found")
tenant.token_num_max = quota.get("token_num_max", tenant.token_num_max)
tenant.chunk_num_max = quota.get("chunk_num_max", tenant.chunk_num_max)
tenant.parse_num_max = quota.get("parse_num_max", tenant.parse_num_max)
tenant.save()
AuditService.log("tenant.update_quota", tenant_id, quota)
@staticmethod
def delete_tenant(tenant_id: str):
"""删除租户(软删除)"""
tenant = Tenant.get_by_id(tenant_id)
if not tenant:
raise ValueError("Tenant not found")
# 软删除(设置状态为 DELETED)
tenant.status = TenantStatus.DELETED
tenant.save()
AuditService.log("tenant.delete", tenant_id, {})
2.3.2 租户路由
# admin/server/routes.py
from fastapi import APIRouter, Depends
from admin.server.auth import get_current_user, require_permission
from admin.server.roles import Permission
from admin.server.models import TenantCreateRequest, TenantResponse
router = APIRouter(prefix="/api/admin/tenants", tags=["Tenants"])
@router.post("/", response_model=TenantResponse)
@require_permission(Permission.TENANT_CREATE)
async def create_tenant(
req: TenantCreateRequest,
current_user = Depends(get_current_user)
):
"""创建租户"""
tenant = TenantService.create_tenant(
name=req.name,
email=req.email,
quota=req.quota
)
return TenantResponse.from_orm(tenant)
@router.get("/{tenant_id}", response_model=TenantResponse)
@require_permission(Permission.TENANT_READ)
async def get_tenant(
tenant_id: str,
current_user = Depends(get_current_user)
):
"""查询租户"""
tenant = TenantService.get_by_id(tenant_id)
if not tenant:
raise HTTPException(status_code=404, detail="Tenant not found")
return TenantResponse.from_orm(tenant)
@router.delete("/{tenant_id}")
@require_permission(Permission.TENANT_DELETE)
async def delete_tenant(
tenant_id: str,
current_user = Depends(get_current_user)
):
"""删除租户"""
TenantService.delete_tenant(tenant_id)
return {"message": "Tenant deleted successfully"}
2.4 用户管理
2.4.1 用户服务
# admin/server/services.py
import bcrypt
class UserService:
@staticmethod
def create_user(tenant_id: str, email: str, nickname: str, password: str, role: Role) -> User:
"""创建用户"""
from api.db import User
# 1. 检查邮箱是否已存在
existing = User.select().where(User.email == email).first()
if existing:
raise ValueError("User already exists")
# 2. 密码加密
password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
# 3. 创建用户
user = User(
id=get_uuid(),
tenant_id=tenant_id,
email=email,
nickname=nickname,
password=password_hash,
role=role,
status=UserStatus.ACTIVE
)
user.save()
AuditService.log("user.create", user.id, {"email": email, "role": role})
return user
@staticmethod
def verify_password(email: str, password: str) -> User:
"""验证密码"""
user = User.select().where(User.email == email).first()
if not user:
raise ValueError("User not found")
if not bcrypt.checkpw(password.encode(), user.password.encode()):
raise ValueError("Invalid password")
return user
@staticmethod
def update_role(user_id: str, new_role: Role):
"""更新用户角色"""
user = User.get_by_id(user_id)
if not user:
raise ValueError("User not found")
user.role = new_role
user.save()
AuditService.log("user.update_role", user_id, {"new_role": new_role})
2.5 配置管理
2.5.1 LLM 配置服务
# admin/server/services.py
class ConfigService:
@staticmethod
def set_llm_config(tenant_id: str, llm_type: str, llm_config: dict):
"""设置 LLM 配置"""
from api.db.services.llm_service import TenantLLMService
# 验证配置
required_fields = ["model_name", "api_key", "base_url"]
for field in required_fields:
if field not in llm_config:
raise ValueError(f"Missing field: {field}")
# 保存配置
TenantLLMService.save(
tenant_id=tenant_id,
llm_type=llm_type,
llm_name=llm_config["model_name"],
model_type=llm_config.get("model_type", "UNKNOWN"),
api_key=llm_config["api_key"],
api_base=llm_config["base_url"]
)
AuditService.log("config.set_llm", tenant_id, {"llm_type": llm_type})
@staticmethod
def get_llm_config(tenant_id: str, llm_type: str) -> dict:
"""获取 LLM 配置"""
from api.db.services.llm_service import TenantLLMService
llm = TenantLLMService.query(tenant_id=tenant_id, llm_type=llm_type)
if not llm:
return None
return {
"model_name": llm.llm_name,
"model_type": llm.model_type,
"base_url": llm.api_base
# 不返回 api_key(安全)
}
3. Admin CLI 客户端
3.1 命令行工具
# admin/client/admin_client.py
import click
import requests
BASE_URL = "http://localhost:8001"
@click.group()
def cli():
"""RAGFlow Admin CLI"""
pass
@cli.command()
@click.option("--email", required=True, help="Admin email")
@click.option("--password", required=True, help="Admin password")
def login(email, password):
"""登录并获取 Token"""
resp = requests.post(f"{BASE_URL}/api/admin/login", json={
"email": email,
"password": password
})
if resp.status_code == 200:
token = resp.json()["access_token"]
# 保存 Token 到本地
with open(".admin_token", "w") as f:
f.write(token)
click.echo("Login successful!")
else:
click.echo(f"Login failed: {resp.text}")
@cli.command()
@click.option("--name", required=True, help="Tenant name")
@click.option("--email", required=True, help="Tenant email")
@click.option("--token-quota", default=1000000, help="Token quota")
def create_tenant(name, email, token_quota):
"""创建租户"""
token = open(".admin_token").read().strip()
resp = requests.post(
f"{BASE_URL}/api/admin/tenants",
json={
"name": name,
"email": email,
"quota": {"token_num_max": token_quota}
},
headers={"Authorization": f"Bearer {token}"}
)
if resp.status_code == 200:
click.echo(f"Tenant created: {resp.json()}")
else:
click.echo(f"Failed: {resp.text}")
if __name__ == "__main__":
cli()
使用示例:
# 登录
python admin_client.py login --email admin@example.com --password admin123
# 创建租户
python admin_client.py create-tenant --name "ACME Corp" --email acme@example.com --token-quota 5000000
# 查询租户
python admin_client.py get-tenant --tenant-id <tenant_id>
4. 时序图
4.1 租户创建时序
sequenceDiagram
autonumber
participant CLI as Admin CLI
participant API as Admin API
participant Auth as 认证中间件
participant Service as TenantService
participant DB as MySQL
CLI->>API: POST /api/admin/tenants
note right of CLI: 携带 JWT Token
API->>Auth: 验证 Token
Auth->>Auth: 解析 JWT
Auth->>DB: 查询用户
DB-->>Auth: 返回用户信息
Auth->>Auth: 检查权限(TENANT_CREATE)
Auth-->>API: 认证通过
API->>Service: TenantService.create_tenant()
Service->>DB: 检查邮箱是否存在
DB-->>Service: 不存在
Service->>DB: INSERT INTO tenant
Service->>DB: INSERT INTO user(默认管理员)
Service->>DB: INSERT INTO audit_log
DB-->>Service: 成功
Service-->>API: 返回 Tenant
API-->>CLI: 200 OK + Tenant 信息
5. 最佳实践
1. 安全性:
- JWT Secret 使用环境变量,定期轮换
- API Key 支持权限范围限制(Scope)
- 密码使用 bcrypt 加密,不明文存储
2. 审计日志:
- 记录所有关键操作(创建、删除、更新)
- 日志包含操作人、时间、操作内容
- 支持日志查询与导出
3. 配额管理:
- 定期检查租户配额使用情况
- 超过配额时拒绝新请求,返回 429 错误
- 提供配额预警通知
4. 权限设计:
- 最小权限原则(Principle of Least Privilege)
- 支持自定义角色(未来扩展)
- 权限粒度适中(不要过细)