📋 模块概述

Python算法服务是VoiceHelper系统的核心AI引擎,负责RAG检索、GraphRAG推理、语音处理和智能对话生成。基于FastAPI异步框架构建,集成多种AI模型和向量数据库,提供高性能的智能算法服务。

🏗️ 服务架构图

graph TB
    subgraph "Python算法服务架构"
        FASTAPI[FastAPI服务器<br/>异步HTTP/WebSocket]
        
        subgraph "核心服务层"
            INGEST[文档入库服务<br/>IngestService]
            RETRIEVE[检索服务<br/>RetrieveService]
            VOICE[语音服务<br/>VoiceService]
            ENHANCED[增强语音服务<br/>EnhancedVoiceService]
        end
        
        subgraph "AI模型层"
            BGE[BGE向量模型<br/>文档嵌入]
            WHISPER[Whisper模型<br/>语音识别]
            TTS[Edge-TTS<br/>语音合成]
            LLM[大语言模型<br/>OpenAI/豆包/GLM]
        end
        
        subgraph "数据存储层"
            FAISS[FAISS索引<br/>向量检索]
            NEO4J[Neo4j图数据库<br/>知识图谱]
            REDIS[Redis缓存<br/>会话管理]
            MINIO[MinIO存储<br/>文档文件]
        end
        
        subgraph "推理引擎"
            GRAPHRAG[GraphRAG引擎<br/>图推理]
            REASONING[推理模块<br/>多种推理模式]
            AGENT[Agent系统<br/>工具调用]
        end
    end
    
    FASTAPI --> INGEST
    FASTAPI --> RETRIEVE
    FASTAPI --> VOICE
    FASTAPI --> ENHANCED
    
    RETRIEVE --> BGE
    RETRIEVE --> LLM
    RETRIEVE --> FAISS
    RETRIEVE --> NEO4J
    RETRIEVE --> GRAPHRAG
    
    VOICE --> WHISPER
    VOICE --> TTS
    ENHANCED --> WHISPER
    ENHANCED --> TTS
    
    INGEST --> BGE
    INGEST --> FAISS
    INGEST --> NEO4J
    INGEST --> MINIO
    
    GRAPHRAG --> REASONING
    GRAPHRAG --> AGENT
    
    style FASTAPI fill:#e3f2fd
    style RETRIEVE fill:#f3e5f5
    style GRAPHRAG fill:#e8f5e8
    style LLM fill:#fff3e0

🚀 核心API详细分析

1. 文档入库API

入口函数详细解析

文件位置: algo/app/main.py:ingest_documents

@app.post("/ingest", response_model=IngestResponse)
async def ingest_documents(
    request: IngestRequest,
    background_tasks: BackgroundTasks,
    http_request: Request
):
    """
    文档入库接口 - 处理文档上传、解析、向量化和索引构建
    
    Args:
        request (IngestRequest): 入库请求对象
            - files: List[IngestFile] 文档文件列表
            - collection_name: str 集合名称,默认'default'
            - chunk_size: int 分块大小,默认1000字符
            - chunk_overlap: int 分块重叠,默认200字符
            - metadata: dict 自定义元数据
            
        background_tasks (BackgroundTasks): FastAPI后台任务管理器
        http_request (Request): HTTP请求对象,用于获取客户端信息
    
    Returns:
        IngestResponse: 入库响应对象
            - task_id: str 任务ID,用于查询处理状态
            - status: str 初始状态 'pending'
            - message: str 响应消息
    
    Raises:
        VoiceHelperError: 自定义业务异常
            - RAG_INVALID_QUERY: 无效的请求参数
            - RAG_INDEXING_FAILED: 索引构建失败
    
    处理流程:
        1. 参数验证 - 检查文件列表和格式
        2. 任务创建 - 生成唯一任务ID
        3. 后台处理 - 异步执行入库流程
        4. 状态响应 - 返回任务ID供客户端查询
    """
    start_time = time.time()
    
    # === 第一阶段:请求日志和验证 ===
    
    # 1.1 记录业务日志
    logger.business("文档入库请求", context={
        "files_count": len(request.files) if request.files else 0,
        "collection_name": getattr(request, 'collection_name', 'default'),
        "client_ip": http_request.client.host if http_request.client else "unknown",
    })
    
    try:
        # 1.2 验证必要参数
        if not request.files or len(request.files) == 0:
            raise VoiceHelperError(
                ErrorCode.RAG_INVALID_QUERY, 
                "没有提供文档文件"
            )
        
        # 1.3 验证文件格式和大小
        total_size = 0
        for file in request.files:
            if not file.filename or not file.content:
                raise VoiceHelperError(
                    ErrorCode.RAG_INVALID_QUERY,
                    f"文件 {file.filename} 内容为空"
                )
            
            # 检查文件类型
            allowed_types = ['.txt', '.pdf', '.docx', '.md', '.html']
            if not any(file.filename.lower().endswith(ext) for ext in allowed_types):
                raise VoiceHelperError(
                    ErrorCode.RAG_INVALID_QUERY,
                    f"不支持的文件类型: {file.filename}"
                )
            
            total_size += len(file.content.encode('utf-8') if isinstance(file.content, str) else file.content)
        
        # 1.4 检查总文件大小限制 (100MB)
        if total_size > 100 * 1024 * 1024:
            raise VoiceHelperError(
                ErrorCode.RAG_INVALID_QUERY,
                f"文件总大小超过限制: {total_size / 1024 / 1024:.2f}MB"
            )
        
        # === 第二阶段:任务创建和调度 ===
        
        # 2.1 生成唯一任务ID
        task_id = ingest_service.generate_task_id()
        
        # 2.2 记录任务创建日志
        logger.info(f"生成入库任务ID: {task_id}", context={
            "task_id": task_id,
            "files_count": len(request.files),
            "total_size_mb": total_size / 1024 / 1024,
        })
        
        # 2.3 将入库任务添加到后台队列
        background_tasks.add_task(
            ingest_service.process_ingest_task,
            task_id,
            request
        )
        
        # === 第三阶段:性能记录和响应 ===
        
        # 3.1 记录性能指标
        duration_ms = (time.time() - start_time) * 1000
        logger.performance("文档入库任务创建", duration_ms, context={
            "task_id": task_id,
            "files_count": len(request.files),
        })
        
        # 3.2 返回任务响应
        return IngestResponse(
            task_id=task_id,
            status="pending",
            message=f"已创建入库任务,正在处理 {len(request.files)} 个文件"
        )
    
    except VoiceHelperError:
        # 重新抛出业务异常
        raise
    except Exception as e:
        # 处理未预期异常
        logger.exception("文档入库失败", e, context={
            "files_count": len(request.files) if request.files else 0,
        })
        raise VoiceHelperError(
            ErrorCode.RAG_INDEXING_FAILED, 
            f"文档入库失败: {str(e)}"
        )

