RAGFlow-06-Admin模块

模块概览

1.1 职责与定位

Admin 模块是 RAGFlow 的后台管理服务,提供系统管理、租户管理、权限控制等功能。主要职责包括:

  1. 租户管理:创建、更新、删除、查询租户(Tenant)
  2. 用户管理:用户注册、登录、权限分配、团队管理
  3. 系统配置管理:LLM 配置、Embedding 配置、系统参数
  4. 权限控制:基于角色的访问控制(RBAC)
  5. API Key 管理:生成、撤销、查询 API Key
  6. 审计日志:记录关键操作日志
  7. 资源配额管理:Token 配额、存储配额

1.2 技术栈

框架

  • FastAPI:Admin Server 使用 FastAPI 框架
  • 认证:JWT Token + API Key 双重认证
  • 数据库:与主服务共享 MySQL 数据库

核心依赖

  • admin/server/admin_server.py:Admin Server 主入口
  • admin/server/auth.py:认证与授权
  • admin/server/roles.py:角色定义与权限映射
  • admin/server/services.py:业务逻辑层

1. 模块架构图

flowchart TB
    subgraph "客户端"
        AdminCLI[Admin CLI<br/>命令行工具]
        AdminUI[Admin Web UI<br/>管理界面]
    end

    subgraph "Admin Server"
        FastAPI[FastAPI 应用]
        AuthMiddleware[认证中间件<br/>JWT + API Key]
        RoleMiddleware[权限中间件<br/>RBAC]
        
        subgraph "路由层"
            TenantRoutes[租户路由<br/>/tenants]
            UserRoutes[用户路由<br/>/users]
            ConfigRoutes[配置路由<br/>/configs]
            AuditRoutes[审计路由<br/>/audits]
        end
        
        subgraph "服务层"
            TenantService[TenantService]
            UserService[UserService]
            ConfigService[ConfigService]
            AuditService[AuditService]
        end
    end

    subgraph "存储层"
        MySQL[(MySQL<br/>租户/用户/配置)]
        Redis[(Redis<br/>会话缓存)]
    end

    AdminCLI --> FastAPI
    AdminUI --> FastAPI
    
    FastAPI --> AuthMiddleware
    AuthMiddleware --> RoleMiddleware
    
    RoleMiddleware --> TenantRoutes
    RoleMiddleware --> UserRoutes
    RoleMiddleware --> ConfigRoutes
    RoleMiddleware --> AuditRoutes
    
    TenantRoutes --> TenantService
    UserRoutes --> UserService
    ConfigRoutes --> ConfigService
    AuditRoutes --> AuditService
    
    TenantService --> MySQL
    UserService --> MySQL
    ConfigService --> MySQL
    AuditService --> MySQL
    
    AuthMiddleware --> Redis

2. 核心功能详细剖析

2.1 认证机制

2.1.1 JWT Token 认证

功能:用户登录后颁发 JWT Token,后续请求携带 Token 访问

核心代码

# admin/server/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24  # 24 小时

security = HTTPBearer()

