VoiceHelper第三方集成与扩展

本文档详细介绍VoiceHelper智能语音助手系统的第三方服务集成方案,涵盖豆包大模型、OpenAI、微信生态、云存储等外部服务的集成实现。

7. 第三方集成与扩展

7.1 豆包大模型集成

7.1.0 第三方集成架构总览

graph TB
    subgraph "VoiceHelper核心系统"
        Core[核心服务层]
        API[API网关]
        Auth[认证授权]
    end
    
    subgraph "AI模型服务"
        Doubao[豆包大模型<br/>ep-20241201140014-vbzjz]
        OpenAI[OpenAI GPT<br/>gpt-4/gpt-3.5-turbo]
        Claude[Claude-3<br/>claude-3-sonnet]
    end
    
    subgraph "微信生态"
        WxMP[微信小程序]
        WxPay[微信支付]
        WxLogin[微信登录]
    end
    
    subgraph "云存储服务"
        AliyunOSS[阿里云OSS]
        TencentCOS[腾讯云COS]
        MinIO[MinIO对象存储]
    end
    
    subgraph "监控告警"
        DingTalk[钉钉机器人]
        Email[邮件服务]
        SMS[短信服务]
    end
    
    Core --> Doubao
    Core --> OpenAI
    Core --> Claude
    
    API --> WxMP
    Auth --> WxLogin
    Core --> WxPay
    
    Core --> AliyunOSS
    Core --> TencentCOS
    Core --> MinIO
    
    Core --> DingTalk
    Core --> Email
    Core --> SMS
    
    style Core fill:#e1f5fe
    style Doubao fill:#f3e5f5
    style WxMP fill:#fff3e0
    style AliyunOSS fill:#e8f5e8

7.1.1 豆包API客户端实现

# 豆包大模型API客户端
# 文件路径: algo/clients/ark_client.py
import asyncio
import json
import aiohttp
from typing import Dict, Any, AsyncGenerator, Optional, List
import logging
from dataclasses import dataclass

@dataclass
class ArkConfig:
    """豆包API配置"""
    api_key: str
    base_url: str
    model_id: str
    timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 1.0