入库服务核心实现

文件位置: algo/core/ingest.py:IngestService

class IngestService:
    """
    文档入库服务 - 处理文档解析、向量化、索引构建和存储
    
    主要功能:
    - 文档解析: 支持多种格式文档的文本提取
    - 智能分块: 基于语义的自适应文档分块
    - 向量化: BGE模型生成高质量中文向量
    - 索引构建: FAISS高性能向量索引
    - 图谱构建: 实体抽取和关系识别
    - 异步处理: 后台任务队列管理
    """
    
    def __init__(self):
        """初始化入库服务及其依赖组件"""
        self.text_splitter = self._init_text_splitter()      # 文本分块器
        self.embedding_service = self._init_embedding()       # 向量化服务
        self.vector_store = self._init_vector_store()        # 向量存储
        self.graph_store = self._init_graph_store()          # 图数据库
        self.document_parser = self._init_parser()           # 文档解析器
        self.task_manager = self._init_task_manager()        # 任务管理器
        
        # 异步任务队列和状态管理
        self.task_status: Dict[str, TaskStatus] = {}
        self.processing_queue = asyncio.Queue(maxsize=100)   # 限制队列大小
        self.worker_pool = []                                # 工作线程池
        
        # 启动后台工作线程
        self._start_workers()
    
    async def process_ingest_task(self, task_id: str, request: IngestRequest):
        """
        处理入库任务的主要流程
        
        Args:
            task_id (str): 任务唯一标识符
            request (IngestRequest): 入库请求对象
            
        处理步骤:
            1. 任务初始化 - 设置状态和进度跟踪
            2. 文档解析 - 提取文本内容和元数据
            3. 内容分块 - 智能分割长文档
            4. 向量化 - 生成文档embeddings
            5. 索引构建 - 更新FAISS索引
            6. 图谱构建 - 抽取实体和关系
            7. 状态更新 - 更新任务完成状态
        """
        start_time = time.time()
        
        try:
            # === 第一阶段:任务初始化 ===
            
            # 1.1 初始化任务状态
            self.task_status[task_id] = TaskStatus(
                task_id=task_id,
                status="processing",
                progress=0,
                total_files=len(request.files),
                processed_files=0,
                created_at=datetime.now(),
                updated_at=datetime.now()
            )
            
            logger.info(f"开始处理入库任务: {task_id}", context={
                "task_id": task_id,
                "files_count": len(request.files),
                "collection": getattr(request, 'collection_name', 'default')
            })
            
            # 1.2 创建文档集合
            collection_name = getattr(request, 'collection_name', 'default')
            await self._ensure_collection_exists(collection_name)
            
            # === 第二阶段:批量文档处理 ===
            
            all_documents = []
            all_chunks = []
            processed_files = 0
            
            for file_index, file in enumerate(request.files):
                try:
                    # 2.1 更新处理进度
                    progress = int((file_index / len(request.files)) * 100)
                    await self._update_task_progress(task_id, progress, f"处理文件: {file.filename}")
                    
                    # 2.2 解析单个文档
                    document = await self._parse_single_document(file, collection_name, request.metadata)
                    
                    # 2.3 文档分块处理
                    chunks = await self._split_document_into_chunks(
                        document,
                        chunk_size=getattr(request, 'chunk_size', 1000),
                        chunk_overlap=getattr(request, 'chunk_overlap', 200)
                    )
                    
                    # 2.4 向量化处理
                    vectorized_chunks = await self._vectorize_chunks(chunks)
                    
                    all_documents.append(document)
                    all_chunks.extend(vectorized_chunks)
                    processed_files += 1
                    
                    # 2.5 记录文件处理完成
                    logger.info(f"文件处理完成: {file.filename}", context={
                        "task_id": task_id,
                        "file_index": file_index + 1,
                        "chunks_count": len(chunks),
                        "progress": f"{processed_files}/{len(request.files)}"
                    })
                    
                except Exception as e:
                    logger.error(f"文件处理失败: {file.filename}", context={
                        "task_id": task_id,
                        "error": str(e),
                        "file_index": file_index
                    })
                    # 继续处理其他文件,不中断整个任务
                    continue
            
            # === 第三阶段:向量索引构建 ===
            
            if all_chunks:
                await self._update_task_progress(task_id, 80, "构建向量索引...")
                
                # 3.1 批量更新FAISS索引
                index_update_result = await self._update_vector_index(all_chunks, collection_name)
                
                logger.info("向量索引构建完成", context={
                    "task_id": task_id,
                    "chunks_indexed": len(all_chunks),
                    "index_size": index_update_result.get("total_vectors", 0)
                })
            
            # === 第四阶段:知识图谱构建 ===
            
            if all_documents:
                await self._update_task_progress(task_id, 90, "构建知识图谱...")
                
                # 4.1 实体抽取和关系识别
                graph_result = await self._build_knowledge_graph(all_documents, collection_name)
                
                logger.info("知识图谱构建完成", context={
                    "task_id": task_id,
                    "entities_count": graph_result.get("entities_count", 0),
                    "relations_count": graph_result.get("relations_count", 0)
                })
            
            # === 第五阶段:任务完成处理 ===
            
            # 5.1 更新任务状态为成功
            total_duration = time.time() - start_time
            
            self.task_status[task_id].status = "completed"
            self.task_status[task_id].progress = 100
            self.task_status[task_id].processed_files = processed_files
            self.task_status[task_id].completed_at = datetime.now()
            self.task_status[task_id].duration_seconds = total_duration
            self.task_status[task_id].result = {
                "documents_processed": len(all_documents),
                "chunks_created": len(all_chunks),
                "vectors_indexed": len(all_chunks),
                "entities_extracted": graph_result.get("entities_count", 0) if all_documents else 0,
                "processing_time_seconds": total_duration
            }
            
            # 5.2 记录任务完成日志
            logger.business("文档入库任务完成", context={
                "task_id": task_id,
                "documents_processed": len(all_documents),
                "chunks_created": len(all_chunks),
                "processing_time_seconds": total_duration,
                "success_rate": f"{processed_files}/{len(request.files)}"
            })
            
        except Exception as e:
            # 异常处理:更新任务状态为失败
            logger.exception(f"入库任务失败: {task_id}", e)
            
            self.task_status[task_id].status = "failed"
            self.task_status[task_id].error = str(e)
            self.task_status[task_id].completed_at = datetime.now()
            
            raise VoiceHelperError(
                ErrorCode.RAG_INDEXING_FAILED,
                f"入库任务失败: {str(e)}"
            )
    
    async def _parse_single_document(
        self, 
        file: IngestFile, 
        collection_name: str, 
        metadata: Optional[Dict] = None
    ) -> Document:
        """
        解析单个文档文件
        
        Args:
            file (IngestFile): 待解析的文件对象
            collection_name (str): 文档集合名称
            metadata (dict): 额外的元数据
            
        Returns:
            Document: 解析后的文档对象
            
        支持格式:
            - TXT: 纯文本文件
            - PDF: PDF文档(使用PyPDF2)
            - DOCX: Word文档(使用python-docx)
            - MD: Markdown文件
            - HTML: HTML网页
        """
        try:
            # 1. 确定文件类型
            file_extension = file.filename.lower().split('.')[-1]
            
            # 2. 根据文件类型选择解析器
            if file_extension == 'txt':
                content = await self._parse_txt(file.content)
            elif file_extension == 'pdf':
                content = await self._parse_pdf(file.content)
            elif file_extension == 'docx':
                content = await self._parse_docx(file.content)
            elif file_extension == 'md':
                content = await self._parse_markdown(file.content)
            elif file_extension == 'html':
                content = await self._parse_html(file.content)
            else:
                raise ValueError(f"不支持的文件格式: {file_extension}")
            
            # 3. 内容清洗和预处理
            cleaned_content = self._clean_text_content(content)
            
            # 4. 构建文档对象
            document = Document(
                doc_id=f"{collection_name}_{file.filename}_{int(time.time())}",
                title=file.filename,
                content=cleaned_content,
                metadata={
                    "filename": file.filename,
                    "file_type": file_extension,
                    "file_size": len(file.content.encode('utf-8') if isinstance(file.content, str) else file.content),
                    "collection": collection_name,
                    "created_at": datetime.now().isoformat(),
                    "content_length": len(cleaned_content),
                    **(metadata or {})
                }
            )
            
            return document
            
        except Exception as e:
            logger.error(f"文档解析失败: {file.filename}", error=str(e))
            raise ValueError(f"文档解析失败: {str(e)}")
    
    async def _split_document_into_chunks(
        self,
        document: Document,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ) -> List[DocumentChunk]:
        """
        智能文档分块 - 基于语义和结构的自适应分割
        
        Args:
            document (Document): 待分块的文档
            chunk_size (int): 目标分块大小(字符数)
            chunk_overlap (int): 分块重叠大小
            
        Returns:
            List[DocumentChunk]: 文档分块列表
            
        分块策略:
            1. 结构化分割: 按段落、章节等自然边界
            2. 语义保持: 确保语义完整性
            3. 大小控制: 控制分块大小在合理范围
            4. 重叠设计: 保持上下文连贯性
        """
        try:
            chunks = []
            content = document.content
            
            # 1. 预处理:按自然边界分割
            paragraphs = self._split_by_paragraphs(content)
            
            current_chunk = ""
            current_start = 0
            chunk_index = 0
            
            for para in paragraphs:
                # 2. 检查当前分块大小
                if len(current_chunk) + len(para) <= chunk_size:
                    # 可以加入当前分块
                    current_chunk += para + "\n\n"
                else:
                    # 需要创建新分块
                    if current_chunk:
                        chunk = DocumentChunk(
                            chunk_id=f"{document.doc_id}_chunk_{chunk_index}",
                            doc_id=document.doc_id,
                            content=current_chunk.strip(),
                            start_index=current_start,
                            end_index=current_start + len(current_chunk),
                            metadata={
                                "chunk_index": chunk_index,
                                "chunk_type": "paragraph_based",
                                "original_doc_title": document.title,
                                **document.metadata
                            }
                        )
                        chunks.append(chunk)
                        chunk_index += 1
                    
                    # 3. 处理重叠逻辑
                    if chunk_overlap > 0 and chunks:
                        # 从上一个分块的末尾取重叠内容
                        overlap_text = current_chunk[-chunk_overlap:] if len(current_chunk) > chunk_overlap else current_chunk
                        current_chunk = overlap_text + para + "\n\n"
                    else:
                        current_chunk = para + "\n\n"
                    
                    current_start = current_start + len(chunks[-1].content) - chunk_overlap if chunks else 0
            
            # 4. 处理最后一个分块
            if current_chunk:
                chunk = DocumentChunk(
                    chunk_id=f"{document.doc_id}_chunk_{chunk_index}",
                    doc_id=document.doc_id,
                    content=current_chunk.strip(),
                    start_index=current_start,
                    end_index=current_start + len(current_chunk),
                    metadata={
                        "chunk_index": chunk_index,
                        "chunk_type": "paragraph_based",
                        "original_doc_title": document.title,
                        **document.metadata
                    }
                )
                chunks.append(chunk)
            
            # 5. 分块质量检查
            chunks = self._validate_and_optimize_chunks(chunks)
            
            logger.info(f"文档分块完成: {document.title}", context={
                "doc_id": document.doc_id,
                "total_chunks": len(chunks),
                "avg_chunk_size": sum(len(c.content) for c in chunks) / len(chunks) if chunks else 0,
                "content_coverage": sum(len(c.content) for c in chunks) / len(document.content) * 100
            })
            
            return chunks
            
        except Exception as e:
            logger.error(f"文档分块失败: {document.title}", error=str(e))
            raise ValueError(f"文档分块失败: {str(e)}")

