VoiceHelper后端服务核心实现
本文档详细介绍VoiceHelper智能语音助手系统的后端服务实现,涵盖Go微服务架构、gRPC通信、数据库设计等关键技术。
3. 后端服务核心实现
3.1 对话服务实现
// 对话服务主结构体
// 文件路径: backend/internal/service/chat.go
type ChatService struct {
db *sql.DB
cache *redis.Client
ragClient *rag.Client
voiceClient *voice.Client
config *ChatConfig
sessionManager *SessionManager
messageQueue chan *Message
contextManager *ContextManager
}
// 会话管理器
type SessionManager struct {
sessions map[string]*Session
mutex sync.RWMutex
db *sql.DB
cache *redis.Client
}
func (sm *SessionManager) CreateSession(userID string) (*Session, error) {
session := &Session{
ID: generateSessionID(),
UserID: userID,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Context: make(map[string]interface{}),
Messages: []*Message{},
Status: SessionStatusActive,
}
// 保存到数据库
if err := sm.saveSession(session); err != nil {
return nil, err
}
// 缓存到Redis
sm.cacheSession(session)
return session, nil
}
func (sm *SessionManager) GetSession(sessionID string) (*Session, error) {
// 先从缓存获取
if session := sm.getCachedSession(sessionID); session != nil {
return session, nil
}
// 从数据库获取
session, err := sm.loadSessionFromDB(sessionID)
if err != nil {
return nil, err
}
// 缓存到Redis
sm.cacheSession(session)
return session, nil
}
// 消息处理
func (cs *ChatService) ProcessMessage(sessionID string, userMessage *Message) error {
// 获取会话
session, err := cs.sessionManager.GetSession(sessionID)
if err != nil {
return err
}
// 添加到消息历史
session.Messages = append(session.Messages, userMessage)
// 更新上下文
cs.contextManager.UpdateContext(session, userMessage)
// 异步处理AI响应
go cs.processAIResponse(session, userMessage)
return nil
}
func (cs *ChatService) processAIResponse(session *Session, userMessage *Message) {
// 构建检索请求
retrievalRequest := &rag.RetrievalRequest{
Query: userMessage.Content,
TopK: 5,
Filters: cs.buildFilters(session),
}
// 执行RAG检索
retrievalResult, err := cs.ragClient.Retrieve(retrievalRequest)
if err != nil {
cs.handleError(session, err)
return
}
// 构建提示词
prompt := cs.buildPrompt(session, userMessage, retrievalResult)
// 调用大模型生成响应
response, err := cs.generateResponse(prompt)
if err != nil {
cs.handleError(session, err)
return
}
// 保存AI响应
aiMessage := &Message{
ID: generateMessageID(),
SessionID: session.ID,
Role: MessageRoleAssistant,
Content: response,
ContentType: ContentTypeText,
Timestamp: time.Now(),
}
session.Messages = append(session.Messages, aiMessage)
cs.sessionManager.UpdateSession(session)
}
3.2 用户服务实现
// 用户服务结构体
// 文件路径: backend/internal/service/user.go
type UserService struct {
db *sql.DB
cache *redis.Client
jwt *JWTManager
bcrypt *BCryptHasher
}
// 用户注册
func (us *UserService) RegisterUser(req *RegisterRequest) (*User, error) {
// 验证用户信息
if err := us.validateUserInfo(req); err != nil {
return nil, err
}
// 检查用户是否已存在
if exists, err := us.userExists(req.Email); err != nil || exists {
return nil, ErrUserAlreadyExists
}
// 加密密码
hashedPassword, err := us.bcrypt.HashPassword(req.Password)
if err != nil {
return nil, err
}
// 创建用户
user := &User{
ID: generateUserID(),
Email: req.Email,
Username: req.Username,
PasswordHash: hashedPassword,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Status: UserStatusActive,
}
// 保存到数据库
if err := us.saveUser(user); err != nil {
return nil, err
}
// 缓存用户信息
us.cacheUser(user)
return user, nil
}
// 用户登录
func (us *UserService) LoginUser(req *LoginRequest) (*LoginResponse, error) {
// 获取用户信息
user, err := us.getUserByEmail(req.Email)
if err != nil {
return nil, ErrInvalidCredentials
}
// 验证密码
if !us.bcrypt.VerifyPassword(req.Password, user.PasswordHash) {
return nil, ErrInvalidCredentials
}
// 生成JWT Token
token, err := us.jwt.GenerateToken(user.ID, user.Email)
if err != nil {
return nil, err
}
// 更新最后登录时间
user.LastLoginAt = time.Now()
us.updateUser(user)
return &LoginResponse{
User: user,
Token: token,
}, nil
}
// 权限验证
func (us *UserService) ValidatePermission(userID string, resource string, action string) bool {
// 获取用户角色
roles, err := us.getUserRoles(userID)
if err != nil {
return false
}
// 检查权限
for _, role := range roles {
if us.hasPermission(role, resource, action) {
return true
}
}
return false
}
3.3 数据集服务实现
// 数据集服务结构体
// 文件路径: backend/internal/service/dataset.go
type DatasetService struct {
db *sql.DB
minioClient *minio.Client
esClient *elasticsearch.Client
}
// 文档上传
func (ds *DatasetService) UploadDocument(req *UploadDocumentRequest) (*Document, error) {
// 验证文档格式
if err := ds.validateDocument(req.File); err != nil {
return nil, err
}
// 解析文档内容
content, metadata, err := ds.parseDocument(req.File)
if err != nil {
return nil, err
}
// 分块处理
chunks, err := ds.chunkDocument(content, metadata)
if err != nil {
return nil, err
}
// 创建文档记录
document := &Document{
ID: generateDocumentID(),
UserID: req.UserID,
Title: req.Title,
Content: content,
Metadata: metadata,
Chunks: chunks,
Status: DocumentStatusProcessing,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// 保存到数据库
if err := ds.saveDocument(document); err != nil {
return nil, err
}
// 上传到对象存储
if err := ds.uploadToMinIO(document); err != nil {
return nil, err
}
// 异步处理向量化
go ds.processDocumentVectorization(document)
return document, nil
}
// 文档向量化处理
func (ds *DatasetService) processDocumentVectorization(document *Document) {
for _, chunk := range document.Chunks {
// 生成向量嵌入
embedding, err := ds.generateEmbedding(chunk.Content)
if err != nil {
log.Printf("向量化失败: %v", err)
continue
}
// 保存到向量数据库
vectorRecord := &VectorRecord{
DocumentID: document.ID,
ChunkID: chunk.ID,
Content: chunk.Content,
Embedding: embedding,
Metadata: chunk.Metadata,
}
if err := ds.saveVectorRecord(vectorRecord); err != nil {
log.Printf("向量保存失败: %v", err)
}
}
// 更新文档状态
document.Status = DocumentStatusCompleted
ds.updateDocument(document)
}
3.4 gRPC服务实现
// gRPC服务定义
// 文件路径: backend/api/proto/chat.proto
syntax = "proto3";
package chat;
service ChatService {
rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse);
rpc SendMessage(SendMessageRequest) returns (stream MessageResponse);
rpc GetSessionHistory(GetSessionHistoryRequest) returns (GetSessionHistoryResponse);
}
message CreateSessionRequest {
string user_id = 1;
map<string, string> context = 2;
}
message CreateSessionResponse {
string session_id = 1;
string status = 2;
}
message SendMessageRequest {
string session_id = 1;
string content = 2;
string content_type = 3;
map<string, string> metadata = 4;
}
message MessageResponse {
string message_id = 1;
string content = 2;
string role = 3;
bool is_streaming = 4;
bool is_final = 5;
}
// gRPC服务实现
// 文件路径: backend/internal/grpc/chat_server.go
type ChatServer struct {
chatService *service.ChatService
pb.UnimplementedChatServiceServer
}
func (s *ChatServer) CreateSession(ctx context.Context, req *pb.CreateSessionRequest) (*pb.CreateSessionResponse, error) {
session, err := s.chatService.CreateSession(req.UserId, req.Context)
if err != nil {
return nil, err
}
return &pb.CreateSessionResponse{
SessionId: session.ID,
Status: "created",
}, nil
}
func (s *ChatServer) SendMessage(req *pb.SendMessageRequest, stream pb.ChatService_SendMessageServer) error {
// 创建消息
message := &service.Message{
ID: generateMessageID(),
SessionID: req.SessionId,
Role: service.MessageRoleUser,
Content: req.Content,
ContentType: service.ContentType(req.ContentType),
Timestamp: time.Now(),
Metadata: req.Metadata,
}
// 处理消息并流式返回
return s.chatService.ProcessMessageStream(req.SessionId, message, func(response *service.Message) error {
return stream.Send(&pb.MessageResponse{
MessageId: response.ID,
Content: response.Content,
Role: string(response.Role),
IsStreaming: true,
IsFinal: false,
})
})
}
3.5 数据库设计
-- 用户表
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
username VARCHAR(100) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login_at TIMESTAMP
);
-- 会话表
CREATE TABLE sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
title VARCHAR(255),
context JSONB,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 消息表
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
role VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
content_type VARCHAR(50) DEFAULT 'text',
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 文档表
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
title VARCHAR(255) NOT NULL,
content TEXT,
metadata JSONB,
status VARCHAR(20) DEFAULT 'processing',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 文档块表
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID NOT NULL REFERENCES documents(id),
content TEXT NOT NULL,
metadata JSONB,
chunk_index INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 索引
CREATE INDEX idx_sessions_user_id ON sessions(user_id);
CREATE INDEX idx_messages_session_id ON messages(session_id);
CREATE INDEX idx_documents_user_id ON documents(user_id);
CREATE INDEX idx_document_chunks_document_id ON document_chunks(document_id);
3.6 缓存策略
// Redis缓存管理
// 文件路径: backend/internal/cache/redis_manager.go
type RedisManager struct {
client *redis.Client
config *RedisConfig
}
func (rm *RedisManager) CacheSession(session *Session) error {
key := fmt.Sprintf("session:%s", session.ID)
data, err := json.Marshal(session)
if err != nil {
return err
}
return rm.client.Set(context.Background(), key, data, time.Hour*24).Err()
}
func (rm *RedisManager) GetSession(sessionID string) (*Session, error) {
key := fmt.Sprintf("session:%s", sessionID)
data, err := rm.client.Get(context.Background(), key).Result()
if err != nil {
return nil, err
}
var session Session
if err := json.Unmarshal([]byte(data), &session); err != nil {
return nil, err
}
return &session, nil
}
func (rm *RedisManager) CacheUser(user *User) error {
key := fmt.Sprintf("user:%s", user.ID)
data, err := json.Marshal(user)
if err != nil {
return err
}
return rm.client.Set(context.Background(), key, data, time.Hour*12).Err()
}
// 分布式锁
func (rm *RedisManager) AcquireLock(key string, expiration time.Duration) (bool, error) {
result := rm.client.SetNX(context.Background(), key, "1", expiration)
return result.Val(), result.Err()
}
func (rm *RedisManager) ReleaseLock(key string) error {
return rm.client.Del(context.Background(), key).Err()
}
3.7 错误处理和日志
// 错误处理中间件
// 文件路径: backend/internal/middleware/error_handler.go
func ErrorHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
if len(c.Errors) > 0 {
err := c.Errors.Last()
// 记录错误日志
log.Printf("Error: %v", err)
// 根据错误类型返回相应状态码
switch err.Type {
case gin.ErrorTypeBind:
c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数错误"})
case gin.ErrorTypePublic:
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
default:
c.JSON(http.StatusInternalServerError, gin.H{"error": "内部服务器错误"})
}
}
}
}
// 结构化日志
// 文件路径: backend/internal/logger/logger.go
type Logger struct {
logger *logrus.Logger
}
func NewLogger() *Logger {
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
logger.SetLevel(logrus.InfoLevel)
return &Logger{logger: logger}
}
func (l *Logger) LogRequest(c *gin.Context, duration time.Duration) {
l.logger.WithFields(logrus.Fields{
"method": c.Request.Method,
"path": c.Request.URL.Path,
"status": c.Writer.Status(),
"duration": duration,
"client_ip": c.ClientIP(),
"user_agent": c.Request.UserAgent(),
}).Info("HTTP Request")
}
func (l *Logger) LogError(err error, context map[string]interface{}) {
l.logger.WithFields(logrus.Fields{
"error": err.Error(),
"context": context,
}).Error("Application Error")
}