class ArkClient:
    """豆包大模型API客户端"""
    
    def __init__(self, config: ArkConfig):
        self.config = config
        self.session = None
        self.logger = logging.getLogger(__name__)
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.config.timeout),
            headers={
                "Authorization": f"Bearer {self.config.api_key}",
                "Content-Type": "application/json"
            }
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器退出"""
        if self.session:
            await self.session.close()
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2000,
        stream: bool = False,
        **kwargs
    ) -> Dict[str, Any]:
        """聊天完成API调用"""
        
        payload = {
            "model": self.config.model_id,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens,
            "stream": stream,
            **kwargs
        }
        
        url = f"{self.config.base_url}/chat/completions"
        
        for attempt in range(self.config.max_retries):
            try:
                async with self.session.post(url, json=payload) as response:
                    if response.status == 200:
                        if stream:
                            return self._handle_stream_response(response)
                        else:
                            return await response.json()
                    else:
                        error_text = await response.text()
                        self.logger.error(f"API error {response.status}: {error_text}")
                        
                        if response.status == 429:  # 限流
                            await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
                            continue
                        elif response.status >= 500:  # 服务器错误
                            await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
                            continue
                        else:
                            raise Exception(f"API error {response.status}: {error_text}")
                            
            except asyncio.TimeoutError:
                self.logger.warning(f"Request timeout, attempt {attempt + 1}")
                if attempt < self.config.max_retries - 1:
                    await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
                    continue
                raise
            except Exception as e:
                self.logger.error(f"Request failed: {e}")
                if attempt < self.config.max_retries - 1:
                    await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
                    continue
                raise
        
        raise Exception(f"Max retries ({self.config.max_retries}) exceeded")
    
    async def _handle_stream_response(self, response) -> AsyncGenerator[Dict[str, Any], None]:
        """处理流式响应"""
        async for line in response.content:
            line = line.decode('utf-8').strip()
            
            if line.startswith('data: '):
                data = line[6:]  # 移除 'data: ' 前缀
                
                if data == '[DONE]':
                    break
                
                try:
                    chunk = json.loads(data)
                    yield chunk
                except json.JSONDecodeError:
                    continue
    
    async def embeddings(
        self,
        texts: List[str],
        model: Optional[str] = None
    ) -> Dict[str, Any]:
        """文本嵌入API调用"""
        
        payload = {
            "model": model or "text-embedding-v1",
            "input": texts
        }
        
        url = f"{self.config.base_url}/embeddings"
        
        async with self.session.post(url, json=payload) as response:
            if response.status == 200:
                return await response.json()
            else:
                error_text = await response.text()
                raise Exception(f"Embeddings API error {response.status}: {error_text}")
    
    async def function_call(
        self,
        messages: List[Dict[str, str]],
        functions: List[Dict[str, Any]],
        function_call: Optional[str] = "auto"
    ) -> Dict[str, Any]:
        """函数调用API"""
        
        payload = {
            "model": self.config.model_id,
            "messages": messages,
            "functions": functions,
            "function_call": function_call
        }
        
        url = f"{self.config.base_url}/chat/completions"
        
        async with self.session.post(url, json=payload) as response:
            if response.status == 200:
                return await response.json()
            else:
                error_text = await response.text()
                raise Exception(f"Function call API error {response.status}: {error_text}")

# 豆包客户端工厂
class ArkClientFactory:
    """豆包客户端工厂"""
    
    _instances = {}
    
    @classmethod
    def get_client(cls, config: ArkConfig) -> ArkClient:
        """获取客户端实例(单例模式)"""
        key = f"{config.base_url}_{config.model_id}"
        
        if key not in cls._instances:
            cls._instances[key] = ArkClient(config)
        
        return cls._instances[key]
    
    @classmethod
    async def close_all(cls):
        """关闭所有客户端"""
        for client in cls._instances.values():
            if client.session:
                await client.session.close()
        cls._instances.clear()

7.1.2 豆包模型路由器

# 豆包模型路由器
# 文件路径: algo/router/ark_router.py
from typing import Dict, Any, List, Optional
import asyncio
from dataclasses import dataclass
from enum import Enum

class ModelCapability(Enum):
    """模型能力枚举"""
    CHAT = "chat"
    EMBEDDING = "embedding"
    FUNCTION_CALL = "function_call"
    CODE_GENERATION = "code_generation"
    MULTIMODAL = "multimodal"

@dataclass
class ModelInfo:
    """模型信息"""
    model_id: str
    name: str
    capabilities: List[ModelCapability]
    max_tokens: int
    cost_per_1k_tokens: float
    latency_ms: int
    quality_score: float

class ArkModelRouter:
    """豆包模型路由器"""
    
    def __init__(self):
        self.models = self._initialize_models()
        self.performance_stats = {}
    
    def _initialize_models(self) -> Dict[str, ModelInfo]:
        """初始化模型信息"""
        return {
            "doubao-pro": ModelInfo(
                model_id="ep-20241201140014-vbzjz",
                name="豆包Pro",
                capabilities=[
                    ModelCapability.CHAT,
                    ModelCapability.FUNCTION_CALL,
                    ModelCapability.CODE_GENERATION
                ],
                max_tokens=4096,
                cost_per_1k_tokens=0.1,
                latency_ms=300,
                quality_score=0.92
            ),
            "doubao-lite": ModelInfo(
                model_id="ep-20241201140014-lite",
                name="豆包Lite",
                capabilities=[ModelCapability.CHAT],
                max_tokens=2048,
                cost_per_1k_tokens=0.05,
                latency_ms=150,
                quality_score=0.85
            ),
            "doubao-embedding": ModelInfo(
                model_id="text-embedding-v1",
                name="豆包嵌入模型",
                capabilities=[ModelCapability.EMBEDDING],
                max_tokens=512,
                cost_per_1k_tokens=0.02,
                latency_ms=100,
                quality_score=0.90
            )
        }
    
    def select_model(
        self,
        capability: ModelCapability,
        priority: str = "balanced",  # balanced, cost, speed, quality
        max_tokens: Optional[int] = None
    ) -> Optional[ModelInfo]:
        """选择最适合的模型"""
        
        # 过滤具备所需能力的模型
        candidates = [
            model for model in self.models.values()
            if capability in model.capabilities
        ]
        
        if not candidates:
            return None
        
        # 根据token限制过滤
        if max_tokens:
            candidates = [
                model for model in candidates
                if model.max_tokens >= max_tokens
            ]
        
        if not candidates:
            return None
        
        # 根据优先级选择
        if priority == "cost":
            return min(candidates, key=lambda m: m.cost_per_1k_tokens)
        elif priority == "speed":
            return min(candidates, key=lambda m: m.latency_ms)
        elif priority == "quality":
            return max(candidates, key=lambda m: m.quality_score)
        else:  # balanced
            # 综合评分:质量40% + 速度30% + 成本30%
            def score(model):
                speed_score = 1.0 - (model.latency_ms / 1000)  # 归一化速度分数
                cost_score = 1.0 - (model.cost_per_1k_tokens / 1.0)  # 归一化成本分数
                return (
                    0.4 * model.quality_score +
                    0.3 * max(0, speed_score) +
                    0.3 * max(0, cost_score)
                )
            
            return max(candidates, key=score)
    
    def get_model_by_id(self, model_id: str) -> Optional[ModelInfo]:
        """根据ID获取模型信息"""
        for model in self.models.values():
            if model.model_id == model_id:
                return model
        return None
    
    async def update_performance_stats(
        self,
        model_id: str,
        latency_ms: float,
        success: bool,
        tokens_used: int
    ):
        """更新模型性能统计"""
        if model_id not in self.performance_stats:
            self.performance_stats[model_id] = {
                "avg_latency": latency_ms,
                "success_rate": 1.0 if success else 0.0,
                "total_requests": 1,
                "total_tokens": tokens_used
            }
        else:
            stats = self.performance_stats[model_id]
            total_requests = stats["total_requests"]
            
            # 更新平均延迟
            stats["avg_latency"] = (
                (stats["avg_latency"] * total_requests + latency_ms) /
                (total_requests + 1)
            )
            
            # 更新成功率
            stats["success_rate"] = (
                (stats["success_rate"] * total_requests + (1.0 if success else 0.0)) /
                (total_requests + 1)
            )
            
            stats["total_requests"] += 1
            stats["total_tokens"] += tokens_used

7.2 OpenAI集成

7.2.0 多模型路由架构图

sequenceDiagram
    participant Client as 客户端
    participant Router as 统一路由器
    participant ArkClient as 豆包客户端
    participant OpenAIClient as OpenAI客户端
    participant ClaudeClient as Claude客户端
    participant Monitor as 性能监控

    Client->>Router: 发送请求
    Router->>Router: 分析请求类型和优先级
    
    alt 成本优先策略
        Router->>ArkClient: 路由到豆包模型
        ArkClient->>ArkClient: 调用豆包API
        ArkClient-->>Router: 返回响应
    else 质量优先策略
        Router->>OpenAIClient: 路由到GPT-4
        OpenAIClient->>OpenAIClient: 调用OpenAI API
        OpenAIClient-->>Router: 返回响应
    else 故障转移
        Router->>ArkClient: 尝试豆包模型
        ArkClient-->>Router: 返回错误
        Router->>OpenAIClient: 故障转移到OpenAI
        OpenAIClient-->>Router: 返回响应
    end
    
    Router->>Monitor: 记录性能指标
    Router-->>Client: 返回最终响应

7.2.1 OpenAI客户端实现

# OpenAI API客户端
# 文件路径: algo/clients/openai_client.py
import openai
from openai import AsyncOpenAI
from typing import Dict, Any, List, Optional, AsyncGenerator
import asyncio
import logging
from dataclasses import dataclass

@dataclass
class OpenAIConfig:
    """OpenAI配置"""
    api_key: str
    base_url: Optional[str] = None
    organization: Optional[str] = None
    timeout: int = 30
    max_retries: int = 3

class OpenAIClient:
    """OpenAI API客户端"""
    
    def __init__(self, config: OpenAIConfig):
        self.config = config
        self.client = AsyncOpenAI(
            api_key=config.api_key,
            base_url=config.base_url,
            organization=config.organization,
            timeout=config.timeout,
            max_retries=config.max_retries
        )
        self.logger = logging.getLogger(__name__)
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-3.5-turbo",
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        stream: bool = False,
        **kwargs
    ) -> Dict[str, Any]:
        """聊天完成"""
        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                stream=stream,
                **kwargs
            )
            
            if stream:
                return self._handle_stream_response(response)
            else:
                return response.model_dump()
                
        except Exception as e:
            self.logger.error(f"OpenAI chat completion error: {e}")
            raise
    
    async def _handle_stream_response(self, response) -> AsyncGenerator[Dict[str, Any], None]:
        """处理流式响应"""
        async for chunk in response:
            yield chunk.model_dump()
    
    async def embeddings(
        self,
        texts: List[str],
        model: str = "text-embedding-3-large"
    ) -> Dict[str, Any]:
        """文本嵌入"""
        try:
            response = await self.client.embeddings.create(
                model=model,
                input=texts
            )
            return response.model_dump()
            
        except Exception as e:
            self.logger.error(f"OpenAI embeddings error: {e}")
            raise
    
    async def image_generation(
        self,
        prompt: str,
        model: str = "dall-e-3",
        size: str = "1024x1024",
        quality: str = "standard",
        n: int = 1
    ) -> Dict[str, Any]:
        """图像生成"""
        try:
            response = await self.client.images.generate(
                model=model,
                prompt=prompt,
                size=size,
                quality=quality,
                n=n
            )
            return response.model_dump()
            
        except Exception as e:
            self.logger.error(f"OpenAI image generation error: {e}")
            raise
    
    async def speech_to_text(
        self,
        audio_file: bytes,
        model: str = "whisper-1",
        language: Optional[str] = None
    ) -> Dict[str, Any]:
        """语音转文字"""
        try:
            response = await self.client.audio.transcriptions.create(
                model=model,
                file=audio_file,
                language=language
            )
            return response.model_dump()
            
        except Exception as e:
            self.logger.error(f"OpenAI speech to text error: {e}")
            raise
    
    async def text_to_speech(
        self,
        text: str,
        model: str = "tts-1",
        voice: str = "alloy",
        response_format: str = "mp3"
    ) -> bytes:
        """文字转语音"""
        try:
            response = await self.client.audio.speech.create(
                model=model,
                voice=voice,
                input=text,
                response_format=response_format
            )
            return response.content
            
        except Exception as e:
            self.logger.error(f"OpenAI text to speech error: {e}")
            raise

7.2.2 多模型统一接口

# 多模型统一接口
# 文件路径: algo/clients/unified_client.py
from typing import Dict, Any, List, Optional, AsyncGenerator, Union
from enum import Enum
import asyncio
from .ark_client import ArkClient, ArkConfig
from .openai_client import OpenAIClient, OpenAIConfig

class ModelProvider(Enum):
    """模型提供商"""
    ARK = "ark"
    OPENAI = "openai"
    CLAUDE = "claude"

class UnifiedLLMClient:
    """统一大模型客户端"""
    
    def __init__(self):
        self.clients = {}
        self.default_provider = ModelProvider.ARK
    
    def add_ark_client(self, config: ArkConfig, name: str = "default"):
        """添加豆包客户端"""
        self.clients[f"ark_{name}"] = ArkClient(config)
    
    def add_openai_client(self, config: OpenAIConfig, name: str = "default"):
        """添加OpenAI客户端"""
        self.clients[f"openai_{name}"] = OpenAIClient(config)
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        provider: Optional[ModelProvider] = None,
        model: Optional[str] = None,
        **kwargs
    ) -> Union[Dict[str, Any], AsyncGenerator[Dict[str, Any], None]]:
        """统一聊天完成接口"""
        
        provider = provider or self.default_provider
        client_key = f"{provider.value}_default"
        
        if client_key not in self.clients:
            raise ValueError(f"Client for provider {provider.value} not found")
        
        client = self.clients[client_key]
        
        if provider == ModelProvider.ARK:
            return await client.chat_completion(messages, **kwargs)
        elif provider == ModelProvider.OPENAI:
            model = model or "gpt-3.5-turbo"
            return await client.chat_completion(messages, model=model, **kwargs)
        else:
            raise ValueError(f"Unsupported provider: {provider.value}")
    
    async def embeddings(
        self,
        texts: List[str],
        provider: Optional[ModelProvider] = None,
        model: Optional[str] = None
    ) -> Dict[str, Any]:
        """统一嵌入接口"""
        
        provider = provider or self.default_provider
        client_key = f"{provider.value}_default"
        
        if client_key not in self.clients:
            raise ValueError(f"Client for provider {provider.value} not found")
        
        client = self.clients[client_key]
        
        if provider == ModelProvider.ARK:
            return await client.embeddings(texts, model)
        elif provider == ModelProvider.OPENAI:
            model = model or "text-embedding-3-large"
            return await client.embeddings(texts, model)
        else:
            raise ValueError(f"Unsupported provider: {provider.value}")
    
    async def fallback_chat_completion(
        self,
        messages: List[Dict[str, str]],
        providers: List[ModelProvider],
        **kwargs
    ) -> Dict[str, Any]:
        """故障转移聊天完成"""
        
        last_error = None
        
        for provider in providers:
            try:
                return await self.chat_completion(
                    messages, 
                    provider=provider, 
                    **kwargs
                )
            except Exception as e:
                last_error = e
                continue
        
        raise Exception(f"All providers failed. Last error: {last_error}")

7.3 微信生态集成

7.3.0 微信生态集成架构图

graph TB
    subgraph "微信生态"
        WxUser[微信用户]
        WxMP[微信小程序]
        WxPay[微信支付]
        WxLogin[微信登录]
        WxShare[微信分享]
    end
    
    subgraph "VoiceHelper后端"
        Gateway[API网关]
        AuthService[认证服务]
        PaymentService[支付服务]
        UserService[用户服务]
        ChatService[对话服务]
    end
    
    subgraph "微信服务器"
        WxServer[微信服务器]
        WxPayServer[微信支付服务器]
    end
    
    WxUser --> WxMP
    WxMP --> Gateway
    
    WxMP --> WxLogin
    WxLogin --> AuthService
    AuthService --> WxServer
    
    WxMP --> WxPay
    WxPay --> PaymentService
    PaymentService --> WxPayServer
    
    WxMP --> WxShare
    WxShare --> UserService
    
    Gateway --> ChatService
    Gateway --> UserService
    
    style WxMP fill:#e1f5fe
    style Gateway fill:#f3e5f5
    style WxServer fill:#fff3e0

7.3.1 微信小程序API集成

// 微信小程序API集成
// 文件路径: miniprogram/utils/api.js

class WeChatAPI {
  constructor(config) {
    this.baseURL = config.baseURL
    this.timeout = config.timeout || 10000
    this.token = wx.getStorageSync('token') || ''
  }

  // 统一请求方法
  async request(options) {
    const { url, method = 'GET', data = {}, header = {} } = options
    
    return new Promise((resolve, reject) => {
      wx.request({
        url: `${this.baseURL}${url}`,
        method,
        data,
        header: {
          'Content-Type': 'application/json',
          'Authorization': this.token ? `Bearer ${this.token}` : '',
          ...header
        },
        timeout: this.timeout,
        success: (res) => {
          if (res.statusCode === 200) {
            resolve(res.data)
          } else if (res.statusCode === 401) {
            // Token过期,重新登录
            this.reLogin()
            reject(new Error('登录已过期'))
          } else {
            reject(new Error(`请求失败: ${res.statusCode}`))
          }
        },
        fail: (err) => {
          reject(new Error(`网络错误: ${err.errMsg}`))
        }
      })
    })
  }

  // 微信登录
  async wxLogin() {
    return new Promise((resolve, reject) => {
      wx.login({
        success: async (res) => {
          if (res.code) {
            try {
              const result = await this.request({
                url: '/api/auth/wx-login',
                method: 'POST',
                data: { code: res.code }
              })
              
              this.token = result.token
              wx.setStorageSync('token', result.token)
              wx.setStorageSync('userInfo', result.userInfo)
              
              resolve(result)
            } catch (error) {
              reject(error)
            }
          } else {
            reject(new Error('获取登录凭证失败'))
          }
        },
        fail: reject
      })
    })
  }

  // 获取用户信息
  async getUserProfile() {
    return new Promise((resolve, reject) => {
      wx.getUserProfile({
        desc: '用于完善用户资料',
        success: (res) => {
          resolve(res.userInfo)
        },
        fail: reject
      })
    })
  }

  // 发送对话消息
  async sendMessage(sessionId, content, contentType = 'text') {
    return this.request({
      url: '/api/chat/message',
      method: 'POST',
      data: {
        sessionId,
        content,
        contentType
      }
    })
  }

  // 上传文件
  async uploadFile(filePath, fileName) {
    return new Promise((resolve, reject) => {
      wx.uploadFile({
        url: `${this.baseURL}/api/files/upload`,
        filePath,
        name: 'file',
        formData: {
          fileName
        },
        header: {
          'Authorization': this.token ? `Bearer ${this.token}` : ''
        },
        success: (res) => {
          if (res.statusCode === 200) {
            resolve(JSON.parse(res.data))
          } else {
            reject(new Error(`上传失败: ${res.statusCode}`))
          }
        },
        fail: reject
      })
    })
  }

  // 录音功能
  startRecord() {
    const recorderManager = wx.getRecorderManager()
    
    recorderManager.start({
      duration: 60000, // 最长60秒
      sampleRate: 16000,
      numberOfChannels: 1,
      encodeBitRate: 48000,
      format: 'mp3'
    })
    
    return recorderManager
  }

  // 语音转文字
  async speechToText(audioPath) {
    return this.request({
      url: '/api/voice/transcribe',
      method: 'POST',
      data: {
        audioPath
      }
    })
  }

  // 重新登录
  async reLogin() {
    try {
      await this.wxLogin()
    } catch (error) {
      wx.showToast({
        title: '登录失败',
        icon: 'error'
      })
    }
  }

  // 支付功能
  async wxPay(orderInfo) {
    return new Promise((resolve, reject) => {
      wx.requestPayment({
        ...orderInfo,
        success: resolve,
        fail: reject
      })
    })
  }

  // 分享功能
  async shareToFriends(title, path, imageUrl) {
    return new Promise((resolve, reject) => {
      wx.shareAppMessage({
        title,
        path,
        imageUrl,
        success: resolve,
        fail: reject
      })
    })
  }
}

// 导出API实例
const api = new WeChatAPI({
  baseURL: 'https://api.voicehelper.com'
})

export default api

7.3.2 微信支付集成

// 微信支付集成
// 文件路径: backend/internal/payment/wechat_pay.go
package payment

import (
    "context"
    "crypto/md5"
    "encoding/xml"
    "fmt"
    "io"
    "net/http"
    "sort"
    "strings"
    "time"
)

type WeChatPayConfig struct {
    AppID     string
    MchID     string
    APIKey    string
    NotifyURL string
}

type WeChatPayClient struct {
    config *WeChatPayConfig
    client *http.Client
}

type UnifiedOrderRequest struct {
    AppID          string `xml:"appid"`
    MchID          string `xml:"mch_id"`
    NonceStr       string `xml:"nonce_str"`
    Sign           string `xml:"sign"`
    Body           string `xml:"body"`
    OutTradeNo     string `xml:"out_trade_no"`
    TotalFee       int    `xml:"total_fee"`
    SpbillCreateIP string `xml:"spbill_create_ip"`
    NotifyURL      string `xml:"notify_url"`
    TradeType      string `xml:"trade_type"`
    OpenID         string `xml:"openid,omitempty"`
}

type UnifiedOrderResponse struct {
    ReturnCode string `xml:"return_code"`
    ReturnMsg  string `xml:"return_msg"`
    AppID      string `xml:"appid"`
    MchID      string `xml:"mch_id"`
    NonceStr   string `xml:"nonce_str"`
    Sign       string `xml:"sign"`
    ResultCode string `xml:"result_code"`
    PrepayID   string `xml:"prepay_id"`
    TradeType  string `xml:"trade_type"`
}

func NewWeChatPayClient(config *WeChatPayConfig) *WeChatPayClient {
    return &WeChatPayClient{
        config: config,
        client: &http.Client{
            Timeout: 30 * time.Second,
        },
    }
}

func (w *WeChatPayClient) UnifiedOrder(ctx context.Context, req *UnifiedOrderRequest) (*UnifiedOrderResponse, error) {
    // 设置基本参数
    req.AppID = w.config.AppID
    req.MchID = w.config.MchID
    req.NonceStr = generateNonceStr()
    req.NotifyURL = w.config.NotifyURL
    
    // 生成签名
    req.Sign = w.generateSign(req)
    
    // 构建XML请求
    xmlData, err := xml.Marshal(req)
    if err != nil {
        return nil, fmt.Errorf("marshal request error: %v", err)
    }
    
    // 发送请求
    resp, err := w.client.Post(
        "https://api.mch.weixin.qq.com/pay/unifiedorder",
        "application/xml",
        strings.NewReader(string(xmlData)),
    )
    if err != nil {
        return nil, fmt.Errorf("request error: %v", err)
    }
    defer resp.Body.Close()
    
    // 解析响应
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return nil, fmt.Errorf("read response error: %v", err)
    }
    
    var response UnifiedOrderResponse
    if err := xml.Unmarshal(body, &response); err != nil {
        return nil, fmt.Errorf("unmarshal response error: %v", err)
    }
    
    return &response, nil
}

func (w *WeChatPayClient) generateSign(req *UnifiedOrderRequest) string {
    // 参数排序
    params := map[string]string{
        "appid":            req.AppID,
        "mch_id":           req.MchID,
        "nonce_str":        req.NonceStr,
        "body":             req.Body,
        "out_trade_no":     req.OutTradeNo,
        "total_fee":        fmt.Sprintf("%d", req.TotalFee),
        "spbill_create_ip": req.SpbillCreateIP,
        "notify_url":       req.NotifyURL,
        "trade_type":       req.TradeType,
    }
    
    if req.OpenID != "" {
        params["openid"] = req.OpenID
    }
    
    // 按key排序
    var keys []string
    for k := range params {
        keys = append(keys, k)
    }
    sort.Strings(keys)
    
    // 构建签名字符串
    var signStr strings.Builder
    for i, k := range keys {
        if i > 0 {
            signStr.WriteString("&")
        }
        signStr.WriteString(fmt.Sprintf("%s=%s", k, params[k]))
    }
    signStr.WriteString("&key=" + w.config.APIKey)
    
    // MD5签名
    h := md5.New()
    h.Write([]byte(signStr.String()))
    return strings.ToUpper(fmt.Sprintf("%x", h.Sum(nil)))
}

func generateNonceStr() string {
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

// 支付结果通知处理
func (w *WeChatPayClient) HandleNotify(xmlData []byte) (*PaymentNotification, error) {
    var notify PaymentNotification
    if err := xml.Unmarshal(xmlData, &notify); err != nil {
        return nil, fmt.Errorf("unmarshal notify error: %v", err)
    }
    
    // 验证签名
    if !w.verifyNotifySign(&notify) {
        return nil, fmt.Errorf("invalid signature")
    }
    
    return &notify, nil
}

type PaymentNotification struct {
    ReturnCode    string `xml:"return_code"`
    ReturnMsg     string `xml:"return_msg"`
    AppID         string `xml:"appid"`
    MchID         string `xml:"mch_id"`
    NonceStr      string `xml:"nonce_str"`
    Sign          string `xml:"sign"`
    ResultCode    string `xml:"result_code"`
    OpenID        string `xml:"openid"`
    TradeType     string `xml:"trade_type"`
    BankType      string `xml:"bank_type"`
    TotalFee      int    `xml:"total_fee"`
    TransactionID string `xml:"transaction_id"`
    OutTradeNo    string `xml:"out_trade_no"`
    TimeEnd       string `xml:"time_end"`
}

func (w *WeChatPayClient) verifyNotifySign(notify *PaymentNotification) bool {
    // 构建签名参数
    params := map[string]string{
        "return_code":    notify.ReturnCode,
        "return_msg":     notify.ReturnMsg,
        "appid":          notify.AppID,
        "mch_id":         notify.MchID,
        "nonce_str":      notify.NonceStr,
        "result_code":    notify.ResultCode,
        "openid":         notify.OpenID,
        "trade_type":     notify.TradeType,
        "bank_type":      notify.BankType,
        "total_fee":      fmt.Sprintf("%d", notify.TotalFee),
        "transaction_id": notify.TransactionID,
        "out_trade_no":   notify.OutTradeNo,
        "time_end":       notify.TimeEnd,
    }
    
    // 生成签名
    expectedSign := w.generateSignFromParams(params)
    return expectedSign == notify.Sign
}

func (w *WeChatPayClient) generateSignFromParams(params map[string]string) string {
    // 按key排序
    var keys []string
    for k := range params {
        if params[k] != "" {
            keys = append(keys, k)
        }
    }
    sort.Strings(keys)
    
    // 构建签名字符串
    var signStr strings.Builder
    for i, k := range keys {
        if i > 0 {
            signStr.WriteString("&")
        }
        signStr.WriteString(fmt.Sprintf("%s=%s", k, params[k]))
    }
    signStr.WriteString("&key=" + w.config.APIKey)
    
    // MD5签名
    h := md5.New()
    h.Write([]byte(signStr.String()))
    return strings.ToUpper(fmt.Sprintf("%x", h.Sum(nil)))
}

7.4 云存储服务集成

7.4.1 阿里云OSS集成

// 阿里云OSS集成
// 文件路径: backend/internal/storage/aliyun_oss.go
package storage

import (
    "bytes"
    "fmt"
    "io"
    "path/filepath"
    "time"
    
    "github.com/aliyun/aliyun-oss-go-sdk/oss"
)

type AliyunOSSConfig struct {
    Endpoint        string
    AccessKeyID     string
    AccessKeySecret string
    BucketName      string
    Region          string
}

type AliyunOSSClient struct {
    client *oss.Client
    bucket *oss.Bucket
    config *AliyunOSSConfig
}

func NewAliyunOSSClient(config *AliyunOSSConfig) (*AliyunOSSClient, error) {
    client, err := oss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret)
    if err != nil {
        return nil, fmt.Errorf("create OSS client error: %v", err)
    }
    
    bucket, err := client.Bucket(config.BucketName)
    if err != nil {
        return nil, fmt.Errorf("get bucket error: %v", err)
    }
    
    return &AliyunOSSClient{
        client: client,
        bucket: bucket,
        config: config,
    }, nil
}

func (a *AliyunOSSClient) UploadFile(objectKey string, data []byte, contentType string) error {
    reader := bytes.NewReader(data)
    
    options := []oss.Option{
        oss.ContentType(contentType),
        oss.Meta("upload-time", time.Now().Format(time.RFC3339)),
    }
    
    err := a.bucket.PutObject(objectKey, reader, options...)
    if err != nil {
        return fmt.Errorf("upload file error: %v", err)
    }
    
    return nil
}

func (a *AliyunOSSClient) DownloadFile(objectKey string) ([]byte, error) {
    body, err := a.bucket.GetObject(objectKey)
    if err != nil {
        return nil, fmt.Errorf("download file error: %v", err)
    }
    defer body.Close()
    
    data, err := io.ReadAll(body)
    if err != nil {
        return nil, fmt.Errorf("read file error: %v", err)
    }
    
    return data, nil
}

func (a *AliyunOSSClient) DeleteFile(objectKey string) error {
    err := a.bucket.DeleteObject(objectKey)
    if err != nil {
        return fmt.Errorf("delete file error: %v", err)
    }
    
    return nil
}

func (a *AliyunOSSClient) GetFileURL(objectKey string, expireTime time.Duration) (string, error) {
    signedURL, err := a.bucket.SignURL(objectKey, oss.HTTPGet, int64(expireTime.Seconds()))
    if err != nil {
        return "", fmt.Errorf("generate signed URL error: %v", err)
    }
    
    return signedURL, nil
}

func (a *AliyunOSSClient) ListFiles(prefix string, maxKeys int) ([]oss.ObjectProperties, error) {
    lsRes, err := a.bucket.ListObjects(oss.Prefix(prefix), oss.MaxKeys(maxKeys))
    if err != nil {
        return nil, fmt.Errorf("list files error: %v", err)
    }
    
    return lsRes.Objects, nil
}

// 分片上传大文件
func (a *AliyunOSSClient) MultipartUpload(objectKey string, filePath string) error {
    chunks, err := oss.SplitFileByPartNum(filePath, 3)
    if err != nil {
        return fmt.Errorf("split file error: %v", err)
    }
    
    fd, err := os.Open(filePath)
    if err != nil {
        return fmt.Errorf("open file error: %v", err)
    }
    defer fd.Close()
    
    // 初始化分片上传
    imur, err := a.bucket.InitiateMultipartUpload(objectKey)
    if err != nil {
        return fmt.Errorf("initiate multipart upload error: %v", err)
    }
    
    var parts []oss.UploadPart
    for _, chunk := range chunks {
        fd.Seek(chunk.Offset, 0)
        
        // 上传分片
        part, err := a.bucket.UploadPart(imur, fd, chunk.Size, chunk.Number)
        if err != nil {
            // 取消分片上传
            a.bucket.AbortMultipartUpload(imur)
            return fmt.Errorf("upload part error: %v", err)
        }
        parts = append(parts, part)
    }
    
    // 完成分片上传
    _, err = a.bucket.CompleteMultipartUpload(imur, parts)
    if err != nil {
        return fmt.Errorf("complete multipart upload error: %v", err)
    }
    
    return nil
}

7.4.2 腾讯云COS集成

// 腾讯云COS集成
// 文件路径: backend/internal/storage/tencent_cos.go
package storage

import (
    "bytes"
    "context"
    "fmt"
    "net/http"
    "net/url"
    "time"
    
    "github.com/tencentyun/cos-go-sdk-v5"
)

type TencentCOSConfig struct {
    SecretID  string
    SecretKey string
    Region    string
    Bucket    string
}

type TencentCOSClient struct {
    client *cos.Client
    config *TencentCOSConfig
}

func NewTencentCOSClient(config *TencentCOSConfig) *TencentCOSClient {
    u, _ := url.Parse(fmt.Sprintf("https://%s.cos.%s.myqcloud.com", config.Bucket, config.Region))
    b := &cos.BaseURL{BucketURL: u}
    
    client := cos.NewClient(b, &http.Client{
        Transport: &cos.AuthorizationTransport{
            SecretID:  config.SecretID,
            SecretKey: config.SecretKey,
        },
    })
    
    return &TencentCOSClient{
        client: client,
        config: config,
    }
}

func (t *TencentCOSClient) UploadFile(ctx context.Context, objectKey string, data []byte, contentType string) error {
    reader := bytes.NewReader(data)
    
    _, err := t.client.Object.Put(ctx, objectKey, reader, &cos.ObjectPutOptions{
        ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{
            ContentType: contentType,
        },
    })
    
    if err != nil {
        return fmt.Errorf("upload file error: %v", err)
    }
    
    return nil
}

func (t *TencentCOSClient) DownloadFile(ctx context.Context, objectKey string) ([]byte, error) {
    resp, err := t.client.Object.Get(ctx, objectKey, nil)
    if err != nil {
        return nil, fmt.Errorf("download file error: %v", err)
    }
    defer resp.Body.Close()
    
    data, err := io.ReadAll(resp.Body)
    if err != nil {
        return nil, fmt.Errorf("read file error: %v", err)
    }
    
    return data, nil
}

func (t *TencentCOSClient) DeleteFile(ctx context.Context, objectKey string) error {
    _, err := t.client.Object.Delete(ctx, objectKey)
    if err != nil {
        return fmt.Errorf("delete file error: %v", err)
    }
    
    return nil
}

func (t *TencentCOSClient) GetPresignedURL(ctx context.Context, objectKey string, expireTime time.Duration) (string, error) {
    presignedURL, err := t.client.Object.GetPresignedURL(ctx, http.MethodGet, objectKey, t.config.SecretID, t.config.SecretKey, expireTime, nil)
    if err != nil {
        return "", fmt.Errorf("generate presigned URL error: %v", err)
    }
    
    return presignedURL.String(), nil
}

func (t *TencentCOSClient) ListFiles(ctx context.Context, prefix string, maxKeys int) (*cos.BucketGetResult, error) {
    opt := &cos.BucketGetOptions{
        Prefix:  prefix,
        MaxKeys: maxKeys,
    }
    
    result, _, err := t.client.Bucket.Get(ctx, opt)
    if err != nil {
        return nil, fmt.Errorf("list files error: %v", err)
    }
    
    return result, nil
}

7.5 监控和告警集成

7.5.1 钉钉告警集成

// 钉钉告警集成
// 文件路径: backend/internal/notification/dingtalk.go
package notification

import (
    "bytes"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
    "strconv"
    "time"
)

type DingTalkConfig struct {
    WebhookURL string
    Secret     string
}

type DingTalkClient struct {
    config *DingTalkConfig
    client *http.Client
}

type DingTalkMessage struct {
    MsgType string `json:"msgtype"`
    Text    *Text  `json:"text,omitempty"`
    At      *At    `json:"at,omitempty"`
}

type Text struct {
    Content string `json:"content"`
}

type At struct {
    AtMobiles []string `json:"atMobiles,omitempty"`
    IsAtAll   bool     `json:"isAtAll,omitempty"`
}

func NewDingTalkClient(config *DingTalkConfig) *DingTalkClient {
    return &DingTalkClient{
        config: config,
        client: &http.Client{
            Timeout: 10 * time.Second,
        },
    }
}

func (d *DingTalkClient) SendTextMessage(content string, atMobiles []string, isAtAll bool) error {
    message := &DingTalkMessage{
        MsgType: "text",
        Text: &Text{
            Content: content,
        },
        At: &At{
            AtMobiles: atMobiles,
            IsAtAll:   isAtAll,
        },
    }
    
    return d.sendMessage(message)
}

func (d *DingTalkClient) SendAlertMessage(title, content, level string) error {
    emoji := "🔔"
    switch level {
    case "critical":
        emoji = "🚨"
    case "warning":
        emoji = "⚠️"
    case "info":
        emoji = "ℹ️"
    }
    
    message := fmt.Sprintf("%s %s\n\n%s\n\n时间: %s", 
        emoji, title, content, time.Now().Format("2006-01-02 15:04:05"))
    
    return d.SendTextMessage(message, nil, level == "critical")
}

func (d *DingTalkClient) sendMessage(message *DingTalkMessage) error {
    // 生成签名
    timestamp := time.Now().UnixNano() / 1e6
    sign := d.generateSign(timestamp)
    
    // 构建URL
    webhookURL := fmt.Sprintf("%s&timestamp=%d&sign=%s", 
        d.config.WebhookURL, timestamp, url.QueryEscape(sign))
    
    // 序列化消息
    data, err := json.Marshal(message)
    if err != nil {
        return fmt.Errorf("marshal message error: %v", err)
    }
    
    // 发送请求
    resp, err := d.client.Post(webhookURL, "application/json", bytes.NewBuffer(data))
    if err != nil {
        return fmt.Errorf("send request error: %v", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("request failed with status: %d", resp.StatusCode)
    }
    
    return nil
}

func (d *DingTalkClient) generateSign(timestamp int64) string {
    stringToSign := fmt.Sprintf("%d\n%s", timestamp, d.config.Secret)
    
    h := hmac.New(sha256.New, []byte(d.config.Secret))
    h.Write([]byte(stringToSign))
    
    return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

7.5.2 邮件告警集成

// 邮件告警集成
// 文件路径: backend/internal/notification/email.go
package notification

import (
    "fmt"
    "net/smtp"
    "strings"
)

type EmailConfig struct {
    SMTPHost     string
    SMTPPort     int
    Username     string
    Password     string
    FromAddress  string
    FromName     string
}

type EmailClient struct {
    config *EmailConfig
}

func NewEmailClient(config *EmailConfig) *EmailClient {
    return &EmailClient{
        config: config,
    }
}

func (e *EmailClient) SendEmail(to []string, subject, body string) error {
    auth := smtp.PlainAuth("", e.config.Username, e.config.Password, e.config.SMTPHost)
    
    msg := e.buildMessage(to, subject, body)
    
    addr := fmt.Sprintf("%s:%d", e.config.SMTPHost, e.config.SMTPPort)
    
    err := smtp.SendMail(addr, auth, e.config.FromAddress, to, []byte(msg))
    if err != nil {
        return fmt.Errorf("send email error: %v", err)
    }
    
    return nil
}

func (e *EmailClient) SendAlertEmail(to []string, title, content, level string) error {
    subject := fmt.Sprintf("[VoiceHelper Alert - %s] %s", strings.ToUpper(level), title)
    
    body := fmt.Sprintf(`
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>%s</title>
</head>
<body>
    <div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
        <div style="background-color: %s; color: white; padding: 20px; text-align: center;">
            <h1>%s</h1>
        </div>
        <div style="padding: 20px; background-color: #f9f9f9;">
            <h2>告警详情</h2>
            <p style="white-space: pre-wrap;">%s</p>
            <hr>
            <p><strong>时间:</strong> %s</p>
            <p><strong>系统:</strong> VoiceHelper智能语音助手</p>
        </div>
    </div>
</body>
</html>
    `, title, e.getLevelColor(level), title, content, time.Now().Format("2006-01-02 15:04:05"))
    
    return e.SendEmail(to, subject, body)
}

func (e *EmailClient) buildMessage(to []string, subject, body string) string {
    msg := fmt.Sprintf("From: %s <%s>\r\n", e.config.FromName, e.config.FromAddress)
    msg += fmt.Sprintf("To: %s\r\n", strings.Join(to, ","))
    msg += fmt.Sprintf("Subject: %s\r\n", subject)
    msg += "MIME-Version: 1.0\r\n"
    msg += "Content-Type: text/html; charset=UTF-8\r\n"
    msg += "\r\n"
    msg += body
    
    return msg
}

func (e *EmailClient) getLevelColor(level string) string {
    switch level {
    case "critical":
        return "#dc3545"
    case "warning":
        return "#ffc107"
    case "info":
        return "#17a2b8"
    default:
        return "#6c757d"
    }
}

相关文档