1. 安全系统概述

FastAPI 的安全认证系统是建立在依赖注入之上的,提供了完整的身份验证和授权解决方案。它不仅支持多种标准的安全协议(OAuth2、OpenID Connect、API Key等),还能自动生成相应的 OpenAPI 文档,在 Swagger UI 中提供交互式的认证功能。

1.1 安全系统架构

graph TB
    subgraph "安全方案层 (Security Schemes)"
        A[SecurityBase] --> B[HTTPBase]
        A --> C[APIKeyBase]
        A --> D[OAuth2]
        A --> E[OpenIdConnect]
        
        B --> B1[HTTPBasic]
        B --> B2[HTTPBearer]  
        B --> B3[HTTPDigest]
        
        C --> C1[APIKeyCookie]
        C --> C2[APIKeyHeader]
        C --> C3[APIKeyQuery]
        
        D --> D1[OAuth2PasswordBearer]
        D --> D2[OAuth2PasswordRequestForm]
        D --> D3[OAuth2AuthorizationCodeBearer]
    end
    
    subgraph "依赖注入层"
        F[Security Dependencies] --> G[SecurityScopes]
        F --> H[token validation]
        F --> I[permission check]
    end
    
    subgraph "OpenAPI集成层"
        J[Security Definitions] --> K[Swagger UI Integration]
        J --> L[ReDoc Integration]
    end
    
    A --> F
    F --> J

1.2 核心组件

SecurityBase: 所有安全方案的基类
SecurityScopes: 权限范围管理
依赖注入集成: 与 FastAPI 依赖系统的深度集成
OpenAPI 自动化: 自动生成认证相关的 API 文档
Swagger UI 集成: 提供可视化的认证测试界面

2. SecurityBase 基类分析

2.1 SecurityBase 核心实现

class SecurityBase:
    """
    所有安全方案的抽象基类
    
    定义了安全方案的基本接口和 OpenAPI 集成的标准
    
    Attributes:
        type_: 安全方案类型(apiKey, http, oauth2, openIdConnect)
        description: 安全方案描述,用于 API 文档
    """
    
    type_: str = ""  # 安全类型,子类必须设置
    
    def __init__(
        self,
        *,
        scheme_name: Optional[str] = None,      # 方案名称
        description: Optional[str] = None,      # 描述信息
        auto_error: bool = True,               # 是否自动抛出认证错误
    ):
        """
        初始化安全基类
        
        Args:
            scheme_name: 安全方案的名称,用于 OpenAPI 文档中的引用
            description: 安全方案的描述,显示在 API 文档中
            auto_error: 当认证失败时是否自动抛出 HTTPException
                       设为 False 可以在路径操作中自定义错误处理
                       
        Examples:
            # 自动错误处理(默认)
            oauth2_scheme = OAuth2PasswordBearer(
                tokenUrl="token",
                auto_error=True  # 认证失败时自动返回 401
            )
            
            # 手动错误处理
            optional_oauth2 = OAuth2PasswordBearer(
                tokenUrl="token", 
                auto_error=False  # 认证失败时返回 None,需要手动处理
            )
        """
        self.scheme_name = scheme_name
        self.description = description  
        self.auto_error = auto_error

    def __call__(self, request: Request) -> Optional[Any]:
        """
        安全方案的调用接口
        
        这是一个抽象方法,子类必须实现具体的认证逻辑
        
        Args:
            request: HTTP请求对象
            
        Returns:
            认证信息(如token、credentials等),失败时返回None
            
        Raises:
            HTTPException: 当 auto_error=True 且认证失败时抛出
        """
        raise NotImplementedError()  # pragma: no cover

    def get_openapi_security_scheme(self) -> Dict[str, Any]:
        """
        生成 OpenAPI 安全方案定义
        
        子类需要重写此方法,返回符合 OpenAPI 规范的安全方案定义
        
        Returns:
            Dict: OpenAPI 安全方案定义
            
        Examples:
            # OAuth2 方案的 OpenAPI 定义
            {
                "type": "oauth2",
                "flows": {
                    "password": {
                        "tokenUrl": "token",
                        "scopes": {}
                    }
                }
            }
        """
        return {"type": self.type_}

2.2 安全方案类型层次结构