def create_access_token(data: dict, expires_delta: timedelta = None):
    """创建 JWT Token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """从 Token 中获取当前用户"""
    token = credentials.credentials
    
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")
    
    # 从数据库查询用户
    from admin.server.services import UserService
    user = UserService.get_by_id(user_id)
    if user is None:
        raise HTTPException(status_code=401, detail="User not found")
    
    return user

2.1.2 API Key 认证

功能:为第三方应用颁发 API Key,支持权限范围限制

核心代码

# admin/server/auth.py
from fastapi import Header, HTTPException

async def verify_api_key(x_api_key: str = Header(...)):
    """验证 API Key"""
    from admin.server.services import APIKeyService
    
    api_key = APIKeyService.get_by_key(x_api_key)
    if not api_key:
        raise HTTPException(status_code=401, detail="Invalid API Key")
    
    # 检查是否过期
    if api_key.expired_at and api_key.expired_at < datetime.now():
        raise HTTPException(status_code=401, detail="API Key expired")
    
    # 检查是否被撤销
    if api_key.revoked:
        raise HTTPException(status_code=401, detail="API Key revoked")
    
    # 记录使用次数
    APIKeyService.increment_usage(api_key.id)
    
    return api_key

2.2 角色与权限

2.2.1 角色定义

# admin/server/roles.py
from enum import Enum

class Role(str, Enum):
    """角色枚举"""
    SUPER_ADMIN = "super_admin"  # 超级管理员(所有权限)
    TENANT_ADMIN = "tenant_admin"  # 租户管理员(租户内所有权限)
    USER = "user"  # 普通用户(基本权限)
    GUEST = "guest"  # 访客(只读权限)

class Permission(str, Enum):
    """权限枚举"""
    # 租户管理
    TENANT_CREATE = "tenant:create"
    TENANT_READ = "tenant:read"
    TENANT_UPDATE = "tenant:update"
    TENANT_DELETE = "tenant:delete"
    
    # 用户管理
    USER_CREATE = "user:create"
    USER_READ = "user:read"
    USER_UPDATE = "user:update"
    USER_DELETE = "user:delete"
    
    # 配置管理
    CONFIG_READ = "config:read"
    CONFIG_UPDATE = "config:update"

# 角色权限映射
ROLE_PERMISSIONS = {
    Role.SUPER_ADMIN: [p for p in Permission],  # 所有权限
    Role.TENANT_ADMIN: [
        Permission.USER_CREATE,
        Permission.USER_READ,
        Permission.USER_UPDATE,
        Permission.USER_DELETE,
        Permission.CONFIG_READ,
        Permission.CONFIG_UPDATE
    ],
    Role.USER: [
        Permission.USER_READ,
        Permission.CONFIG_READ
    ],
    Role.GUEST: [
        Permission.USER_READ,
        Permission.CONFIG_READ
    ]
}

2.2.2 权限检查装饰器

# admin/server/auth.py
from functools import wraps
from admin.server.roles import Permission, ROLE_PERMISSIONS

def require_permission(permission: Permission):
    """权限检查装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, current_user=None, **kwargs):
            if not current_user:
                raise HTTPException(status_code=401, detail="Not authenticated")
            
            # 检查用户角色是否拥有该权限
            user_permissions = ROLE_PERMISSIONS.get(current_user.role, [])
            if permission not in user_permissions:
                raise HTTPException(
                    status_code=403,
                    detail=f"Permission denied: {permission}"
                )
            
            return await func(*args, current_user=current_user, **kwargs)
        return wrapper
    return decorator

2.3 租户管理

2.3.1 租户服务

功能:创建、更新、删除、查询租户

核心代码

# admin/server/services.py
class TenantService:
    @staticmethod
    def create_tenant(name: str, email: str, quota: dict) -> Tenant:
        """创建租户"""
        from api.db import Tenant
        
        # 1. 检查租户是否已存在
        existing = Tenant.select().where(Tenant.email == email).first()
        if existing:
            raise ValueError("Tenant already exists")
        
        # 2. 创建租户
        tenant = Tenant(
            id=get_uuid(),
            name=name,
            email=email,
            status=TenantStatus.ACTIVE,
            llm_id="",  # 待配置
            embd_id="",  # 待配置
            token_num=0,
            token_num_max=quota.get("token_num_max", 1000000),
            chunk_num=0,
            chunk_num_max=quota.get("chunk_num_max", 100000),
            parse_num=0,
            parse_num_max=quota.get("parse_num_max", 10000)
        )
        tenant.save()
        
        # 3. 创建默认管理员用户
        admin_user = UserService.create_user(
            tenant_id=tenant.id,
            email=email,
            nickname=f"{name} Admin",
            password=generate_random_password(),
            role=Role.TENANT_ADMIN
        )
        
        # 4. 审计日志
        AuditService.log("tenant.create", tenant.id, {"name": name})
        
        return tenant
    
    @staticmethod
    def update_quota(tenant_id: str, quota: dict):
        """更新租户配额"""
        tenant = Tenant.get_by_id(tenant_id)
        if not tenant:
            raise ValueError("Tenant not found")
        
        tenant.token_num_max = quota.get("token_num_max", tenant.token_num_max)
        tenant.chunk_num_max = quota.get("chunk_num_max", tenant.chunk_num_max)
        tenant.parse_num_max = quota.get("parse_num_max", tenant.parse_num_max)
        tenant.save()
        
        AuditService.log("tenant.update_quota", tenant_id, quota)
    
    @staticmethod
    def delete_tenant(tenant_id: str):
        """删除租户(软删除)"""
        tenant = Tenant.get_by_id(tenant_id)
        if not tenant:
            raise ValueError("Tenant not found")
        
        # 软删除(设置状态为 DELETED)
        tenant.status = TenantStatus.DELETED
        tenant.save()
        
        AuditService.log("tenant.delete", tenant_id, {})

