VoiceHelper后端服务核心实现

本文档详细介绍VoiceHelper智能语音助手系统的后端服务实现,涵盖Go微服务架构、gRPC通信、数据库设计等关键技术。

3. 后端服务核心实现

3.1 对话服务实现

// 对话服务主结构体
// 文件路径: backend/internal/service/chat.go
type ChatService struct {
    db            *sql.DB
    cache         *redis.Client
    ragClient     *rag.Client
    voiceClient   *voice.Client
    config        *ChatConfig
    sessionManager *SessionManager
    messageQueue  chan *Message
    contextManager *ContextManager
}

// 会话管理器
type SessionManager struct {
    sessions map[string]*Session
    mutex    sync.RWMutex
    db       *sql.DB
    cache    *redis.Client
}

func (sm *SessionManager) CreateSession(userID string) (*Session, error) {
    session := &Session{
        ID:        generateSessionID(),
        UserID:    userID,
        CreatedAt: time.Now(),
        UpdatedAt: time.Now(),
        Context:   make(map[string]interface{}),
        Messages:  []*Message{},
        Status:    SessionStatusActive,
    }
    
    // 保存到数据库
    if err := sm.saveSession(session); err != nil {
        return nil, err
    }
    
    // 缓存到Redis
    sm.cacheSession(session)
    
    return session, nil
}

func (sm *SessionManager) GetSession(sessionID string) (*Session, error) {
    // 先从缓存获取
    if session := sm.getCachedSession(sessionID); session != nil {
        return session, nil
    }
    
    // 从数据库获取
    session, err := sm.loadSessionFromDB(sessionID)
    if err != nil {
        return nil, err
    }
    
    // 缓存到Redis
    sm.cacheSession(session)
    
    return session, nil
}

// 消息处理
func (cs *ChatService) ProcessMessage(sessionID string, userMessage *Message) error {
    // 获取会话
    session, err := cs.sessionManager.GetSession(sessionID)
    if err != nil {
        return err
    }
    
    // 添加到消息历史
    session.Messages = append(session.Messages, userMessage)
    
    // 更新上下文
    cs.contextManager.UpdateContext(session, userMessage)
    
    // 异步处理AI响应
    go cs.processAIResponse(session, userMessage)
    
    return nil
}

func (cs *ChatService) processAIResponse(session *Session, userMessage *Message) {
    // 构建检索请求
    retrievalRequest := &rag.RetrievalRequest{
        Query:   userMessage.Content,
        TopK:    5,
        Filters: cs.buildFilters(session),
    }
    
    // 执行RAG检索
    retrievalResult, err := cs.ragClient.Retrieve(retrievalRequest)
    if err != nil {
        cs.handleError(session, err)
        return
    }
    
    // 构建提示词
    prompt := cs.buildPrompt(session, userMessage, retrievalResult)
    
    // 调用大模型生成响应
    response, err := cs.generateResponse(prompt)
    if err != nil {
        cs.handleError(session, err)
        return
    }
    
    // 保存AI响应
    aiMessage := &Message{
        ID:          generateMessageID(),
        SessionID:   session.ID,
        Role:        MessageRoleAssistant,
        Content:     response,
        ContentType:  ContentTypeText,
        Timestamp:   time.Now(),
    }
    
    session.Messages = append(session.Messages, aiMessage)
    cs.sessionManager.UpdateSession(session)
}

3.2 用户服务实现

// 用户服务结构体
// 文件路径: backend/internal/service/user.go
type UserService struct {
    db     *sql.DB
    cache  *redis.Client
    jwt    *JWTManager
    bcrypt *BCryptHasher
}

// 用户注册
func (us *UserService) RegisterUser(req *RegisterRequest) (*User, error) {
    // 验证用户信息
    if err := us.validateUserInfo(req); err != nil {
        return nil, err
    }
    
    // 检查用户是否已存在
    if exists, err := us.userExists(req.Email); err != nil || exists {
        return nil, ErrUserAlreadyExists
    }
    
    // 加密密码
    hashedPassword, err := us.bcrypt.HashPassword(req.Password)
    if err != nil {
        return nil, err
    }
    
    // 创建用户
    user := &User{
        ID:           generateUserID(),
        Email:        req.Email,
        Username:     req.Username,
        PasswordHash: hashedPassword,
        CreatedAt:    time.Now(),
        UpdatedAt:    time.Now(),
        Status:       UserStatusActive,
    }
    
    // 保存到数据库
    if err := us.saveUser(user); err != nil {
        return nil, err
    }
    
    // 缓存用户信息
    us.cacheUser(user)
    
    return user, nil
}

