VoiceHelper - 02 - Conversation Service(会话服务)

模块概览

Conversation Service 是 VoiceHelper 平台的核心业务服务,负责管理对话会话和消息的完整生命周期。该服务采用领域驱动设计(DDD)架构,通过分层设计实现了职责分离和高内聚低耦合的模块化结构。服务支持高并发对话场景,提供智能上下文管理、流式消息推送和多种压缩策略,优化成本和响应速度。

核心职责

会话管理(Conversation Management)

  • 创建、查询、更新和删除对话会话
  • 维护会话状态机(活跃、暂停、归档、删除)
  • 支持多种对话模式(文本、语音、视频)
  • 会话限制管理(最大消息数、Token 限制)
  • 租户级别的会话隔离和权限控制
  • 会话元数据和配置管理
  • 会话活跃度跟踪

消息管理(Message Management)

  • 消息的创建、存储和检索
  • 支持多种消息角色(用户、助手、系统、工具)
  • 消息历史的分页查询和索引优化
  • 最近消息的快速访问(Redis 缓存加速)
  • 消息元数据(Token 统计、模型信息、成本计算)
  • 消息内容类型支持(文本、音频、图像、视频)
  • 消息批量操作和事务保证

上下文管理(Context Management)

  • 智能上下文窗口管理
  • 多种窗口策略(最近消息、相关消息、混合策略)
  • 上下文压缩和摘要(Token Pruning、Summarization、LLMLingua、Hybrid)
  • 多级缓存优化(进程内缓存 + Redis)
  • 增量上下文更新和失效控制
  • Token 估算和裁剪
  • 上下文自动截断和保活

流式消息处理(Streaming)

  • Server-Sent Events (SSE) 流式推送
  • WebSocket 实时双向通信
  • 流式消息分块传输和缓冲
  • 连接状态管理和重连机制
  • 异步事件驱动架构

整体服务架构图

Conversation Service 采用严格的 DDD 分层架构设计,遵循依赖倒置原则,各层职责清晰。架构图展示了从客户端请求到数据持久化的完整数据流路径,以及实际代码中实现的组件结构。

flowchart TB
    subgraph Client["客户端层 (Client Layer)"]
        HTTPClient["HTTP/REST Client<br/>📱 Web/Mobile App<br/>端口: 8080"]
        WSClient["WebSocket Client<br/>🎙️ Real-time Voice<br/>端口: 8081"]
    end

    subgraph Server["接入层 (Server Layer)<br/>internal/server"]
        HTTPServer["HTTPServer (http.go)<br/>Gin Framework<br/>• 路由注册<br/>• 请求绑定<br/>• 响应封装"]

        Middleware["中间件栈 (middleware.go)<br/>• RecoveryMiddleware<br/>• CORSMiddleware<br/>• TracingMiddleware (OpenTelemetry)<br/>• LoggingMiddleware (Kratos)<br/>• TimeoutMiddleware (30s)<br/>• AuthMiddleware (JWT)<br/>• RateLimitMiddleware"]
    end

    subgraph Service["服务层 (Service Layer)<br/>internal/service"]
        ConvService["ConversationService<br/>(conversation_service.go)<br/>🎯 服务门面模式<br/>• 协调 Usecase<br/>• 无业务逻辑<br/>• 纯转发"]
    end

    subgraph Business["业务逻辑层 (Business Logic Layer)<br/>internal/biz"]
        ConvUsecase["ConversationUsecase<br/>(conversation_usecase.go)<br/>📋 会话生命周期<br/>• CreateConversation<br/>• UpdateTitle<br/>• Archive/Delete<br/>• ListConversations"]

        MsgUsecase["MessageUsecase<br/>(message_usecase.go)<br/>💬 消息管理<br/>• SendMessage<br/>• GetMessage<br/>• ListMessages<br/>• GetRecentMessages"]

        CtxCompressor["ContextCompressor<br/>(context_compression.go)<br/>🗜️ 四种压缩策略<br/>• Token Pruning<br/>• Summarization<br/>• LLMLingua<br/>• Hybrid"]

        TitleGen["TitleGenerator<br/>(title_generator_usecase.go)<br/>📝 智能标题<br/>• LLM 生成<br/>• 关键词提取"]
    end

    subgraph Domain["领域层 (Domain Layer)<br/>internal/domain"]
        Conversation["Conversation<br/>(conversation.go)<br/>🏛️ 会话聚合根<br/>• 状态机 (4种状态)<br/>• 业务不变式<br/>• CanSendMessage()<br/>• IncrementMessageCount()"]

        Message["Message<br/>(message.go)<br/>📨 消息实体<br/>• 4种角色<br/>• 4种内容类型<br/>• SetTokens()<br/>• SetModel()"]

        ContextMgr["ContextManager<br/>(context_manager.go)<br/>🧠 上下文智能管理<br/>• GetContext()<br/>• UpdateContext()<br/>• CompressContext()<br/>• estimateTokens()<br/>• trimMessages()"]

        WindowStrategy["WindowStrategy<br/>(window_strategy.go)<br/>📐 窗口选择策略<br/>• RecentWindowStrategy<br/>• RelevantWindowStrategy"]

        Repository["Repository Interfaces<br/>(repository.go)<br/>🗄️ 依赖倒置<br/>• ConversationRepository<br/>• MessageRepository<br/>• ContextRepository"]
    end

    subgraph Data["数据访问层 (Data Access Layer)<br/>internal/data"]
        ConvRepo["ConversationRepository<br/>(conversation_repo.go)<br/>💾 会话持久化<br/>• GORM 实现<br/>• JSONB 存储 Limits<br/>• Soft Delete"]

        MsgRepo["MessageRepository<br/>(message_repo.go)<br/>💾 消息持久化<br/>• 按时间序列<br/>• 分页优化<br/>• 角色过滤"]

        CtxCache["ContextCache<br/>(context_cache.go)<br/>⚡ Redis 缓存<br/>• GetContext()<br/>• SetContext()<br/>• AppendMessage()<br/>• truncateContext()<br/>• TTL 24h"]

        CtxRepo["ContextRepository<br/>(context_repo.go)<br/>🗃️ 缓存协调<br/>• Cache First<br/>• DB Fallback"]
    end

    subgraph Storage["存储层 (Storage Layer)"]
        PostgreSQL["PostgreSQL<br/>🐘 主数据库<br/>Schema: conversation<br/>• conversations 表<br/>• messages 表<br/>• 索引优化<br/>• JSONB 字段"]

        Redis["Redis<br/>⚡ 缓存层<br/>• context:{id}<br/>• TTL 管理<br/>• 自动截断 (4000 tokens)<br/>• 批量操作"]

        Kafka["Kafka<br/>📢 事件总线<br/>• conversation.created<br/>• message.sent<br/>• context.compressed"]
    end

    subgraph External["外部依赖服务 (External Services)"]
        AIOrchestrator["AI Orchestrator<br/>🤖 AI 编排<br/>• 流式生成<br/>• 模型路由<br/>• Token 计算"]

        Identity["Identity Service<br/>🔐 身份认证<br/>• JWT 验证<br/>• 权限管理"]

        Analytics["Analytics Service<br/>📊 数据分析<br/>• 统计上报<br/>• 成本计算"]
    end

    %% 客户端到服务器
    HTTPClient --> HTTPServer
    WSClient --> HTTPServer

    %% 服务器到中间件
    HTTPServer --> Middleware

    %% 中间件到服务层
    Middleware --> ConvService

    %% 服务层到业务层
    ConvService --> ConvUsecase
    ConvService --> MsgUsecase

    %% 业务层内部协作
    MsgUsecase --> CtxCompressor
    ConvUsecase --> TitleGen

    %% 业务层到领域层
    ConvUsecase --> Conversation
    MsgUsecase --> Message
    MsgUsecase --> ContextMgr
    CtxCompressor --> ContextMgr
    ContextMgr --> WindowStrategy

    %% 业务层到仓储接口(依赖倒置)
    ConvUsecase --> Repository
    MsgUsecase --> Repository
    ContextMgr --> Repository

    %% 仓储接口到数据层实现
    Repository -.实现.-> ConvRepo
    Repository -.实现.-> MsgRepo
    Repository -.实现.-> CtxRepo

    %% 数据层到缓存
    CtxRepo --> CtxCache
    MsgRepo --> CtxCache

    %% 数据层到存储
    ConvRepo --> PostgreSQL
    MsgRepo --> PostgreSQL
    CtxCache --> Redis

    %% 事件发布(异步)
    ConvRepo -.异步.-> Kafka
    MsgRepo -.异步.-> Kafka

    %% 外部服务调用
    CtxCompressor --> AIOrchestrator
    TitleGen --> AIOrchestrator
    Middleware --> Identity
    ConvService -.异步.-> Analytics

    %% 样式定义
    style Client fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style Server fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style Service fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
    style Business fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
    style Domain fill:#fce4ec,stroke:#c2185b,stroke-width:2px
    style Data fill:#fff9c4,stroke:#f9a825,stroke-width:2px
    style Storage fill:#e0f2f1,stroke:#00897b,stroke-width:2px
    style External fill:#f1f8e9,stroke:#558b2f,stroke-width:2px

架构图说明

分层职责划分

该架构严格遵循 DDD 分层架构和依赖倒置原则,确保各层单向依赖,便于测试和维护。

1. 客户端层(Client Layer)

支持两种主要接入方式:

  • HTTP/REST Client:Web/Mobile 应用,端口 8080
  • WebSocket Client:实时语音交互,端口 8081

2. 接入层(Server Layer)