classDiagram
    class SecurityBase {
        <<abstract>>
        +type_: str
        +scheme_name: str
        +description: str
        +auto_error: bool
        +__call__(request)
        +get_openapi_security_scheme()
    }
    
    class HTTPBase {
        +scheme: str
        +bearerFormat: str
        +get_openapi_security_scheme()
    }
    
    class APIKeyBase {
        +name: str
        +get_openapi_security_scheme()
    }
    
    class OAuth2 {
        +flows: Dict
        +scheme_name: str
        +scopes: Dict[str, str]
        +get_openapi_security_scheme()
    }
    
    class HTTPBasic {
        +__call__(request)
    }
    
    class HTTPBearer {
        +__call__(request)  
    }
    
    class APIKeyHeader {
        +__call__(request)
    }
    
    class OAuth2PasswordBearer {
        +tokenUrl: str
        +__call__(request)
    }
    
    SecurityBase <|-- HTTPBase
    SecurityBase <|-- APIKeyBase  
    SecurityBase <|-- OAuth2
    SecurityBase <|-- OpenIdConnect
    
    HTTPBase <|-- HTTPBasic
    HTTPBase <|-- HTTPBearer
    HTTPBase <|-- HTTPDigest
    
    APIKeyBase <|-- APIKeyHeader
    APIKeyBase <|-- APIKeyCookie
    APIKeyBase <|-- APIKeyQuery
    
    OAuth2 <|-- OAuth2PasswordBearer
    OAuth2 <|-- OAuth2AuthorizationCodeBearer

3. HTTP 认证方案

3.1 HTTPBearer 实现分析

class HTTPBearer(HTTPBase):
    """
    HTTP Bearer Token 认证方案
    
    实现 RFC 6750 Bearer Token 标准
    从 HTTP Authorization 头中提取 Bearer token
    
    格式: Authorization: Bearer <token>
    """
    
    def __init__(
        self,
        *,
        bearerFormat: Optional[str] = None,     # Bearer token 格式(JWT、opaque等)
        scheme_name: Optional[str] = None,      # 方案名称
        description: Optional[str] = None,      # 描述
        auto_error: bool = True,               # 自动错误处理
    ):
        """
        初始化 HTTP Bearer 认证
        
        Args:
            bearerFormat: Bearer token 的格式说明
                         常见值: "JWT", "opaque", "base64" 等
            scheme_name: OpenAPI 中的方案名称
            description: 方案描述
            auto_error: 认证失败时是否自动抛出异常
            
        Examples:
            # JWT Bearer Token
            jwt_bearer = HTTPBearer(
                bearerFormat="JWT",
                description="JWT Bearer Token Authentication"
            )
            
            # 通用 Bearer Token
            bearer = HTTPBearer()
        """
        self.scheme = "Bearer"
        self.bearer_format = bearerFormat
        super().__init__(
            scheme=self.scheme,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

    def __call__(self, request: Request) -> Optional[HTTPAuthorizationCredentials]:
        """
        从请求中提取 Bearer Token
        
        Args:
            request: HTTP请求对象
            
        Returns:
            HTTPAuthorizationCredentials: 包含认证信息的对象
                .scheme: "Bearer"
                .credentials: 实际的 token 值
            None: 当认证失败且 auto_error=False 时
            
        Raises:
            HTTPException: 当认证失败且 auto_error=True 时
                - 403: 缺少 Authorization 头
                - 403: Authorization 头格式错误
                - 403: 不是 Bearer 类型认证
                
        Examples:
            # 正确的请求头
            # Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
            
            credentials = bearer_scheme(request)
            token = credentials.credentials  # 提取 token
        """
        authorization = request.headers.get("Authorization")
        
        # 检查是否存在 Authorization 头
        if not authorization:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_403_FORBIDDEN,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        
        # 解析 Authorization 头格式
        # 期望格式: "Bearer <token>"
        try:
            scheme, credentials = authorization.split(" ", 1)
        except ValueError:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_403_FORBIDDEN,
                    detail="Invalid authorization header format. Expected 'Bearer <token>'",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        
        # 验证认证方案
        if scheme.lower() != "bearer":
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_403_FORBIDDEN,
                    detail=f"Invalid authentication scheme. Expected 'Bearer', got '{scheme}'",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        
        # 返回认证凭据
        return HTTPAuthorizationCredentials(
            scheme=scheme,
            credentials=credentials
        )

    def get_openapi_security_scheme(self) -> Dict[str, Any]:
        """
        生成 OpenAPI 安全方案定义
        
        Returns:
            Dict: 符合 OpenAPI 3.0 规范的 HTTP Bearer 方案定义
        """
        scheme = {"type": "http", "scheme": "bearer"}
        if self.bearer_format:
            scheme["bearerFormat"] = self.bearer_format
        if self.description:
            scheme["description"] = self.description
        return scheme

@dataclass
class HTTPAuthorizationCredentials:
    """
    HTTP 认证凭据数据类
    
    封装从 HTTP Authorization 头中提取的认证信息
    
    Attributes:
        scheme: 认证方案(Basic、Bearer、Digest等)
        credentials: 认证凭据(token、密码等)
    """
    scheme: str         # 认证方案名称
    credentials: str    # 认证凭据内容
    
    def __str__(self) -> str:
        return f"{self.scheme} {self.credentials}"

3.2 HTTPBasic 实现分析

class HTTPBasic(HTTPBase):
    """
    HTTP Basic 认证方案
    
    实现 RFC 7617 HTTP Basic Authentication 标准
    从 HTTP Authorization 头中提取用户名和密码
    
    格式: Authorization: Basic <base64(username:password)>
    """
    
    def __init__(
        self,
        *,
        scheme_name: Optional[str] = None,
        description: Optional[str] = None,
        auto_error: bool = True,
    ):
        """
        初始化 HTTP Basic 认证
        
        Args:
            scheme_name: OpenAPI 方案名称
            description: 方案描述  
            auto_error: 认证失败时是否自动抛出异常
        """
        self.scheme = "Basic"
        super().__init__(
            scheme=self.scheme,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

    def __call__(self, request: Request) -> Optional[HTTPBasicCredentials]:
        """
        从请求中提取 Basic 认证信息
        
        Args:
            request: HTTP请求对象
            
        Returns:
            HTTPBasicCredentials: 包含用户名和密码的对象
                .username: 解码后的用户名
                .password: 解码后的密码
            None: 当认证失败且 auto_error=False 时
            
        Raises:
            HTTPException: 当认证失败且 auto_error=True 时
        """
        authorization = request.headers.get("Authorization")
        
        if not authorization:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Basic"},
                )
            else:
                return None
        
        try:
            scheme, credentials = authorization.split(" ", 1)
        except ValueError:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Invalid authorization header format",
                    headers={"WWW-Authenticate": "Basic"},
                )
            else:
                return None
        
        if scheme.lower() != "basic":
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Invalid authentication scheme",
                    headers={"WWW-Authenticate": "Basic"},
                )
            else:
                return None
        
        try:
            # Base64 解码认证信息
            decoded = base64.b64decode(credentials).decode("utf-8")
            username, password = decoded.split(":", 1)
        except (ValueError, UnicodeDecodeError):
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Invalid basic authentication credentials",
                    headers={"WWW-Authenticate": "Basic"},
                )
            else:
                return None
        
        return HTTPBasicCredentials(username=username, password=password)