2. 检索查询API

流式检索核心实现

文件位置: algo/core/retrieve.py:RetrieveService.stream_query

async def stream_query(self, request: QueryRequest) -> AsyncGenerator[str, None]:
    """
    流式查询处理 - 多路召回+融合重排+流式生成的完整RAG流程
    
    Args:
        request (QueryRequest): 查询请求对象
            - messages: List[Message] 对话消息列表
            - top_k: int 返回结果数量,默认10
            - temperature: float LLM生成温度,默认0.7
            - collection_name: str 检索集合,默认'default'
            - retrieval_mode: str 检索模式 'hybrid'|'vector'|'graph'
    
    Yields:
        str: NDJSON格式的事件流,包含以下事件类型:
            - retrieval_start: {"type": "retrieval_start", "data": {"query_id": "...", "timestamp": 123}}
            - retrieval_progress: {"type": "retrieval_progress", "data": {"stage": "vector", "progress": 0.3}}
            - retrieval_result: {"type": "retrieval_result", "data": {"results": [...], "total": 10}}
            - generation_start: {"type": "generation_start", "data": {"model": "gpt-3.5-turbo"}}
            - generation_chunk: {"type": "generation_chunk", "data": {"text": "..."}}
            - generation_done: {"type": "generation_done", "data": {"full_text": "...", "metrics": {...}}}
            - error: {"type": "error", "data": {"error": "...", "code": "..."}}
    
    检索流程详解:
        1. 查询预处理: 意图识别、关键词提取、查询重写
        2. 多路召回: 并行执行向量检索、BM25检索、图推理
        3. 结果融合: 去重、重排、相关性打分
        4. 上下文构建: 整合检索结果为LLM提示
        5. 流式生成: 调用LLM并实时流式返回
    """
    query_id = self._generate_query_id()
    start_time = time.time()
    metrics = RetrievalMetrics()
    
    try:
        # === 第一阶段:查询预处理和初始化 ===
        
        # 1.1 发送检索开始事件
        yield self._create_event("retrieval_start", {
            "query_id": query_id,
            "timestamp": int(time.time() * 1000),
            "mode": getattr(request, 'retrieval_mode', 'hybrid')
        })
        
        # 1.2 提取最新用户查询
        if not request.messages or len(request.messages) == 0:
            raise ValueError("没有提供查询消息")
        
        user_query = request.messages[-1].content
        conversation_history = request.messages[:-1] if len(request.messages) > 1 else []
        
        # 1.3 查询增强和重写
        enhanced_query = await self._enhance_user_query(user_query, conversation_history)
        
        logger.info("开始检索查询", context={
            "query_id": query_id,
            "original_query": user_query[:100],
            "enhanced_query": enhanced_query[:100],
            "history_length": len(conversation_history)
        })
        
        # === 第二阶段:多路并行检索 ===
        
        # 2.1 根据检索模式确定策略
        retrieval_mode = getattr(request, 'retrieval_mode', 'hybrid')
        top_k = request.top_k or 10
        
        retrieval_tasks = []
        
        if retrieval_mode in ['hybrid', 'vector']:
            # 向量检索任务
            retrieval_tasks.append(
                self._vector_retrieval(enhanced_query, top_k, query_id)
            )
        
        if retrieval_mode in ['hybrid', 'text']:
            # BM25文本检索任务  
            retrieval_tasks.append(
                self._bm25_retrieval(enhanced_query, top_k, query_id)
            )
        
        if retrieval_mode in ['hybrid', 'graph']:
            # GraphRAG图推理任务
            retrieval_tasks.append(
                self._graph_reasoning_retrieval(enhanced_query, top_k // 2, query_id)
            )
        
        # 2.2 并发执行所有检索任务
        yield self._create_event("retrieval_progress", {
            "stage": "multi_recall",
            "tasks_count": len(retrieval_tasks),
            "progress": 0.1
        })
        
        # 使用asyncio.gather并发执行,设置超时
        try:
            retrieval_results = await asyncio.wait_for(
                asyncio.gather(*retrieval_tasks, return_exceptions=True),
                timeout=30.0  # 30秒超时
            )
        except asyncio.TimeoutError:
            logger.error("检索任务超时", context={"query_id": query_id})
            yield self._create_event("error", {
                "error": "检索超时,请稍后重试",
                "code": "RETRIEVAL_TIMEOUT"
            })
            return
        
        # 2.3 处理检索结果
        vector_results, bm25_results, graph_results = [], [], []
        
        for i, result in enumerate(retrieval_results):
            if isinstance(result, Exception):
                logger.error(f"检索任务 {i} 失败", error=str(result))
                continue
            
            if i == 0 and retrieval_mode in ['hybrid', 'vector']:
                vector_results = result
            elif (i == 1 and retrieval_mode == 'hybrid') or (i == 0 and retrieval_mode == 'text'):
                bm25_results = result  
            elif retrieval_mode in ['hybrid', 'graph']:
                graph_results = result
        
        # === 第三阶段:结果融合和重排 ===
        
        yield self._create_event("retrieval_progress", {
            "stage": "fusion_rerank",
            "vector_count": len(vector_results),
            "bm25_count": len(bm25_results),
            "graph_count": len(graph_results),
            "progress": 0.6
        })
        
        # 3.1 多路结果融合
        fused_results = await self._fuse_retrieval_results(
            vector_results=vector_results,
            bm25_results=bm25_results, 
            graph_results=graph_results,
            original_query=user_query,
            enhanced_query=enhanced_query
        )
        
        # 3.2 智能重排序
        reranked_results = await self._rerank_results(
            results=fused_results,
            query=enhanced_query,
            top_k=top_k
        )
        
        # 3.3 结果质量过滤
        filtered_results = self._filter_low_quality_results(
            reranked_results,
            min_score=0.3,  # 最小相关性分数
            max_results=top_k
        )
        
        # === 第四阶段:返回检索结果 ===
        
        # 4.1 格式化检索结果
        formatted_results = []
        for i, result in enumerate(filtered_results):
            formatted_result = {
                "rank": i + 1,
                "doc_id": result.doc_id,
                "chunk_id": result.chunk_id,
                "title": result.metadata.get("title", ""),
                "content": result.content[:500] + "..." if len(result.content) > 500 else result.content,
                "score": float(result.score),
                "source": result.source,
                "metadata": {
                    k: v for k, v in result.metadata.items() 
                    if k in ["filename", "file_type", "created_at", "section"]
                }
            }
            formatted_results.append(formatted_result)
        
        # 4.2 发送检索结果事件
        retrieval_time = (time.time() - start_time) * 1000
        yield self._create_event("retrieval_result", {
            "results": formatted_results,
            "total_found": len(filtered_results),
            "retrieval_time_ms": retrieval_time,
            "retrieval_modes": retrieval_mode,
            "query_enhancement": {
                "original": user_query[:100],
                "enhanced": enhanced_query[:100] if enhanced_query != user_query else None
            }
        })
        
        # === 第五阶段:LLM上下文构建 ===
        
        # 5.1 构建增强提示
        augmented_context = await self._build_augmented_context(
            query=user_query,
            conversation_history=conversation_history,
            retrieval_results=filtered_results[:5],  # 使用top5结果
            enhanced_query=enhanced_query
        )
        
        # 5.2 构建系统提示
        system_prompt = self._build_system_prompt(retrieval_mode, len(filtered_results))
        
        # === 第六阶段:流式LLM生成 ===
        
        # 6.1 发送生成开始事件
        model_name = getattr(request, 'model', 'gpt-3.5-turbo')
        yield self._create_event("generation_start", {
            "model": model_name,
            "context_length": len(augmented_context),
            "temperature": request.temperature or 0.7,
            "max_tokens": getattr(request, 'max_tokens', 2048)
        })
        
        # 6.2 流式调用LLM生成回复
        full_response = ""
        chunk_count = 0
        generation_start = time.time()
        
        async for chunk in self._stream_llm_response(
            system_prompt=system_prompt,
            augmented_context=augmented_context,
            model=model_name,
            temperature=request.temperature or 0.7,
            max_tokens=getattr(request, 'max_tokens', 2048)
        ):
            if chunk.strip():
                full_response += chunk
                chunk_count += 1
                
                # 发送生成片段事件
                yield self._create_event("generation_chunk", {
                    "text": chunk,
                    "chunk_index": chunk_count
                })
        
        # === 第七阶段:生成完成和指标统计 ===
        
        generation_time = (time.time() - generation_start) * 1000
        total_time = (time.time() - start_time) * 1000
        
        # 7.1 计算生成指标
        generation_metrics = {
            "total_tokens": len(full_response.split()),
            "generation_time_ms": generation_time,
            "tokens_per_second": len(full_response.split()) / (generation_time / 1000) if generation_time > 0 else 0,
            "chunks_generated": chunk_count
        }
        
        # 7.2 计算检索指标  
        retrieval_metrics = {
            "retrieval_time_ms": retrieval_time,
            "total_results": len(fused_results),
            "filtered_results": len(filtered_results),
            "vector_results": len(vector_results),
            "bm25_results": len(bm25_results),
            "graph_results": len(graph_results)
        }
        
        # 7.3 发送完成事件
        yield self._create_event("generation_done", {
            "full_text": full_response,
            "query_id": query_id,
            "total_time_ms": total_time,
            "generation_metrics": generation_metrics,
            "retrieval_metrics": retrieval_metrics,
            "context_sources": [
                {
                    "doc_id": r.doc_id,
                    "title": r.metadata.get("title", ""),
                    "score": float(r.score)
                }
                for r in filtered_results[:3]  # 返回top3来源
            ]
        })
        
        # 7.4 记录查询完成日志
        logger.business("检索查询完成", context={
            "query_id": query_id,
            "total_time_ms": total_time,
            "results_count": len(filtered_results),
            "response_length": len(full_response),
            "retrieval_mode": retrieval_mode
        })
        
    except Exception as e:
        # 异常处理:发送错误事件
        logger.exception(f"检索查询失败: {query_id}", e)
        
        yield self._create_event("error", {
            "error": str(e),
            "query_id": query_id,
            "code": "QUERY_PROCESSING_ERROR",
            "timestamp": int(time.time() * 1000)
        })

🧠 GraphRAG推理引擎

知识图谱构建与推理

flowchart TD
    A[原始文档] --> B[实体识别NER]
    A --> C[关系抽取RE]
    
    B --> D[实体消歧]
    C --> E[关系验证]
    
    D --> F[Neo4j图数据库]
    E --> F
    
    F --> G[图嵌入GraphEmbedding]
    F --> H[社区发现Community]
    F --> I[路径分析PathFinding]
    
    G --> J[向量化实体]
    H --> K[聚类子图]
    I --> L[推理路径]
    
    J --> M[混合检索HybridRetrieval]
    K --> M
    L --> M
    
    M --> N[多跳推理MultiHop]
    N --> O[答案生成]
    
    style A fill:#e3f2fd
    style F fill:#f3e5f5
    style M fill:#e8f5e8
    style O fill:#fff3e0

GraphRAG核心实现

文件位置: algo/core/graph_rag.py:GraphRAG

class GraphRAG:
    """
    GraphRAG - 基于知识图谱的检索增强生成系统
    
    核心功能:
    - 实体识别: 使用NER模型识别文档中的实体
    - 关系抽取: 识别实体间的语义关系
    - 图谱构建: 构建结构化知识图谱
    - 多跳推理: 基于图结构的推理查询
    - 社区发现: 识别相关实体聚类
    - 路径分析: 分析实体间的推理路径
    """
    
    def __init__(self, neo4j_client, embedding_service):
        """初始化GraphRAG系统"""
        self.neo4j = neo4j_client
        self.embedding_service = embedding_service
        self.entity_recognizer = self._init_ner_model()
        self.relation_extractor = self._init_re_model() 
        self.graph_embedder = self._init_graph_embedder()
    
    async def build_knowledge_graph(self, documents: List[Document]) -> Dict[str, int]:
        """
        从文档构建知识图谱
        
        Args:
            documents: 待处理的文档列表
            
        Returns:
            dict: 构建统计信息 {"entities_count": 123, "relations_count": 456}
        """
        entities_count = 0
        relations_count = 0
        
        for doc in documents:
            # 1. 实体识别
            entities = await self._extract_entities(doc.content)
            
            # 2. 关系抽取  
            relations = await self._extract_relations(doc.content, entities)
            
            # 3. 存储到图数据库
            doc_entities = await self._store_entities(entities, doc.doc_id)
            doc_relations = await self._store_relations(relations, doc.doc_id)
            
            entities_count += len(doc_entities)
            relations_count += len(doc_relations)
        
        return {
            "entities_count": entities_count,
            "relations_count": relations_count
        }
    
    async def reasoning_retrieval(self, query: str, max_depth: int = 2) -> List[ReasoningResult]:
        """
        基于图推理的检索
        
        Args:
            query: 查询文本
            max_depth: 最大推理深度
            
        Returns:
            List[ReasoningResult]: 推理结果列表
        """
        # 1. 从查询中识别关键实体
        query_entities = await self._extract_entities(query)
        
        if not query_entities:
            return []
        
        # 2. 多跳图遍历
        reasoning_results = []
        
        for entity in query_entities[:3]:  # 限制起始实体数量
            # 执行多跳推理
            paths = await self._multi_hop_reasoning(entity, max_depth)
            
            for path in paths:
                result = ReasoningResult(
                    entities=path['entities'],
                    relations=path['relations'],
                    reasoning_path=path['path'],
                    confidence=path['confidence'],
                    evidence=path['evidence']
                )
                reasoning_results.append(result)
        
        # 3. 按置信度排序
        return sorted(reasoning_results, key=lambda x: x.confidence, reverse=True)
    
    async def _multi_hop_reasoning(self, start_entity: str, max_depth: int) -> List[Dict]:
        """
        多跳图推理实现
        
        Args:
            start_entity: 起始实体
            max_depth: 最大推理深度
            
        Returns:
            List[Dict]: 推理路径列表
        """
        paths = []
        
        # 使用Cypher查询进行图遍历
        cypher_query = """
        MATCH path = (start:Entity {name: $entity})-[*1..{max_depth}]-(end:Entity)
        WHERE start <> end
        RETURN path, length(path) as depth,
               [node in nodes(path) | node.name] as entity_path,
               [rel in relationships(path) | type(rel)] as relation_path
        ORDER BY depth ASC
        LIMIT 50
        """.format(max_depth=max_depth)
        
        results = await self.neo4j.run(cypher_query, entity=start_entity)
        
        for record in results:
            path_info = {
                'entities': record['entity_path'],
                'relations': record['relation_path'], 
                'path': record['path'],
                'depth': record['depth'],
                'confidence': self._calculate_path_confidence(record['path'])
            }
            paths.append(path_info)
        
        return paths

🎙️ 语音处理模块

语音处理架构图

graph TB
    subgraph "语音处理流水线"
        AUDIO[音频输入<br/>WebSocket流]
        
        subgraph "音频预处理"
            VAD[语音活动检测<br/>Voice Activity Detection]
            DENOISE[噪声抑制<br/>Noise Suppression] 
            RESAMPLE[重采样<br/>16kHz单声道]
        end
        
        subgraph "语音识别ASR"
            WHISPER[Whisper模型<br/>多语言识别]
            AZURE_ASR[Azure Speech<br/>实时识别]
            LOCAL_ASR[本地ASR<br/>离线识别]
        end
        
        subgraph "语音合成TTS"  
            EDGE_TTS[Edge-TTS<br/>多语言合成]
            AZURE_TTS[Azure TTS<br/>神经网络语音]
            LOCAL_TTS[本地TTS<br/>离线合成]
        end
        
        subgraph "语音后处理"
            SPEED[语速调节<br/>Speed Control]
            PITCH[音调调节<br/>Pitch Control]
            VOLUME[音量标准化<br/>Volume Normalize]
        end
    end
    
    AUDIO --> VAD
    VAD --> DENOISE
    DENOISE --> RESAMPLE
    
    RESAMPLE --> WHISPER
    RESAMPLE --> AZURE_ASR  
    RESAMPLE --> LOCAL_ASR
    
    EDGE_TTS --> SPEED
    AZURE_TTS --> PITCH
    LOCAL_TTS --> VOLUME
    
    style AUDIO fill:#e3f2fd
    style WHISPER fill:#f3e5f5
    style EDGE_TTS fill:#e8f5e8

增强语音服务实现

文件位置: algo/core/enhanced_voice_services.py:EnhancedVoiceService

class EnhancedVoiceService:
    """
    增强语音服务 - 集成ASR、TTS和智能对话的完整语音交互系统
    
    主要特性:
    - 多Provider支持: Whisper、Azure、Edge等多种语音服务
    - 实时处理: 流式ASR识别和TTS合成
    - 智能VAD: 语音活动检测和端点检测
    - 上下文管理: 多轮对话和会话状态
    - 性能优化: 缓存、连接池、异步处理
    """
    
    def __init__(self, config: VoiceConfig, retrieve_service=None):
        """
        初始化增强语音服务
        
        Args:
            config (VoiceConfig): 语音服务配置
            retrieve_service: 检索服务实例,用于RAG对话
        """
        self.config = config
        self.retrieve_service = retrieve_service
        
        # 初始化ASR和TTS服务
        self.asr_service = EnhancedASRService(config)
        self.tts_service = EnhancedTTSService(config)
        
        # 会话管理
        self.active_sessions: Dict[str, VoiceSessionState] = {}
        self.session_lock = asyncio.Lock()
        
        # 性能指标
        self.metrics = VoiceServiceMetrics()
        
        # 启动后台清理任务
        self.cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())
    
    async def process_voice_query(self, request: VoiceQueryRequest) -> AsyncGenerator[VoiceQueryResponse, None]:
        """
        处理语音查询的完整流程
        
        Args:
            request (VoiceQueryRequest): 语音查询请求
                - session_id: str 会话ID
                - audio_chunk: str Base64编码的音频数据
                - is_final: bool 是否为最终音频块
                - language: str 语言代码,默认'zh-CN'
                - conversation_id: str 对话ID
        
        Yields:
            VoiceQueryResponse: 语音查询响应,包含多种事件类型:
                - asr_partial: 部分ASR识别结果
                - asr_final: 最终ASR识别结果  
                - processing_start: 开始处理提示
                - llm_response_chunk: LLM回复片段
                - llm_response_final: LLM完整回复
                - tts_start: TTS合成开始
                - tts_audio: TTS音频数据
                - tts_complete: TTS合成完成
                - error: 错误信息
        """
        session_id = request.session_id or self._generate_session_id()
        start_time = time.time()
        
        try:
            # === 第一阶段:会话初始化和管理 ===
            
            # 1.1 获取或创建会话状态
            async with self.session_lock:
                if session_id not in self.active_sessions:
                    self.active_sessions[session_id] = VoiceSessionState(
                        session_id=session_id,
                        conversation_id=request.conversation_id,
                        language=getattr(request, 'language', 'zh-CN'),
                        audio_buffer=b"",
                        transcript_buffer="",
                        last_activity=datetime.now(),
                        context_history=[]
                    )
                
                session = self.active_sessions[session_id]
                session.last_activity = datetime.now()
            
            # === 第二阶段:音频数据处理 ===
            
            # 2.1 解码音频数据
            if hasattr(request, 'audio_chunk') and request.audio_chunk:
                try:
                    audio_data = base64.b64decode(request.audio_chunk)
                    session.audio_buffer += audio_data
                    
                    # 更新音频指标
                    self.metrics.total_audio_bytes += len(audio_data)
                    self.metrics.audio_packets_received += 1
                    
                except Exception as e:
                    yield VoiceQueryResponse(
                        type="error",
                        session_id=session_id,
                        error=f"音频解码失败: {str(e)}"
                    )
                    return
            
            # === 第三阶段:实时ASR处理 ===
            
            # 3.1 检查是否有足够音频进行处理
            if len(session.audio_buffer) >= self.config.min_audio_chunk_size:
                
                # 3.2 实时ASR识别
                asr_start_time = time.time()
                partial_result = await self.asr_service.transcribe_partial(
                    audio_data=session.audio_buffer[-self.config.asr_chunk_size:],
                    language=session.language,
                    session_id=session_id
                )
                
                asr_latency = (time.time() - asr_start_time) * 1000
                self.metrics.avg_asr_latency = (self.metrics.avg_asr_latency + asr_latency) / 2
                
                # 3.3 发送部分识别结果
                if partial_result and partial_result.text.strip():
                    yield VoiceQueryResponse(
                        type="asr_partial",
                        session_id=session_id,
                        text=partial_result.text,
                        confidence=partial_result.confidence,
                        timestamp=int(time.time() * 1000)
                    )
                    
                    session.transcript_buffer = partial_result.text
            
            # === 第四阶段:句子完整性检测和最终识别 ===
            
            # 4.1 检测完整句子或最终音频
            is_complete_sentence = self._detect_sentence_boundary(
                session.transcript_buffer,
                session.audio_buffer
            )
            
            if is_complete_sentence or getattr(request, 'is_final', False):
                
                # 4.2 执行最终ASR识别
                final_result = await self.asr_service.transcribe_final(
                    audio_data=session.audio_buffer,
                    language=session.language,
                    session_id=session_id
                )
                
                if final_result and final_result.text.strip():
                    final_text = final_result.text.strip()
                    
                    # 4.3 发送最终识别结果
                    yield VoiceQueryResponse(
                        type="asr_final", 
                        session_id=session_id,
                        text=final_text,
                        confidence=final_result.confidence,
                        timestamp=int(time.time() * 1000)
                    )
                    
                    # === 第五阶段:RAG知识检索和对话生成 ===
                    
                    if self.retrieve_service and final_text:
                        
                        # 5.1 发送处理开始提示
                        yield VoiceQueryResponse(
                            type="processing_start",
                            session_id=session_id,
                            message="正在思考...",
                            timestamp=int(time.time() * 1000)
                        )
                        
                        # 5.2 构建RAG查询请求
                        from core.models import QueryRequest, Message
                        
                        # 构建对话历史
                        messages = []
                        for ctx in session.context_history[-5:]:  # 保留最近5轮对话
                            messages.extend([
                                Message(role="user", content=ctx['user_query']),
                                Message(role="assistant", content=ctx['assistant_response'])
                            ])
                        messages.append(Message(role="user", content=final_text))
                        
                        query_request = QueryRequest(
                            messages=messages,
                            top_k=5,
                            temperature=0.3,
                            collection_name=getattr(request, 'collection_name', 'default')
                        )
                        
                        # 5.3 流式处理RAG查询
                        full_response = ""
                        references = []
                        
                        async for response_chunk in self.retrieve_service.stream_query(query_request):
                            try:
                                chunk_data = json.loads(response_chunk)
                                
                                if chunk_data["type"] == "retrieval_result":
                                    references = chunk_data["data"]["results"][:3]  # 保留top3引用
                                    
                                elif chunk_data["type"] == "generation_chunk":
                                    text_chunk = chunk_data["data"]["text"]
                                    full_response += text_chunk
                                    
                                    # 转发文本回复片段
                                    yield VoiceQueryResponse(
                                        type="llm_response_chunk",
                                        session_id=session_id,
                                        text=text_chunk,
                                        timestamp=int(time.time() * 1000)
                                    )
                                    
                                elif chunk_data["type"] == "generation_done":
                                    full_response = chunk_data["data"]["full_text"]
                                    break
                                    
                            except json.JSONDecodeError:
                                continue
                        
                        # 5.4 发送完整文本回复
                        if full_response.strip():
                            yield VoiceQueryResponse(
                                type="llm_response_final",
                                session_id=session_id,
                                text=full_response,
                                references=[
                                    {
                                        "title": ref.get("title", ""),
                                        "content": ref.get("content", "")[:200],
                                        "score": ref.get("score", 0.0)
                                    }
                                    for ref in references
                                ],
                                timestamp=int(time.time() * 1000)
                            )
                            
                            # === 第六阶段:TTS语音合成 ===
                            
                            await self._synthesize_and_stream_tts(
                                text=full_response,
                                session_id=session_id,
                                language=session.language,
                                voice_config=self.config.tts_config
                            )
                            
                            # 5.5 更新会话上下文
                            session.context_history.append({
                                'user_query': final_text,
                                'assistant_response': full_response,
                                'timestamp': datetime.now().isoformat(),
                                'references': references
                            })
                            
                            # 限制上下文历史长度
                            if len(session.context_history) > 10:
                                session.context_history = session.context_history[-10:]
                    
                    # 清空音频和转录缓冲区
                    session.audio_buffer = b""
                    session.transcript_buffer = ""
                    
        except Exception as e:
            logger.exception(f"语音查询处理失败: {session_id}", e)
            
            yield VoiceQueryResponse(
                type="error",
                session_id=session_id,
                error=f"语音处理失败: {str(e)}",
                timestamp=int(time.time() * 1000)
            )
        
        finally:
            # 更新性能指标
            total_time = (time.time() - start_time) * 1000
            self.metrics.avg_query_time = (self.metrics.avg_query_time + total_time) / 2
            self.metrics.total_queries += 1
    
    async def _synthesize_and_stream_tts(
        self, 
        text: str, 
        session_id: str, 
        language: str,
        voice_config: dict
    ):
        """
        合成并流式发送TTS音频
        
        Args:
            text: 要合成的文本
            session_id: 会话ID
            language: 语言代码
            voice_config: 语音配置
        """
        try:
            # 1. 发送TTS开始事件
            yield VoiceQueryResponse(
                type="tts_start",
                session_id=session_id,
                text=text,
                timestamp=int(time.time() * 1000)
            )
            
            # 2. 文本预处理和分段
            text_segments = self._split_text_for_tts(text, max_length=500)
            
            # 3. 流式合成每个文本段
            chunk_index = 0
            for segment in text_segments:
                if not segment.strip():
                    continue
                
                # 调用TTS服务合成音频
                async for audio_chunk in self.tts_service.synthesize_streaming(
                    text=segment,
                    voice_id=voice_config.get("voice_id", "zh-CN-XiaoxiaoNeural"),
                    language=language,
                    rate=voice_config.get("rate", "+0%"),
                    pitch=voice_config.get("pitch", "+0Hz")
                ):
                    if audio_chunk:
                        # Base64编码音频数据
                        audio_b64 = base64.b64encode(audio_chunk).decode('utf-8')
                        
                        yield VoiceQueryResponse(
                            type="tts_audio",
                            session_id=session_id,
                            audio_data=audio_b64,
                            chunk_index=chunk_index,
                            audio_format="mp3",
                            sample_rate=16000,
                            timestamp=int(time.time() * 1000)
                        )
                        
                        chunk_index += 1
                        
                        # 流量控制
                        await asyncio.sleep(0.01)
            
            # 4. 发送TTS完成事件
            yield VoiceQueryResponse(
                type="tts_complete",
                session_id=session_id,
                total_chunks=chunk_index,
                timestamp=int(time.time() * 1000)
            )
            
        except Exception as e:
            logger.error(f"TTS合成失败: {session_id}", error=str(e))
            yield VoiceQueryResponse(
                type="error",
                session_id=session_id,
                error=f"语音合成失败: {str(e)}"
            )

