概述

AutoGPT集成服务模块提供了与第三方服务的完整集成能力,支持50+种主流服务的API集成。模块采用插件化架构设计,包含OAuth认证流程、API密钥管理、凭据存储、Webhook处理等核心功能。通过标准化的集成接口和安全的凭据管理机制,为用户提供了便捷的第三方服务接入能力。

1. 集成服务整体架构

1.1 集成服务设计原则

AutoGPT集成服务模块遵循以下核心设计原则:

  • 插件化架构:每个第三方服务独立封装,支持动态加载
  • 标准化接口:统一的OAuth和API密钥认证接口
  • 安全凭据管理:加密存储和安全传输用户凭据
  • 灵活配置:支持环境变量和动态配置
  • 扩展性设计:易于添加新的第三方服务集成

1.2 集成服务架构图

graph TB
    subgraph "AutoGPT集成服务架构"
        subgraph "用户接口层 - User Interface Layer"
            WebUI[Web用户界面]
            APIEndpoint[API端点]
            OAuthCallback[OAuth回调]
        end
        
        subgraph "集成管理层 - Integration Management Layer"
            IntegrationManager[集成管理器]
            CredentialsManager[凭据管理器]
            ProviderRegistry[提供商注册表]
            ConfigManager[配置管理器]
        end
        
        subgraph "认证处理层 - Authentication Layer"
            OAuthHandler[OAuth处理器]
            APIKeyHandler[API密钥处理器]
            TokenManager[令牌管理器]
            RefreshManager[刷新管理器]
        end
        
        subgraph "服务适配层 - Service Adapter Layer"
            GoogleAdapter[Google适配器]
            GitHubAdapter[GitHub适配器]
            NotionAdapter[Notion适配器]
            DiscordAdapter[Discord适配器]
            TwitterAdapter[Twitter适配器]
            OtherAdapters[其他适配器...]
        end
        
        subgraph "Webhook处理层 - Webhook Layer"
            WebhookManager[Webhook管理器]
            EventProcessor[事件处理器]
            SignatureValidator[签名验证器]
        end
        
        subgraph "凭据存储层 - Credentials Storage Layer"
            CredentialsStore[凭据存储]
            EncryptionService[加密服务]
            UserIntegrations[用户集成数据]
        end
        
        subgraph "第三方服务 - Third-party Services"
            GoogleAPI[Google APIs]
            GitHubAPI[GitHub API]
            NotionAPI[Notion API]
            DiscordAPI[Discord API]
            TwitterAPI[Twitter API]
            OtherAPIs[其他APIs...]
        end
        
        subgraph "存储层 - Storage Layer"
            PostgreSQL[(PostgreSQL数据库)]
            Redis[(Redis缓存)]
            SecretManager[(密钥管理器)]
        end
    end
    
    %% 连接关系
    WebUI --> IntegrationManager
    APIEndpoint --> IntegrationManager
    OAuthCallback --> OAuthHandler
    
    IntegrationManager --> CredentialsManager
    IntegrationManager --> ProviderRegistry
    IntegrationManager --> ConfigManager
    
    CredentialsManager --> OAuthHandler
    CredentialsManager --> APIKeyHandler
    CredentialsManager --> TokenManager
    
    OAuthHandler --> RefreshManager
    TokenManager --> RefreshManager
    
    IntegrationManager --> GoogleAdapter
    IntegrationManager --> GitHubAdapter
    IntegrationManager --> NotionAdapter
    IntegrationManager --> DiscordAdapter
    IntegrationManager --> TwitterAdapter
    
    GoogleAdapter --> GoogleAPI
    GitHubAdapter --> GitHubAPI
    NotionAdapter --> NotionAPI
    DiscordAdapter --> DiscordAPI
    TwitterAdapter --> TwitterAPI
    
    WebhookManager --> EventProcessor
    WebhookManager --> SignatureValidator
    
    CredentialsStore --> EncryptionService
    CredentialsStore --> UserIntegrations
    
    CredentialsStore --> PostgreSQL
    TokenManager --> Redis
    ConfigManager --> SecretManager

图1-1: AutoGPT集成服务架构图

此架构图展示了集成服务模块的完整分层结构。用户接口层提供OAuth认证和API管理界面,集成管理层协调各种集成服务,认证处理层处理OAuth和API密钥认证,服务适配层封装具体的第三方服务API,Webhook处理层处理第三方服务的回调事件,凭据存储层安全管理用户凭据。

1.3 集成服务提供商总览

graph LR
    subgraph "AI/ML服务"
        OpenAI[OpenAI]
        Anthropic[Anthropic]
        Groq[Groq]
        Replicate[Replicate]
        Ollama[Ollama]
        NVIDIA[NVIDIA]
    end
    
    subgraph "开发工具"
        GitHub[GitHub]
        Notion[Notion]
        E2B[E2B]
        V0[V0 by Vercel]
    end
    
    subgraph "社交媒体"
        Twitter[Twitter]
        Discord[Discord]
        Reddit[Reddit]
        Medium[Medium]
    end
    
    subgraph "生产力工具"
        Google[Google Workspace]
        Todoist[Todoist]
        HubSpot[HubSpot]
        Smartlead[Smartlead]
    end
    
    subgraph "多媒体服务"
        DID[D-ID]
        Ideogram[Ideogram]
        FAL[FAL]
        UnrealSpeech[Unreal Speech]
    end
    
    subgraph "数据服务"
        Pinecone[Pinecone]
        Mem0[Mem0]
        Jina[Jina]
        ZeroBounce[ZeroBounce]
    end

图1-2: 支持的第三方服务提供商

AutoGPT平台支持50+种第三方服务,涵盖AI/ML、开发工具、社交媒体、生产力工具、多媒体服务和数据服务等多个领域。

2. 提供商注册与管理

2.1 提供商枚举定义

# /autogpt_platform/backend/backend/integrations/providers.py

from enum import Enum
from typing import Any

