📚 文档导航
📋 项目概述
VoiceHelper是一个企业级智能语音助手平台,支持多模态交互(文本+语音)、GraphRAG检索系统和智能Agent功能。该项目采用现代微服务架构,具备高可用性、可扩展性和多平台支持能力。
🎯 核心特性
- 多模态交互: 文本SSE流式 + WebSocket语音全双工
- GraphRAG系统: 知识图谱构建 + 多跳推理 + 融合排序
- 智能Agent: 多推理模式 + 工具生态 + MCP协议支持
- 全平台覆盖: Web、移动端、桌面端、小程序、浏览器扩展
🏗️ 整体架构概览
系统架构图
graph TB
subgraph "客户端层 Client Layer"
WEB[Web应用<br/>Next.js + React]
MOBILE[移动端<br/>React Native]
DESKTOP[桌面端<br/>Electron]
MINI[微信小程序<br/>原生]
EXT[浏览器扩展<br/>Chrome/Firefox]
end
subgraph "网关层 Gateway Layer"
GATEWAY[Go后端网关<br/>Gin + WebSocket]
LB[负载均衡器<br/>Nginx/Traefik]
end
subgraph "服务层 Service Layer"
ALGO[Python算法服务<br/>FastAPI + AsyncIO]
AUTH[认证服务<br/>JWT + OAuth2]
ADMIN[管理服务<br/>FastAPI]
end
subgraph "数据层 Data Layer"
PG[(PostgreSQL<br/>主数据库)]
REDIS[(Redis<br/>缓存+会话)]
NEO4J[(Neo4j<br/>知识图谱)]
MINIO[(MinIO<br/>对象存储)]
end
subgraph "AI服务层 AI Services"
LLM[大语言模型<br/>OpenAI/豆包/GLM]
ASR[语音识别<br/>Whisper/Edge]
TTS[语音合成<br/>Edge-TTS/Azure]
EMBED[向量模型<br/>BGE/OpenAI]
end
WEB --> LB
MOBILE --> LB
DESKTOP --> LB
MINI --> LB
EXT --> LB
LB --> GATEWAY
GATEWAY --> ALGO
GATEWAY --> AUTH
GATEWAY --> ADMIN
ALGO --> PG
ALGO --> REDIS
ALGO --> NEO4J
ALGO --> MINIO
ALGO --> LLM
ALGO --> ASR
ALGO --> TTS
ALGO --> EMBED
GATEWAY --> REDIS
AUTH --> PG
ADMIN --> PG
style WEB fill:#e1f5fe
style GATEWAY fill:#f3e5f5
style ALGO fill:#e8f5e8
style PG fill:#fff3e0
技术栈分析
前端技术栈
- 框架: Next.js 14 (App Router) + React 18
- 样式: Tailwind CSS + Framer Motion
- 状态管理: Zustand + React Query
- 实时通信: EventSource (SSE) + WebSocket
- 类型系统: TypeScript 5.0+
- 测试: Jest + Testing Library
- 构建: Webpack + SWC
后端技术栈
- 网关服务: Go 1.23 + Gin + Gorilla WebSocket
- 算法服务: Python 3.11 + FastAPI + AsyncIO
- 认证: JWT + OAuth2 + RBAC
- 数据库: PostgreSQL 15 + Redis 7 + Neo4j 5
- 存储: MinIO (S3兼容)
- 监控: Prometheus + OpenTelemetry + Jaeger
AI/ML技术栈
- 语言模型: OpenAI GPT、字节跳动豆包、智谱GLM
- 语音识别: OpenAI Whisper、Azure Speech、Edge Speech
- 语音合成: Edge-TTS、Azure TTS、本地TTS
- 向量模型: BGE-large-zh、OpenAI Embeddings
- 图数据库: Neo4j (知识图谱存储)
- 向量数据库: FAISS (本地索引)
🔄 系统交互时序图
多模态对话交互流程
sequenceDiagram
participant U as 用户
participant F as 前端应用
participant G as Go网关
participant A as 算法服务
participant L as LLM服务
participant D as 数据库
Note over U,D: 文本对话流程
U->>F: 输入文本消息
F->>G: POST /api/v2/chat/stream
G->>A: 转发到算法服务
A->>D: 检索相关文档
D-->>A: 返回相关片段
A->>L: 构建提示+调用LLM
L-->>A: 流式返回响应
A-->>G: SSE流式数据
G-->>F: 转发SSE流
F-->>U: 实时展示回复
Note over U,D: 语音对话流程
U->>F: 开始语音输入
F->>G: WebSocket连接 /api/v2/voice/ws
G->>A: 建立语音会话
loop 音频流处理
U->>F: 音频数据块
F->>G: 发送音频帧
G->>A: 转发音频数据
A->>A: ASR实时识别
A-->>G: 部分识别结果
G-->>F: 返回转录文本
F-->>U: 显示实时转录
end
A->>D: 检索+推理
A->>L: 生成回复
A-->>G: TTS音频流
G-->>F: 语音回复数据
F-->>U: 播放语音回复
🎯 核心API分析
Go网关服务API
1. 流式聊天接口
入口函数: backend/internal/handlers/v2_chat.go:StreamChat
// StreamChat 处理流式聊天请求
// @Summary 流式聊天
// @Description 发送消息并获取流式回复,支持SSE协议
// @Tags Chat
// @Accept json
// @Produce text/event-stream
// @Param request body ChatRequest true "聊天请求"
// @Success 200 {object} StreamResponse "流式响应"
// @Router /api/v2/chat/stream [post]
func (h *V2ChatHandlerSimple) StreamChat(c *gin.Context) {
// 1. 解析请求参数
var req ChatRequest
if err := c.ShouldBindJSON(&req); err != nil {
logger.Error("Invalid request format", zap.Error(err))
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request format"})
return
}
// 2. 参数验证
if req.Message == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "Message cannot be empty"})
return
}
// 3. 设置SSE响应头
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// 4. 构建转发请求到算法服务
algoReq := AlgoServiceRequest{
Messages: []Message{
{Role: "user", Content: req.Message}
},
ConversationID: req.ConversationID,
StreamID: req.StreamID,
RequestID: req.RequestID,
Model: req.Model,
Temperature: req.Temperature,
MaxTokens: req.MaxTokens,
}
// 5. 发送请求到算法服务并流式转发响应
if err := h.forwardToAlgoService(c, algoReq); err != nil {
logger.Error("Failed to forward request", zap.Error(err))
// 发送错误事件
c.SSEvent("error", gin.H{
"error": "Internal server error",
"code": "ALGO_SERVICE_ERROR"
})
}
}
// forwardToAlgoService 转发请求到算法服务
// @param c gin上下文,用于SSE流式响应
// @param req 转发的请求体
// @return error 转发过程中的错误
func (h *V2ChatHandlerSimple) forwardToAlgoService(c *gin.Context, req AlgoServiceRequest) error {
// 1. 序列化请求体
reqBody, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}
// 2. 创建HTTP请求
httpReq, err := http.NewRequest("POST", h.algoServiceURL+"/query", bytes.NewBuffer(reqBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
// 3. 设置请求头
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "application/x-ndjson")
// 4. 发送请求
client := &http.Client{Timeout: 0} // 无超时,支持长连接
resp, err := client.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// 5. 检查响应状态
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("algo service returned status %d", resp.StatusCode)
}
// 6. 流式读取并转发响应
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
// 解析NDJSON格式的响应
var response map[string]interface{}
if err := json.Unmarshal([]byte(line), &response); err != nil {
logger.Warn("Failed to parse response line", zap.String("line", line))
continue
}
// 转发为SSE事件
eventType := "data"
if errMsg, exists := response["error"]; exists {
eventType = "error"
}
c.SSEvent(eventType, response)
c.Writer.Flush() // 强制刷新缓冲区
}
return scanner.Err()
}
2. WebSocket语音接口
入口函数: backend/internal/handlers/v2_voice.go:HandleWebSocket
// V2VoiceHandler 语音处理器v2版本
type V2VoiceHandler struct {
algoServiceURL string // 算法服务URL
activeConnections map[string]*VoiceSession // 活跃连接映射
mu sync.RWMutex // 读写锁保护
cleanupTicker *time.Ticker // 清理定时器
}
// VoiceSession 语音会话结构
type VoiceSession struct {
ID string `json:"session_id"` // 会话ID
UserID string `json:"user_id"` // 用户ID
ConversationID string `json:"conversation_id"` // 对话ID
Connection *websocket.Conn `json:"-"` // WebSocket连接
AlgoConn *websocket.Conn `json:"-"` // 与算法服务的连接
Status string `json:"status"` // 会话状态: active/paused/ended
CreatedAt time.Time `json:"created_at"` // 创建时间
LastActivity time.Time `json:"last_activity"` // 最后活动时间
AudioConfig AudioConfig `json:"audio_config"` // 音频配置
Metrics VoiceMetrics `json:"metrics"` // 性能指标
CancelFunc context.CancelFunc `json:"-"` // 取消函数
}
// HandleWebSocket 处理WebSocket语音连接
// @Summary WebSocket语音流处理
// @Description 建立WebSocket连接进行实时语音交互
// @Tags Voice
// @Accept application/json
// @Produce application/json
// @Success 101 {object} VoiceSession "连接升级成功"
// @Router /api/v2/voice/stream [get]
func (h *V2VoiceHandler) HandleWebSocket(c *gin.Context) {
// 1. 升级HTTP连接为WebSocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error("WebSocket upgrade failed", zap.Error(err))
return
}
defer conn.Close()
// 2. 生成会话ID和提取用户信息
sessionID := generateSessionID()
userID := extractUserID(c) // 从JWT token或header提取
conversationID := c.Query("conversation_id")
// 3. 创建会话上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 4. 建立与算法服务的WebSocket连接
algoConn, err := h.connectToAlgoService(ctx, sessionID)
if err != nil {
logger.Error("Failed to connect to algo service", zap.Error(err))
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"error","error":"Service unavailable"}`))
return
}
defer algoConn.Close()
// 5. 创建语音会话
session := &VoiceSession{
ID: sessionID,
UserID: userID,
ConversationID: conversationID,
Connection: conn,
AlgoConn: algoConn,
Status: "active",
CreatedAt: time.Now(),
LastActivity: time.Now(),
AudioConfig: getDefaultAudioConfig(),
CancelFunc: cancel,
}
// 6. 注册会话
h.mu.Lock()
h.activeConnections[sessionID] = session
h.mu.Unlock()
// 7. 发送会话建立确认
initMsg := map[string]interface{}{
"type": "session_started",
"session_id": sessionID,
"config": session.AudioConfig,
}
conn.WriteJSON(initMsg)
// 8. 启动消息处理协程
go h.handleAlgoServiceMessages(session)
// 9. 处理客户端消息(阻塞主协程)
h.handleClientMessages(session)
// 10. 清理会话
h.cleanup(sessionID)
}
// handleClientMessages 处理来自客户端的消息
// @param session 语音会话对象
func (h *V2VoiceHandler) handleClientMessages(session *VoiceSession) {
defer session.CancelFunc() // 确保上下文取消
for {
// 1. 读取WebSocket消息
messageType, message, err := session.Connection.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Error("WebSocket error", zap.Error(err))
}
break
}
// 2. 更新活动时间
session.LastActivity = time.Now()
// 3. 根据消息类型处理
switch messageType {
case websocket.TextMessage:
// 处理控制消息(JSON格式)
var controlMsg map[string]interface{}
if err := json.Unmarshal(message, &controlMsg); err != nil {
logger.Warn("Invalid control message", zap.Error(err))
continue
}
h.handleControlMessage(session, controlMsg)
case websocket.BinaryMessage:
// 处理音频数据
h.handleAudioData(session, message)
default:
logger.Warn("Unsupported message type", zap.Int("type", messageType))
}
}
}
// handleAudioData 处理音频数据
// @param session 语音会话
// @param audioData 音频字节数据
func (h *V2VoiceHandler) handleAudioData(session *VoiceSession, audioData []byte) {
// 1. 构建音频消息
audioMsg := map[string]interface{}{
"type": "audio_chunk",
"session_id": session.ID,
"conversation_id": session.ConversationID,
"audio_chunk": base64.StdEncoding.EncodeToString(audioData),
"timestamp": time.Now().UnixNano() / 1e6, // 毫秒时间戳
"seq": session.Metrics.AudioPackets,
}
// 2. 转发到算法服务
if err := session.AlgoConn.WriteJSON(audioMsg); err != nil {
logger.Error("Failed to forward audio to algo service", zap.Error(err))
session.Connection.WriteJSON(map[string]interface{}{
"type": "error",
"error": "Failed to process audio",
})
return
}
// 3. 更新指标
session.Metrics.AudioPackets++
session.Metrics.AudioBytes += int64(len(audioData))
}
// connectToAlgoService 连接到算法服务的WebSocket
// @param ctx 上下文
// @param sessionID 会话ID
// @return (*websocket.Conn, error) WebSocket连接和错误
func (h *V2VoiceHandler) connectToAlgoService(ctx context.Context, sessionID string) (*websocket.Conn, error) {
// 1. 构建WebSocket URL
wsURL := strings.Replace(h.algoServiceURL, "http", "ws", 1) + "/voice/stream"
// 2. 设置连接头
header := http.Header{}
header.Set("Session-ID", sessionID)
// 3. 建立WebSocket连接
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
conn, _, err := dialer.DialContext(ctx, wsURL, header)
if err != nil {
return nil, fmt.Errorf("failed to dial algo service: %w", err)
}
return conn, nil
}
Python算法服务API
1. 文档检索接口
入口函数: algo/app/main.py:query_documents
@app.post("/query")
async def query_documents(request: QueryRequest, http_request: Request):
"""
文档查询接口,返回流式响应
Args:
request (QueryRequest): 查询请求对象,包含消息列表和检索参数
http_request (Request): FastAPI请求对象,用于获取客户端信息
Returns:
StreamingResponse: 流式NDJSON响应,包含检索结果和LLM生成内容
Raises:
VoiceHelperError: 自定义业务异常
流程说明:
1. 参数验证 - 检查消息列表是否为空
2. 日志记录 - 记录查询开始和相关统计信息
3. 委托处理 - 调用RetrieveService的stream_query方法
4. 流式响应 - 返回application/x-ndjson格式的流
"""
start_time = time.time()
# 业务日志记录 - 记录查询请求的关键信息
logger.business("文档查询请求", context={
"messages_count": len(request.messages) if request.messages else 0,
"top_k": getattr(request, 'top_k', None),
"client_ip": http_request.client.host if http_request.client else "unknown",
})
try:
# 参数验证 - 确保请求包含有效的消息
if not request.messages or len(request.messages) == 0:
raise VoiceHelperError(ErrorCode.RAG_INVALID_QUERY, "没有提供查询消息")
# 记录查询开始 - 便于问题排查和性能分析
logger.info("开始处理查询请求", context={
"messages_count": len(request.messages),
"last_message": request.messages[-1].content[:100] if request.messages else "",
})
# 生成流式响应 - 委托给专门的检索服务处理
return StreamingResponse(
retrieve_service.stream_query(request),
media_type="application/x-ndjson" # 新行分隔的JSON格式
)
except VoiceHelperError:
# 重新抛出自定义异常,由全局异常处理器处理
raise
except Exception as e:
# 处理未预期的异常
logger.exception("文档查询失败", e, context={
"messages_count": len(request.messages) if request.messages else 0,
})
raise VoiceHelperError(ErrorCode.RAG_RETRIEVAL_FAILED, f"查询失败: {str(e)}")
核心检索服务: algo/core/retrieve.py:RetrieveService
class RetrieveService:
"""
检索服务 - 负责文档检索、GraphRAG推理和LLM生成
主要功能:
- 多路召回: BGE向量检索 + BM25文本检索 + GraphRAG图推理
- 智能重排: 基于多维度相关性的融合排序
- 流式生成: SSE协议实时返回检索结果和生成内容
- 缓存优化: Redis缓存热点查询结果
"""
def __init__(self):
"""初始化检索服务及其依赖组件"""
self.rag_service = self._init_rag_service() # BGE+FAISS向量检索
self.graph_rag = self._init_graph_rag() # GraphRAG图推理
self.llm_client = self._init_llm_client() # 大语言模型客户端
self.cache_manager = self._init_cache() # Redis缓存管理
async def stream_query(self, request: QueryRequest) -> AsyncGenerator[str, None]:
"""
流式查询处理 - 核心检索和生成流程
Args:
request (QueryRequest): 包含用户消息和检索参数的请求对象
Yields:
str: NDJSON格式的响应数据,包含多种事件类型:
- retrieval_start: 检索开始
- retrieval_progress: 检索进度
- retrieval_result: 检索结果
- generation_start: 生成开始
- generation_chunk: 生成片段
- generation_done: 生成完成
- error: 错误信息
检索流程:
1. 查询预处理 - 提取关键词、意图识别、查询重写
2. 多路召回 - 并行执行向量检索、文本检索、图推理
3. 结果融合 - 去重、重排、相关性打分
4. 上下文构建 - 整理检索结果为LLM提示
5. 流式生成 - 调用LLM并实时返回结果
"""
query_id = self._generate_query_id()
start_time = time.time()
try:
# 1. 查询预处理
yield self._create_event("retrieval_start", {
"query_id": query_id,
"timestamp": int(time.time() * 1000)
})
# 提取最后一条用户消息作为查询
user_query = request.messages[-1].content if request.messages else ""
# 查询增强 - 关键词提取和查询重写
enhanced_query = await self._enhance_query(user_query)
# 2. 多路召回并行处理
retrieval_tasks = [
self._vector_retrieval(enhanced_query, request.top_k or 10),
self._text_retrieval(enhanced_query, request.top_k or 10),
self._graph_retrieval(enhanced_query, request.top_k or 5)
]
# 执行并行检索
vector_results, text_results, graph_results = await asyncio.gather(*retrieval_tasks)
# 3. 结果融合和重排
yield self._create_event("retrieval_progress", {
"stage": "fusion",
"vector_count": len(vector_results),
"text_count": len(text_results),
"graph_count": len(graph_results)
})
fused_results = self._fuse_results(vector_results, text_results, graph_results)
final_results = self._rerank_results(fused_results, user_query)
# 4. 返回检索结果
yield self._create_event("retrieval_result", {
"results": [self._format_result(r) for r in final_results[:10]],
"total_found": len(final_results),
"retrieval_time_ms": (time.time() - start_time) * 1000
})
# 5. 构建LLM上下文
context = self._build_context(final_results, request.messages)
# 6. 流式生成回复
yield self._create_event("generation_start", {"model": "gpt-3.5-turbo"})
full_response = ""
async for chunk in self._stream_llm_response(context, request):
full_response += chunk
yield self._create_event("generation_chunk", {"text": chunk})
# 7. 生成完成
yield self._create_event("generation_done", {
"full_text": full_response,
"total_time_ms": (time.time() - start_time) * 1000,
"token_count": len(full_response.split()) # 简单估算
})
except Exception as e:
logger.exception(f"Stream query failed: {e}")
yield self._create_event("error", {
"error": str(e),
"query_id": query_id
})
async def _vector_retrieval(self, query: str, top_k: int) -> List[RetrievalResult]:
"""
向量检索 - 使用BGE模型和FAISS索引
Args:
query (str): 查询文本
top_k (int): 返回结果数量
Returns:
List[RetrievalResult]: 按相似度排序的检索结果
"""
# 查询缓存
cache_key = f"vector:{hashlib.md5(query.encode()).hexdigest()}:{top_k}"
cached = await self.cache_manager.get(cache_key)
if cached:
return cached
# 向量化查询
query_embedding = await self.rag_service.embed_query(query)
# FAISS检索
similar_docs = await self.rag_service.similarity_search(
query_embedding,
k=top_k,
threshold=0.7 # 相似度阈值
)
# 格式化结果
results = []
for doc, score in similar_docs:
result = RetrievalResult(
content=doc.page_content,
metadata=doc.metadata,
score=float(score),
source="vector_search",
doc_id=doc.metadata.get("doc_id"),
chunk_id=doc.metadata.get("chunk_id")
)
results.append(result)
# 缓存结果
await self.cache_manager.set(cache_key, results, ttl=300) # 5分钟缓存
return results
async def _graph_retrieval(self, query: str, top_k: int) -> List[RetrievalResult]:
"""
图检索 - 基于知识图谱的多跳推理
Args:
query (str): 查询文本
top_k (int): 返回结果数量
Returns:
List[RetrievalResult]: 图推理得到的相关实体和关系
"""
if not self.graph_rag:
return []
# 实体识别
entities = await self.graph_rag.extract_entities(query)
# 多跳图遍历
graph_results = []
for entity in entities[:3]: # 限制实体数量
# 1跳邻居
neighbors = await self.graph_rag.get_neighbors(entity, depth=1)
# 2跳推理路径
paths = await self.graph_rag.find_reasoning_paths(entity, max_depth=2)
for neighbor in neighbors[:top_k//3]:
result = RetrievalResult(
content=f"实体: {entity} -> 关系: {neighbor['relation']} -> {neighbor['target']}",
metadata={
"entity": entity,
"relation": neighbor['relation'],
"target": neighbor['target'],
"reasoning_path": neighbor.get('path', [])
},
score=neighbor.get('confidence', 0.8),
source="graph_search",
doc_id=f"graph_{entity}",
chunk_id=neighbor.get('id')
)
graph_results.append(result)
return sorted(graph_results, key=lambda x: x.score, reverse=True)[:top_k]
2. 语音处理接口
入口函数: algo/app/main.py:websocket_voice_stream
@app.websocket("/voice/stream")
async def websocket_voice_stream(websocket: WebSocket):
"""
WebSocket语音流接口 - 实时语音交互处理
Args:
websocket (WebSocket): WebSocket连接对象
处理流程:
1. 连接建立 - 接受WebSocket连接并初始化会话
2. 消息循环 - 持续接收和处理音频数据
3. 实时ASR - 边收边转录,支持部分结果
4. RAG检索 - 完整句子触发知识检索
5. 流式TTS - 将回复转换为语音数据返回
6. 异常处理 - 优雅处理连接断开和错误
消息格式:
接收: {"type": "audio_chunk", "audio_chunk": "base64data", "seq": 123}
发送: {"type": "asr_partial", "text": "部分识别...", "seq": 123}
{"type": "asr_final", "text": "完整句子", "seq": 123}
{"type": "llm_response", "text": "AI回复", "seq": 124}
{"type": "tts_audio", "audio_data": "base64data", "seq": 124}
"""
try:
# 1. 接受WebSocket连接
await websocket.accept()
logger.info("WebSocket语音连接建立")
# 2. 委托给WebSocket处理器
await websocket_handler.handle_websocket_connection(websocket)
except Exception as e:
logger.exception("WebSocket语音流处理失败", e)
try:
# 发送错误信息并关闭连接
await websocket.close(code=1011, reason="Internal server error")
except:
pass # 连接可能已经关闭
WebSocket处理器: algo/core/websocket_voice.py:WebSocketVoiceHandler
class WebSocketVoiceHandler:
"""
WebSocket语音处理器 - 管理实时语音交互会话
主要功能:
- 会话管理: 创建、维护、清理语音会话
- 音频处理: 实时ASR、VAD、音频格式转换
- 智能对话: 结合RAG检索和LLM生成
- 语音合成: TTS生成并流式返回音频
"""
def __init__(self, enhanced_voice_service: EnhancedVoiceService):
"""
初始化WebSocket语音处理器
Args:
enhanced_voice_service: 增强语音服务实例,提供ASR/TTS能力
"""
self.voice_service = enhanced_voice_service
self.active_sessions: Dict[str, VoiceSession] = {}
self.session_lock = asyncio.Lock()
# 启动会话清理任务
self.cleanup_task = asyncio.create_task(self._cleanup_sessions_periodically())
async def handle_websocket_connection(self, websocket: WebSocket):
"""
处理WebSocket连接的主要逻辑
Args:
websocket: WebSocket连接对象
处理流程:
1. 会话初始化 - 创建会话ID和音频缓冲区
2. 消息循环 - 接收客户端音频数据
3. 音频处理 - ASR识别和语音对话
4. 结果返回 - 实时发送识别结果和AI回复
5. 会话清理 - 连接断开时的资源清理
"""
session_id = self._generate_session_id()
try:
# 1. 创建语音会话
session = VoiceSession(
session_id=session_id,
websocket=websocket,
audio_buffer=b"",
transcript_buffer="",
last_activity=datetime.now(),
status="active"
)
# 2. 注册会话
async with self.session_lock:
self.active_sessions[session_id] = session
# 3. 发送会话建立确认
await websocket.send_json({
"type": "session_started",
"session_id": session_id,
"config": {
"sample_rate": 16000,
"channels": 1,
"format": "pcm"
}
})
# 4. 消息处理循环
while True:
try:
# 接收WebSocket消息
data = await websocket.receive()
if data["type"] == "websocket.disconnect":
logger.info(f"Client disconnected: {session_id}")
break
# 处理JSON控制消息
if data["type"] == "websocket.receive" and "text" in data:
message = json.loads(data["text"])
await self._handle_control_message(session, message)
# 处理二进制音频数据
elif data["type"] == "websocket.receive" and "bytes" in data:
await self._handle_audio_data(session, data["bytes"])
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected: {session_id}")
break
except Exception as e:
logger.error(f"Error processing message: {e}")
await websocket.send_json({
"type": "error",
"error": str(e)
})
break
finally:
# 5. 清理会话
await self._cleanup_session(session_id)
async def _handle_audio_data(self, session: VoiceSession, audio_data: bytes):
"""
处理音频数据 - 核心语音处理流程
Args:
session: 当前语音会话
audio_data: 音频字节数据
处理步骤:
1. 音频缓冲 - 累积音频数据直到足够长度
2. VAD检测 - 语音活动检测,过滤静音
3. ASR识别 - 实时语音转文本,支持部分结果
4. 句子检测 - 检测完整句子并触发后续处理
5. RAG对话 - 调用检索和生成服务
6. TTS合成 - 将回复转换为语音并返回
"""
# 1. 更新会话活动时间
session.last_activity = datetime.now()
# 2. 累积音频数据
session.audio_buffer += audio_data
# 3. 检查是否有足够的音频数据进行处理(约0.5秒)
if len(session.audio_buffer) < 8000: # 16000 * 0.5秒
return
try:
# 4. 语音活动检测 (VAD)
if not self._detect_speech_activity(session.audio_buffer[-8000:]):
# 如果检测到静音超过阈值,触发最终识别
if len(session.audio_buffer) > 32000: # 2秒静音
await self._process_final_audio(session)
return
# 5. 实时ASR处理
partial_text = await self.voice_service.asr_service.transcribe(
session.audio_buffer,
is_final=False,
language="zh-CN"
)
if partial_text:
# 发送部分识别结果
await session.websocket.send_json({
"type": "asr_partial",
"text": partial_text,
"timestamp": int(time.time() * 1000)
})
session.transcript_buffer = partial_text
# 6. 检测是否为完整句子
if self._is_complete_sentence(partial_text):
await self._process_complete_sentence(session, partial_text)
except Exception as e:
logger.error(f"Audio processing error: {e}")
await session.websocket.send_json({
"type": "error",
"error": f"Audio processing failed: {str(e)}"
})
async def _process_complete_sentence(self, session: VoiceSession, text: str):
"""
处理完整句子 - 触发RAG检索和对话生成
Args:
session: 语音会话
text: 识别出的完整文本
处理流程:
1. 最终ASR - 获得更准确的识别结果
2. RAG检索 - 基于用户问题检索相关知识
3. LLM生成 - 结合检索结果生成回复
4. TTS合成 - 将文本回复转换为语音
5. 流式返回 - 实时发送音频数据给客户端
"""
try:
# 1. 获取最终ASR结果
final_text = await self.voice_service.asr_service.transcribe(
session.audio_buffer,
is_final=True,
language="zh-CN"
)
if not final_text:
final_text = text
# 2. 发送最终识别结果
await session.websocket.send_json({
"type": "asr_final",
"text": final_text,
"timestamp": int(time.time() * 1000)
})
# 3. RAG检索和对话生成
await session.websocket.send_json({
"type": "processing_start",
"message": "正在思考..."
})
# 构建查询请求
from core.models import QueryRequest, Message
query_request = QueryRequest(
messages=[Message(role="user", content=final_text)],
top_k=5,
temperature=0.3
)
# 4. 流式处理RAG响应
full_response = ""
references = []
async for response_chunk in self.voice_service.retrieve_service.stream_query(query_request):
chunk_data = json.loads(response_chunk)
if chunk_data["type"] == "retrieval_result":
references = chunk_data["data"]["results"]
elif chunk_data["type"] == "generation_chunk":
text_chunk = chunk_data["data"]["text"]
full_response += text_chunk
# 发送文本回复片段
await session.websocket.send_json({
"type": "llm_response_chunk",
"text": text_chunk,
"timestamp": int(time.time() * 1000)
})
elif chunk_data["type"] == "generation_done":
full_response = chunk_data["data"]["full_text"]
break
# 5. 发送完整文本回复
await session.websocket.send_json({
"type": "llm_response_final",
"text": full_response,
"references": references[:3], # 限制引用数量
"timestamp": int(time.time() * 1000)
})
# 6. TTS语音合成
if full_response.strip():
await self._synthesize_and_send_audio(session, full_response)
# 7. 清空缓冲区,准备下一轮对话
session.audio_buffer = b""
session.transcript_buffer = ""
except Exception as e:
logger.exception(f"Complete sentence processing error: {e}")
await session.websocket.send_json({
"type": "error",
"error": f"Processing failed: {str(e)}"
})
async def _synthesize_and_send_audio(self, session: VoiceSession, text: str):
"""
合成语音并流式发送
Args:
session: 语音会话
text: 要合成的文本
TTS流程:
1. 文本预处理 - 清理和分段
2. 语音合成 - 调用TTS服务生成音频
3. 音频分块 - 将音频切分为小块
4. 流式发送 - 逐块发送音频数据
"""
try:
# 1. 发送TTS开始信号
await session.websocket.send_json({
"type": "tts_start",
"text": text,
"timestamp": int(time.time() * 1000)
})
# 2. 语音合成
audio_stream = await self.voice_service.tts_service.synthesize_streaming(
text=text,
voice="zh-CN-XiaoxiaoNeural", # Edge-TTS中文女声
rate="+0%",
pitch="+0Hz"
)
# 3. 流式发送音频数据
chunk_id = 0
async for audio_chunk in audio_stream:
if audio_chunk:
# Base64编码音频数据
audio_b64 = base64.b64encode(audio_chunk).decode('utf-8')
await session.websocket.send_json({
"type": "tts_audio",
"audio_data": audio_b64,
"chunk_id": chunk_id,
"format": "mp3",
"sample_rate": 16000,
"timestamp": int(time.time() * 1000)
})
chunk_id += 1
# 控制发送速率,避免缓冲区溢出
await asyncio.sleep(0.01)
# 4. 发送TTS完成信号
await session.websocket.send_json({
"type": "tts_complete",
"total_chunks": chunk_id,
"timestamp": int(time.time() * 1000)
})
except Exception as e:
logger.error(f"TTS synthesis error: {e}")
await session.websocket.send_json({
"type": "error",
"error": f"Speech synthesis failed: {str(e)}"
})
📊 数据结构设计
核心数据模型UML图
classDiagram
class User {
+String user_id
+String username
+String nickname
+String email
+String avatar_url
+DateTime created_at
+DateTime updated_at
+DateTime last_login
+UserStatus status
+UserPreferences preferences
+authenticate() bool
+updateProfile(data: dict) void
}
class Conversation {
+String conversation_id
+String user_id
+String title
+ConversationStatus status
+DateTime created_at
+DateTime updated_at
+DateTime ended_at
+int message_count
+ConversationMetadata metadata
+addMessage(message: Message) void
+updateStatus(status: ConversationStatus) void
}
class Message {
+String message_id
+String conversation_id
+String user_id
+MessageRole role
+String content
+ContentType content_type
+DateTime created_at
+MessageMetadata metadata
+List~Attachment~ attachments
+List~ToolCall~ tool_calls
+List~Reference~ references
+formatForDisplay() String
}
class VoiceSession {
+String session_id
+String user_id
+String conversation_id
+SessionStatus status
+DateTime created_at
+DateTime ended_at
+VoiceSessionSettings settings
+VoiceMetrics metrics
+WebSocket connection
+startSession() void
+endSession() void
+updateMetrics(data: dict) void
}
class Document {
+String document_id
+String dataset_id
+String title
+String content
+String content_type
+String url
+DateTime created_at
+DateTime updated_at
+DocumentMetadata metadata
+List~DocumentChunk~ chunks
+vectorize() List~float~
+extractChunks() List~DocumentChunk~
}
class DocumentChunk {
+String chunk_id
+String document_id
+String content
+int start_index
+int end_index
+List~float~ embedding
+ChunkMetadata metadata
+calculateSimilarity(query: List~float~) float
}
class RetrievalResult {
+String doc_id
+String chunk_id
+String content
+float score
+String source
+dict metadata
+formatForLLM() String
}
class ToolCall {
+String tool_call_id
+String tool_name
+dict parameters
+DateTime created_at
+ToolCallStatus status
+ToolResult result
+ErrorInfo error
+execute() ToolResult
}
%% 关系定义
User ||--o{ Conversation : "owns"
Conversation ||--o{ Message : "contains"
User ||--o{ VoiceSession : "creates"
VoiceSession ||--o{ Message : "generates"
Document ||--o{ DocumentChunk : "split into"
DocumentChunk ||--o{ RetrievalResult : "becomes"
Message ||--o{ ToolCall : "triggers"
Message ||--o{ RetrievalResult : "references"
%% 枚举类型
class UserStatus {
<<enumeration>>
ACTIVE
INACTIVE
BANNED
}
class ConversationStatus {
<<enumeration>>
ACTIVE
ENDED
ARCHIVED
}
class MessageRole {
<<enumeration>>
USER
ASSISTANT
SYSTEM
TOOL
}
class ContentType {
<<enumeration>>
TEXT
AUDIO
IMAGE
FILE
TOOL_CALL
TOOL_RESULT
}
语音处理数据流图
flowchart TD
A[用户语音输入] --> B[WebSocket连接]
B --> C[音频数据缓冲]
C --> D{VAD检测}
D -->|有语音| E[实时ASR处理]
D -->|静音| F[等待或结束]
E --> G[部分识别结果]
G --> H{完整句子?}
H -->|否| E
H -->|是| I[最终ASR识别]
I --> J[RAG检索流程]
J --> K[向量检索]
J --> L[图谱推理]
J --> M[文本检索]
K --> N[结果融合]
L --> N
M --> N
N --> O[LLM生成]
O --> P[流式文本回复]
P --> Q[TTS语音合成]
Q --> R[音频流返回]
R --> S[用户播放]
G -.-> T[前端显示部分文本]
P -.-> U[前端显示完整回复]
style A fill:#e1f5fe
style J fill:#f3e5f5
style O fill:#e8f5e8
style S fill:#fff3e0