📊 性能优化与监控

性能指标收集

class AlgoServiceMetrics:
    """算法服务性能指标收集器"""
    
    def __init__(self):
        self.request_count = 0
        self.total_response_time = 0
        self.error_count = 0
        
        # 检索性能指标
        self.retrieval_metrics = {
            "avg_retrieval_time": 0,
            "vector_search_time": 0, 
            "graph_search_time": 0,
            "rerank_time": 0
        }
        
        # 语音性能指标
        self.voice_metrics = {
            "avg_asr_latency": 0,
            "avg_tts_latency": 0,
            "active_sessions": 0,
            "total_audio_processed": 0
        }
        
        # LLM性能指标
        self.llm_metrics = {
            "avg_generation_time": 0,
            "tokens_per_second": 0,
            "total_tokens_generated": 0
        }

🛠️ 最佳实践

1. 异步编程实践

# 高并发异步处理
async def process_multiple_requests(requests: List[QueryRequest]) -> List[QueryResponse]:
    """并发处理多个请求"""
    
    # 使用信号量限制并发数
    semaphore = asyncio.Semaphore(10)
    
    async def process_single_request(request):
        async with semaphore:
            return await retrieve_service.process_query(request)
    
    # 并发执行所有请求
    tasks = [process_single_request(req) for req in requests]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return [r for r in results if not isinstance(r, Exception)]