class ProviderName(str, Enum):
    """
    集成服务提供商名称枚举
    
    此枚举扩展了str类型,支持接受任何字符串值,
    同时保持与现有提供商常量的向后兼容性。
    
    特性:
    1. 预定义常用提供商
    2. 支持动态添加自定义提供商
    3. 类型安全的字符串枚举
    4. Pydantic集成支持
    """
    
    # AI/ML服务提供商
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    GROQ = "groq"
    REPLICATE = "replicate"
    OLLAMA = "ollama"
    NVIDIA = "nvidia"
    AIML_API = "aiml_api"
    LLAMA_API = "llama_api"
    OPEN_ROUTER = "open_router"
    
    # 开发工具
    GITHUB = "github"
    NOTION = "notion"
    E2B = "e2b"
    V0 = "v0"
    
    # 社交媒体
    TWITTER = "twitter"
    DISCORD = "discord"
    REDDIT = "reddit"
    MEDIUM = "medium"
    
    # 生产力工具
    GOOGLE = "google"
    GOOGLE_MAPS = "google_maps"
    TODOIST = "todoist"
    HUBSPOT = "hubspot"
    SMARTLEAD = "smartlead"
    
    # 多媒体服务
    D_ID = "d_id"
    IDEOGRAM = "ideogram"
    FAL = "fal"
    UNREAL_SPEECH = "unreal_speech"
    REVID = "revid"
    SCREENSHOTONE = "screenshotone"
    
    # 数据服务
    PINECONE = "pinecone"
    MEM0 = "mem0"
    JINA = "jina"
    ZEROBOUNCE = "zerobounce"
    ENRICHLAYER = "enrichlayer"
    
    # 其他服务
    HTTP = "http"
    SMTP = "smtp"
    APOLLO = "apollo"
    COMPASS = "compass"
    SLANT3D = "slant3d"
    OPENWEATHERMAP = "openweathermap"
    
    @classmethod
    def _missing_(cls, value: Any) -> "ProviderName":
        """
        允许任何字符串值作为ProviderName使用
        
        这使得SDK用户可以定义自定义提供商,
        而无需修改枚举定义。
        
        参数:
            value: 要创建的提供商名称
            
        返回:
            ProviderName: 伪枚举成员
        """
        if isinstance(value, str):
            # 创建一个行为类似枚举成员的伪成员
            pseudo_member = str.__new__(cls, value)
            pseudo_member._name_ = value.upper()
            pseudo_member._value_ = value
            return pseudo_member
        return None  # type: ignore
    
    @classmethod
    def __get_pydantic_json_schema__(cls, schema, handler):
        """
        为Pydantic提供JSON Schema定义
        
        允许在API文档中正确显示提供商选项
        """
        # 获取所有预定义的提供商值
        enum_values = [member.value for member in cls]
        
        return {
            "type": "string",
            "enum": enum_values,
            "description": "集成服务提供商名称",
            "examples": ["openai", "github", "notion", "discord"]
        }
    
    def is_ai_provider(self) -> bool:
        """检查是否为AI服务提供商"""
        ai_providers = {
            self.OPENAI, self.ANTHROPIC, self.GROQ, 
            self.REPLICATE, self.OLLAMA, self.NVIDIA,
            self.AIML_API, self.LLAMA_API, self.OPEN_ROUTER
        }
        return self in ai_providers
    
    def is_oauth_provider(self) -> bool:
        """检查是否支持OAuth认证"""
        oauth_providers = {
            self.GOOGLE, self.GITHUB, self.TWITTER,
            self.DISCORD, self.NOTION, self.TODOIST
        }
        return self in oauth_providers
    
    def get_display_name(self) -> str:
        """获取用户友好的显示名称"""
        display_names = {
            self.OPENAI: "OpenAI",
            self.ANTHROPIC: "Anthropic",
            self.GITHUB: "GitHub",
            self.GOOGLE: "Google",
            self.GOOGLE_MAPS: "Google Maps",
            self.D_ID: "D-ID",
            self.E2B: "E2B",
            self.V0: "v0 by Vercel",
            self.AIML_API: "AI/ML API",
            self.LLAMA_API: "Llama API",
            self.OPEN_ROUTER: "OpenRouter",
            self.UNREAL_SPEECH: "Unreal Speech",
            self.SCREENSHOTONE: "ScreenshotOne",
            self.ZEROBOUNCE: "ZeroBounce",
            self.ENRICHLAYER: "EnrichLayer",
            self.OPENWEATHERMAP: "OpenWeatherMap",
            self.SLANT3D: "Slant 3D",
        }
        return display_names.get(self, self.value.title())

class ProviderCategory(str, Enum):
    """提供商分类枚举"""
    
    AI_ML = "ai_ml"
    DEVELOPMENT = "development"
    SOCIAL_MEDIA = "social_media"
    PRODUCTIVITY = "productivity"
    MULTIMEDIA = "multimedia"
    DATA_SERVICES = "data_services"
    COMMUNICATION = "communication"
    ECOMMERCE = "ecommerce"
    ANALYTICS = "analytics"
    OTHER = "other"

def get_provider_category(provider: ProviderName) -> ProviderCategory:
    """
    获取提供商所属分类
    
    参数:
        provider: 提供商名称
        
    返回:
        ProviderCategory: 提供商分类
    """
    category_mapping = {
        # AI/ML服务
        ProviderName.OPENAI: ProviderCategory.AI_ML,
        ProviderName.ANTHROPIC: ProviderCategory.AI_ML,
        ProviderName.GROQ: ProviderCategory.AI_ML,
        ProviderName.REPLICATE: ProviderCategory.AI_ML,
        ProviderName.OLLAMA: ProviderCategory.AI_ML,
        ProviderName.NVIDIA: ProviderCategory.AI_ML,
        
        # 开发工具
        ProviderName.GITHUB: ProviderCategory.DEVELOPMENT,
        ProviderName.NOTION: ProviderCategory.DEVELOPMENT,
        ProviderName.E2B: ProviderCategory.DEVELOPMENT,
        ProviderName.V0: ProviderCategory.DEVELOPMENT,
        
        # 社交媒体
        ProviderName.TWITTER: ProviderCategory.SOCIAL_MEDIA,
        ProviderName.DISCORD: ProviderCategory.SOCIAL_MEDIA,
        ProviderName.REDDIT: ProviderCategory.SOCIAL_MEDIA,
        ProviderName.MEDIUM: ProviderCategory.SOCIAL_MEDIA,
        
        # 生产力工具
        ProviderName.GOOGLE: ProviderCategory.PRODUCTIVITY,
        ProviderName.TODOIST: ProviderCategory.PRODUCTIVITY,
        ProviderName.HUBSPOT: ProviderCategory.PRODUCTIVITY,
        ProviderName.SMARTLEAD: ProviderCategory.PRODUCTIVITY,
        
        # 多媒体服务
        ProviderName.D_ID: ProviderCategory.MULTIMEDIA,
        ProviderName.IDEOGRAM: ProviderCategory.MULTIMEDIA,
        ProviderName.FAL: ProviderCategory.MULTIMEDIA,
        ProviderName.UNREAL_SPEECH: ProviderCategory.MULTIMEDIA,
        
        # 数据服务
        ProviderName.PINECONE: ProviderCategory.DATA_SERVICES,
        ProviderName.MEM0: ProviderCategory.DATA_SERVICES,
        ProviderName.JINA: ProviderCategory.DATA_SERVICES,
        ProviderName.ZEROBOUNCE: ProviderCategory.DATA_SERVICES,
    }
    
    return category_mapping.get(provider, ProviderCategory.OTHER)

2.2 提供商注册表

# 提供商注册表实现

from typing import Dict, List, Optional, Type
from dataclasses import dataclass

@dataclass
class ProviderInfo:
    """
    提供商信息数据类
    
    包含提供商的基本信息和配置
    """
    name: ProviderName
    display_name: str
    category: ProviderCategory
    description: str
    auth_type: str  # "oauth", "api_key", "both"
    website_url: str
    documentation_url: str
    logo_url: Optional[str] = None
    is_enabled: bool = True
    requires_approval: bool = False
    rate_limits: Optional[Dict[str, int]] = None