@dataclass  
class HTTPBasicCredentials:
    """
    HTTP Basic 认证凭据
    
    Attributes:
        username: 用户名
        password: 密码
    """
    username: str
    password: str

4. API Key 认证方案

4.1 APIKeyHeader 实现

class APIKeyHeader(APIKeyBase):
    """
    API Key Header 认证方案
    
    从 HTTP 请求头中提取 API Key
    常用于 API 服务的认证
    """
    
    def __init__(
        self,
        *,
        name: str,                             # API Key 的头部字段名
        scheme_name: Optional[str] = None,      # OpenAPI 方案名称
        description: Optional[str] = None,      # 方案描述
        auto_error: bool = True,               # 自动错误处理
    ):
        """
        初始化 API Key Header 认证
        
        Args:
            name: HTTP 头中 API Key 的字段名
                 常见名称: "X-API-Key", "X-Token", "Authorization" 等
            scheme_name: OpenAPI 方案名称
            description: 方案描述
            auto_error: 认证失败时是否自动抛出异常
            
        Examples:
            # 标准 API Key
            api_key_header = APIKeyHeader(
                name="X-API-Key",
                description="API Key for authentication"
            )
            
            # 自定义头部
            custom_auth = APIKeyHeader(
                name="X-Custom-Token", 
                description="Custom authentication token"
            )
        """
        self.name = name
        super().__init__(
            name=name,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

    def __call__(self, request: Request) -> Optional[str]:
        """
        从请求头中提取 API Key
        
        Args:
            request: HTTP请求对象
            
        Returns:
            str: API Key 值
            None: 当认证失败且 auto_error=False 时
            
        Raises:
            HTTPException: 当认证失败且 auto_error=True 时
                - 403: 缺少指定的 API Key 头部
                
        Examples:
            # 请求示例
            # X-API-Key: sk-1234567890abcdef
            
            api_key = api_key_scheme(request)
            # api_key = "sk-1234567890abcdef"
        """
        api_key = request.headers.get(self.name)
        
        if not api_key:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_403_FORBIDDEN,
                    detail=f"Missing API Key in header: {self.name}",
                )
            else:
                return None
        
        return api_key

    def get_openapi_security_scheme(self) -> Dict[str, Any]:
        """
        生成 OpenAPI 安全方案定义
        
        Returns:
            Dict: OpenAPI API Key Header 方案定义
        """
        scheme = {
            "type": "apiKey",
            "name": self.name,
            "in": "header",
        }
        if self.description:
            scheme["description"] = self.description
        return scheme