2. 缓存优化策略

# 多层缓存实现
class MultiLevelCache:
    """多层缓存系统"""
    
    def __init__(self):
        self.memory_cache = {}  # L1: 内存缓存
        self.redis_client = redis.Redis()  # L2: Redis缓存
        
    async def get(self, key: str) -> Optional[Any]:
        # L1缓存查找
        if key in self.memory_cache:
            return self.memory_cache[key]
        
        # L2缓存查找 
        value = await self.redis_client.get(key)
        if value:
            # 回填L1缓存
            self.memory_cache[key] = json.loads(value)
            return self.memory_cache[key]
        
        return None
    
    async def set(self, key: str, value: Any, ttl: int = 3600):
        # 同时更新两级缓存
        self.memory_cache[key] = value
        await self.redis_client.setex(key, ttl, json.dumps(value))

3. 错误处理和恢复

# 自动重试和熔断机制
class RetryableService:
    """支持重试和熔断的服务包装器"""
    
    def __init__(self, service, max_retries=3, circuit_breaker_threshold=5):
        self.service = service
        self.max_retries = max_retries
        self.circuit_breaker_threshold = circuit_breaker_threshold
        self.failure_count = 0
        self.last_failure_time = None
        
    async def call_with_retry(self, method_name: str, *args, **kwargs):
        """带重试的服务调用"""
        
        # 熔断检查
        if self._is_circuit_open():
            raise ServiceUnavailableError("Service circuit breaker is open")
        
        for attempt in range(self.max_retries + 1):
            try:
                method = getattr(self.service, method_name)
                result = await method(*args, **kwargs)
                
                # 成功时重置失败计数
                self.failure_count = 0
                return result
                
            except Exception as e:
                if attempt == self.max_retries:
                    self.failure_count += 1
                    self.last_failure_time = time.time()
                    raise
                
                # 指数退避
                await asyncio.sleep(2 ** attempt)
    
    def _is_circuit_open(self) -> bool:
        """检查熔断器是否开启"""
        if self.failure_count < self.circuit_breaker_threshold:
            return False
        
        # 熔断恢复检查(30秒后尝试恢复)
        if time.time() - self.last_failure_time > 30:
            self.failure_count = 0
            return False
            
        return True

这份Python算法服务的详细分析涵盖了核心API实现、GraphRAG推理引擎、语音处理模块、性能优化和最佳实践,为开发者提供了深入理解系统架构和实现细节的完整指南。