class ProviderRegistry:
    """
    提供商注册表
    
    管理所有集成服务提供商的信息和配置
    """
    
    def __init__(self):
        self._providers: Dict[ProviderName, ProviderInfo] = {}
        self._initialize_default_providers()
    
    def _initialize_default_providers(self):
        """初始化默认提供商信息"""
        default_providers = [
            ProviderInfo(
                name=ProviderName.OPENAI,
                display_name="OpenAI",
                category=ProviderCategory.AI_ML,
                description="OpenAI GPT models and APIs",
                auth_type="api_key",
                website_url="https://openai.com",
                documentation_url="https://platform.openai.com/docs",
                logo_url="https://openai.com/favicon.ico",
                rate_limits={"requests_per_minute": 60}
            ),
            ProviderInfo(
                name=ProviderName.GITHUB,
                display_name="GitHub",
                category=ProviderCategory.DEVELOPMENT,
                description="GitHub repositories and APIs",
                auth_type="oauth",
                website_url="https://github.com",
                documentation_url="https://docs.github.com/en/rest",
                logo_url="https://github.com/favicon.ico",
                rate_limits={"requests_per_hour": 5000}
            ),
            ProviderInfo(
                name=ProviderName.GOOGLE,
                display_name="Google",
                category=ProviderCategory.PRODUCTIVITY,
                description="Google Workspace APIs",
                auth_type="oauth",
                website_url="https://google.com",
                documentation_url="https://developers.google.com",
                logo_url="https://google.com/favicon.ico",
                rate_limits={"requests_per_day": 100000}
            ),
            ProviderInfo(
                name=ProviderName.DISCORD,
                display_name="Discord",
                category=ProviderCategory.SOCIAL_MEDIA,
                description="Discord bot and API integration",
                auth_type="both",
                website_url="https://discord.com",
                documentation_url="https://discord.com/developers/docs",
                logo_url="https://discord.com/assets/favicon.ico",
                rate_limits={"requests_per_second": 50}
            ),
            # 添加更多默认提供商...
        ]
        
        for provider in default_providers:
            self._providers[provider.name] = provider
    
    def register_provider(self, provider_info: ProviderInfo):
        """
        注册新的提供商
        
        参数:
            provider_info: 提供商信息
        """
        self._providers[provider_info.name] = provider_info
    
    def get_provider(self, name: ProviderName) -> Optional[ProviderInfo]:
        """
        获取提供商信息
        
        参数:
            name: 提供商名称
            
        返回:
            Optional[ProviderInfo]: 提供商信息,如果不存在则返回None
        """
        return self._providers.get(name)
    
    def get_providers_by_category(self, category: ProviderCategory) -> List[ProviderInfo]:
        """
        按分类获取提供商列表
        
        参数:
            category: 提供商分类
            
        返回:
            List[ProviderInfo]: 该分类下的提供商列表
        """
        return [
            provider for provider in self._providers.values()
            if provider.category == category and provider.is_enabled
        ]
    
    def get_oauth_providers(self) -> List[ProviderInfo]:
        """获取支持OAuth的提供商列表"""
        return [
            provider for provider in self._providers.values()
            if provider.auth_type in ["oauth", "both"] and provider.is_enabled
        ]
    
    def get_api_key_providers(self) -> List[ProviderInfo]:
        """获取支持API密钥的提供商列表"""
        return [
            provider for provider in self._providers.values()
            if provider.auth_type in ["api_key", "both"] and provider.is_enabled
        ]
    
    def search_providers(self, query: str) -> List[ProviderInfo]:
        """
        搜索提供商
        
        参数:
            query: 搜索关键词
            
        返回:
            List[ProviderInfo]: 匹配的提供商列表
        """
        query_lower = query.lower()
        results = []
        
        for provider in self._providers.values():
            if not provider.is_enabled:
                continue
            
            # 搜索名称、显示名称和描述
            if (query_lower in provider.name.value.lower() or
                query_lower in provider.display_name.lower() or
                query_lower in provider.description.lower()):
                results.append(provider)
        
        return results
    
    def get_all_providers(self) -> List[ProviderInfo]:
        """获取所有启用的提供商"""
        return [
            provider for provider in self._providers.values()
            if provider.is_enabled
        ]

# 全局提供商注册表实例
provider_registry = ProviderRegistry()

3. OAuth认证处理

3.1 OAuth基础处理器

# /autogpt_platform/backend/backend/integrations/oauth/base.py

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from urllib.parse import urlencode
import httpx
from pydantic import BaseModel

from autogpt_libs.supabase_integration_credentials_store.types import (
    OAuth2Credentials,
    OAuthState,
)
from backend.integrations.providers import ProviderName

class OAuthConfig(BaseModel):
    """OAuth配置模型"""
    
    client_id: str
    client_secret: str
    redirect_uri: str
    scope: Optional[str] = None
    additional_params: Dict[str, str] = {}

class BaseOAuthHandler(ABC):
    """
    OAuth处理器基类
    
    定义了OAuth认证流程的标准接口,
    所有OAuth提供商都必须实现这些方法。
    
    OAuth流程:
    1. 生成授权URL
    2. 处理授权回调
    3. 交换访问令牌
    4. 刷新访问令牌
    5. 撤销访问令牌
    """
    
    PROVIDER_NAME: ProviderName
    
    def __init__(self, config: OAuthConfig):
        self.config = config
        self.client = httpx.AsyncClient()
    
    @property
    @abstractmethod
    def authorization_url(self) -> str:
        """授权服务器URL"""
        pass
    
    @property
    @abstractmethod
    def token_url(self) -> str:
        """令牌交换URL"""
        pass
    
    @property
    @abstractmethod
    def revoke_url(self) -> Optional[str]:
        """令牌撤销URL(可选)"""
        pass
    
    def get_authorization_url(
        self, 
        state: str, 
        extra_params: Optional[Dict[str, str]] = None
    ) -> str:
        """
        生成OAuth授权URL
        
        参数:
            state: 状态参数,用于防止CSRF攻击
            extra_params: 额外的查询参数
            
        返回:
            str: 完整的授权URL
        """
        params = {
            "client_id": self.config.client_id,
            "redirect_uri": self.config.redirect_uri,
            "response_type": "code",
            "state": state,
        }
        
        # 添加scope参数
        if self.config.scope:
            params["scope"] = self.config.scope
        
        # 添加额外参数
        if extra_params:
            params.update(extra_params)
        
        # 添加提供商特定参数
        params.update(self.config.additional_params)
        
        return f"{self.authorization_url}?{urlencode(params)}"
    
    async def exchange_code_for_tokens(
        self, 
        code: str, 
        state: str
    ) -> OAuth2Credentials:
        """
        交换授权码获取访问令牌
        
        参数:
            code: 授权码
            state: 状态参数
            
        返回:
            OAuth2Credentials: OAuth2凭据对象
            
        异常:
            OAuthError: 令牌交换失败时抛出
        """
        token_data = {
            "client_id": self.config.client_id,
            "client_secret": self.config.client_secret,
            "code": code,
            "grant_type": "authorization_code",
            "redirect_uri": self.config.redirect_uri,
        }
        
        try:
            response = await self.client.post(
                self.token_url,
                data=token_data,
                headers={"Accept": "application/json"}
            )
            response.raise_for_status()
            
            token_response = response.json()
            
            # 获取用户信息
            user_info = await self.get_user_info(token_response["access_token"])
            
            # 创建OAuth2凭据
            credentials = OAuth2Credentials(
                provider=self.PROVIDER_NAME.value,
                access_token=token_response["access_token"],
                refresh_token=token_response.get("refresh_token"),
                access_token_expires_at=self._calculate_expiry(
                    token_response.get("expires_in")
                ),
                scopes=self._parse_scopes(token_response.get("scope")),
                username=user_info.get("username"),
                metadata={
                    "user_info": user_info,
                    "token_type": token_response.get("token_type", "Bearer"),
                }
            )
            
            return credentials
            
        except httpx.HTTPStatusError as e:
            error_detail = e.response.text
            raise OAuthError(f"Token exchange failed: {error_detail}")
        except Exception as e:
            raise OAuthError(f"Unexpected error during token exchange: {str(e)}")
    
    async def refresh_access_token(
        self, 
        credentials: OAuth2Credentials
    ) -> OAuth2Credentials:
        """
        刷新访问令牌
        
        参数:
            credentials: 现有的OAuth2凭据
            
        返回:
            OAuth2Credentials: 更新后的凭据
            
        异常:
            OAuthError: 令牌刷新失败时抛出
        """
        if not credentials.refresh_token:
            raise OAuthError("No refresh token available")
        
        refresh_data = {
            "client_id": self.config.client_id,
            "client_secret": self.config.client_secret,
            "refresh_token": credentials.refresh_token.get_secret_value(),
            "grant_type": "refresh_token",
        }
        
        try:
            response = await self.client.post(
                self.token_url,
                data=refresh_data,
                headers={"Accept": "application/json"}
            )
            response.raise_for_status()
            
            token_response = response.json()
            
            # 更新凭据
            credentials.access_token = token_response["access_token"]
            if "refresh_token" in token_response:
                credentials.refresh_token = token_response["refresh_token"]
            
            credentials.access_token_expires_at = self._calculate_expiry(
                token_response.get("expires_in")
            )
            
            return credentials
            
        except httpx.HTTPStatusError as e:
            error_detail = e.response.text
            raise OAuthError(f"Token refresh failed: {error_detail}")
        except Exception as e:
            raise OAuthError(f"Unexpected error during token refresh: {str(e)}")
    
    async def revoke_token(self, credentials: OAuth2Credentials) -> bool:
        """
        撤销访问令牌
        
        参数:
            credentials: 要撤销的凭据
            
        返回:
            bool: 是否成功撤销
        """
        if not self.revoke_url:
            return True  # 如果不支持撤销,认为成功
        
        try:
            revoke_data = {
                "token": credentials.access_token.get_secret_value(),
                "client_id": self.config.client_id,
                "client_secret": self.config.client_secret,
            }
            
            response = await self.client.post(
                self.revoke_url,
                data=revoke_data
            )
            
            return response.status_code in [200, 204]
            
        except Exception as e:
            logger.error(f"Token revocation failed: {e}")
            return False
    
    @abstractmethod
    async def get_user_info(self, access_token: str) -> Dict[str, Any]:
        """
        获取用户信息
        
        参数:
            access_token: 访问令牌
            
        返回:
            Dict[str, Any]: 用户信息字典
        """
        pass
    
    def _calculate_expiry(self, expires_in: Optional[int]) -> Optional[int]:
        """计算令牌过期时间"""
        if expires_in is None:
            return None
        
        import time
        return int(time.time()) + expires_in
    
    def _parse_scopes(self, scope_string: Optional[str]) -> List[str]:
        """解析权限范围字符串"""
        if not scope_string:
            return []
        
        return scope_string.split()
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()