// 用户登录
func (us *UserService) LoginUser(req *LoginRequest) (*LoginResponse, error) {
    // 获取用户信息
    user, err := us.getUserByEmail(req.Email)
    if err != nil {
        return nil, ErrInvalidCredentials
    }
    
    // 验证密码
    if !us.bcrypt.VerifyPassword(req.Password, user.PasswordHash) {
        return nil, ErrInvalidCredentials
    }
    
    // 生成JWT Token
    token, err := us.jwt.GenerateToken(user.ID, user.Email)
    if err != nil {
        return nil, err
    }
    
    // 更新最后登录时间
    user.LastLoginAt = time.Now()
    us.updateUser(user)
    
    return &LoginResponse{
        User:  user,
        Token: token,
    }, nil
}

// 权限验证
func (us *UserService) ValidatePermission(userID string, resource string, action string) bool {
    // 获取用户角色
    roles, err := us.getUserRoles(userID)
    if err != nil {
        return false
    }
    
    // 检查权限
    for _, role := range roles {
        if us.hasPermission(role, resource, action) {
            return true
        }
    }
    
    return false
}

3.3 数据集服务实现

// 数据集服务结构体
// 文件路径: backend/internal/service/dataset.go
type DatasetService struct {
    db          *sql.DB
    minioClient *minio.Client
    esClient    *elasticsearch.Client
}

// 文档上传
func (ds *DatasetService) UploadDocument(req *UploadDocumentRequest) (*Document, error) {
    // 验证文档格式
    if err := ds.validateDocument(req.File); err != nil {
        return nil, err
    }
    
    // 解析文档内容
    content, metadata, err := ds.parseDocument(req.File)
    if err != nil {
        return nil, err
    }
    
    // 分块处理
    chunks, err := ds.chunkDocument(content, metadata)
    if err != nil {
        return nil, err
    }
    
    // 创建文档记录
    document := &Document{
        ID:          generateDocumentID(),
        UserID:      req.UserID,
        Title:       req.Title,
        Content:     content,
        Metadata:    metadata,
        Chunks:      chunks,
        Status:      DocumentStatusProcessing,
        CreatedAt:   time.Now(),
        UpdatedAt:   time.Now(),
    }
    
    // 保存到数据库
    if err := ds.saveDocument(document); err != nil {
        return nil, err
    }
    
    // 上传到对象存储
    if err := ds.uploadToMinIO(document); err != nil {
        return nil, err
    }
    
    // 异步处理向量化
    go ds.processDocumentVectorization(document)
    
    return document, nil
}

// 文档向量化处理
func (ds *DatasetService) processDocumentVectorization(document *Document) {
    for _, chunk := range document.Chunks {
        // 生成向量嵌入
        embedding, err := ds.generateEmbedding(chunk.Content)
        if err != nil {
            log.Printf("向量化失败: %v", err)
            continue
        }
        
        // 保存到向量数据库
        vectorRecord := &VectorRecord{
            DocumentID: document.ID,
            ChunkID:    chunk.ID,
            Content:    chunk.Content,
            Embedding:  embedding,
            Metadata:   chunk.Metadata,
        }
        
        if err := ds.saveVectorRecord(vectorRecord); err != nil {
            log.Printf("向量保存失败: %v", err)
        }
    }
    
    // 更新文档状态
    document.Status = DocumentStatusCompleted
    ds.updateDocument(document)
}

3.4 gRPC服务实现

// gRPC服务定义
// 文件路径: backend/api/proto/chat.proto
syntax = "proto3";

package chat;

service ChatService {
  rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse);
  rpc SendMessage(SendMessageRequest) returns (stream MessageResponse);
  rpc GetSessionHistory(GetSessionHistoryRequest) returns (GetSessionHistoryResponse);
}

message CreateSessionRequest {
  string user_id = 1;
  map<string, string> context = 2;
}

message CreateSessionResponse {
  string session_id = 1;
  string status = 2;
}

message SendMessageRequest {
  string session_id = 1;
  string content = 2;
  string content_type = 3;
  map<string, string> metadata = 4;
}

message MessageResponse {
  string message_id = 1;
  string content = 2;
  string role = 3;
  bool is_streaming = 4;
  bool is_final = 5;
}

// gRPC服务实现
// 文件路径: backend/internal/grpc/chat_server.go
type ChatServer struct {
    chatService *service.ChatService
    pb.UnimplementedChatServiceServer
}

func (s *ChatServer) CreateSession(ctx context.Context, req *pb.CreateSessionRequest) (*pb.CreateSessionResponse, error) {
    session, err := s.chatService.CreateSession(req.UserId, req.Context)
    if err != nil {
        return nil, err
    }
    
    return &pb.CreateSessionResponse{
        SessionId: session.ID,
        Status:    "created",
    }, nil
}