2.3.2 租户路由

# admin/server/routes.py
from fastapi import APIRouter, Depends
from admin.server.auth import get_current_user, require_permission
from admin.server.roles import Permission
from admin.server.models import TenantCreateRequest, TenantResponse

router = APIRouter(prefix="/api/admin/tenants", tags=["Tenants"])

@router.post("/", response_model=TenantResponse)
@require_permission(Permission.TENANT_CREATE)
async def create_tenant(
    req: TenantCreateRequest,
    current_user = Depends(get_current_user)
):
    """创建租户"""
    tenant = TenantService.create_tenant(
        name=req.name,
        email=req.email,
        quota=req.quota
    )
    return TenantResponse.from_orm(tenant)

@router.get("/{tenant_id}", response_model=TenantResponse)
@require_permission(Permission.TENANT_READ)
async def get_tenant(
    tenant_id: str,
    current_user = Depends(get_current_user)
):
    """查询租户"""
    tenant = TenantService.get_by_id(tenant_id)
    if not tenant:
        raise HTTPException(status_code=404, detail="Tenant not found")
    return TenantResponse.from_orm(tenant)

@router.delete("/{tenant_id}")
@require_permission(Permission.TENANT_DELETE)
async def delete_tenant(
    tenant_id: str,
    current_user = Depends(get_current_user)
):
    """删除租户"""
    TenantService.delete_tenant(tenant_id)
    return {"message": "Tenant deleted successfully"}

2.4 用户管理

2.4.1 用户服务

# admin/server/services.py
import bcrypt

class UserService:
    @staticmethod
    def create_user(tenant_id: str, email: str, nickname: str, password: str, role: Role) -> User:
        """创建用户"""
        from api.db import User
        
        # 1. 检查邮箱是否已存在
        existing = User.select().where(User.email == email).first()
        if existing:
            raise ValueError("User already exists")
        
        # 2. 密码加密
        password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
        
        # 3. 创建用户
        user = User(
            id=get_uuid(),
            tenant_id=tenant_id,
            email=email,
            nickname=nickname,
            password=password_hash,
            role=role,
            status=UserStatus.ACTIVE
        )
        user.save()
        
        AuditService.log("user.create", user.id, {"email": email, "role": role})
        return user
    
    @staticmethod
    def verify_password(email: str, password: str) -> User:
        """验证密码"""
        user = User.select().where(User.email == email).first()
        if not user:
            raise ValueError("User not found")
        
        if not bcrypt.checkpw(password.encode(), user.password.encode()):
            raise ValueError("Invalid password")
        
        return user
    
    @staticmethod
    def update_role(user_id: str, new_role: Role):
        """更新用户角色"""
        user = User.get_by_id(user_id)
        if not user:
            raise ValueError("User not found")
        
        user.role = new_role
        user.save()
        
        AuditService.log("user.update_role", user_id, {"new_role": new_role})

2.5 配置管理

2.5.1 LLM 配置服务