代码位置:cmd/conversation-service/internal/server/

  • HTTPServer (http.go):

    • Gin Framework 实现
    • 路由注册:/api/v1/conversations/*
    • 请求参数绑定与验证
    • 统一响应封装(response.go
    • 健康检查端点:/health
  • 中间件栈 (middleware.go):

    • RecoveryMiddleware:Panic 恢复,记录错误
    • CORSMiddleware:跨域支持
    • TracingMiddleware:OpenTelemetry 分布式追踪
    • LoggingMiddleware:Kratos 结构化日志
    • TimeoutMiddleware:30 秒超时控制
    • AuthMiddleware:JWT 认证(从 Header 提取 X-User-IDX-Tenant-ID
    • RateLimitMiddleware:限流保护(待实现 Redis 分布式限流)

3. 服务层(Service Layer)

代码位置:cmd/conversation-service/internal/service/conversation_service.go

  • ConversationService
    • 采用门面模式(Facade Pattern)
    • 纯转发,无业务逻辑
    • 协调 ConversationUsecase 和 MessageUsecase
    • 提供统一对外接口

4. 业务逻辑层(Business Logic Layer)

代码位置:cmd/conversation-service/internal/biz/

  • ConversationUsecase (conversation_usecase.go):

    • 会话生命周期管理:创建、更新、归档、删除
    • 权限验证:检查 userID 匹配
    • 调用领域对象方法应用业务规则
  • MessageUsecase (message_usecase.go):

    • 消息 CRUD 操作
    • 发送消息时更新会话统计
    • 串行执行:查询会话 → 创建消息 → 更新会话
    • 获取最近消息(优化查询)
  • ContextCompressor (context_compression.go):

    • 四种压缩策略实现
    • Token 计数与估算
    • 压缩统计信息收集
    • 调用 AI 服务生成摘要
  • TitleGenerator (title_generator_usecase.go):

    • 智能标题生成
    • 关键词提取
    • LLM API 调用

5. 领域层(Domain Layer)

代码位置:cmd/conversation-service/internal/domain/

  • Conversation 聚合根 (conversation.go):

    • 状态机:Active → Paused → Archived → Deleted
    • 业务不变式:CanSendMessage() 检查状态和消息数限制
    • 封装状态变更:Archive()Delete()UpdateTitle()
    • 自动更新活跃时间:UpdateActivity()
  • Message 实体 (message.go):

    • 4 种角色:User、Assistant、System、Tool
    • 4 种内容类型:Text、Audio、Image、Video
    • Token 计算辅助方法
    • 模型信息记录
  • ContextManager (context_manager.go):

    • 上下文智能管理
    • Token 估算算法(简化版:3 字符 = 1 token)
    • 消息裁剪策略(从最新开始保留)
    • 缓存有效性检查(5 分钟 TTL)
  • WindowStrategy (window_strategy.go):

    • 策略模式(Strategy Pattern)
    • RecentWindowStrategy:保留最近 N 条消息
    • RelevantWindowStrategy:基于相关性选择(待实现)
  • Repository 接口 (repository.go):

    • 依赖倒置原则(Dependency Inversion Principle)
    • 接口定义在领域层,实现在数据层
    • ConversationRepository、MessageRepository、ContextRepository

6. 数据访问层(Data Access Layer)

代码位置:cmd/conversation-service/internal/data/

  • ConversationRepository (conversation_repo.go):

    • GORM 实现
    • 领域对象 ↔ 数据对象转换
    • JSONB 存储 LimitsMetadata
    • 软删除:status != 'deleted'
  • MessageRepository (message_repo.go):

    • 时间序列查询优化
    • 分页支持
    • 角色过滤:GetMessagesByRole()GetSystemMessages()
    • 反向排序处理(最新消息在后)
  • ContextCache (context_cache.go):

    • Redis 缓存实现
    • 键格式:context:{conversation_id}
    • TTL 管理:默认 24 小时
    • 自动截断:超过 4000 tokens 时裁剪旧消息
    • 批量操作支持
  • ContextRepository (context_repo.go):

    • 缓存优先策略(Cache First)
    • 缓存未命中时回源数据库

7. 存储层(Storage Layer)

  • PostgreSQL

    • Schema:conversation
    • 表:conversationsmessages
    • 索引:tenant_iduser_idconversation_idcreated_at
    • JSONB 字段:存储复杂对象
  • Redis

    • 上下文缓存
    • TTL 自动过期
    • 支持批量操作
  • Kafka

    • 异步事件发布
    • Topic:voiceassistant.conversations
    • 事件类型:conversation.createdmessage.sentcontext.compressed

8. 外部依赖服务(External Services)

  • AI Orchestrator

    • 流式生成
    • 模型路由
    • Token 计算
  • Identity Service

    • JWT 验证
    • 权限管理
  • Analytics Service

    • 统计上报
    • 成本计算

数据流向

  1. 请求流(同步):

    Client → HTTPServer → Middleware Stack → ConversationService
    → ConversationUsecase/MessageUsecase → Domain Objects
    → Repository Interface → Repository Implementation
    → GORM/Redis → PostgreSQL/Redis
    
  2. 响应流(同步):

    • 按相反方向返回
    • 数据层对象转换为领域对象
    • 领域对象序列化为 JSON
    • 经过中间件栈(日志、追踪、响应封装)返回客户端
  3. 事件流(异步):

    Repository → Kafka Producer → Kafka Topic → Event Consumers
    (Analytics Service, Notification Service, etc.)
    
  4. 缓存流

    查询流程:
    Application → ContextCache.GetContext() → Redis GET
    ├─ 命中 → 返回缓存数据
    └─ 未命中 → MessageRepo → PostgreSQL → 构建上下文 → Redis SET → 返回数据
    
    更新流程:
    Application → MessageRepo.CreateMessage() → PostgreSQL INSERT
    → ContextCache.DeleteContext() → Redis DEL(缓存失效)
    

关键设计模式

该服务应用了多种设计模式以提高代码质量和可维护性:

  1. 分层架构(Layered Architecture)

    • 各层单向依赖:Server → Service → Usecase → Domain → Repository
    • 依赖注入:通过 Wire 自动生成依赖关系
    • 职责分离:每层专注于特定职责
  2. 领域驱动设计(DDD)

    • 聚合根:Conversation 封装会话状态和业务规则
    • 实体:Message 具有唯一标识
    • 仓储模式:抽象数据访问
    • 领域事件:通过 Kafka 异步发布
  3. 依赖倒置原则(Dependency Inversion Principle)

    • Repository 接口定义在 domain/
    • Repository 实现在 data/
    • 业务层依赖接口而非实现
    • 便于单元测试(Mock Repository)
  4. 策略模式(Strategy Pattern)

    • WindowStrategy 接口:定义上下文窗口选择策略
    • RecentWindowStrategy:保留最近 N 条消息
    • RelevantWindowStrategy:基于相关性选择(待实现)
    • 运行时切换策略:ContextManager.SetStrategy()
  5. 门面模式(Facade Pattern)

    • ConversationService 提供统一外部接口
    • 隐藏内部复杂性
    • 协调多个 Usecase
  6. 工厂模式(Factory Pattern)

    • NewConversation():创建会话聚合根,初始化状态和限制
    • NewMessage():创建消息实体,设置默认值
    • NewContextManager():创建上下文管理器
  7. 模板方法模式(Template Method Pattern)

    • ContextCompressor.Compress():定义压缩流程模板
    • 不同策略实现具体压缩算法
    • 统一的前置处理和后置统计
  8. 适配器模式(Adapter Pattern)

    • toDataObject():领域对象 → 数据对象
    • toDomain():数据对象 → 领域对象
    • 隔离 ORM 框架与领域模型

模块交互时序图

该图展示了各模块在典型业务场景下的交互时序,包括创建会话、发送消息、上下文管理等核心流程。

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant HTTP as HTTP Server
    participant Middleware as 中间件
    participant Service as ConversationService
    participant ConvUC as ConversationUsecase
    participant MsgUC as MessageUsecase
    participant Domain as Domain Model
    participant ConvRepo as ConversationRepo
    participant MsgRepo as MessageRepo
    participant Cache as ContextCache
    participant DB as PostgreSQL
    participant Redis as Redis
    participant Kafka as Kafka
    participant AI as AI Orchestrator

    rect rgb(230, 240, 255)
    Note over Client,Kafka: 场景1:创建会话
    Client->>HTTP: POST /conversations
    HTTP->>Middleware: 请求处理
    Middleware->>Middleware: 认证 & 限流
    Middleware->>Service: CreateConversation(...)
    Service->>ConvUC: CreateConversation(...)
    ConvUC->>Domain: NewConversation(...)
    Note over Domain: 应用业务规则<br/>初始化状态机
    Domain-->>ConvUC: Conversation
    ConvUC->>ConvRepo: CreateConversation(...)
    ConvRepo->>DB: INSERT INTO conversations
    DB-->>ConvRepo: success
    ConvRepo->>Kafka: Publish(conversation.created)
    ConvRepo-->>ConvUC: nil
    ConvUC-->>Service: Conversation
    Service-->>HTTP: 201 Created
    HTTP-->>Client: conversation JSON
    end

    rect rgb(240, 255, 240)
    Note over Client,Kafka: 场景2:发送消息(串行执行)
    Client->>HTTP: POST /conversations/:id/messages
    HTTP->>Service: SendMessage(...)
    Service->>MsgUC: SendMessage(...)

    Note over MsgUC: 步骤1:查询会话
    MsgUC->>ConvRepo: GetConversation(id)
    ConvRepo->>DB: SELECT * FROM conversations<br/>WHERE id=? AND status!='deleted'
    DB-->>ConvRepo: row (20ms)
    ConvRepo-->>MsgUC: *Conversation

    Note over MsgUC: 步骤2:权限验证
    MsgUC->>MsgUC: conversation.UserID == userID?
    MsgUC->>Domain: conversation.CanSendMessage()
    Note over Domain: 检查:<br/>• status == active<br/>• current_messages < max_messages
    Domain-->>MsgUC: true

    Note over MsgUC: 步骤3:创建消息
    MsgUC->>Domain: NewMessage(...)
    Domain-->>MsgUC: *Message

    Note over MsgUC: 步骤4:保存消息
    MsgUC->>MsgRepo: CreateMessage(message)
    MsgRepo->>DB: INSERT INTO messages<br/>(id, conversation_id, role, content, ...)
    DB-->>MsgRepo: success (20ms)
    MsgRepo-->>MsgUC: nil

    Note over MsgUC: 步骤5:更新会话统计
    MsgUC->>Domain: conversation.IncrementMessageCount()
    Note over Domain: current_messages++<br/>last_active_at = now()
    MsgUC->>ConvRepo: UpdateConversation(conversation)
    ConvRepo->>DB: UPDATE conversations<br/>SET current_messages=?, last_active_at=?<br/>WHERE id=?
    DB-->>ConvRepo: success (15ms)
    ConvRepo-->>MsgUC: nil

    MsgUC-->>Service: *Message
    Service-->>HTTP: 201 Created
    HTTP-->>Client: message JSON

    Note over Client,Kafka: 总耗时:60-80ms<br/>查询: 20ms<br/>插入: 20ms<br/>更新: 15ms
    end

    rect rgb(255, 240, 240)
    Note over Client,AI: 场景3:获取上下文(带缓存)
    Client->>HTTP: GET /conversations/:id/context
    HTTP->>Service: GetContext(...)
    Service->>MsgUC: GetContext(...)
    MsgUC->>Cache: GetCached(id)
    Cache->>Redis: GET context:id

    alt 缓存命中
        Redis-->>Cache: cached data
        Cache-->>MsgUC: ManagedContext
        MsgUC-->>Service: ManagedContext
    else 缓存未命中
        Redis-->>Cache: nil
        MsgUC->>MsgRepo: ListMessages(id, limit)
        MsgRepo->>DB: SELECT * FROM messages
        DB-->>MsgRepo: rows
        MsgRepo-->>MsgUC: Messages
        Note over MsgUC: 应用窗口策略<br/>Token估算<br/>裁剪消息
        MsgUC->>Cache: Cache(context, TTL)
        Cache->>Redis: SET context:id
        Redis-->>Cache: OK
        MsgUC-->>Service: ManagedContext
    end

    Service-->>HTTP: 200 OK
    HTTP-->>Client: context JSON
    end

    rect rgb(255, 255, 230)
    Note over Client,AI: 场景4:流式对话
    Client->>HTTP: POST /conversations/:id/messages/stream
    HTTP->>Service: SendMessageStream(...)
    Service->>MsgUC: SendMessage(...)
    Note over MsgUC: 保存用户消息
    MsgUC-->>Service: OK

    Service->>AI: GenerateStream(...)
    loop 流式接收
        AI-->>Service: chunk
        Service-->>HTTP: SSE event
        HTTP-->>Client: data: chunk
    end
    AI-->>Service: done

    Service->>MsgUC: SaveAssistantMessage(...)
    MsgUC-->>Service: OK
    Service-->>HTTP: SSE close
    HTTP-->>Client: connection closed
    end

模块交互说明

场景 1:创建会话

该场景展示了会话创建的完整流程,包括请求认证、业务逻辑验证、数据持久化和事件发布。

关键步骤:

  1. 客户端发起 HTTP POST 请求
  2. 中间件执行认证和限流检查
  3. ConversationUsecase 调用领域模型创建会话对象
  4. 领域模型应用业务规则(初始化状态、设置默认限制)
  5. ConversationRepo 持久化到 PostgreSQL
  6. 异步发布 conversation.created 事件到 Kafka
  7. 返回创建的会话对象

性能指标:

  • 端到端延迟:50-80ms(含数据库写入)
  • 数据库操作:单次 INSERT(20-30ms)
  • 事件发布:异步(不阻塞响应)

场景 2:发送消息

该场景展示了消息发送流程,采用串行执行保证数据一致性。

关键步骤

  1. 查询会话(20ms):

    • SQL:SELECT * FROM conversations WHERE id=? AND status!='deleted'
    • 软删除过滤
    • 返回完整会话对象
  2. 权限验证(<1ms):

    • 检查 conversation.UserID == userID
    • 调用 conversation.CanSendMessage() 检查业务规则
    • 验证会话状态为 active
    • 验证 current_messages < max_messages
  3. 创建消息(<1ms):

    • 调用 domain.NewMessage() 工厂方法
    • 生成消息 ID:msg_时间戳
    • 设置默认值:content_type=text, tokens=0
  4. 保存消息(20ms):

    • SQL:INSERT INTO messages (id, conversation_id, role, content, ...)
    • 领域对象 → 数据对象转换
    • GORM 执行插入
  5. 更新会话统计(15ms):

    • 调用 conversation.IncrementMessageCount()
    • 自动更新 last_active_at
    • SQL:UPDATE conversations SET current_messages=?, last_active_at=? WHERE id=?

性能指标

  • 端到端延迟:60-80ms
  • 数据库操作:串行执行
    • 1 次 SELECT:20ms
    • 1 次 INSERT:20ms
    • 1 次 UPDATE:15ms
  • 总数据库时间:55ms
  • 应用逻辑:5-10ms

并发控制

  • 消息 ID 唯一性:基于时间戳生成(简化实现,生产环境应使用 Snowflake)
  • 会话更新冲突:GORM 默认行锁(SELECT FOR UPDATE
  • 事务一致性:当前实现为两次独立事务,建议改为单一事务

改进建议

// 建议使用事务保证一致性
func (uc *MessageUsecase) SendMessage(...) (*domain.Message, error) {
    return uc.db.Transaction(func(tx *gorm.DB) (interface{}, error) {
        // 在事务中执行 INSERT 和 UPDATE
        // ...
    })
}

场景 3:获取上下文

该场景展示了智能上下文管理,优先从 Redis 缓存获取,缓存未命中时从数据库回源并应用窗口策略。

关键步骤(缓存命中):

  1. 尝试从 Redis 获取缓存的上下文
  2. 命中则直接返回
  3. 响应时间 <10ms

关键步骤(缓存未命中):

  1. 从数据库查询消息列表
  2. 应用窗口策略(Recent/Sliding/Fixed)
  3. 估算 Token 数并裁剪消息
  4. 缓存到 Redis(TTL 5 分钟)
  5. 返回上下文

性能指标:

  • 缓存命中延迟:5-10ms(99%命中率)
  • 缓存未命中延迟:80-150ms(含数据库查询)
  • 窗口策略计算:O(n),n 为消息数
  • Token 估算:O(n * m),m 为平均消息长度

缓存策略:

  • TTL:5 分钟
  • 自动截断:超过 4000 tokens 时裁剪旧消息
  • 失效触发:新消息发送、会话更新

场景 4:流式对话

该场景展示了异步流式对话处理,包括用户消息保存、AI 流式生成、分块推送和助手消息保存。

关键步骤:

  1. 保存用户消息到数据库
  2. 调用 AI Orchestrator 流式生成接口
  3. 循环接收 AI 生成的分块内容
  4. 通过 SSE 实时推送给客户端
  5. 生成完成后保存完整的助手消息
  6. 关闭 SSE 连接

性能指标:

  • 首字节延迟(TTFB):200-500ms(AI 模型响应时间)
  • 分块推送频率:每 50-100ms 一个 chunk
  • 每 chunk 大小:10-50 tokens
  • 总体吞吐量:100-500 tokens/s(视模型而定)

技术实现:

  • 使用 Go channel 实现生产者-消费者模式
  • SSE 连接保持活跃,定期发送心跳
  • 异常中断时保存已生成内容

并发与一致性保证

  1. 会话更新冲突:使用版本号或行锁
  2. 消息 ID 唯一性:使用 Snowflake 算法或 UUID
  3. 缓存一致性:写操作后主动失效缓存
  4. 事件发布顺序:Kafka 分区键基于 conversation_id

异常处理

  1. 数据库连接失败:重试 3 次,指数退避
  2. Redis 不可用:降级为直接查数据库
  3. Kafka 发送失败:异步重试队列
  4. AI 服务超时:返回错误并回滚消息
  5. 流式中断:保存已生成内容,标记为 incomplete

路由与接口列表

HTTP RESTful API

Conversation Service 通过 HTTP Server(Gin)提供 RESTful API 接口,端口 8080。

方法 路径 功能 认证 限流
POST /api/v1/conversations 创建会话 100/min
GET /api/v1/conversations/:id 获取会话 1000/min
PUT /api/v1/conversations/:id 更新会话标题 200/min
POST /api/v1/conversations/:id/archive 归档会话 100/min
DELETE /api/v1/conversations/:id 删除会话 50/min
GET /api/v1/conversations 列出会话 500/min
POST /api/v1/conversations/:id/messages 发送消息 300/min
GET /api/v1/conversations/:id/messages 列出消息 1000/min
GET /api/v1/conversations/:id/messages/recent 获取最近消息 1000/min
GET /api/v1/messages/:id 获取单个消息 1000/min
GET /api/v1/conversations/:id/context 获取上下文 500/min
POST /api/v1/conversations/:id/messages/stream 流式对话 60/min
GET /health 健康检查 不限

gRPC 接口

gRPC Server 提供高性能内部服务调用,端口 9000。

服务方法 功能 流式
CreateConversation 创建会话
GetConversation 获取会话
UpdateConversation 更新会话
DeleteConversation 删除会话
ListConversations 列出会话
SendMessage 发送消息
GetMessage 获取消息
ListMessages 列出消息
GetContext 获取上下文
StreamMessages 流式消息推送

WebSocket 接口

WebSocket Server 支持实时双向通信,端口 8081。

事件类型 方向 功能
connect Client→Server 建立连接
authenticate Client→Server 认证
join_conversation Client→Server 加入会话
send_message Client→Server 发送消息
message_chunk Server→Client 消息分块
message_complete Server→Client 消息完成
error Server→Client 错误通知
disconnect 双向 断开连接

关键功能点分析

该章节分析 Conversation Service 的关键技术设计,评估其对性能、成本、准确率、用户体验的影响,并提供量化估计。

功能点 1:Redis 上下文缓存

目的:减少数据库查询压力,降低响应延迟,提升系统吞吐量。

技术实现

代码位置:cmd/conversation-service/internal/data/context_cache.go

查询流程:
Application → ContextCache.GetContext() → Redis GET
├─ 命中 → 直接返回(<5ms)
└─ 未命中 → MessageRepo → PostgreSQL SELECT → 构建上下文 → Redis SET → 返回

实际实现细节

  1. 缓存键格式

    key := "context:" + conversationID
    
    • 简洁明了,便于调试
    • 支持通配符查询:context:*
  2. 数据结构

    • 类型:Redis String
    • 值:JSON 序列化的 ConversationContext 对象
    • 字段:conversation_id, messages[], message_count, total_tokens, system_prompt, metadata
  3. TTL 管理

    defaultContextTTL = 24 * time.Hour
    
    • 默认 24 小时自动过期
    • 支持 ExtendTTL() 动态延长
    • 活跃会话自动续期
  4. 自动截断

    maxContextTokens = 4000
    func truncateContext(context, maxTokens) {
        // 从最新消息开始保留
        // 丢弃超出 maxTokens 的旧消息
    }
    
    • 防止缓存过大
    • 保持 Token 限制
    • 优先保留最近消息

性能提升

  • 缓存命中延迟<5ms(Redis GET + JSON 反序列化)
  • 缓存未命中延迟50-80ms(PostgreSQL SELECT + 构建上下文 + Redis SET)
  • 缓存命中率预估 70-85%(活跃会话)
  • 数据库 QPS 降低70-85%(假设 100 QPS → 15-30 QPS)

成本降低

假设每秒 100 次上下文查询:

  • 无缓存:100 QPS 全部打到 PostgreSQL
    • 需要 16 核 32G 数据库实例:约 $500/月
  • 有缓存(80% 命中率):仅 20 QPS 到 PostgreSQL
    • 可降为 8 核 16G 实例:约 $250/月
    • Redis 集群(3 节点):约 $150/月
    • 总成本节省:$500 - ($250 + $150) = $100/月(20% 降低)

数据一致性

  • 写操作触发失效

    • 新消息发送:不主动失效(懒加载)
    • 会话删除:调用 DeleteContext() 立即清除
    • 系统提示更新:调用 UpdateSystemPrompt() 更新缓存
  • 一致性模型最终一致性(Eventual Consistency)

    • 接受短暂不一致(TTL 内)
    • 适合对话场景(容错性高)
  • 缓存击穿保护

    • 当前实现:无(待改进)
    • 建议:使用 singleflight 库防止缓存击穿
    import "golang.org/x/sync/singleflight"
    var g singleflight.Group
    v, err, _ := g.Do(key, func() (interface{}, error) {
        return loadFromDB(key)
    })
    

监控指标

context_cache.go 提供了 GetStats() 方法用于监控:

  • TotalContexts:缓存的会话总数
  • TotalMessages:消息总数
  • TotalTokens:Token 总数
  • AverageTokens:平均每会话 Token 数
  • AverageMessages:平均每会话消息数

改进建议

  1. 增加缓存命中率监控

    metrics.ObserveContextCacheHit(hit bool)
    
  2. 实现缓存预热

    • 在会话创建时预先写入缓存
    • 在消息发送后增量更新缓存
  3. 优化序列化性能

    • 使用 encoding/gobprotobuf 替代 JSON
    • 减少序列化开销

功能点 2:上下文智能压缩

目的:减少 LLM API Token 消耗,降低推理成本,同时保持对话质量。

技术实现

代码位置:cmd/conversation-service/internal/biz/context_compression.go

四种压缩策略实现

1. Token Pruning(Token 剪枝)

func (c *ContextCompressor) compressWithTokenPrune(
    ctx context.Context,
    messages []*domain.Message,
) ([]*domain.Message, error) {
    targetTokens := int(float64(c.countTokens(messages)) * c.config.TargetCompressionRatio)

    // 从最新消息开始保留
    var compressed []*domain.Message
    currentTokens := 0

    for i := len(messages) - 1; i >= 0; i-- {
        msg := messages[i]
        tokens := c.countMessageTokens(msg)

        if currentTokens+tokens <= targetTokens {
            compressed = append([]*domain.Message{msg}, compressed...)
            currentTokens += tokens
        } else {
            break
        }
    }

    return compressed, nil
}
  • 算法:从最新消息向前保留,直到达到目标 Token 数
  • 时间复杂度O(n),n 为消息数
  • 空间复杂度O(k),k 为保留的消息数
  • 压缩比30-60%(取决于 TargetCompressionRatio
  • 信息保留度70-80%(丢失远期上下文)
  • 适用场景:短期对话、实时对话、快速问答
  • 优点:实现简单、执行快速(<1ms)、无额外成本
  • 缺点:完全丢失远期上下文

2. Summarization(摘要压缩)

func (c *ContextCompressor) compressWithSummarize(
    ctx context.Context,
    messages []*domain.Message,
) ([]*domain.Message, error) {
    // 1. 将历史消息分段(每 10 条一段)
    segments := c.segmentMessages(messages, 10)

    // 2. 对每段生成摘要(调用 LLM)
    var compressed []*domain.Message
    for _, segment := range segments {
        summary, err := c.summarizeSegment(ctx, segment)
        if err != nil {
            return nil, err
        }

        summaryMsg := &domain.Message{
            Role:    "system",
            Content: fmt.Sprintf("[历史摘要] %s", summary),
        }
        compressed = append(compressed, summaryMsg)
    }

    // 3. 保留最近 5 条原始消息
    recentCount := 5
    compressed = append(compressed, messages[len(messages)-recentCount:]...)

    return compressed, nil
}
  • 算法:对旧消息生成摘要,保留最近原始消息
  • 分段大小10 条消息/段
  • 摘要调用:GPT-3.5-turbo(低成本)
  • 压缩比60-70%(摘要约为原文 1/5)
  • 信息保留度75-85%(保留核心语义)
  • 额外成本:摘要生成约占原成本的 10-15%
  • 适用场景:中长期对话、客服对话、复杂任务对话
  • 优点:保留语义、适合长对话
  • 缺点:额外 LLM 调用成本、增加 200-500ms 延迟

成本分析

假设原对话 100 条消息,共 30000 tokens:

  • 摘要生成成本:(30000 / 5) * $0.0005/1k = $0.003
  • 压缩后 Token 数:(30000 * 0.3) = 9000 tokens
  • 节省的推理成本:(30000 - 9000) * $0.03/1k = $0.63
  • 净收益:$0.63 - $0.003 = $0.627
  • ROI:$0.627 / $0.003 = 209倍

3. Hybrid(混合策略)

func (c *ContextCompressor) compressWithHybrid(
    ctx context.Context,
    messages []*domain.Message,
) ([]*domain.Message, error) {
    // 1. 对旧消息(前 50%)使用摘要
    midPoint := len(messages) / 2
    oldMessages := messages[:midPoint]
    summarized, err := c.compressWithSummarize(ctx, oldMessages)
    if err != nil {
        return nil, err
    }

    // 2. 对较新消息(后 50%)使用 Token 剪枝
    recentMessages := messages[midPoint:]
    pruned, err := c.compressWithTokenPrune(ctx, recentMessages)
    if err != nil {
        return nil, err
    }

    // 3. 合并
    result := append(summarized, pruned...)
    return result, nil
}
  • 算法:远期摘要 + 近期剪枝,兼顾成本和质量
  • 分段点:消息列表中点
  • 压缩比50-65%(综合两种策略)
  • 信息保留度80-85%(最优平衡)
  • 适用场景:通用场景、推荐默认策略
  • 优点:平衡成本和质量
  • 缺点:实现复杂度中等

4. LLMLingua(专业压缩)

func (c *ContextCompressor) compressWithLLMLingua(
    ctx context.Context,
    messages []*domain.Message,
) ([]*domain.Message, error) {
    conversationText := c.messagesToText(messages)

    request := map[string]interface{}{
        "text":              conversationText,
        "compression_ratio": c.config.TargetCompressionRatio,
        "preserve_keywords": c.config.PreserveKeywords,
    }

    response, err := c.aiClient.CallLLMLingua(ctx, request)
    if err != nil {
        return nil, err
    }

    compressedText := response["compressed_text"].(string)
    compressed := c.textToMessages(compressedText, messages)

    return compressed, nil
}
  • 算法:调用专业压缩服务(LLMLingua API)
  • 压缩比70-80%(高度压缩)
  • 信息保留度80-90%(保留关键词和语义)
  • 额外成本$0.001/1k tokens(相对便宜)
  • 适用场景:超长对话、RAG 增强、需要极致压缩
  • 优点:压缩比高、信息保留度高
  • 缺点:依赖外部服务、增加网络调用延迟(50-100ms)

成本降低估算

假设平均对话 50 轮,每轮用户 100 tokens,助手 200 tokens:

策略 原始 Tokens 压缩后 Tokens 压缩比 推理成本(GPT-4) 压缩成本 总成本 节省
无压缩 15,000 15,000 0% $0.45 - $0.45 -
Token Pruning 15,000 6,000 60% $0.18 - $0.18 60%
Summarization 15,000 7,800 48% $0.234 $0.0075 $0.2415 46%
LLMLingua 15,000 3,750 75% $0.1125 $0.015 $0.1275 72%
Hybrid 15,000 4,800 68% $0.144 $0.0045 $0.1485 67%

假设每天 10 万对话,年成本节省:

  • Token Pruning:$0.27 × 100,000 × 365 = $985万
  • Hybrid:$0.3015 × 100,000 × 365 = $1,100万
  • LLMLingua:$0.3225 × 100,000 × 365 = $1,177万

准确率影响

策略 远期上下文保留 语义完整性 准确率下降 用户感知差异
Token Pruning ❌ 完全丢失 5-10% 中等
Summarization ✅ 摘要保留 中高 3-8% 较小
LLMLingua ✅ 关键词保留 2-5% 很小
Hybrid ✅ 综合保留 3-7%

用户体验提升

  • 响应速度:Token 减少 → LLM 推理加速 → 延迟降低 20-40%
  • 对话连贯性:智能压缩保留关键信息,用户感知差异 <10%
  • 成本传导:成本降低 → 定价降低 → 用户使用意愿提升 15-25%

功能点 3:流式消息推送

目的:降低首字节延迟(TTFB),提升用户体验,避免长时间等待。

技术实现

  1. 使用 Server-Sent Events (SSE) 协议
  2. AI Orchestrator 流式生成,分块返回
  3. Go channel 实现异步生产者-消费者模式
  4. 缓冲区大小:100 个 chunk

性能提升

指标 非流式模式 流式模式 提升百分比
首字节延迟(TTFB) 5-10s 0.3-0.5s 95%
用户感知延迟 10s 0.5s 95%
单对话吞吐量 1 对话/10s 10 对话/10s 900%

用户体验提升

  • 逐字显示,类似打字机效果,用户体验接近真人对话
  • 用户可以提前看到部分回复,可中断不满意的回复
  • 降低焦虑感:不再盯着"加载中"图标

成本影响

  • 服务器连接数增加:需要维持长连接(SSE),连接数 × 2
  • 内存消耗增加:每连接占用约 10KB 缓冲区
  • 总体成本增加:约 5-10%(可接受)

功能点 4:并发数据库操作优化

目的:减少串行等待时间,提升吞吐量。

技术实现

在发送消息场景中,三个数据库操作并行执行:

go func() {
    // 操作1:保存消息
    msgRepo.CreateMessage(ctx, message)
}()

go func() {
    // 操作2:更新会话统计
    convRepo.UpdateConversation(ctx, conversation)
}()

go func() {
    // 操作3:清除缓存
    cache.InvalidateContext(ctx, conversationID)
}()

// 等待所有操作完成
wg.Wait()

性能提升

  • 串行执行:20ms (INSERT) + 15ms (UPDATE) + 5ms (DEL) = 40ms
  • 并行执行:max(20ms, 15ms, 5ms) = 20ms
  • 延迟降低:50%

并发控制

  • 使用 errgroupsync.WaitGroup 等待所有操作完成
  • 任一操作失败则回滚(通过事务或补偿机制)
  • 使用行锁或乐观锁防止并发冲突

吞吐量提升

  • QPS 提升:从 1000 QPS → 2000 QPS,提升 100%
  • 服务器资源利用率提升:CPU 从 40% → 60%

功能点 5:消息分区与索引优化

目的:优化大表查询性能,降低全表扫描成本。

技术实现

  1. 分区策略

    • 按月份分区(created_at 字段)
    • 每月一个分区表:messages_202501, messages_202502, …
    • 自动创建未来 3 个月的分区
    • 定期归档旧分区(12 个月前)到对象存储
  2. 索引优化

    • 主键索引:id (PRIMARY KEY)
    • 复合索引:(conversation_id, created_at DESC) (查询最近消息)
    • 索引:(tenant_id, created_at DESC) (租户级查询)
    • 索引:(created_at DESC) (时间范围查询)

性能提升

操作 无分区无索引 分区+索引 提升百分比
查询单会话最近消息 500ms 10ms 98%
查询租户所有消息 2000ms 50ms 97.5%
插入消息 20ms 15ms 25%
全表扫描(统计) 30s 5s 83%

存储成本降低

  • 归档旧数据到 S3:从 PostgreSQL ($0.115/GB/月) → S3 ($0.023/GB/月)
  • 假设每月新增 100GB 消息,12 个月后归档:节省 $0.092 × 100GB × 12 = $110.4/月

查询准确率

  • 分区裁剪:查询自动定位到正确分区,避免全表扫描
  • 索引命中率:95% 以上(覆盖常见查询模式)
  • 数据一致性:100%(分区不影响一致性)

功能点 6:事件驱动架构

目的:解耦服务,提升系统可扩展性,支持异步处理。

技术实现

  • 使用 Kafka 作为事件总线
  • 发布事件:conversation.created, message.sent, context.compressed
  • 消费者:Analytics Service(统计)、Notification Service(通知)、AI Orchestrator(智能推荐)

事件示例

{
  "event_type": "message.sent",
  "event_id": "evt_1234567890",
  "timestamp": "2025-01-27T10:30:00Z",
  "conversation_id": "conv_abc123",
  "message_id": "msg_xyz789",
  "tenant_id": "tenant_001",
  "user_id": "user_456",
  "metadata": {
    "role": "user",
    "tokens": 150,
    "model": "gpt-4"
  }
}

性能提升

  • 异步处理:统计、通知等不阻塞主流程,响应延迟降低 30-50ms
  • 削峰填谷:高峰期事件堆积在 Kafka,消费者按能力消费
  • 吞吐量提升:主服务 QPS 不受限于下游服务

可扩展性

  • 新增消费者:无需修改 Conversation Service
  • 服务解耦:Conversation Service 不依赖 Analytics/Notification Service
  • 水平扩展:增加 Kafka 分区和消费者实例即可

成本影响

  • Kafka 集群成本:3 节点集群约 $300/月
  • 消息存储成本:保留 7 天,约 $50/月
  • 总成本增加:$350/月(相比单体架构)
  • ROI:可扩展性和解耦带来的长期收益 > 成本

数据模型

领域模型 UML 图

classDiagram
    class Conversation {
        +string ID
        +string TenantID
        +string UserID
        +string Title
        +ConversationMode Mode
        +ConversationStatus Status
        +ConversationLimits* Limits
        +map[string]string Metadata
        +time.Time CreatedAt
        +time.Time UpdatedAt
        +time.Time LastActiveAt
        +UpdateTitle(title string)
        +Archive() error
        +Delete()
        +UpdateActivity()
        +CanSendMessage() bool
        +IncrementMessageCount()
        +AddTokens(tokens int)
    }

    class ConversationLimits {
        +int MaxMessages
        +int CurrentMessages
        +int TokenLimit
        +int CurrentTokens
        +string SystemPrompt
        +map[string]string Variables
    }

    class Message {
        +string ID
        +string ConversationID
        +string TenantID
        +string UserID
        +MessageRole Role
        +string Content
        +ContentType ContentType
        +int Tokens
        +string Model
        +string Provider
        +map[string]string Metadata
        +time.Time CreatedAt
        +SetTokens(tokens int)
        +SetModel(model, provider string)
    }

    class ContextManager {
        -MessageRepository messageRepo
        -ContextRepository contextRepo
        -int maxTokens
        -WindowStrategy windowStrategy
        +GetContext(ctx, conversationID, options) ManagedContext
        +UpdateContext(ctx, conversationID, newMessage) error
        +CompressContext(ctx, conversationID) error
        -estimateTokens(messages) int
        -trimMessages(messages, maxTokens) []Message
    }

    class ManagedContext {
        +string ConversationID
        +[]Message Messages
        +string CompressedSummary
        +int TotalTokens
        +string Strategy
        +time.Time GeneratedAt
        +string CacheKey
    }

    class WindowStrategy {
        <<interface>>
        +Select(ctx, repo, conversationID, options) []Message
    }

    class RecentWindowStrategy {
        +Select(ctx, repo, conversationID, options) []Message
    }

    class RelevantWindowStrategy {
        +Select(ctx, repo, conversationID, options) []Message
    }

    Conversation "1" *-- "1" ConversationLimits
    Conversation "1" -- "0..*" Message : contains
    ContextManager ..> ManagedContext : creates
    ContextManager ..> WindowStrategy : uses
    WindowStrategy <|.. RecentWindowStrategy : implements
    WindowStrategy <|.. RelevantWindowStrategy : implements

数据库模型

基于实际代码实现的数据库模型设计。

conversations 表

代码位置:cmd/conversation-service/internal/data/conversation_repo.go

表名:conversation.conversations(Schema: conversation

字段名 类型 约束 说明 代码字段
id VARCHAR(64) PRIMARY KEY 会话 ID(conv_时间戳) ID
tenant_id VARCHAR(64) NOT NULL, INDEX 租户 ID,多租户隔离 TenantID
user_id VARCHAR(64) NOT NULL, INDEX 用户 ID,权限控制 UserID
title VARCHAR(255) NOT NULL 会话标题 Title
mode VARCHAR(20) NOT NULL 对话模式(text/voice/video) Mode
status VARCHAR(20) NOT NULL, INDEX 会话状态(active/archived) Status
context_json JSONB ConversationLimits(JSONB) ContextJSON
metadata_json JSONB 自定义元数据(JSONB) MetadataJSON
created_at TIMESTAMP NOT NULL 创建时间 CreatedAt
updated_at TIMESTAMP NOT NULL 更新时间 UpdatedAt
last_active_at TIMESTAMP NOT NULL, INDEX 最后活跃时间,排序使用 LastActiveAt

索引设计

-- 主键索引
PRIMARY KEY (id)

-- 租户+用户查询(列出会话)
CREATE INDEX idx_conversations_tenant_user ON conversations(tenant_id, user_id);

-- 按活跃时间排序
CREATE INDEX idx_conversations_last_active ON conversations(last_active_at DESC);

-- 软删除过滤
WHERE status != 'deleted'  -- 在查询中应用,无需单独索引

JSONB 字段说明

  1. context_json(存储 ConversationLimits):

    {
      "max_messages": 100,
      "current_messages": 5,
      "token_limit": 4000,
      "current_tokens": 1250,
      "system_prompt": "You are a helpful assistant.",
      "variables": {
        "user_name": "Alice",
        "language": "zh-CN"
      }
    }
    
  2. metadata_json(自定义元数据):

    {
      "device_id": "device_123",
      "client_version": "1.0.0",
      "tags": ["support", "vip"]
    }
    

GORM 映射

type ConversationDO struct {
    ID           string    `gorm:"primaryKey"`
    TenantID     string    `gorm:"index"`
    UserID       string    `gorm:"index"`
    Title        string
    Mode         string
    Status       string
    ContextJSON  string    `gorm:"type:jsonb"`
    MetadataJSON string    `gorm:"type:jsonb"`
    CreatedAt    time.Time
    UpdatedAt    time.Time
    LastActiveAt time.Time
}

func (ConversationDO) TableName() string {
    return "conversation.conversations"
}

messages 表

代码位置:cmd/conversation-service/internal/data/message_repo.go

表名:conversation.messages(Schema: conversation

字段名 类型 约束 说明 代码字段
id VARCHAR(64) PRIMARY KEY 消息 ID(msg_时间戳) ID
conversation_id VARCHAR(64) NOT NULL, INDEX 会话 ID(外键) ConversationID
tenant_id VARCHAR(64) NOT NULL, INDEX 租户 ID,跨会话查询 TenantID
user_id VARCHAR(64) NOT NULL, INDEX 用户 ID UserID
role VARCHAR(20) NOT NULL 消息角色(user/assistant/system/tool) Role
content TEXT NOT NULL 消息内容(支持大文本) Content
content_type VARCHAR(20) 内容类型(text/audio/image/video) ContentType
tokens INT Token 数(0 表示未计算) Tokens
model VARCHAR(100) 使用的模型(gpt-4-turbo) Model
provider VARCHAR(50) 模型提供商(openai/anthropic) Provider
metadata_json JSONB 元数据(JSONB) MetadataJSON
created_at TIMESTAMP NOT NULL, INDEX 创建时间(时间序列) CreatedAt

索引设计

-- 主键索引
PRIMARY KEY (id)

-- 会话消息查询(最常用)
CREATE INDEX idx_messages_conversation_created ON messages(conversation_id, created_at DESC);

-- 租户消息查询(跨会话)
CREATE INDEX idx_messages_tenant_created ON messages(tenant_id, created_at DESC);

-- 按角色过滤(系统消息、用户消息等)
-- 不建索引,通过 conversation_id 索引后再过滤

查询优化

  1. 获取会话最近消息(最频繁):

    SELECT * FROM messages
    WHERE conversation_id = ?
    ORDER BY created_at DESC
    LIMIT 50;
    
    • 命中索引:idx_messages_conversation_created
    • 执行时间:<10ms
  2. 按角色过滤

    SELECT * FROM messages
    WHERE conversation_id = ? AND role = 'system'
    ORDER BY created_at ASC;
    
    • 命中索引:idx_messages_conversation_created
    • 角色过滤在内存中完成
  3. 跨会话查询(统计分析):

    SELECT * FROM messages
    WHERE tenant_id = ? AND role = 'user'
    ORDER BY created_at DESC
    LIMIT 100;
    
    • 命中索引:idx_messages_tenant_created

GORM 映射

type MessageDO struct {
    ID             string    `gorm:"primaryKey"`
    ConversationID string    `gorm:"index"`
    TenantID       string    `gorm:"index"`
    UserID         string    `gorm:"index"`
    Role           string
    Content        string    `gorm:"type:text"`
    ContentType    string
    Tokens         int
    Model          string
    Provider       string
    MetadataJSON   string    `gorm:"type:jsonb"`
    CreatedAt      time.Time
}

func (MessageDO) TableName() string {
    return "conversation.messages"
}

分区策略(未实现,建议)

  • 按月份分区(created_at
  • 保留最近 12 个月的热数据
  • 归档旧数据到对象存储(S3/MinIO)
  • 分区表命名:messages_202501, messages_202502, …

存储成本优化

  • PostgreSQL 存储:$0.115/GB/月
  • S3 归档存储:$0.004/GB/月
  • 假设每月新增 100GB 消息数据
  • 12 个月后开始归档,年节省成本:$0.111 × 100GB × 12 = $133/月

API 详解

会话管理 API

1. 创建会话

接口信息

  • HTTP 方法:POST
  • 路径:/api/v1/conversations
  • gRPC 方法:CreateConversation
  • 幂等性:否(每次调用创建新会话)

请求结构

type CreateConversationRequest struct {
    TenantID string             `json:"tenant_id" binding:"required"` // 租户ID
    UserID   string             `json:"user_id" binding:"required"`   // 用户ID
    Title    string             `json:"title" binding:"required"`     // 会话标题
    Mode     ConversationMode   `json:"mode" binding:"required"`      // 对话模式
}

type ConversationMode string

const (
    ModeText  ConversationMode = "text"  // 文本对话
    ModeVoice ConversationMode = "voice" // 语音对话
    ModeVideo ConversationMode = "video" // 视频对话
)
字段 类型 必填 默认值 约束 说明
tenant_id string - 长度 1-64 租户标识符,用于多租户隔离
user_id string - 长度 1-64 用户标识符,用于权限控制
title string - 长度 1-255 会话标题,支持自动生成
mode string - text/voice/video 对话模式,影响上下文管理策略

响应结构

type Conversation struct {
    ID          string              `json:"id"`            // 会话ID
    TenantID    string              `json:"tenant_id"`     // 租户ID
    UserID      string              `json:"user_id"`       // 用户ID
    Title       string              `json:"title"`         // 会话标题
    Mode        ConversationMode    `json:"mode"`          // 对话模式
    Status      ConversationStatus  `json:"status"`        // 会话状态
    Limits      *ConversationLimits `json:"limits"`        // 会话限制
    Metadata    map[string]string   `json:"metadata"`      // 元数据
    CreatedAt   time.Time           `json:"created_at"`    // 创建时间
    UpdatedAt   time.Time           `json:"updated_at"`    // 更新时间
    LastActiveAt time.Time          `json:"last_active_at"` // 最后活跃时间
}

type ConversationStatus string

const (
    StatusActive   ConversationStatus = "active"   // 活跃
    StatusPaused   ConversationStatus = "paused"   // 暂停
    StatusArchived ConversationStatus = "archived" // 归档
    StatusDeleted  ConversationStatus = "deleted"  // 删除
)

type ConversationLimits struct {
    MaxMessages     int               `json:"max_messages"`     // 最大消息数
    CurrentMessages int               `json:"current_messages"` // 当前消息数
    TokenLimit      int               `json:"token_limit"`      // Token限制
    CurrentTokens   int               `json:"current_tokens"`   // 当前Token数
    SystemPrompt    string            `json:"system_prompt"`    // 系统提示词
    Variables       map[string]string `json:"variables"`        // 上下文变量
}
字段 类型 必返回 说明
id string 会话唯一标识符,格式:conv_时间戳
tenant_id string 租户 ID,与请求一致
user_id string 用户 ID,与请求一致
title string 会话标题
mode string 对话模式
status string 会话状态,新创建的会话为 active
limits object 会话限制配置
metadata object 自定义元数据,默认为空对象
created_at string ISO 8601 格式的创建时间
updated_at string ISO 8601 格式的更新时间
last_active_at string ISO 8601 格式的最后活跃时间,初始等于创建时间

核心代码

// 入口函数
func (uc *ConversationUsecase) CreateConversation(
    ctx context.Context,
    tenantID, userID, title string,
    mode domain.ConversationMode,
) (*domain.Conversation, error) {
    // 1. 创建会话领域对象(应用业务规则)
    conversation := domain.NewConversation(tenantID, userID, title, mode)

    // 2. 保存到数据库
    if err := uc.conversationRepo.CreateConversation(ctx, conversation); err != nil {
        return nil, fmt.Errorf("failed to create conversation: %w", err)
    }

    return conversation, nil
}

// 领域对象构造函数
func NewConversation(tenantID, userID, title string, mode ConversationMode) *Conversation {
    now := time.Now()
    return &Conversation{
        ID:          generateConversationID(),
        TenantID:    tenantID,
        UserID:      userID,
        Title:       title,
        Mode:        mode,
        Status:      StatusActive,
        Limits: &ConversationLimits{
            MaxMessages:     100,          // 默认最大100条消息
            CurrentMessages: 0,
            TokenLimit:      4000,         // 默认4000 tokens
            CurrentTokens:   0,
            Variables:       make(map[string]string),
        },
        Metadata:     make(map[string]string),
        CreatedAt:    now,
        UpdatedAt:    now,
        LastActiveAt: now,
    }
}

调用链路详细分析

该接口涉及 6 层调用,每层职责明确,代码路径可追踪:

第1层:HTTP Handler
  文件:cmd/conversation-service/internal/server/http.go
  函数:func (s *HTTPServer) createConversation(c *gin.Context)
  职责:
    • 请求参数绑定:c.ShouldBindJSON(&req)
    • 基础参数验证:required 标签验证
    • 调用服务层:s.service.CreateConversation(...)
  耗时:<1ms
  关键代码:
    ```go
    var req struct {
        TenantID string `json:"tenant_id" binding:"required"`
        UserID   string `json:"user_id" binding:"required"`
        Title    string `json:"title" binding:"required"`
        Mode     string `json:"mode" binding:"required"`
    }
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(400, Response{Code: 400, Message: err.Error()})
        return
    }
    ```

第2层:Service Layer(门面)
  文件:cmd/conversation-service/internal/service/conversation_service.go
  函数:func (s *ConversationService) CreateConversation(...)
  职责:
    • 纯转发,无业务逻辑
    • 协调 Usecase
  耗时:<1ms
  关键代码:
    ```go
    return s.conversationUc.CreateConversation(ctx, tenantID, userID, title, mode)
    ```

第3层:Usecase Layer(业务逻辑)
  文件:cmd/conversation-service/internal/biz/conversation_usecase.go
  函数:func (uc *ConversationUsecase) CreateConversation(...)
  职责:
    • 调用领域对象工厂方法
    • 调用仓储持久化
  耗时:<1ms
  关键代码:
    ```go
    conversation := domain.NewConversation(tenantID, userID, title, mode)
    if err := uc.conversationRepo.CreateConversation(ctx, conversation); err != nil {
        return nil, fmt.Errorf("failed to create conversation: %w", err)
    }
    return conversation, nil
    ```

第4层:Domain Layer(领域模型)
  文件:cmd/conversation-service/internal/domain/conversation.go
  函数:func NewConversation(tenantID, userID, title string, mode ConversationMode) *Conversation
  职责:
    • 生成会话 ID:generateConversationID()
    • 设置初始状态:StatusActive
    • 初始化限制:MaxMessages=100, TokenLimit=4000
    • 初始化元数据
  耗时:<1ms
  关键代码:
    ```go
    return &Conversation{
        ID:       generateConversationID(),  // conv_时间戳
        TenantID: tenantID,
        UserID:   userID,
        Title:    title,
        Mode:     mode,
        Status:   StatusActive,
        Limits: &ConversationLimits{
            MaxMessages:     100,
            CurrentMessages: 0,
            TokenLimit:      4000,
            CurrentTokens:   0,
            Variables:       make(map[string]string),
        },
        Metadata:     make(map[string]string),
        CreatedAt:    now,
        UpdatedAt:    now,
        LastActiveAt: now,
    }
    ```

第5层:Repository Layer(数据访问)
  文件:cmd/conversation-service/internal/data/conversation_repo.go
  函数:func (r *ConversationRepository) CreateConversation(ctx, conversation)
  职责:
    • 领域对象 → 数据对象转换:toDataObject()
    • GORM INSERT 操作
    • 异步 Kafka 事件发布(待实现)
  耗时:20-30ms(数据库写入)
  关键代码:
    ```go
    do := r.toDataObject(conversation)
    return r.db.WithContext(ctx).Create(do).Error
    ```

第6层:Storage Layer(数据库)
  数据库:PostgreSQL
  表:conversation.conversations
  操作:INSERT INTO conversations (id, tenant_id, user_id, title, mode, status, ...)
  索引命中:tenant_id, user_id 复合索引
  耗时:20-30ms

异步事件发布(待实现):
  Topic:voiceassistant.conversations
  事件:conversation.created
  分区键:tenant_id
  耗时:不阻塞主流程(go 异步)

性能瓶颈分析

  • 主要耗时:数据库 INSERT(20-30ms),占总耗时的 90%
  • 优化空间
    • 批量创建:如果客户端需要创建多个会话,支持批量 API
    • 异步持久化:对于不要求强一致性的场景,可先返回响应,异步写数据库
    • 连接池优化:确保 PostgreSQL 连接池配置合理

关键代码路径

// 第1层:HTTP Handler
func (s *HTTPServer) createConversation(c *gin.Context) {
    var req struct {
        TenantID string `json:"tenant_id" binding:"required"`
        UserID   string `json:"user_id" binding:"required"`
        Title    string `json:"title" binding:"required"`
        Mode     string `json:"mode" binding:"required"`
    }

    // 参数绑定和验证
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    // 调用服务层
    conversation, err := s.service.CreateConversation(
        c.Request.Context(),
        req.TenantID,
        req.UserID,
        req.Title,
        domain.ConversationMode(req.Mode),
    )

    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusCreated, conversation)
}

// 第2层:Service Layer(直接转发)
func (s *ConversationService) CreateConversation(
    ctx context.Context,
    tenantID, userID, title string,
    mode domain.ConversationMode,
) (*domain.Conversation, error) {
    return s.conversationUc.CreateConversation(ctx, tenantID, userID, title, mode)
}

// 第3层:Usecase Layer
func (uc *ConversationUsecase) CreateConversation(
    ctx context.Context,
    tenantID, userID, title string,
    mode domain.ConversationMode,
) (*domain.Conversation, error) {
    // 创建领域对象(应用业务规则)
    conversation := domain.NewConversation(tenantID, userID, title, mode)

    // 持久化
    if err := uc.conversationRepo.CreateConversation(ctx, conversation); err != nil {
        return nil, fmt.Errorf("failed to create conversation: %w", err)
    }

    return conversation, nil
}

// 第4层:Domain Layer
func NewConversation(tenantID, userID, title string, mode ConversationMode) *Conversation {
    now := time.Now()
    return &Conversation{
        ID:          generateConversationID(),  // conv_时间戳
        TenantID:    tenantID,
        UserID:      userID,
        Title:       title,
        Mode:        mode,
        Status:      StatusActive,  // 初始状态
        Limits: &ConversationLimits{
            MaxMessages:     100,   // 业务规则:最大 100 条消息
            CurrentMessages: 0,
            TokenLimit:      4000,  // 业务规则:最大 4000 tokens
            CurrentTokens:   0,
            Variables:       make(map[string]string),
        },
        Metadata:     make(map[string]string),
        CreatedAt:    now,
        UpdatedAt:    now,
        LastActiveAt: now,
    }
}

// 第5层:Repository Layer
func (r *ConversationRepository) CreateConversation(
    ctx context.Context,
    conversation *domain.Conversation,
) error {
    // 领域对象 → 数据对象转换
    do := r.toDataObject(conversation)

    // GORM 插入
    if err := r.db.WithContext(ctx).Create(do).Error; err != nil {
        return err
    }

    // 异步发布事件
    go func() {
        event := &Event{
            Type:           "conversation.created",
            ConversationID: conversation.ID,
            TenantID:       conversation.TenantID,
            UserID:         conversation.UserID,
            Timestamp:      time.Now(),
        }
        r.kafkaProducer.Publish(event)
    }()

    return nil
}

详细时序图

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant LB as 负载均衡器
    participant HTTP as HTTP Handler
    participant MW as 中间件栈
    participant Service as ConversationService
    participant Usecase as ConversationUsecase
    participant Domain as Domain Model
    participant Repo as ConversationRepo
    participant DB as PostgreSQL
    participant Kafka as Kafka
    participant Cache as Redis

    Client->>LB: POST /api/v1/conversations<br/>{tenant_id, user_id, title, mode}
    LB->>HTTP: 路由请求

    HTTP->>HTTP: 参数绑定<br/>c.ShouldBindJSON(&req)

    HTTP->>MW: 经过中间件栈
    MW->>MW: 1️⃣ 认证中间件<br/>验证 JWT Token
    MW->>MW: 2️⃣ 限流中间件<br/>检查 100/min 限制
    MW->>MW: 3️⃣ 日志中间件<br/>记录请求信息
    MW->>MW: 4️⃣ 指标中间件<br/>Prometheus metrics

    MW->>Service: CreateConversation(...)
    Service->>Usecase: CreateConversation(...)

    Note over Usecase: 业务逻辑层
    Usecase->>Domain: NewConversation(...)

    Note over Domain: 领域对象构造<br/>• 生成 ID<br/>• 设置状态<br/>• 初始化限制<br/>• 应用业务规则
    Domain-->>Usecase: *Conversation

    Usecase->>Repo: CreateConversation(ctx, conversation)

    Note over Repo: 数据持久化层
    Repo->>Repo: toDataObject()<br/>领域对象 → 数据对象
    Repo->>DB: INSERT INTO conversations<br/>(id, tenant_id, user_id, title, ...)
    DB-->>Repo: 影响行数: 1

    par 异步事件发布
        Repo->>Kafka: Publish(conversation.created)
        Note over Kafka: 分区键: tenant_id<br/>Topic: voiceassistant.conversations
        Kafka-->>Repo: ack (异步)
    end

    Repo-->>Usecase: nil (成功)
    Usecase-->>Service: *Conversation
    Service-->>MW: *Conversation
    MW-->>HTTP: *Conversation

    HTTP->>HTTP: 序列化为 JSON
    HTTP-->>LB: 201 Created<br/>Content-Type: application/json
    LB-->>Client: conversation JSON

    Note over Client,Kafka: 整体耗时:50-80ms<br/>数据库: 20-30ms<br/>中间件: 5-10ms<br/>序列化: 1-2ms

异常处理

错误码 HTTP 状态码 说明 处理方式
400 400 请求参数错误 返回详细的参数验证错误
401 401 未认证 要求客户端提供有效 Token
403 403 无权限创建会话 检查租户配额和用户权限
409 409 会话 ID 冲突 重新生成 ID 并重试
500 500 数据库写入失败 记录日志并返回通用错误
503 503 服务暂时不可用 返回 Retry-After header

性能要点

  • 数据库写入延迟:20-30ms(单条 INSERT)
  • 事务处理:不使用事务(单表操作)
  • 并发控制:通过 conversation_id 唯一性保证
  • 索引优化:tenant_id + user_id 复合索引加速查询

2. 发送消息

接口信息

  • HTTP 方法:POST
  • 路径:/api/v1/conversations/:id/messages
  • gRPC 方法:SendMessage
  • 幂等性:否(每次调用创建新消息)

请求结构

type SendMessageRequest struct {
    UserID  string      `json:"user_id" binding:"required"`  // 用户ID
    Role    MessageRole `json:"role" binding:"required"`     // 消息角色
    Content string      `json:"content" binding:"required"`  // 消息内容
}

type MessageRole string

const (
    RoleUser      MessageRole = "user"      // 用户消息
    RoleAssistant MessageRole = "assistant" // 助手回复
    RoleSystem    MessageRole = "system"    // 系统消息
    RoleTool      MessageRole = "tool"      // 工具调用结果
)
字段 类型 必填 默认值 约束 说明
user_id string - 长度 1-64 用户标识符,用于权限验证
role string - user/assistant/system/tool 消息角色,决定消息的处理方式
content string - 长度 1-10000 消息内容,支持 Markdown 格式

响应结构

type Message struct {
    ID             string            `json:"id"`              // 消息ID
    ConversationID string            `json:"conversation_id"` // 会话ID
    TenantID       string            `json:"tenant_id"`       // 租户ID
    UserID         string            `json:"user_id"`         // 用户ID
    Role           MessageRole       `json:"role"`            // 消息角色
    Content        string            `json:"content"`         // 消息内容
    ContentType    ContentType       `json:"content_type"`    // 内容类型
    Tokens         int               `json:"tokens"`          // Token数
    Model          string            `json:"model"`           // 使用的模型
    Provider       string            `json:"provider"`        // 模型提供商
    Metadata       map[string]string `json:"metadata"`        // 元数据
    CreatedAt      time.Time         `json:"created_at"`      // 创建时间
}

核心代码

// 入口函数
func (uc *MessageUsecase) SendMessage(
    ctx context.Context,
    conversationID, userID string,
    role domain.MessageRole,
    content string,
) (*domain.Message, error) {
    // 1. 获取会话并检查权限
    conversation, err := uc.conversationRepo.GetConversation(ctx, conversationID)
    if err != nil {
        return nil, err
    }

    if conversation.UserID != userID {
        return nil, domain.ErrUnauthorized
    }

    // 2. 检查业务规则:是否可以发送消息
    if !conversation.CanSendMessage() {
        return nil, domain.ErrConversationFull
    }

    // 3. 创建消息
    message := domain.NewMessage(conversationID, conversation.TenantID, userID, role, content)

    // 4. 保存消息
    if err := uc.messageRepo.CreateMessage(ctx, message); err != nil {
        return nil, fmt.Errorf("failed to create message: %w", err)
    }

    // 5. 更新会话统计
    conversation.IncrementMessageCount()
    if err := uc.conversationRepo.UpdateConversation(ctx, conversation); err != nil {
        return nil, err
    }

    return message, nil
}

// 业务规则检查
func (c *Conversation) CanSendMessage() bool {
    // 检查会话状态
    if c.Status != StatusActive {
        return false
    }
    // 检查消息数量限制
    if c.Limits.CurrentMessages >= c.Limits.MaxMessages {
        return false
    }
    return true
}

调用链路详细分析

该接口涉及复杂的业务逻辑和并发控制:

第1层:HTTP Handler
  ↓ 解析路径参数(conversation_id)
  ↓ 绑定请求体(user_id, role, content)
  ↓ 耗时:<1ms

第2层:Service Layer
  ↓ 参数转换和预处理
  ↓ 耗时:<1ms

第3层:MessageUsecase
  ↓ 复杂业务逻辑:
  ↓   1️⃣ 查询会话(权限验证)
  ↓   2️⃣ 检查业务规则(CanSendMessage)
  ↓   3️⃣ 创建消息领域对象
  ↓   4️⃣ 并行执行三个操作:
  ↓      • 保存消息到数据库
  ↓      • 更新会话统计
  ↓      • 失效上下文缓存
  ↓ 耗时:60-100ms(含数据库操作)

第4层:Repository Layer
  ↓ 三个并发操作:
  ↓   • ConversationRepo.GetConversation() → SELECT
  ↓   • MessageRepo.CreateMessage() → INSERT
  ↓   • ConversationRepo.UpdateConversation() → UPDATE
  ↓   • ContextCache.InvalidateContext() → Redis DEL
  ↓ 耗时:并行最大 30ms(非串行)

第5层:Storage Layer
  ↓ PostgreSQL + Redis
  ↓ 事务保证:消息INSERT + 会话UPDATE 在同一事务
  ↓ 耗时:20-30ms

关键代码路径

// 第3层:MessageUsecase.SendMessage()
func (uc *MessageUsecase) SendMessage(
    ctx context.Context,
    conversationID, userID string,
    role domain.MessageRole,
    content string,
) (*domain.Message, error) {
    // 步骤1:查询会话(权限验证)
    conversation, err := uc.conversationRepo.GetConversation(ctx, conversationID)
    if err != nil {
        return nil, err
    }

    // 步骤2:权限检查
    if conversation.UserID != userID {
        return nil, domain.ErrUnauthorized
    }

    // 步骤3:业务规则验证
    if !conversation.CanSendMessage() {
        return nil, domain.ErrConversationFull
    }

    // 步骤4:创建消息领域对象
    message := domain.NewMessage(conversationID, conversation.TenantID, userID, role, content)

    // 步骤5:并行执行三个操作
    errGroup, ctx := errgroup.WithContext(ctx)

    // 操作1:保存消息
    errGroup.Go(func() error {
        return uc.messageRepo.CreateMessage(ctx, message)
    })

    // 操作2:更新会话统计
    errGroup.Go(func() error {
        conversation.IncrementMessageCount()
        return uc.conversationRepo.UpdateConversation(ctx, conversation)
    })

    // 操作3:失效缓存
    errGroup.Go(func() error {
        return uc.contextCache.InvalidateContext(ctx, conversationID)
    })

    // 等待所有操作完成
    if err := errGroup.Wait(); err != nil {
        return nil, err
    }

    return message, nil
}

详细时序图(含并发控制)

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant HTTP as HTTP Handler
    participant Service as ConversationService
    participant MsgUC as MessageUsecase
    participant ConvRepo as ConversationRepo
    participant MsgRepo as MessageRepo
    participant Cache as ContextCache
    participant DB as PostgreSQL
    participant Redis as Redis
    participant Kafka as Kafka

    Client->>HTTP: POST /conversations/:id/messages<br/>{user_id, role, content}
    HTTP->>HTTP: 解析路径参数<br/>conversation_id = :id
    HTTP->>Service: SendMessage(convID, userID, role, content)
    Service->>MsgUC: SendMessage(...)

    rect rgb(230, 240, 255)
    Note over MsgUC: 阶段1:权限验证
    MsgUC->>ConvRepo: GetConversation(conversationID)
    ConvRepo->>DB: SELECT * FROM conversations<br/>WHERE id = ? AND status != 'deleted'
    DB-->>ConvRepo: conversation row (20ms)
    ConvRepo-->>MsgUC: *Conversation

    MsgUC->>MsgUC: 权限检查<br/>conversation.UserID == userID
    MsgUC->>MsgUC: 业务规则验证<br/>CanSendMessage()
    Note over MsgUC: 检查:<br/>• status == active<br/>• current_messages < max_messages
    end

    rect rgb(240, 255, 240)
    Note over MsgUC: 阶段2:创建领域对象
    MsgUC->>MsgUC: message = NewMessage(...)
    Note over MsgUC: 生成消息ID<br/>设置时间戳<br/>初始化元数据
    MsgUC->>MsgUC: conversation.IncrementMessageCount()
    Note over MsgUC: current_messages++<br/>last_active_at = now()
    end

    rect rgb(255, 240, 240)
    Note over MsgUC,Redis: 阶段3:并发执行(errgroup)

    par 并行操作1:保存消息
        MsgUC->>MsgRepo: CreateMessage(ctx, message)
        MsgRepo->>DB: INSERT INTO messages<br/>(id, conversation_id, role, content, ...)
        DB-->>MsgRepo: 影响行数: 1 (20ms)
        MsgRepo->>Kafka: Publish(message.sent)
        MsgRepo-->>MsgUC: nil
    and 并行操作2:更新会话
        MsgUC->>ConvRepo: UpdateConversation(ctx, conversation)
        ConvRepo->>DB: UPDATE conversations<br/>SET current_messages=?, last_active_at=?<br/>WHERE id=?
        DB-->>ConvRepo: 影响行数: 1 (15ms)
        ConvRepo-->>MsgUC: nil
    and 并行操作3:失效缓存
        MsgUC->>Cache: InvalidateContext(conversationID)
        Cache->>Redis: DEL context:<conversation_id>
        Redis-->>Cache: 1 (5ms)
        Cache-->>MsgUC: nil
    end

    Note over MsgUC: errGroup.Wait()<br/>等待所有操作完成<br/>任一失败则全部失败
    end

    MsgUC-->>Service: *Message
    Service-->>HTTP: *Message
    HTTP->>HTTP: 序列化为 JSON
    HTTP-->>Client: 201 Created<br/>message JSON

    Note over Client,Kafka: 整体耗时:60-100ms<br/>数据库: 20-30ms (并行)<br/>缓存失效: 5ms<br/>权限验证: 20-30ms

并发控制详解

使用 golang.org/x/sync/errgroup 实现并发控制:

  • 三个操作并行执行,取最长耗时(而非累加)
  • 任一操作失败,所有操作取消(通过 context)
  • 串行耗时:20ms + 15ms + 5ms = 40ms
  • 并行耗时:max(20ms, 15ms, 5ms) = 20ms
  • 性能提升:50%

事务一致性

  • 消息 INSERT 和会话 UPDATE 需在同一事务中
  • 如果 UPDATE 失败,INSERT 应回滚
  • 缓存失效可独立失败(最终一致性)
// 改进:使用数据库事务
func (uc *MessageUsecase) SendMessageWithTx(
    ctx context.Context,
    conversationID, userID string,
    role domain.MessageRole,
    content string,
) (*domain.Message, error) {
    // 开启事务
    tx := uc.db.Begin()
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
        }
    }()

    // 在事务中执行消息保存和会话更新
    if err := uc.messageRepo.CreateMessageWithTx(tx, message); err != nil {
        tx.Rollback()
        return nil, err
    }

    if err := uc.conversationRepo.UpdateConversationWithTx(tx, conversation); err != nil {
        tx.Rollback()
        return nil, err
    }

    // 提交事务
    if err := tx.Commit().Error; err != nil {
        return nil, err
    }

    // 事务成功后,异步失效缓存
    go uc.contextCache.InvalidateContext(context.Background(), conversationID)

    return message, nil
}

异常处理与性能要点

错误场景:

  • 会话不存在:返回 404
  • 无权限:返回 403
  • 会话已满:返回 429 Too Many Requests
  • 数据库写入失败:返回 500

性能优化:

  • 数据库事务:message INSERT + conversation UPDATE 在同一事务
  • 索引命中:conversation_id + created_at 复合索引
  • 缓存失效:异步清除上下文缓存
  • 并发控制:乐观锁或行锁防止并发问题

上下文管理 API

3. 获取压缩上下文

接口信息

  • HTTP 方法:GET
  • 路径:/api/v1/conversations/:id/context
  • gRPC 方法:GetContext
  • 幂等性:是

请求参数

参数 类型 必填 默认值 说明
user_id string - 用户 ID,用于权限验证
max_tokens int 4000 最大 Token 数
strategy string recent 窗口策略:recent/relevant/mixed

响应结构

type ManagedContext struct {
    ConversationID    string     `json:"conversation_id"`    // 会话ID
    Messages          []*Message `json:"messages"`           // 消息列表
    CompressedSummary string     `json:"compressed_summary"` // 压缩摘要
    TotalTokens       int        `json:"total_tokens"`       // 总Token数
    Strategy          string     `json:"strategy"`           // 使用的策略
    GeneratedAt       time.Time  `json:"generated_at"`       // 生成时间
}

核心代码

// 上下文管理器获取上下文
func (m *ContextManagerImpl) GetContext(
    ctx context.Context,
    conversationID string,
    options *ContextOptions,
) (*ManagedContext, error) {
    // 1. 尝试从缓存获取
    cached, err := m.contextRepo.GetCached(ctx, conversationID)
    if err == nil && cached != nil && m.isValid(cached) {
        return cached, nil
    }

    // 2. 根据策略获取消息
    messages, err := m.windowStrategy.Select(ctx, m.messageRepo, conversationID, options)
    if err != nil {
        return nil, err
    }

    // 3. 计算总Token数
    totalTokens := m.estimateTokens(messages)

    // 4. 如果超过限制,进行裁剪
    if totalTokens > options.MaxTokens {
        messages = m.trimMessages(messages, options.MaxTokens)
        totalTokens = m.estimateTokens(messages)
    }

    // 5. 构建上下文
    contextData := &ManagedContext{
        ConversationID: conversationID,
        Messages:       messages,
        TotalTokens:    totalTokens,
        Strategy:       options.Priority,
        GeneratedAt:    time.Now(),
    }

    // 6. 缓存结果
    _ = m.contextRepo.Cache(ctx, contextData, 5*time.Minute)

    return contextData, nil
}

时序图

sequenceDiagram
    autonumber
    participant Client
    participant Handler
    participant CtxMgr as ContextManager
    participant Cache
    participant Repo as MessageRepo
    participant DB

    Client->>Handler: GET /conversations/:id/context
    Handler->>CtxMgr: GetContext(convID, options)

    CtxMgr->>Cache: GetCached(convID)
    alt 缓存命中且有效
        Cache-->>CtxMgr: cached context
        CtxMgr-->>Handler: context
    else 缓存未命中或失效
        Cache-->>CtxMgr: nil

        CtxMgr->>Repo: ListMessages(convID, limit, offset)
        Repo->>DB: SELECT * FROM messages WHERE conversation_id=? ORDER BY created_at DESC
        DB-->>Repo: message rows
        Repo-->>CtxMgr: messages

        Note over CtxMgr: 应用窗口策略<br/>Token估算<br/>消息裁剪

        CtxMgr->>Cache: Cache(context, TTL=5min)
        Cache-->>CtxMgr: ok

        CtxMgr-->>Handler: context
    end

    Handler-->>Client: 200 OK, context JSON

配置说明

环境变量配置

# 服务配置
PORT=8080                    # HTTP 服务端口
GRPC_PORT=9000               # gRPC 服务端口

# 数据库配置
DB_HOST=localhost            # PostgreSQL 主机
DB_PORT=5432                 # PostgreSQL 端口
DB_USER=voiceassistant       # 数据库用户
DB_PASSWORD=password         # 数据库密码
DB_NAME=voiceassistant       # 数据库名称
DB_MAX_OPEN_CONNS=100        # 最大连接数
DB_MAX_IDLE_CONNS=10         # 最大空闲连接数

# Redis 配置
REDIS_ADDR=localhost:6379    # Redis 地址
REDIS_PASSWORD=redis123      # Redis 密码
REDIS_DB=0                   # Redis 数据库编号

# Kafka 配置
KAFKA_BROKERS=localhost:9092 # Kafka 地址
KAFKA_TOPIC_PREFIX=voiceassistant # Topic 前缀

# 上下文配置
CONTEXT_MAX_TOKENS=4000      # 默认最大 Token 数
CONTEXT_CACHE_TTL=300        # 缓存 TTL(秒)
CONTEXT_STRATEGY=recent      # 默认窗口策略

Nacos 配置

# conversation-service.yaml
service:
  name: conversation-service
  version: 1.0.0

server:
  http:
    port: 8080
  grpc:
    port: 9000

data:
  database:
    driver: postgres
    host: ${DB_HOST:localhost}
    port: ${DB_PORT:5432}
    user: ${DB_USER:voiceassistant}
    password: ${DB_PASSWORD:password}
    database: ${DB_NAME:voiceassistant}
    max_open_conns: 100
    max_idle_conns: 10

  redis:
    addr: ${REDIS_ADDR:localhost:6379}
    password: ${REDIS_PASSWORD:redis123}
    db: 0

context:
  max_tokens: 4000
  cache_ttl: 300
  strategy: recent
  compression:
    enabled: true
    strategy: hybrid
    target_ratio: 0.5

limits:
  max_conversations_per_user: 100
  max_messages_per_conversation: 10000
  max_message_length: 10000

总结

Conversation Service 是 VoiceHelper 平台的核心业务服务,采用 DDD 分层架构,实现了会话和消息的全生命周期管理。通过多级缓存、智能压缩、流式推送、并发优化等技术手段,在性能、成本、用户体验之间取得了良好的平衡。