class OAuthError(Exception):
    """OAuth认证异常"""
    pass

3.2 具体OAuth处理器实现

# GitHub OAuth处理器实现

from backend.integrations.oauth.base import BaseOAuthHandler, OAuthConfig
from backend.integrations.providers import ProviderName

class GitHubOAuthHandler(BaseOAuthHandler):
    """
    GitHub OAuth处理器
    
    实现GitHub OAuth 2.0认证流程
    """
    
    PROVIDER_NAME = ProviderName.GITHUB
    
    @property
    def authorization_url(self) -> str:
        return "https://github.com/login/oauth/authorize"
    
    @property
    def token_url(self) -> str:
        return "https://github.com/login/oauth/access_token"
    
    @property
    def revoke_url(self) -> Optional[str]:
        return None  # GitHub不支持令牌撤销
    
    async def get_user_info(self, access_token: str) -> Dict[str, Any]:
        """
        获取GitHub用户信息
        
        参数:
            access_token: GitHub访问令牌
            
        返回:
            Dict[str, Any]: 用户信息
        """
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/vnd.github.v3+json",
        }
        
        try:
            response = await self.client.get(
                "https://api.github.com/user",
                headers=headers
            )
            response.raise_for_status()
            
            user_data = response.json()
            
            return {
                "id": str(user_data["id"]),
                "username": user_data["login"],
                "name": user_data.get("name"),
                "email": user_data.get("email"),
                "avatar_url": user_data.get("avatar_url"),
                "profile_url": user_data.get("html_url"),
                "company": user_data.get("company"),
                "location": user_data.get("location"),
                "bio": user_data.get("bio"),
                "public_repos": user_data.get("public_repos"),
                "followers": user_data.get("followers"),
                "following": user_data.get("following"),
            }
            
        except httpx.HTTPStatusError as e:
            raise OAuthError(f"Failed to get GitHub user info: {e.response.text}")

# Google OAuth处理器实现

class GoogleOAuthHandler(BaseOAuthHandler):
    """
    Google OAuth处理器
    
    实现Google OAuth 2.0认证流程
    """
    
    PROVIDER_NAME = ProviderName.GOOGLE
    
    @property
    def authorization_url(self) -> str:
        return "https://accounts.google.com/o/oauth2/v2/auth"
    
    @property
    def token_url(self) -> str:
        return "https://oauth2.googleapis.com/token"
    
    @property
    def revoke_url(self) -> Optional[str]:
        return "https://oauth2.googleapis.com/revoke"
    
    def get_authorization_url(
        self, 
        state: str, 
        extra_params: Optional[Dict[str, str]] = None
    ) -> str:
        """Google特定的授权URL生成"""
        # Google需要access_type=offline来获取refresh_token
        google_params = {
            "access_type": "offline",
            "prompt": "consent",  # 强制显示同意屏幕以获取refresh_token
        }
        
        if extra_params:
            google_params.update(extra_params)
        
        return super().get_authorization_url(state, google_params)
    
    async def get_user_info(self, access_token: str) -> Dict[str, Any]:
        """
        获取Google用户信息
        
        参数:
            access_token: Google访问令牌
            
        返回:
            Dict[str, Any]: 用户信息
        """
        headers = {
            "Authorization": f"Bearer {access_token}",
        }
        
        try:
            response = await self.client.get(
                "https://www.googleapis.com/oauth2/v2/userinfo",
                headers=headers
            )
            response.raise_for_status()
            
            user_data = response.json()
            
            return {
                "id": user_data["id"],
                "username": user_data.get("email"),
                "name": user_data.get("name"),
                "email": user_data.get("email"),
                "avatar_url": user_data.get("picture"),
                "verified_email": user_data.get("verified_email"),
                "locale": user_data.get("locale"),
                "given_name": user_data.get("given_name"),
                "family_name": user_data.get("family_name"),
            }
            
        except httpx.HTTPStatusError as e:
            raise OAuthError(f"Failed to get Google user info: {e.response.text}")

# Discord OAuth处理器实现

class DiscordOAuthHandler(BaseOAuthHandler):
    """
    Discord OAuth处理器
    
    实现Discord OAuth 2.0认证流程
    """
    
    PROVIDER_NAME = ProviderName.DISCORD
    
    @property
    def authorization_url(self) -> str:
        return "https://discord.com/api/oauth2/authorize"
    
    @property
    def token_url(self) -> str:
        return "https://discord.com/api/oauth2/token"
    
    @property
    def revoke_url(self) -> Optional[str]:
        return "https://discord.com/api/oauth2/token/revoke"
    
    async def get_user_info(self, access_token: str) -> Dict[str, Any]:
        """
        获取Discord用户信息
        
        参数:
            access_token: Discord访问令牌
            
        返回:
            Dict[str, Any]: 用户信息
        """
        headers = {
            "Authorization": f"Bearer {access_token}",
        }
        
        try:
            response = await self.client.get(
                "https://discord.com/api/users/@me",
                headers=headers
            )
            response.raise_for_status()
            
            user_data = response.json()
            
            return {
                "id": user_data["id"],
                "username": user_data["username"],
                "discriminator": user_data.get("discriminator"),
                "email": user_data.get("email"),
                "avatar": user_data.get("avatar"),
                "verified": user_data.get("verified"),
                "locale": user_data.get("locale"),
                "mfa_enabled": user_data.get("mfa_enabled"),
                "premium_type": user_data.get("premium_type"),
            }
            
        except httpx.HTTPStatusError as e:
            raise OAuthError(f"Failed to get Discord user info: {e.response.text}")

4. 凭据管理系统

4.1 集成凭据管理器

# /autogpt_platform/backend/backend/integrations/creds_manager.py

import logging
from typing import Dict, List, Optional
from uuid import uuid4

from autogpt_libs.supabase_integration_credentials_store.types import (
    Credentials,
    OAuth2Credentials,
    APIKeyCredentials,
    OAuthState,
)
from backend.integrations.credentials_store import IntegrationCredentialsStore
from backend.integrations.oauth import get_oauth_handler
from backend.integrations.providers import ProviderName

logger = logging.getLogger(__name__)