# 类似地,还有 APIKeyCookie 和 APIKeyQuery

class APIKeyCookie(APIKeyBase):
    """API Key Cookie 认证 - 从 Cookie 中提取 API Key"""
    
    def __call__(self, request: Request) -> Optional[str]:
        api_key = request.cookies.get(self.name)
        if not api_key and self.auto_error:
            raise HTTPException(
                status_code=HTTP_403_FORBIDDEN,
                detail=f"Missing API Key in cookie: {self.name}",
            )
        return api_key

class APIKeyQuery(APIKeyBase): 
    """API Key Query 认证 - 从查询参数中提取 API Key"""
    
    def __call__(self, request: Request) -> Optional[str]:
        api_key = request.query_params.get(self.name)
        if not api_key and self.auto_error:
            raise HTTPException(
                status_code=HTTP_403_FORBIDDEN,
                detail=f"Missing API Key in query parameter: {self.name}",
            )
        return api_key

5. OAuth2 认证方案

5.1 OAuth2PasswordBearer 深度分析

class OAuth2PasswordBearer(OAuth2):
    """
    OAuth2 密码流 Bearer Token 认证
    
    实现 OAuth2 Resource Owner Password Credentials Grant 流程
    这是最常用的 OAuth2 实现,适合第一方应用
    
    工作流程:
    1. 用户提供用户名和密码
    2. 客户端向认证服务器发送凭据到 tokenUrl
    3. 认证服务器返回访问令牌
    4. 客户端使用 Bearer Token 访问受保护资源
    """
    
    def __init__(
        self,
        tokenUrl: str,                         # 获取token的URL端点
        scheme_name: Optional[str] = None,      # OpenAPI方案名称
        scopes: Optional[Dict[str, str]] = None, # 权限范围定义
        description: Optional[str] = None,      # 方案描述
        auto_error: bool = True,               # 自动错误处理
    ):
        """
        初始化 OAuth2 密码流认证
        
        Args:
            tokenUrl: 获取访问令牌的端点URL
                     客户端将向此URL发送用户名密码获取token
            scheme_name: OpenAPI中的方案名称
            scopes: 权限范围字典,格式: {scope_name: description}
                   例如: {"read": "Read access", "write": "Write access"}
            description: 方案描述,用于API文档
            auto_error: 认证失败时是否自动抛出HTTPException
            
        Examples:
            # 基础OAuth2认证
            oauth2_scheme = OAuth2PasswordBearer(
                tokenUrl="token"
            )
            
            # 带权限范围的OAuth2认证
            oauth2_scheme = OAuth2PasswordBearer(
                tokenUrl="auth/token",
                scopes={
                    "users:read": "Read user information",
                    "users:write": "Modify user information", 
                    "admin": "Administrative privileges"
                },
                description="OAuth2 with Password and Bearer token"
            )
        """
        if not scopes:
            scopes = {}
        
        # OAuth2 flows 定义
        flows = OAuthFlows(
            password=OAuthFlowPassword(
                tokenUrl=tokenUrl,
                scopes=scopes
            )
        )
        
        super().__init__(
            flows=flows,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

    def __call__(self, request: Request) -> Optional[str]:
        """
        从请求中提取 OAuth2 Bearer Token
        
        Args:
            request: HTTP请求对象
            
        Returns:
            str: 提取的访问令牌(不包含 "Bearer " 前缀)
            None: 当认证失败且 auto_error=False 时
            
        Raises:
            HTTPException: 当认证失败且 auto_error=True 时
                - 401: 缺少或无效的 Authorization 头
                
        Examples:
            # 请求头示例
            # Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
            
            token = oauth2_scheme(request)
            # token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
        """
        authorization = request.headers.get("Authorization")
        
        # 检查Authorization头是否存在
        if not authorization:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        
        # 解析Bearer Token
        try:
            scheme, token = authorization.split(" ", 1)
        except ValueError:
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Invalid authorization header format",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        
        # 验证认证方案
        if scheme.lower() != "bearer":
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Invalid authentication scheme",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        
        return token

5.2 OAuth2PasswordRequestForm 表单处理

class OAuth2PasswordRequestForm:
    """
    OAuth2 密码流请求表单
    
    用于处理 OAuth2 密码流的登录请求表单数据
    符合 OAuth2 RFC 6749 标准
    
    表单字段:
    - username: 用户名(必需)
    - password: 密码(必需) 
    - scope: 权限范围(可选,空格分隔的字符串)
    - client_id: 客户端ID(可选)
    - client_secret: 客户端密钥(可选)
    - grant_type: 授权类型(固定为"password")
    """
    
    def __init__(
        self,
        *,
        grant_type: str = Form(regex="password"),  # 固定为password
        username: str = Form(),                    # 用户名
        password: str = Form(),                    # 密码
        scope: str = Form(""),                     # 权限范围
        client_id: Optional[str] = Form(None),     # 客户端ID
        client_secret: Optional[str] = Form(None), # 客户端密钥
    ):
        """
        OAuth2 密码流表单数据
        
        Args:
            grant_type: 授权类型,OAuth2密码流固定为"password"
            username: 用户名,用于身份验证
            password: 密码,用于身份验证
            scope: 权限范围字符串,多个范围用空格分隔
                  例如: "users:read users:write admin"
            client_id: 客户端标识符(可选)
            client_secret: 客户端密钥(可选)
            
        Examples:
            # 在token端点使用
            @app.post("/token")
            async def login(form_data: OAuth2PasswordRequestForm = Depends()):
                user = authenticate_user(form_data.username, form_data.password)
                if not user:
                    raise HTTPException(status_code=400, detail="Invalid credentials")
                
                # 解析权限范围
                scopes = form_data.scopes
                
                access_token = create_access_token(
                    data={"sub": user.username, "scopes": scopes}
                )
                return {"access_token": access_token, "token_type": "bearer"}
        """
        self.grant_type = grant_type
        self.username = username
        self.password = password
        self.scope = scope
        self.client_id = client_id
        self.client_secret = client_secret

    @property
    def scopes(self) -> List[str]:
        """
        解析权限范围字符串
        
        将空格分隔的权限范围字符串解析为列表
        
        Returns:
            List[str]: 权限范围列表
            
        Examples:
            form_data.scope = "users:read users:write admin"
            form_data.scopes  # ["users:read", "users:write", "admin"]
        """
        return self.scope.split() if self.scope else []

6. 权限范围管理 - SecurityScopes

6.1 SecurityScopes 实现

class SecurityScopes:
    """
    安全权限范围管理
    
    用于管理和验证 OAuth2/OpenID Connect 的权限范围
    与依赖注入系统集成,自动处理权限验证
    
    Attributes:
        scopes: 所需的权限范围列表
        scope_str: 权限范围的字符串表示
    """
    
    def __init__(self, scopes: Optional[List[str]] = None):
        """
        初始化权限范围
        
        Args:
            scopes: 权限范围列表
                   例如: ["users:read", "users:write", "admin"]
        """
        self.scopes = scopes or []
        # 生成权限范围的字符串表示(用于WWW-Authenticate头)
        self.scope_str = " ".join(self.scopes)

    def __str__(self) -> str:
        """返回权限范围的字符串表示"""
        return self.scope_str
    
    def __repr__(self) -> str:
        """返回权限范围的详细表示"""
        return f"SecurityScopes(scopes={self.scopes})"

# 在依赖函数中使用 SecurityScopes
def verify_token_with_scopes(
    security_scopes: SecurityScopes,           # 自动注入所需权限
    token: str = Depends(oauth2_scheme)        # OAuth2 token
) -> User:
    """
    验证token并检查权限
    
    Args:
        security_scopes: FastAPI自动注入的所需权限范围
        token: OAuth2 访问令牌
        
    Returns:
        User: 验证通过且有足够权限的用户
        
    Raises:
        HTTPException: 
            - 401: token无效
            - 403: 权限不足
    """
    # 解码和验证token
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        token_scopes: List[str] = payload.get("scopes", [])
    except JWTError:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": f"Bearer scope=\"{security_scopes.scope_str}\""},
        )
    
    # 获取用户
    user = get_user(username=username)
    if user is None:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": f"Bearer scope=\"{security_scopes.scope_str}\""},
        )
    
    # 检查权限范围
    for required_scope in security_scopes.scopes:
        if required_scope not in token_scopes:
            raise HTTPException(
                status_code=HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": f"Bearer scope=\"{security_scopes.scope_str}\""},
            )
    
    return user