func (s *ChatServer) SendMessage(req *pb.SendMessageRequest, stream pb.ChatService_SendMessageServer) error {
    // 创建消息
    message := &service.Message{
        ID:          generateMessageID(),
        SessionID:   req.SessionId,
        Role:        service.MessageRoleUser,
        Content:     req.Content,
        ContentType: service.ContentType(req.ContentType),
        Timestamp:   time.Now(),
        Metadata:    req.Metadata,
    }
    
    // 处理消息并流式返回
    return s.chatService.ProcessMessageStream(req.SessionId, message, func(response *service.Message) error {
        return stream.Send(&pb.MessageResponse{
            MessageId:   response.ID,
            Content:     response.Content,
            Role:        string(response.Role),
            IsStreaming: true,
            IsFinal:     false,
        })
    })
}

3.5 数据库设计

-- 用户表
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    username VARCHAR(100) NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login_at TIMESTAMP
);

-- 会话表
CREATE TABLE sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    title VARCHAR(255),
    context JSONB,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 消息表
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id UUID NOT NULL REFERENCES sessions(id),
    role VARCHAR(20) NOT NULL,
    content TEXT NOT NULL,
    content_type VARCHAR(50) DEFAULT 'text',
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 文档表
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    title VARCHAR(255) NOT NULL,
    content TEXT,
    metadata JSONB,
    status VARCHAR(20) DEFAULT 'processing',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 文档块表
CREATE TABLE document_chunks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    document_id UUID NOT NULL REFERENCES documents(id),
    content TEXT NOT NULL,
    metadata JSONB,
    chunk_index INTEGER NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_sessions_user_id ON sessions(user_id);
CREATE INDEX idx_messages_session_id ON messages(session_id);
CREATE INDEX idx_documents_user_id ON documents(user_id);
CREATE INDEX idx_document_chunks_document_id ON document_chunks(document_id);

3.6 缓存策略

// Redis缓存管理
// 文件路径: backend/internal/cache/redis_manager.go
type RedisManager struct {
    client *redis.Client
    config *RedisConfig
}

func (rm *RedisManager) CacheSession(session *Session) error {
    key := fmt.Sprintf("session:%s", session.ID)
    data, err := json.Marshal(session)
    if err != nil {
        return err
    }
    
    return rm.client.Set(context.Background(), key, data, time.Hour*24).Err()
}

func (rm *RedisManager) GetSession(sessionID string) (*Session, error) {
    key := fmt.Sprintf("session:%s", sessionID)
    data, err := rm.client.Get(context.Background(), key).Result()
    if err != nil {
        return nil, err
    }
    
    var session Session
    if err := json.Unmarshal([]byte(data), &session); err != nil {
        return nil, err
    }
    
    return &session, nil
}

func (rm *RedisManager) CacheUser(user *User) error {
    key := fmt.Sprintf("user:%s", user.ID)
    data, err := json.Marshal(user)
    if err != nil {
        return err
    }
    
    return rm.client.Set(context.Background(), key, data, time.Hour*12).Err()
}

// 分布式锁
func (rm *RedisManager) AcquireLock(key string, expiration time.Duration) (bool, error) {
    result := rm.client.SetNX(context.Background(), key, "1", expiration)
    return result.Val(), result.Err()
}

func (rm *RedisManager) ReleaseLock(key string) error {
    return rm.client.Del(context.Background(), key).Err()
}

3.7 错误处理和日志

// 错误处理中间件
// 文件路径: backend/internal/middleware/error_handler.go
func ErrorHandler() gin.HandlerFunc {
    return func(c *gin.Context) {
        c.Next()
        
        if len(c.Errors) > 0 {
            err := c.Errors.Last()
            
            // 记录错误日志
            log.Printf("Error: %v", err)
            
            // 根据错误类型返回相应状态码
            switch err.Type {
            case gin.ErrorTypeBind:
                c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数错误"})
            case gin.ErrorTypePublic:
                c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
            default:
                c.JSON(http.StatusInternalServerError, gin.H{"error": "内部服务器错误"})
            }
        }
    }
}

// 结构化日志
// 文件路径: backend/internal/logger/logger.go
type Logger struct {
    logger *logrus.Logger
}

func NewLogger() *Logger {
    logger := logrus.New()
    logger.SetFormatter(&logrus.JSONFormatter{})
    logger.SetLevel(logrus.InfoLevel)
    
    return &Logger{logger: logger}
}

func (l *Logger) LogRequest(c *gin.Context, duration time.Duration) {
    l.logger.WithFields(logrus.Fields{
        "method":     c.Request.Method,
        "path":       c.Request.URL.Path,
        "status":     c.Writer.Status(),
        "duration":   duration,
        "client_ip":  c.ClientIP(),
        "user_agent": c.Request.UserAgent(),
    }).Info("HTTP Request")
}

func (l *Logger) LogError(err error, context map[string]interface{}) {
    l.logger.WithFields(logrus.Fields{
        "error":   err.Error(),
        "context": context,
    }).Error("Application Error")
}

相关文档