class IntegrationCredentialsManager:
    """
    集成凭据管理器
    
    负责管理用户的第三方服务凭据,包括:
    1. OAuth认证流程管理
    2. API密钥存储和验证
    3. 凭据的增删改查操作
    4. 凭据的加密存储
    5. 凭据的自动刷新
    """
    
    def __init__(self):
        self.store = IntegrationCredentialsStore()
    
    async def initiate_oauth_flow(
        self, 
        user_id: str, 
        provider: ProviderName,
        redirect_uri: str,
        scopes: Optional[List[str]] = None
    ) -> tuple[str, str]:
        """
        启动OAuth认证流程
        
        参数:
            user_id: 用户ID
            provider: 服务提供商
            redirect_uri: 重定向URI
            scopes: 权限范围列表
            
        返回:
            tuple[str, str]: (授权URL, 状态参数)
            
        异常:
            ValueError: 不支持的提供商或配置错误
        """
        try:
            # 获取OAuth处理器
            oauth_handler = await get_oauth_handler(provider)
            
            # 生成状态参数
            state = str(uuid4())
            
            # 创建OAuth状态记录
            oauth_state = OAuthState(
                state=state,
                provider=provider.value,
                user_id=user_id,
                redirect_uri=redirect_uri,
                scopes=scopes or [],
                created_at=datetime.utcnow(),
            )
            
            # 存储OAuth状态
            await self.store.store_oauth_state(user_id, oauth_state)
            
            # 生成授权URL
            extra_params = {}
            if scopes:
                extra_params["scope"] = " ".join(scopes)
            
            auth_url = oauth_handler.get_authorization_url(state, extra_params)
            
            logger.info(f"OAuth flow initiated for user {user_id}, provider {provider}")
            return auth_url, state
            
        except Exception as e:
            logger.error(f"Failed to initiate OAuth flow: {e}")
            raise ValueError(f"OAuth flow initiation failed: {str(e)}")
    
    async def complete_oauth_flow(
        self, 
        user_id: str, 
        code: str, 
        state: str
    ) -> Credentials:
        """
        完成OAuth认证流程
        
        参数:
            user_id: 用户ID
            code: 授权码
            state: 状态参数
            
        返回:
            Credentials: 创建的凭据对象
            
        异常:
            ValueError: 状态验证失败或令牌交换失败
        """
        try:
            # 验证并获取OAuth状态
            oauth_state = await self.store.get_oauth_state(user_id, state)
            if not oauth_state:
                raise ValueError("Invalid or expired OAuth state")
            
            # 获取OAuth处理器
            oauth_handler = await get_oauth_handler(oauth_state.provider)
            
            # 交换授权码获取令牌
            credentials = await oauth_handler.exchange_code_for_tokens(code, state)
            
            # 设置凭据ID和用户ID
            credentials.id = str(uuid4())
            credentials.user_id = user_id
            
            # 存储凭据
            await self.store.add_creds(user_id, credentials)
            
            # 清理OAuth状态
            await self.store.delete_oauth_state(user_id, state)
            
            logger.info(f"OAuth flow completed for user {user_id}, provider {oauth_state.provider}")
            return credentials
            
        except Exception as e:
            logger.error(f"Failed to complete OAuth flow: {e}")
            raise ValueError(f"OAuth flow completion failed: {str(e)}")
    
    async def add_api_key_credentials(
        self,
        user_id: str,
        provider: ProviderName,
        api_key: str,
        title: Optional[str] = None,
        metadata: Optional[Dict] = None
    ) -> APIKeyCredentials:
        """
        添加API密钥凭据
        
        参数:
            user_id: 用户ID
            provider: 服务提供商
            api_key: API密钥
            title: 凭据标题
            metadata: 额外元数据
            
        返回:
            APIKeyCredentials: 创建的API密钥凭据
        """
        try:
            # 创建API密钥凭据
            credentials = APIKeyCredentials(
                id=str(uuid4()),
                user_id=user_id,
                provider=provider.value,
                api_key=api_key,
                title=title or f"{provider.get_display_name()} API Key",
                metadata=metadata or {},
                created_at=datetime.utcnow(),
            )
            
            # 验证API密钥(如果提供商支持)
            if await self._validate_api_key(provider, api_key):
                credentials.is_valid = True
            
            # 存储凭据
            await self.store.add_creds(user_id, credentials)
            
            logger.info(f"API key credentials added for user {user_id}, provider {provider}")
            return credentials
            
        except Exception as e:
            logger.error(f"Failed to add API key credentials: {e}")
            raise ValueError(f"API key addition failed: {str(e)}")
    
    async def get_user_credentials(
        self, 
        user_id: str, 
        provider: Optional[ProviderName] = None
    ) -> List[Credentials]:
        """
        获取用户凭据列表
        
        参数:
            user_id: 用户ID
            provider: 可选的提供商过滤
            
        返回:
            List[Credentials]: 凭据列表
        """
        try:
            all_credentials = await self.store.get_all_creds(user_id)
            
            if provider:
                # 过滤特定提供商的凭据
                filtered_credentials = [
                    cred for cred in all_credentials
                    if cred.provider == provider.value
                ]
                return filtered_credentials
            
            return all_credentials
            
        except Exception as e:
            logger.error(f"Failed to get user credentials: {e}")
            return []
    
    async def get_credentials_by_id(
        self, 
        user_id: str, 
        credentials_id: str
    ) -> Optional[Credentials]:
        """
        根据ID获取特定凭据
        
        参数:
            user_id: 用户ID
            credentials_id: 凭据ID
            
        返回:
            Optional[Credentials]: 凭据对象,如果不存在则返回None
        """
        try:
            all_credentials = await self.store.get_all_creds(user_id)
            
            for cred in all_credentials:
                if cred.id == credentials_id:
                    return cred
            
            return None
            
        except Exception as e:
            logger.error(f"Failed to get credentials by ID: {e}")
            return None
    
    async def update_credentials(
        self, 
        user_id: str, 
        credentials_id: str,
        updates: Dict
    ) -> Optional[Credentials]:
        """
        更新凭据信息
        
        参数:
            user_id: 用户ID
            credentials_id: 凭据ID
            updates: 要更新的字段
            
        返回:
            Optional[Credentials]: 更新后的凭据对象
        """
        try:
            # 获取现有凭据
            credentials = await self.get_credentials_by_id(user_id, credentials_id)
            if not credentials:
                return None
            
            # 应用更新
            for key, value in updates.items():
                if hasattr(credentials, key):
                    setattr(credentials, key, value)
            
            # 更新时间戳
            credentials.updated_at = datetime.utcnow()
            
            # 保存更新
            await self.store.update_creds(user_id, credentials)
            
            logger.info(f"Credentials updated for user {user_id}, ID {credentials_id}")
            return credentials
            
        except Exception as e:
            logger.error(f"Failed to update credentials: {e}")
            return None
    
    async def delete_credentials(
        self, 
        user_id: str, 
        credentials_id: str
    ) -> bool:
        """
        删除凭据
        
        参数:
            user_id: 用户ID
            credentials_id: 凭据ID
            
        返回:
            bool: 是否删除成功
        """
        try:
            # 获取凭据
            credentials = await self.get_credentials_by_id(user_id, credentials_id)
            if not credentials:
                return False
            
            # 如果是OAuth凭据,尝试撤销令牌
            if isinstance(credentials, OAuth2Credentials):
                try:
                    oauth_handler = await get_oauth_handler(credentials.provider)
                    await oauth_handler.revoke_token(credentials)
                except Exception as e:
                    logger.warning(f"Failed to revoke OAuth token: {e}")
            
            # 从存储中删除
            await self.store.delete_creds(user_id, credentials_id)
            
            logger.info(f"Credentials deleted for user {user_id}, ID {credentials_id}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to delete credentials: {e}")
            return False
    
    async def refresh_oauth_credentials(
        self, 
        user_id: str, 
        credentials_id: str
    ) -> Optional[OAuth2Credentials]:
        """
        刷新OAuth凭据
        
        参数:
            user_id: 用户ID
            credentials_id: 凭据ID
            
        返回:
            Optional[OAuth2Credentials]: 刷新后的凭据
        """
        try:
            # 获取凭据
            credentials = await self.get_credentials_by_id(user_id, credentials_id)
            if not isinstance(credentials, OAuth2Credentials):
                return None
            
            # 检查是否需要刷新
            if not credentials.is_expired():
                return credentials
            
            # 获取OAuth处理器并刷新令牌
            oauth_handler = await get_oauth_handler(credentials.provider)
            refreshed_credentials = await oauth_handler.refresh_access_token(credentials)
            
            # 更新存储
            await self.store.update_creds(user_id, refreshed_credentials)
            
            logger.info(f"OAuth credentials refreshed for user {user_id}, ID {credentials_id}")
            return refreshed_credentials
            
        except Exception as e:
            logger.error(f"Failed to refresh OAuth credentials: {e}")
            return None
    
    async def _validate_api_key(self, provider: ProviderName, api_key: str) -> bool:
        """
        验证API密钥有效性
        
        参数:
            provider: 服务提供商
            api_key: API密钥
            
        返回:
            bool: 是否有效
        """
        # 这里可以实现具体的API密钥验证逻辑
        # 例如调用提供商的验证端点
        try:
            # 示例:OpenAI API密钥验证
            if provider == ProviderName.OPENAI:
                headers = {"Authorization": f"Bearer {api_key}"}
                async with httpx.AsyncClient() as client:
                    response = await client.get(
                        "https://api.openai.com/v1/models",
                        headers=headers,
                        timeout=10
                    )
                    return response.status_code == 200
            
            # 其他提供商的验证逻辑...
            
            return True  # 默认认为有效
            
        except Exception as e:
            logger.warning(f"API key validation failed for {provider}: {e}")
            return False
    
    async def cleanup_expired_states(self):
        """清理过期的OAuth状态"""
        try:
            await self.store.cleanup_expired_oauth_states()
            logger.info("Expired OAuth states cleaned up")
        except Exception as e:
            logger.error(f"Failed to cleanup expired OAuth states: {e}")
    
    async def get_provider_statistics(self, user_id: str) -> Dict[str, int]:
        """
        获取用户的提供商使用统计
        
        参数:
            user_id: 用户ID
            
        返回:
            Dict[str, int]: 提供商使用统计
        """
        try:
            credentials = await self.get_user_credentials(user_id)
            
            stats = {}
            for cred in credentials:
                provider = cred.provider
                if provider not in stats:
                    stats[provider] = 0
                stats[provider] += 1
            
            return stats
            
        except Exception as e:
            logger.error(f"Failed to get provider statistics: {e}")
            return {}