6.2 Security() 函数深度分析

def Security(
    dependency: Callable[..., Any],            # 安全依赖函数
    *,
    scopes: Optional[Sequence[str]] = None,    # 所需权限范围
    use_cache: bool = True,                   # 是否使用缓存
) -> Any:
    """
    安全依赖装饰器
    
    Security() 是 Depends() 的特殊版本,专门用于安全相关的依赖
    它会自动处理权限范围,并在 OpenAPI 中生成相应的安全需求
    
    Args:
        dependency: 安全依赖函数,通常是验证函数
        scopes: 此端点所需的权限范围列表
        use_cache: 是否缓存依赖结果
        
    Returns:
        Depends: 包装后的依赖对象,包含安全信息
        
    Examples:
        # 基础安全依赖
        @app.get("/users/me")
        async def read_users_me(
            current_user: User = Security(verify_token_with_scopes)
        ):
            return current_user
        
        # 带权限范围的安全依赖
        @app.get("/admin/users")
        async def list_all_users(
            current_user: User = Security(
                verify_token_with_scopes,
                scopes=["admin", "users:read"]  # 需要admin和users:read权限
            )
        ):
            return get_all_users()
        
        # 多级权限验证
        @app.delete("/admin/users/{user_id}")  
        async def delete_user(
            user_id: int,
            current_user: User = Security(
                verify_token_with_scopes,
                scopes=["admin", "users:write", "users:delete"]
            )
        ):
            return delete_user_by_id(user_id)
    """
    if scopes is None:
        scopes = []
    
    return Depends(
        dependency=dependency,
        use_cache=use_cache,
        # 关键:将scopes信息传递给依赖系统
        _scopes=list(scopes),  # 内部字段,用于权限管理
    )