# admin/server/services.py
class ConfigService:
    @staticmethod
    def set_llm_config(tenant_id: str, llm_type: str, llm_config: dict):
        """设置 LLM 配置"""
        from api.db.services.llm_service import TenantLLMService
        
        # 验证配置
        required_fields = ["model_name", "api_key", "base_url"]
        for field in required_fields:
            if field not in llm_config:
                raise ValueError(f"Missing field: {field}")
        
        # 保存配置
        TenantLLMService.save(
            tenant_id=tenant_id,
            llm_type=llm_type,
            llm_name=llm_config["model_name"],
            model_type=llm_config.get("model_type", "UNKNOWN"),
            api_key=llm_config["api_key"],
            api_base=llm_config["base_url"]
        )
        
        AuditService.log("config.set_llm", tenant_id, {"llm_type": llm_type})
    
    @staticmethod
    def get_llm_config(tenant_id: str, llm_type: str) -> dict:
        """获取 LLM 配置"""
        from api.db.services.llm_service import TenantLLMService
        
        llm = TenantLLMService.query(tenant_id=tenant_id, llm_type=llm_type)
        if not llm:
            return None
        
        return {
            "model_name": llm.llm_name,
            "model_type": llm.model_type,
            "base_url": llm.api_base
            # 不返回 api_key(安全)
        }

3. Admin CLI 客户端

3.1 命令行工具

# admin/client/admin_client.py
import click
import requests

BASE_URL = "http://localhost:8001"

@click.group()
def cli():
    """RAGFlow Admin CLI"""
    pass

@cli.command()
@click.option("--email", required=True, help="Admin email")
@click.option("--password", required=True, help="Admin password")
def login(email, password):
    """登录并获取 Token"""
    resp = requests.post(f"{BASE_URL}/api/admin/login", json={
        "email": email,
        "password": password
    })
    
    if resp.status_code == 200:
        token = resp.json()["access_token"]
        # 保存 Token 到本地
        with open(".admin_token", "w") as f:
            f.write(token)
        click.echo("Login successful!")
    else:
        click.echo(f"Login failed: {resp.text}")

@cli.command()
@click.option("--name", required=True, help="Tenant name")
@click.option("--email", required=True, help="Tenant email")
@click.option("--token-quota", default=1000000, help="Token quota")
def create_tenant(name, email, token_quota):
    """创建租户"""
    token = open(".admin_token").read().strip()
    
    resp = requests.post(
        f"{BASE_URL}/api/admin/tenants",
        json={
            "name": name,
            "email": email,
            "quota": {"token_num_max": token_quota}
        },
        headers={"Authorization": f"Bearer {token}"}
    )
    
    if resp.status_code == 200:
        click.echo(f"Tenant created: {resp.json()}")
    else:
        click.echo(f"Failed: {resp.text}")

if __name__ == "__main__":
    cli()

使用示例

# 登录
python admin_client.py login --email admin@example.com --password admin123

# 创建租户
python admin_client.py create-tenant --name "ACME Corp" --email acme@example.com --token-quota 5000000

# 查询租户
python admin_client.py get-tenant --tenant-id <tenant_id>

4. 时序图

4.1 租户创建时序

sequenceDiagram
    autonumber
    participant CLI as Admin CLI
    participant API as Admin API
    participant Auth as 认证中间件
    participant Service as TenantService
    participant DB as MySQL

    CLI->>API: POST /api/admin/tenants
    note right of CLI: 携带 JWT Token

    API->>Auth: 验证 Token
    Auth->>Auth: 解析 JWT
    Auth->>DB: 查询用户
    DB-->>Auth: 返回用户信息
    Auth->>Auth: 检查权限(TENANT_CREATE)
    Auth-->>API: 认证通过

    API->>Service: TenantService.create_tenant()
    Service->>DB: 检查邮箱是否存在
    DB-->>Service: 不存在
    Service->>DB: INSERT INTO tenant
    Service->>DB: INSERT INTO user(默认管理员)
    Service->>DB: INSERT INTO audit_log
    DB-->>Service: 成功

    Service-->>API: 返回 Tenant
    API-->>CLI: 200 OK + Tenant 信息

5. 最佳实践

1. 安全性

  • JWT Secret 使用环境变量,定期轮换
  • API Key 支持权限范围限制(Scope)
  • 密码使用 bcrypt 加密,不明文存储

2. 审计日志

  • 记录所有关键操作(创建、删除、更新)
  • 日志包含操作人、时间、操作内容
  • 支持日志查询与导出

3. 配额管理

  • 定期检查租户配额使用情况
  • 超过配额时拒绝新请求,返回 429 错误
  • 提供配额预警通知

4. 权限设计

  • 最小权限原则(Principle of Least Privilege)
  • 支持自定义角色(未来扩展)
  • 权限粒度适中(不要过细)