5. Webhook处理系统

5.1 Webhook基础管理器

# /autogpt_platform/backend/backend/integrations/webhooks/_base.py

from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Optional, TypeVar
import hashlib
import hmac
import logging
from fastapi import Request, HTTPException

from backend.integrations.providers import ProviderName

logger = logging.getLogger(__name__)

# Webhook事件类型变量
WT = TypeVar('WT')

class WebhookEvent:
    """
    Webhook事件基类
    
    所有Webhook事件都应该继承此类
    """
    
    def __init__(
        self, 
        provider: str, 
        event_type: str, 
        data: Dict[str, Any],
        timestamp: Optional[int] = None
    ):
        self.provider = provider
        self.event_type = event_type
        self.data = data
        self.timestamp = timestamp or int(time.time())
        self.id = self._generate_event_id()
    
    def _generate_event_id(self) -> str:
        """生成事件唯一ID"""
        import time
        import uuid
        return f"{self.provider}_{self.event_type}_{uuid.uuid4().hex[:8]}_{int(time.time())}"
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典格式"""
        return {
            "id": self.id,
            "provider": self.provider,
            "event_type": self.event_type,
            "data": self.data,
            "timestamp": self.timestamp,
        }

class BaseWebhooksManager(ABC, Generic[WT]):
    """
    Webhook管理器基类
    
    定义了处理第三方服务Webhook的标准接口
    
    功能:
    1. 验证Webhook签名
    2. 解析Webhook事件
    3. 处理Webhook事件
    4. 管理Webhook订阅
    """
    
    PROVIDER_NAME: ProviderName
    
    def __init__(self, webhook_secret: Optional[str] = None):
        self.webhook_secret = webhook_secret
    
    @abstractmethod
    async def verify_webhook_signature(
        self, 
        request: Request, 
        payload: bytes
    ) -> bool:
        """
        验证Webhook签名
        
        参数:
            request: FastAPI请求对象
            payload: 请求载荷
            
        返回:
            bool: 签名是否有效
        """
        pass
    
    @abstractmethod
    async def parse_webhook_event(
        self, 
        request: Request, 
        payload: Dict[str, Any]
    ) -> Optional[WT]:
        """
        解析Webhook事件
        
        参数:
            request: FastAPI请求对象
            payload: 解析后的载荷
            
        返回:
            Optional[WT]: 解析后的事件对象,如果无法解析则返回None
        """
        pass
    
    @abstractmethod
    async def handle_webhook_event(self, event: WT) -> Dict[str, Any]:
        """
        处理Webhook事件
        
        参数:
            event: Webhook事件对象
            
        返回:
            Dict[str, Any]: 处理结果
        """
        pass
    
    async def process_webhook(self, request: Request) -> Dict[str, Any]:
        """
        处理Webhook请求的完整流程
        
        参数:
            request: FastAPI请求对象
            
        返回:
            Dict[str, Any]: 处理结果
            
        异常:
            HTTPException: 请求验证失败或处理错误时抛出
        """
        try:
            # 读取请求载荷
            payload_bytes = await request.body()
            
            # 验证签名
            if not await self.verify_webhook_signature(request, payload_bytes):
                logger.warning(f"Invalid webhook signature from {self.PROVIDER_NAME}")
                raise HTTPException(status_code=401, detail="Invalid signature")
            
            # 解析JSON载荷
            try:
                import json
                payload_dict = json.loads(payload_bytes.decode('utf-8'))
            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                logger.error(f"Failed to parse webhook payload: {e}")
                raise HTTPException(status_code=400, detail="Invalid JSON payload")
            
            # 解析事件
            event = await self.parse_webhook_event(request, payload_dict)
            if not event:
                logger.warning(f"Failed to parse webhook event from {self.PROVIDER_NAME}")
                raise HTTPException(status_code=400, detail="Invalid event format")
            
            # 处理事件
            result = await self.handle_webhook_event(event)
            
            logger.info(f"Successfully processed webhook from {self.PROVIDER_NAME}")
            return result
            
        except HTTPException:
            raise
        except Exception as e:
            logger.error(f"Unexpected error processing webhook: {e}")
            raise HTTPException(status_code=500, detail="Internal server error")
    
    def _verify_hmac_signature(
        self, 
        payload: bytes, 
        signature: str, 
        secret: str,
        algorithm: str = "sha256"
    ) -> bool:
        """
        验证HMAC签名
        
        参数:
            payload: 请求载荷
            signature: 提供的签名
            secret: 密钥
            algorithm: 哈希算法
            
        返回:
            bool: 签名是否有效
        """
        if not secret:
            logger.warning("No webhook secret configured")
            return False
        
        try:
            # 计算期望的签名
            expected_signature = hmac.new(
                secret.encode('utf-8'),
                payload,
                getattr(hashlib, algorithm)
            ).hexdigest()
            
            # 安全比较签名
            return hmac.compare_digest(signature, expected_signature)
            
        except Exception as e:
            logger.error(f"Error verifying HMAC signature: {e}")
            return False
    
    async def get_webhook_url(self, base_url: str) -> str:
        """
        获取Webhook URL
        
        参数:
            base_url: 基础URL
            
        返回:
            str: 完整的Webhook URL
        """
        return f"{base_url}/webhooks/{self.PROVIDER_NAME.value}"
    
    async def subscribe_to_events(
        self, 
        events: List[str], 
        webhook_url: str,
        credentials: Any
    ) -> Dict[str, Any]:
        """
        订阅Webhook事件
        
        参数:
            events: 要订阅的事件类型列表
            webhook_url: Webhook URL
            credentials: 认证凭据
            
        返回:
            Dict[str, Any]: 订阅结果
        """
        # 默认实现,子类可以重写
        return {
            "success": True,
            "message": "Webhook subscription not implemented for this provider"
        }
    
    async def unsubscribe_from_events(
        self, 
        subscription_id: str,
        credentials: Any
    ) -> Dict[str, Any]:
        """
        取消订阅Webhook事件
        
        参数:
            subscription_id: 订阅ID
            credentials: 认证凭据
            
        返回:
            Dict[str, Any]: 取消订阅结果
        """
        # 默认实现,子类可以重写
        return {
            "success": True,
            "message": "Webhook unsubscription not implemented for this provider"
        }

5.2 具体Webhook管理器实现

# GitHub Webhook管理器实现

from backend.integrations.webhooks._base import BaseWebhooksManager, WebhookEvent
from backend.integrations.providers import ProviderName

class GitHubWebhookEvent(WebhookEvent):
    """GitHub Webhook事件"""
    
    def __init__(self, event_type: str, data: Dict[str, Any], delivery_id: str):
        super().__init__("github", event_type, data)
        self.delivery_id = delivery_id
        self.repository = data.get("repository", {}).get("full_name")
        self.sender = data.get("sender", {}).get("login")

class GithubWebhooksManager(BaseWebhooksManager[GitHubWebhookEvent]):
    """
    GitHub Webhook管理器
    
    处理GitHub的Webhook事件,包括:
    - push事件
    - pull request事件
    - issue事件
    - release事件
    """
    
    PROVIDER_NAME = ProviderName.GITHUB
    
    async def verify_webhook_signature(
        self, 
        request: Request, 
        payload: bytes
    ) -> bool:
        """
        验证GitHub Webhook签名
        
        GitHub使用HMAC-SHA256签名
        """
        signature_header = request.headers.get("X-Hub-Signature-256")
        if not signature_header:
            return False
        
        # GitHub签名格式: "sha256=<signature>"
        if not signature_header.startswith("sha256="):
            return False
        
        signature = signature_header[7:]  # 移除"sha256="前缀
        
        return self._verify_hmac_signature(
            payload, 
            signature, 
            self.webhook_secret,
            "sha256"
        )
    
    async def parse_webhook_event(
        self, 
        request: Request, 
        payload: Dict[str, Any]
    ) -> Optional[GitHubWebhookEvent]:
        """
        解析GitHub Webhook事件
        """
        event_type = request.headers.get("X-GitHub-Event")
        delivery_id = request.headers.get("X-GitHub-Delivery")
        
        if not event_type or not delivery_id:
            return None
        
        return GitHubWebhookEvent(
            event_type=event_type,
            data=payload,
            delivery_id=delivery_id
        )
    
    async def handle_webhook_event(self, event: GitHubWebhookEvent) -> Dict[str, Any]:
        """
        处理GitHub Webhook事件
        """
        try:
            if event.event_type == "push":
                return await self._handle_push_event(event)
            elif event.event_type == "pull_request":
                return await self._handle_pull_request_event(event)
            elif event.event_type == "issues":
                return await self._handle_issues_event(event)
            elif event.event_type == "release":
                return await self._handle_release_event(event)
            else:
                logger.info(f"Unhandled GitHub event type: {event.event_type}")
                return {"success": True, "message": "Event received but not processed"}
                
        except Exception as e:
            logger.error(f"Error handling GitHub webhook event: {e}")
            return {"success": False, "error": str(e)}
    
    async def _handle_push_event(self, event: GitHubWebhookEvent) -> Dict[str, Any]:
        """处理push事件"""
        data = event.data
        repository = data.get("repository", {})
        commits = data.get("commits", [])
        
        logger.info(
            f"Push event: {len(commits)} commits to {repository.get('full_name')} "
            f"on branch {data.get('ref', '').replace('refs/heads/', '')}"
        )
        
        # 这里可以触发相关的自动化流程
        # 例如:触发CI/CD、更新项目状态等
        
        return {
            "success": True,
            "event_type": "push",
            "repository": repository.get("full_name"),
            "commits_count": len(commits),
            "branch": data.get("ref", "").replace("refs/heads/", "")
        }
    
    async def _handle_pull_request_event(self, event: GitHubWebhookEvent) -> Dict[str, Any]:
        """处理pull request事件"""
        data = event.data
        action = data.get("action")
        pull_request = data.get("pull_request", {})
        
        logger.info(
            f"Pull request {action}: #{pull_request.get('number')} "
            f"in {event.repository}"
        )
        
        return {
            "success": True,
            "event_type": "pull_request",
            "action": action,
            "pr_number": pull_request.get("number"),
            "repository": event.repository
        }
    
    async def _handle_issues_event(self, event: GitHubWebhookEvent) -> Dict[str, Any]:
        """处理issues事件"""
        data = event.data
        action = data.get("action")
        issue = data.get("issue", {})
        
        logger.info(
            f"Issue {action}: #{issue.get('number')} "
            f"in {event.repository}"
        )
        
        return {
            "success": True,
            "event_type": "issues",
            "action": action,
            "issue_number": issue.get("number"),
            "repository": event.repository
        }
    
    async def _handle_release_event(self, event: GitHubWebhookEvent) -> Dict[str, Any]:
        """处理release事件"""
        data = event.data
        action = data.get("action")
        release = data.get("release", {})
        
        logger.info(
            f"Release {action}: {release.get('tag_name')} "
            f"in {event.repository}"
        )
        
        return {
            "success": True,
            "event_type": "release",
            "action": action,
            "tag_name": release.get("tag_name"),
            "repository": event.repository
        }

# Slack Webhook管理器实现(示例)

class SlackWebhookEvent(WebhookEvent):
    """Slack Webhook事件"""
    
    def __init__(self, event_type: str, data: Dict[str, Any]):
        super().__init__("slack", event_type, data)
        self.team_id = data.get("team_id")
        self.user_id = data.get("event", {}).get("user")
        self.channel_id = data.get("event", {}).get("channel")

class SlackWebhooksManager(BaseWebhooksManager[SlackWebhookEvent]):
    """
    Slack Webhook管理器
    
    处理Slack的事件API Webhook
    """
    
    PROVIDER_NAME = ProviderName.SLACK  # 假设已添加到ProviderName
    
    async def verify_webhook_signature(
        self, 
        request: Request, 
        payload: bytes
    ) -> bool:
        """
        验证Slack Webhook签名
        
        Slack使用时间戳和签名验证
        """
        timestamp = request.headers.get("X-Slack-Request-Timestamp")
        signature = request.headers.get("X-Slack-Signature")
        
        if not timestamp or not signature:
            return False
        
        # 检查时间戳是否在5分钟内
        import time
        if abs(time.time() - int(timestamp)) > 60 * 5:
            return False
        
        # 构建签名字符串
        sig_basestring = f"v0:{timestamp}:{payload.decode('utf-8')}"
        
        # 计算期望签名
        expected_signature = f"v0={hmac.new(self.webhook_secret.encode(), sig_basestring.encode(), hashlib.sha256).hexdigest()}"
        
        return hmac.compare_digest(signature, expected_signature)
    
    async def parse_webhook_event(
        self, 
        request: Request, 
        payload: Dict[str, Any]
    ) -> Optional[SlackWebhookEvent]:
        """
        解析Slack Webhook事件
        """
        # Slack URL验证
        if payload.get("type") == "url_verification":
            return SlackWebhookEvent("url_verification", payload)
        
        # 事件回调
        if payload.get("type") == "event_callback":
            event_data = payload.get("event", {})
            event_type = event_data.get("type")
            
            if event_type:
                return SlackWebhookEvent(event_type, payload)
        
        return None
    
    async def handle_webhook_event(self, event: SlackWebhookEvent) -> Dict[str, Any]:
        """
        处理Slack Webhook事件
        """
        if event.event_type == "url_verification":
            # URL验证响应
            return {"challenge": event.data.get("challenge")}
        
        elif event.event_type == "message":
            return await self._handle_message_event(event)
        
        elif event.event_type == "app_mention":
            return await self._handle_mention_event(event)
        
        else:
            logger.info(f"Unhandled Slack event type: {event.event_type}")
            return {"success": True}
    
    async def _handle_message_event(self, event: SlackWebhookEvent) -> Dict[str, Any]:
        """处理消息事件"""
        event_data = event.data.get("event", {})
        text = event_data.get("text", "")
        user = event_data.get("user")
        channel = event_data.get("channel")
        
        logger.info(f"Slack message from {user} in {channel}: {text[:50]}...")
        
        return {"success": True, "event_type": "message"}
    
    async def _handle_mention_event(self, event: SlackWebhookEvent) -> Dict[str, Any]:
        """处理@提及事件"""
        event_data = event.data.get("event", {})
        text = event_data.get("text", "")
        user = event_data.get("user")
        channel = event_data.get("channel")
        
        logger.info(f"Slack mention from {user} in {channel}: {text[:50]}...")
        
        # 这里可以触发自动回复或其他操作
        
        return {"success": True, "event_type": "app_mention"}

6. 集成服务API端点

6.1 OAuth认证端点

# OAuth认证相关的API端点

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import RedirectResponse
from typing import List, Optional

from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.providers import ProviderName
from autogpt_libs.auth.dependencies import requires_user
from backend.data.model import User

router = APIRouter(prefix="/integrations", tags=["integrations"])

@router.get("/oauth/authorize")
async def initiate_oauth_flow(
    provider: ProviderName,
    redirect_uri: str = Query(..., description="OAuth重定向URI"),
    scopes: Optional[str] = Query(None, description="权限范围,空格分隔"),
    user: User = Depends(requires_user),
    creds_manager: IntegrationCredentialsManager = Depends()
):
    """
    启动OAuth认证流程
    
    参数:
        provider: 服务提供商
        redirect_uri: 重定向URI
        scopes: 权限范围
        user: 当前用户
        
    返回:
        重定向到OAuth授权页面
    """
    try:
        # 解析权限范围
        scope_list = scopes.split() if scopes else None
        
        # 启动OAuth流程
        auth_url, state = await creds_manager.initiate_oauth_flow(
            user_id=user.id,
            provider=provider,
            redirect_uri=redirect_uri,
            scopes=scope_list
        )
        
        # 重定向到授权页面
        return RedirectResponse(url=auth_url)
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.get("/oauth/callback")
async def oauth_callback(
    code: str = Query(..., description="授权码"),
    state: str = Query(..., description="状态参数"),
    error: Optional[str] = Query(None, description="错误信息"),
    user: User = Depends(requires_user),
    creds_manager: IntegrationCredentialsManager = Depends()
):
    """
    处理OAuth回调
    
    参数:
        code: 授权码
        state: 状态参数
        error: 错误信息(如果有)
        user: 当前用户
        
    返回:
        认证结果
    """
    if error:
        raise HTTPException(
            status_code=400, 
            detail=f"OAuth authorization failed: {error}"
        )
    
    try:
        # 完成OAuth流程
        credentials = await creds_manager.complete_oauth_flow(
            user_id=user.id,
            code=code,
            state=state
        )
        
        return {
            "success": True,
            "message": "OAuth authentication completed successfully",
            "credentials_id": credentials.id,
            "provider": credentials.provider,
        }
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.post("/api-key")
async def add_api_key_credentials(
    provider: ProviderName,
    api_key: str,
    title: Optional[str] = None,
    user: User = Depends(requires_user),
    creds_manager: IntegrationCredentialsManager = Depends()
):
    """
    添加API密钥凭据
    
    参数:
        provider: 服务提供商
        api_key: API密钥
        title: 凭据标题
        user: 当前用户
        
    返回:
        创建的凭据信息
    """
    try:
        credentials = await creds_manager.add_api_key_credentials(
            user_id=user.id,
            provider=provider,
            api_key=api_key,
            title=title
        )
        
        return {
            "success": True,
            "message": "API key credentials added successfully",
            "credentials_id": credentials.id,
            "provider": credentials.provider,
            "title": credentials.title,
        }
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.get("/credentials")
async def get_user_credentials(
    provider: Optional[ProviderName] = Query(None, description="过滤特定提供商"),
    user: User = Depends(requires_user),
    creds_manager: IntegrationCredentialsManager = Depends()
):
    """
    获取用户凭据列表
    
    参数:
        provider: 可选的提供商过滤
        user: 当前用户
        
    返回:
        凭据列表
    """
    try:
        credentials = await creds_manager.get_user_credentials(
            user_id=user.id,
            provider=provider
        )
        
        # 脱敏处理,不返回敏感信息
        safe_credentials = []
        for cred in credentials:
            safe_cred = {
                "id": cred.id,
                "provider": cred.provider,
                "title": getattr(cred, 'title', ''),
                "created_at": cred.created_at,
                "expires_at": getattr(cred, 'expires_at', None),
                "is_valid": getattr(cred, 'is_valid', True),
                "type": "oauth" if hasattr(cred, 'access_token') else "api_key"
            }
            
            # 添加OAuth特定信息
            if hasattr(cred, 'scopes'):
                safe_cred["scopes"] = cred.scopes
            if hasattr(cred, 'username'):
                safe_cred["username"] = cred.username
            
            safe_credentials.append(safe_cred)
        
        return {
            "success": True,
            "credentials": safe_credentials,
            "total": len(safe_credentials)
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.delete("/credentials/{credentials_id}")
async def delete_credentials(
    credentials_id: str,
    user: User = Depends(requires_user),
    creds_manager: IntegrationCredentialsManager = Depends()
):
    """
    删除凭据
    
    参数:
        credentials_id: 凭据ID
        user: 当前用户
        
    返回:
        删除结果
    """
    try:
        success = await creds_manager.delete_credentials(
            user_id=user.id,
            credentials_id=credentials_id
        )
        
        if success:
            return {
                "success": True,
                "message": "Credentials deleted successfully"
            }
        else:
            raise HTTPException(status_code=404, detail="Credentials not found")
            
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/credentials/{credentials_id}/refresh")
async def refresh_oauth_credentials(
    credentials_id: str,
    user: User = Depends(requires_user),
    creds_manager: IntegrationCredentialsManager = Depends()
):
    """
    刷新OAuth凭据
    
    参数:
        credentials_id: 凭据ID
        user: 当前用户
        
    返回:
        刷新结果
    """
    try:
        credentials = await creds_manager.refresh_oauth_credentials(
            user_id=user.id,
            credentials_id=credentials_id
        )
        
        if credentials:
            return {
                "success": True,
                "message": "OAuth credentials refreshed successfully",
                "expires_at": credentials.access_token_expires_at
            }
        else:
            raise HTTPException(
                status_code=404, 
                detail="Credentials not found or not OAuth credentials"
            )
            
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))