7. OpenAPI 集成

7.1 安全方案的 OpenAPI 文档生成

def get_openapi_security_schemes(
    routes: List[BaseRoute]
) -> Dict[str, Dict[str, Any]]:
    """
    从路由中提取所有安全方案,生成 OpenAPI 安全定义
    
    遍历所有路由,收集使用的安全方案,并生成相应的 OpenAPI 定义
    
    Args:
        routes: 应用的所有路由列表
        
    Returns:
        Dict: OpenAPI securitySchemes 定义
        
    Examples:
        # 生成的 OpenAPI securitySchemes 示例
        {
            "OAuth2PasswordBearer": {
                "type": "oauth2",
                "flows": {
                    "password": {
                        "tokenUrl": "/token",
                        "scopes": {
                            "users:read": "Read user information",
                            "users:write": "Write user information",
                            "admin": "Administrative access"
                        }
                    }
                }
            },
            "APIKeyHeader": {
                "type": "apiKey",
                "name": "X-API-Key",
                "in": "header"
            }
        }
    """
    security_schemes: Dict[str, Dict[str, Any]] = {}
    
    for route in routes:
        if isinstance(route, APIRoute):
            # 遍历路由的所有安全需求
            for security_requirement in route.dependant.security_requirements:
                security_scheme = security_requirement.security_scheme
                scheme_name = security_scheme.scheme_name or security_scheme.__class__.__name__
                
                if scheme_name not in security_schemes:
                    # 生成 OpenAPI 安全方案定义
                    security_schemes[scheme_name] = security_scheme.get_openapi_security_scheme()
    
    return security_schemes

def get_openapi_security_requirements(
    dependant: Dependant
) -> List[Dict[str, List[str]]]:
    """
    生成路径操作的安全需求
    
    为单个路径操作生成 OpenAPI security 需求列表
    
    Args:
        dependant: 路径操作的依赖对象
        
    Returns:
        List[Dict]: OpenAPI security requirements
        
    Examples:
        # 生成的安全需求示例
        [
            {
                "OAuth2PasswordBearer": ["users:read", "users:write"]
            },
            {
                "APIKeyHeader": []
            }
        ]
    """
    security_requirements: List[Dict[str, List[str]]] = []
    
    for security_requirement in dependant.security_requirements:
        security_scheme = security_requirement.security_scheme
        scheme_name = security_scheme.scheme_name or security_scheme.__class__.__name__
        scopes = list(security_requirement.scopes or [])
        
        security_requirements.append({scheme_name: scopes})
    
    return security_requirements

7.2 Swagger UI 集成

FastAPI 自动将安全方案集成到 Swagger UI 中:

sequenceDiagram
    participant User as 用户
    participant SwaggerUI as Swagger UI
    participant FastAPI as FastAPI应用
    participant AuthServer as 认证服务器

    User->>SwaggerUI: 访问 /docs
    SwaggerUI->>FastAPI: 获取 OpenAPI JSON
    FastAPI-->>SwaggerUI: 返回包含安全定义的 OpenAPI
    SwaggerUI->>SwaggerUI: 渲染认证按钮和表单
    
    User->>SwaggerUI: 点击 "Authorize" 按钮
    SwaggerUI->>SwaggerUI: 显示认证表单
    User->>SwaggerUI: 输入认证信息
    
    alt OAuth2 Password Flow
        SwaggerUI->>AuthServer: POST /token (username, password)
        AuthServer-->>SwaggerUI: 返回 access_token
    else API Key
        User->>SwaggerUI: 直接输入 API Key
    end
    
    SwaggerUI->>SwaggerUI: 存储认证信息
    
    User->>SwaggerUI: 测试 API 端点
    SwaggerUI->>FastAPI: 添加 Authorization 头发送请求
    FastAPI-->>SwaggerUI: 返回API响应
    SwaggerUI-->>User: 显示响应结果

8. 完整的认证系统实现示例

8.1 JWT Token 认证系统

from datetime import datetime, timedelta
from typing import Dict, List, Optional

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, SecurityScopes
from pydantic import BaseModel
from jose import JWTError, jwt
from passlib.context import CryptContext

# 配置
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2方案
oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={
        "users:read": "Read user information",
        "users:write": "Modify user information",
        "admin": "Administrative privileges",
    }
)

app = FastAPI()

# === 数据模型 ===
class Token(BaseModel):
    """访问令牌响应模型"""
    access_token: str
    token_type: str = "bearer"
    expires_in: int
    scope: str

class TokenData(BaseModel):
    """令牌数据模型"""
    username: Optional[str] = None
    scopes: List[str] = []

class User(BaseModel):
    """用户模型"""
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: bool = False
    scopes: List[str] = []

class UserInDB(User):
    """数据库中的用户模型(包含密码hash)"""
    hashed_password: str

# === 模拟数据库 ===
fake_users_db = {
    "alice": {
        "username": "alice",
        "full_name": "Alice Johnson",
        "email": "alice@example.com", 
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
        "disabled": False,
        "scopes": ["users:read", "users:write"],
    },
    "bob": {
        "username": "bob",
        "full_name": "Bob Smith",
        "email": "bob@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
        "disabled": False,
        "scopes": ["users:read"],
    },
    "admin": {
        "username": "admin",
        "full_name": "Administrator",
        "email": "admin@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
        "disabled": False,
        "scopes": ["admin", "users:read", "users:write"],
    }
}

# === 工具函数 ===
def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码"""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """生成密码hash"""
    return pwd_context.hash(password)

def get_user(db: Dict[str, dict], username: str) -> Optional[UserInDB]:
    """从数据库获取用户"""
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None

def authenticate_user(db: Dict[str, dict], username: str, password: str) -> Optional[UserInDB]:
    """验证用户身份"""
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return None
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """创建访问令牌"""
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# === 依赖函数 ===
async def get_current_user(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme)
) -> User:
    """
    获取当前用户并验证权限
    
    Args:
        security_scopes: 所需权限范围(由FastAPI自动注入)
        token: OAuth2访问令牌
        
    Returns:
        User: 验证通过的用户
        
    Raises:
        HTTPException: 认证失败或权限不足
    """
    # 准备认证异常
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    
    try:
        # 解码JWT令牌
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(username=username, scopes=token_scopes)
    except JWTError:
        raise credentials_exception
    
    # 获取用户信息
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    
    # 验证权限范围
    for required_scope in security_scopes.scopes:
        if required_scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    
    return user

async def get_current_active_user(
    current_user: User = Security(get_current_user)
) -> User:
    """获取当前活跃用户"""
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# === API 端点 ===
@app.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends()
) -> Token:
    """
    OAuth2兼容的令牌登录端点
    
    接受用户名密码,返回访问令牌
    """
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # 处理权限范围
    user_scopes = user.scopes
    request_scopes = form_data.scopes
    granted_scopes = []
    
    # 只授予用户拥有的权限
    for scope in request_scopes:
        if scope in user_scopes:
            granted_scopes.append(scope)
    
    # 如果没有请求特定权限,授予所有用户权限
    if not request_scopes:
        granted_scopes = user_scopes
    
    access_token = create_access_token(
        data={"sub": user.username, "scopes": granted_scopes}
    )
    
    return Token(
        access_token=access_token,
        token_type="bearer",
        expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
        scope=" ".join(granted_scopes)
    )

@app.get("/users/me", response_model=User)
async def read_users_me(
    current_user: User = Security(get_current_active_user, scopes=["users:read"])
) -> User:
    """获取当前用户信息(需要users:read权限)"""
    return current_user

@app.get("/users/{username}", response_model=User)  
async def read_user(
    username: str,
    current_user: User = Security(get_current_active_user, scopes=["users:read"])
) -> User:
    """获取指定用户信息(需要users:read权限)"""
    user = get_user(fake_users_db, username)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return User(**user.dict())

@app.put("/users/{username}")
async def update_user(
    username: str,
    user_update: dict,
    current_user: User = Security(get_current_active_user, scopes=["users:write"])
) -> dict:
    """更新用户信息(需要users:write权限)"""
    if username not in fake_users_db:
        raise HTTPException(status_code=404, detail="User not found")
    
    # 这里应该有实际的更新逻辑
    return {"message": f"User {username} updated successfully"}

@app.get("/admin/users")
async def list_all_users(
    current_user: User = Security(get_current_active_user, scopes=["admin"])
) -> List[User]:
    """列出所有用户(需要admin权限)"""
    users = []
    for user_data in fake_users_db.values():
        users.append(User(**user_data))
    return users

@app.delete("/admin/users/{username}")
async def delete_user(
    username: str,
    current_user: User = Security(
        get_current_active_user, 
        scopes=["admin", "users:write"]  # 需要admin和users:write权限
    )
) -> dict:
    """删除用户(需要admin和users:write权限)"""
    if username not in fake_users_db:
        raise HTTPException(status_code=404, detail="User not found")
    
    if username == current_user.username:
        raise HTTPException(status_code=400, detail="Cannot delete yourself")
    
    # 这里应该有实际的删除逻辑
    return {"message": f"User {username} deleted successfully"}

# 可选的权限验证端点
@app.get("/verify-token")
async def verify_token(
    current_user: User = Depends(get_current_active_user)
) -> dict:
    """验证令牌有效性(任何有效用户都可访问)"""
    return {
        "valid": True,
        "user": current_user.username,
        "scopes": current_user.scopes
    }

9. 安全最佳实践

9.1 令牌安全

# ✅ 好的做法:使用强密钥
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))

# ✅ 好的做法:设置合理的过期时间
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# ✅ 好的做法:使用HTTPS
# 在生产环境中,确保所有认证相关的端点都使用HTTPS

# ❌ 避免:在日志中记录敏感信息
# logger.info(f"User token: {token}")  # 危险!

# ✅ 改进:只记录非敏感信息
logger.info(f"User {username} authenticated successfully")

9.2 权限设计

# ✅ 好的做法:细粒度权限
SCOPES = {
    "users:read": "Read user information",
    "users:write": "Modify user information", 
    "users:delete": "Delete users",
    "posts:read": "Read posts",
    "posts:write": "Create and modify posts",
    "posts:delete": "Delete posts",
    "admin": "Administrative privileges",
}

# ✅ 好的做法:最小权限原则
@app.get("/posts")
async def list_posts(
    # 只需要读取权限
    current_user: User = Security(get_current_user, scopes=["posts:read"])
):
    pass

# ✅ 好的做法:角色基权限
ROLE_SCOPES = {
    "reader": ["users:read", "posts:read"],
    "editor": ["users:read", "posts:read", "posts:write"],
    "admin": ["admin", "users:read", "users:write", "users:delete", "posts:read", "posts:write", "posts:delete"],
}

9.3 错误处理

# ✅ 好的做法:统一的错误响应
class AuthError(HTTPException):
    """认证错误基类"""
    pass

class TokenExpiredError(AuthError):
    """令牌过期错误"""
    def __init__(self):
        super().__init__(
            status_code=401,
            detail="Token has expired",
            headers={"WWW-Authenticate": "Bearer"}
        )

class InsufficientPermissionError(AuthError):
    """权限不足错误"""
    def __init__(self, required_scopes: List[str]):
        scope_str = " ".join(required_scopes)
        super().__init__(
            status_code=403,
            detail="Insufficient permissions",
            headers={"WWW-Authenticate": f'Bearer scope="{scope_str}"'}
        )

# ✅ 好的做法:详细的错误日志
async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # ... 验证逻辑
    except jwt.ExpiredSignatureError:
        logger.warning(f"Expired token used from IP: {request.client.host}")
        raise TokenExpiredError()
    except jwt.JWTError as e:
        logger.warning(f"Invalid token: {str(e)}")
        raise AuthError(status_code=401, detail="Invalid token")

10. 总结

FastAPI 的安全认证系统具有以下特点:

  1. 标准兼容:完全兼容 OAuth2、OpenID Connect、HTTP Basic/Bearer 等标准
  2. 自动化文档:自动生成 OpenAPI 安全定义和 Swagger UI 集成
  3. 依赖注入集成:与依赖系统深度集成,提供优雅的认证依赖
  4. 权限管理:支持细粒度的权限范围控制和验证
  5. 灵活性:支持多种认证方案和自定义扩展
  6. 类型安全:基于 Python 类型提示,提供完整的类型检查

这个系统不仅功能强大,而且易于使用和扩展,是现代 Web API 安全认证的优秀实现。下一章我们将分析 FastAPI 的中间件系统。