VoiceHelper - 14 - Indexing Service

模块概览

Indexing Service(索引服务)是 VoiceHelper 平台 RAG 能力的前置处理服务,负责将上传的文档解析、分块、向量化,并存储到 Milvus 向量数据库和 Neo4j 知识图谱。该服务作为文档索引流水线的协调者,确保文档从原始格式转换为可检索的结构化数据。

核心职责

文档解析

  • 多格式支持:PDF、DOCX、TXT、Markdown、HTML、Excel、PPT
  • 文本提取:从二进制文件中提取纯文本内容
  • 元数据提取:标题、作者、创建时间等元信息
  • 布局保留:保持文档结构(段落、标题层级)

语义分块

  • RecursiveCharacterTextSplitter:基于语义边界分块
  • 自适应分块:根据文档类型调整策略
  • Token 估算:中英文混合场景的 Token 计数
  • 重叠策略:chunk_overlap 保证上下文连贯性

向量化

  • BGE-M3 Embedding:多语言向量化模型
  • 批量处理:batch_size=32,提升吞吐量
  • 异步处理:asyncio 并发,降低延迟
  • 维度标准化:1024 维向量,支持余弦相似度

向量存储

  • Milvus 批量插入:高效写入向量数据库
  • 索引优化:HNSW 索引,平衡速度和准确性
  • 租户隔离:tenant_id 字段实现多租户数据隔离
  • 元数据关联:chunk_id、doc_id、content 联合存储

知识图谱构建

  • 实体提取:NER 模型识别人名、地名、机构等
  • 关系抽取:识别实体间的语义关系
  • Neo4j 存储:图数据库存储实体和关系
  • 异步构建:不阻塞主流程,后台任务完成

任务队列

  • Kafka 消费者:从 document.created 事件消费任务
  • 异步处理:每个任务独立协程执行
  • 失败重试:指数退避重试,最多 3 次
  • 状态回写:处理结果写回 PostgreSQL

技术架构

综合架构全景图

flowchart TB
    subgraph External["外部系统"]
        KnowledgeService["Knowledge Service<br/>文档上传"]
        RetrievalService["Retrieval Service<br/>向量检索"]
    end

    subgraph KafkaCluster["Kafka 集群"]
        Topic["document.events<br/>文档事件流"]
    end

    subgraph IndexingService["Indexing Service (本服务)"]
        subgraph API["API 层"]
            HealthAPI["/health, /readiness"]
            StatsAPI["/stats"]
            IncrementalAPI["/incremental/*"]
        end

        subgraph Core["核心处理层"]
            Consumer["Kafka Consumer<br/>事件消费"]
            IncrementalIndexer["IncrementalIndexer<br/>增量索引协调器"]
            VersionManager["VersionManager<br/>版本管理"]
            DocumentProcessor["DocumentProcessor<br/>文档处理协调器"]
        end

        subgraph Modules["功能模块"]
            ParserFactory["ParserFactory<br/>解析器工厂"]
            Parsers["Parsers<br/>PDF/Word/Excel/..."]
            Chunker["DocumentChunker<br/>语义分块"]
            Embedder["BGE_M3_Embedder<br/>向量化"]
            GraphBuilder["GraphBuilder<br/>图谱构建"]
        end
    end

    subgraph Adapters["适配器层"]
        MinIOClient["MinIOClient<br/>文档下载"]
        VectorStoreClient["VectorStoreClient<br/>向量存储适配器"]
        Neo4jClient["Neo4jClient<br/>图数据库适配器"]
    end

    subgraph StorageAdapter["Vector-Store-Adapter"]
        AdapterAPI["适配器 API<br/>/collections/*/insert"]
        MilvusImpl["Milvus 实现"]
        PgVectorImpl["PgVector 实现"]
    end

    subgraph Storage["存储层"]
        MinIO["MinIO<br/>对象存储"]
        Redis["Redis<br/>版本缓存"]
        Milvus["Milvus<br/>向量数据库"]
        Neo4j["Neo4j<br/>知识图谱"]
    end

    %% 外部系统连接
    KnowledgeService -->|上传文档| MinIO
    KnowledgeService -->|发送事件| Topic
    RetrievalService -->|检索向量| Milvus

    %% Kafka 事件流
    Topic --> Consumer

    %% API 调用
    External -.->|HTTP| HealthAPI
    External -.->|HTTP| IncrementalAPI

    %% 核心流程
    Consumer --> IncrementalIndexer
    IncrementalAPI --> IncrementalIndexer

    IncrementalIndexer --> VersionManager
    IncrementalIndexer --> DocumentProcessor

    VersionManager --> Redis

    DocumentProcessor --> ParserFactory
    ParserFactory --> Parsers
    DocumentProcessor --> Chunker
    DocumentProcessor --> Embedder
    DocumentProcessor --> GraphBuilder

    %% 适配器调用
    DocumentProcessor --> MinIOClient
    DocumentProcessor --> VectorStoreClient
    GraphBuilder --> Neo4jClient

    MinIOClient --> MinIO
    VectorStoreClient --> AdapterAPI
    Neo4jClient --> Neo4j

    %% 适配器内部
    AdapterAPI --> MilvusImpl
    AdapterAPI --> PgVectorImpl
    MilvusImpl --> Milvus

    %% 样式
    style External fill:#e1f5fe
    style KafkaCluster fill:#fff3e0
    style IndexingService fill:#f3e5f5
    style API fill:#e8f5e9
    style Core fill:#fff9c4
    style Modules fill:#fce4ec
    style Adapters fill:#e0f2f1
    style StorageAdapter fill:#f1f8e9
    style Storage fill:#ede7f6

    %% 关键路径标注
    linkStyle 2,3,5,6,7,8,9,10 stroke:#f00,stroke-width:2px

架构说明

  1. 事件驱动:Knowledge Service 上传文档后发送 Kafka 事件,Indexing Service 异步消费
  2. 增量索引:IncrementalIndexer 通过 VersionManager 检测变更,避免重复索引
  3. 模块化设计:解析、分块、向量化、图谱构建各司其职,低耦合高内聚
  4. 适配器模式:VectorStoreClient 通过 vector-store-adapter 统一访问向量存储,支持多后端切换
  5. 版本管理:Redis 缓存文档版本信息,支持快速变更检测
  6. 关键路径(红色标注):Kafka → Consumer → IncrementalIndexer → DocumentProcessor → VectorStoreClient → Milvus

整体分层架构图

flowchart TB
    subgraph External["外部触发层"]
        Upload["Knowledge Service<br/>文档上传"]
        Kafka["Kafka<br/>document.created事件"]
    end

    subgraph IndexingSvc["Indexing Service"]
        Consumer["Kafka Consumer<br/>任务消费"]
        Processor["DocumentProcessor<br/>处理协调"]
    end

    subgraph Parsers["解析器"]
        ParserFactory["ParserFactory<br/>解析器工厂"]
        PDFParser["PDF Parser"]
        DOCXParser["DOCX Parser"]
        TXTParser["TXT Parser"]
        MarkdownParser["Markdown Parser"]
    end

    subgraph Processing["处理层"]
        Chunker["DocumentChunker<br/>语义分块"]
        Embedder["BGE-M3 Embedder<br/>向量化"]
        GraphBuilder["GraphBuilder<br/>图谱构建"]
    end

    subgraph Storage["存储"]
        MinIO["MinIO<br/>文档下载"]
        Milvus["Milvus<br/>向量存储"]
        Neo4j["Neo4j<br/>知识图谱"]
        PostgreSQL["PostgreSQL<br/>状态回写"]
    end

    Upload --> Kafka
    Kafka --> Consumer
    Consumer --> Processor

    Processor --> MinIO
    MinIO --> ParserFactory

    ParserFactory --> PDFParser
    ParserFactory --> DOCXParser
    ParserFactory --> TXTParser
    ParserFactory --> MarkdownParser

    ParserFactory --> Chunker
    Chunker --> Embedder
    Embedder --> Milvus

    Chunker --> GraphBuilder
    GraphBuilder --> Neo4j

    Processor --> PostgreSQL

    style External fill:#e3f2fd
    style IndexingSvc fill:#fff3e0
    style Parsers fill:#f3e5f5
    style Processing fill:#e8f5e9
    style Storage fill:#e0f2f1

服务分层架构详图

flowchart LR
    subgraph API["API层"]
        Health["/health<br/>健康检查"]
        Readiness["/readiness<br/>就绪检查"]
        Stats["/stats<br/>统计信息"]
        Trigger["/trigger<br/>手动触发"]
    end

    subgraph Infrastructure["基础设施层"]
        KafkaConsumer["KafkaConsumer<br/>消息消费"]
        MinIOClient["MinIOClient<br/>对象存储"]
        Neo4jClient["Neo4jClient<br/>图数据库"]
        VectorStoreClient["VectorStoreClient<br/>向量数据库"]
    end

    subgraph Core["核心业务层"]
        DocumentProcessor["DocumentProcessor<br/>文档处理协调器"]

        subgraph Parsers2["解析模块"]
            ParserFactory2["ParserFactory"]
            BaseParser["BaseParser接口"]
            PDFParser2["PDFParser"]
            WordParser["WordParser"]
            ExcelParser["ExcelParser"]
            HTMLParser["HTMLParser"]
        end

        subgraph Processing2["处理模块"]
            DocumentChunker2["DocumentChunker<br/>固定分块"]
            AdaptiveChunker["AdaptiveChunker<br/>自适应分块"]
            BGE_M3_Embedder2["BGE_M3_Embedder<br/>向量化"]
            CachedEmbedder["CachedEmbedder<br/>缓存向量化"]
        end

        subgraph GraphModule["图谱模块"]
            EntityExtractor["EntityExtractor<br/>实体提取"]
            GraphBuilder2["GraphBuilder<br/>图谱构建"]
        end
    end

    API --> DocumentProcessor
    KafkaConsumer --> DocumentProcessor

    DocumentProcessor --> MinIOClient
    DocumentProcessor --> ParserFactory2
    DocumentProcessor --> DocumentChunker2
    DocumentProcessor --> BGE_M3_Embedder2
    DocumentProcessor --> VectorStoreClient
    DocumentProcessor --> GraphBuilder2

    ParserFactory2 -.实例化.-> BaseParser
    BaseParser -.实现.-> PDFParser2
    BaseParser -.实现.-> WordParser
    BaseParser -.实现.-> ExcelParser
    BaseParser -.实现.-> HTMLParser

    DocumentChunker2 -.继承.-> AdaptiveChunker
    BGE_M3_Embedder2 -.继承.-> CachedEmbedder

    GraphBuilder2 --> EntityExtractor
    GraphBuilder2 --> Neo4jClient

    style API fill:#e3f2fd
    style Infrastructure fill:#fff3e0
    style Core fill:#e8f5e9
    style Parsers2 fill:#f3e5f5
    style Processing2 fill:#fce4ec
    style GraphModule fill:#e0f2f1

模块间交互与依赖图

graph TB
    subgraph "主流程"
        A[main.py] --> B[FastAPI App]
        B --> C[DocumentEventConsumer]
        B --> D[DocumentProcessor]
    end

    subgraph "DocumentProcessor依赖"
        D --> E[ParserFactory]
        D --> F[DocumentChunker]
        D --> G[BGE_M3_Embedder]
        D --> H[GraphBuilder]
        D --> I[MinIOClient]
        D --> J[VectorStoreClient]
        D --> K[Neo4jClient]
    end

    subgraph "Parser层级"
        E --> L[BaseParser接口]
        L --> M[PDFParser]
        L --> N[WordParser]
        L --> O[ExcelParser]
        L --> P[HTMLParser]
        L --> Q[MarkdownParser]
        L --> R[TextParser]
        L --> S[PPTParser]
    end

    subgraph "Chunker层级"
        F --> T[RecursiveCharacterTextSplitter<br/>LangChain]
        F --> U[AdaptiveChunker]
    end

    subgraph "Embedder层级"
        G --> V[SentenceTransformer<br/>BAAI/bge-m3]
        G --> W[CachedEmbedder]
    end

    subgraph "GraphBuilder依赖"
        H --> X[EntityExtractor]
        H --> K
    end

    subgraph "外部库"
        M --> Y[pdfplumber]
        M --> Z[PyPDF2]
        N --> AA[python-docx]
        O --> AB[openpyxl]
        P --> AC[BeautifulSoup4]
        S --> AD[python-pptx]
    end

    C -.消费事件.-> D

    style A fill:#e3f2fd
    style B fill:#e3f2fd
    style D fill:#fff3e0
    style H fill:#e8f5e9
    style E fill:#f3e5f5
    style F fill:#fce4ec
    style G fill:#e1bee7

架构说明

分层架构详解

API 层(接口层)

Indexing Service 提供 6 个核心 HTTP 端点:

  • /health:健康检查,返回服务状态和版本号
  • /readiness:就绪检查,验证 Kafka Consumer、DocumentProcessor、MinIO、Milvus、Neo4j 连接状态
  • /stats:统计信息,返回已处理文档数、chunk 数、向量数、图谱节点数、增量索引跳过率等
  • /trigger:手动触发文档处理(仅测试环境使用)
  • /incremental/check:增量索引检查,返回文档是否需要重新索引
  • /incremental/reindex:强制重新索引文档

API 层使用 FastAPI 框架,支持异步请求处理。集成中间件链:

  1. RateLimitMiddleware:速率限制(默认 60 次/分钟,1000 次/小时)
  2. RequestIDMiddleware:请求 ID 生成,用于链路追踪
  3. StructuredLoggingMiddleware:结构化日志记录
  4. CORSMiddleware:跨域请求支持

Prometheus metrics 通过 /metrics 端点暴露,监控维度包含 tenant_id、document_type 等标签,核心指标:

  • indexing_documents_processed_total:文档处理总数(按状态和租户分组)
  • indexing_document_processing_seconds:文档处理耗时直方图(P50/P95/P99)
  • indexing_chunks_created_total:分块创建总数
  • indexing_vectors_stored_total:向量存储总数
  • indexing_embedding_seconds:向量化耗时直方图
  • indexing_minio_download_seconds:MinIO 下载耗时直方图
  • indexing_errors_total:错误计数(按错误类型分组)

基础设施层(Infrastructure Layer)

该层封装外部系统的客户端,提供统一的接口:

  • KafkaConsumer:消费 Kafka document.events topic,接收文档上传事件。使用 confluent_kafka 库,配置自动提交偏移量(5 秒间隔),最大轮询间隔 5 分钟。支持事件类型分发(document.uploaded/updated/deleted)。

  • MinIOClient:对象存储客户端,负责文档下载。连接 MinIO S3 兼容 API,下载超时 30 秒,支持多租户文件路径隔离(tenant_id/date/filename)。

  • VectorStoreClient:Milvus 向量数据库客户端,批量插入向量。连接 Milvus 19530 端口,支持按 tenant_id 分区,单批次最大 10000 条记录,插入超时 30 秒。

  • Neo4jClient:Neo4j 图数据库客户端,批量创建节点和关系。使用 Bolt 协议连接,支持事务(ACID),连接池最大 50 个连接,生命周期 3600 秒。

核心业务层(Core Layer)

DocumentProcessor(文档处理协调器)

核心协调器,编排 6 个步骤的处理流水线:

  1. 文档下载_download_document):调用 MinIOClient 下载文件,返回 bytes
  2. 文档解析_parse_document):调用 ParserFactory 选择解析器,提取纯文本
  3. 语义分块_chunk_document):调用 DocumentChunker 分块,生成 chunk_id 和 metadata
  4. 向量化_vectorize_chunks):调用 BGE_M3_Embedder 批量向量化
  5. 向量存储_store_vectors):调用 VectorStoreClient 批量插入 Milvus
  6. 图谱构建_build_graph):调用 GraphBuilder 异步构建知识图谱

每个步骤独立可测试,失败时记录错误日志,更新统计信息。统计数据包括:total_processed(总处理数)、total_success(成功数)、total_failed(失败数)、total_chunks(总 chunk 数)、total_vectors(总向量数)。

解析模块(Parser Module)

ParserFactory:工厂模式,根据文件扩展名动态选择解析器。扩展名与解析器的映射关系:

  • .pdf → PDFParser
  • .docx, .doc → WordParser
  • .xlsx, .xls → ExcelParser
  • .html, .htm → HTMLParser
  • .md → MarkdownParser
  • .txt → TextParser
  • .pptx, .ppt → PPTParser

BaseParser 接口:定义统一的解析接口,包含 2 个方法:

  • parse(file_data: bytes) -> str:解析文件,返回纯文本
  • extract_metadata(file_data: bytes) -> Dict:提取元数据(标题、作者、页数等)
  • clean_text(text: str) -> str:清理文本(去除多余空格、特殊字符)

PDFParser 实现:双层降级策略,确保解析成功率

  • 主解析器:pdfplumber,准确率高(95%+),支持表格和布局识别
  • 备用解析器:PyPDF2,兼容性好,处理加密或复杂布局 PDF
  • 降级条件:pdfplumber 提取文本少于 100 字符时,切换到 PyPDF2
  • 元数据提取:使用 PyPDF2 读取 PDF 元数据(/Title、/Author、/Pages 等)

处理模块(Processing Module)

DocumentChunker(固定分块器)

使用 LangChain 的 RecursiveCharacterTextSplitter,按语义边界递归分块:

  • 分块参数

    • chunk_size:500 字符(平衡上下文长度和检索粒度)
    • chunk_overlap:50 字符(10%重叠,保证上下文连贯性)
  • 分隔符优先级(从高到低):

    1. \n\n(段落分隔)
    2. \n(行分隔)
    3. 。!?;(中文句号)
    4. . ! ? ;(英文句号)
    5. (空格)
    6. ``(字符级切分,兜底策略)
  • Chunk ID 生成:格式 {document_id}_{index}_{text_hash},text_hash 取 MD5 前 8 位,保证内容变化时 ID 变化,支持增量更新。

  • Token 估算:中文字符 × 1.5 + 英文单词 × 1.3,用于控制 LLM 上下文长度。实际应用中可使用 tiktoken 库精确计算。

AdaptiveChunker(自适应分块器)

继承 DocumentChunker,根据文档类型调整分块策略:

文档类型 chunk_size chunk_overlap 分隔符优先级 适用场景
code 800 100 \n\nclass, \n\ndef, \n\n, \n 代码文件,按类/函数分割
markdown 600 60 \n##, \n###, \n\n, \n Markdown 文档,按标题层级分割
technical 700 70 \n\n, \n, , . 技术文档,保留术语完整性
general 500 50 默认分隔符 通用文档

BGE_M3_Embedder(向量化模块)

基于 Sentence-Transformers 库加载 BGE-M3 模型:

  • 模型:BAAI/bge-m3,多语言向量化模型(支持中英文)
  • 维度:1024 维,平衡向量表达能力和存储成本
  • 批量处理:batch_size=32,相比单条处理提升吞吐量 10 倍+
  • 归一化:L2 归一化,支持余弦相似度计算(范围 [-1, 1])
  • 设备:优先 GPU(CUDA),降级到 CPU

批量向量化流程:

  1. 过滤空文本,替换为单空格
  2. 分批处理(每批 32 条),避免 GPU 内存溢出
  3. SentenceTransformer.encode() 推理,返回 numpy 数组
  4. 转换为 List[List[float]],用于 Milvus 插入

CachedEmbedder(缓存向量化)

继承 BGE_M3_Embedder,添加 LRU 缓存:

  • 缓存容量:10000 条,约占用 40MB 内存(1024 维 × 4 字节 × 10000)
  • 缓存键:文本哈希值(hash(text))
  • 命中率:重复文档或相似文本场景,命中率 30-50%
  • 性能提升:缓存命中时,延迟从 50ms 降至 <1ms,提升 50 倍+
  • 驱逐策略:简单 FIFO,缓存满时删除第一个元素

图谱模块(Graph Module)

EntityExtractor(实体提取器)

基于 NER 模型提取实体:

  • 模型:可选 spaCy、BERT-based NER、LLM-based extraction
  • 实体类型:人名(PERSON)、地名(LOCATION)、机构(ORGANIZATION)、时间(DATE)、数量(QUANTITY)等
  • 置信度阈值:0.7,过滤低置信度实体,减少噪声

GraphBuilder(图谱构建器)

负责构建知识图谱:

  1. 实体提取extract_entities):调用 EntityExtractor 识别实体
  2. 关系抽取extract_relationships):识别实体间的语义关系(依赖句法分析或关系抽取模型)
  3. 图谱存储build):批量创建 Neo4j 节点和关系
    • 节点:(Entity {name, type, document_id, tenant_id})
    • 关系:(e1)-[RELATED_TO {type, confidence}]->(e2)
  4. 异步执行:使用 asyncio.create_task(),不阻塞向量存储

存储层说明

Milvus 向量存储

  • Collection:documents,按 tenant_id 分区,支持多租户隔离
  • 索引类型:HNSW(Hierarchical Navigable Small World),平衡速度和准确性
  • 索引参数:M=16(每个节点的邻居数),efConstruction=200(构建时搜索深度)
  • 度量类型:COSINE(余弦相似度),范围 [-1, 1]
  • 字段 Schema
    • chunk_id(主键,VARCHAR)
    • document_id(VARCHAR)
    • tenant_id(VARCHAR,分区键)
    • content(VARCHAR,原始文本)
    • embedding(FloatVector,1024 维)
    • metadata(JSON,扩展字段)

Neo4j 知识图谱

  • 节点类型:Entity(实体节点)、Document(文档节点)
  • 关系类型:RELATED_TO(实体关联)、BELONGS_TO(文档归属)
  • 索引:Entity.name、Entity.type、Document.document_id 建立索引,加速查询
  • 事务:批量操作使用事务(ACID),确保一致性

PostgreSQL 状态回写

  • :documents,存储文档元数据和处理状态
  • 字段:document_id、tenant_id、status(PENDING/PROCESSING/COMPLETED/FAILED)、chunks_count、created_at、updated_at
  • 更新时机:处理完成或失败时,回写 status 和 chunks_count
  • 查询接口:Knowledge Service 查询处理进度,展示给用户

数据模型

领域模型 UML 图

classDiagram
    class DocumentProcessor {
        -ParserFactory parser_factory
        -DocumentChunker chunker
        -BGE_M3_Embedder embedder
        -VectorStoreClient vector_store_client
        -MinIOClient minio_client
        -Neo4jClient neo4j_client
        -GraphBuilder graph_builder
        -dict stats
        +process_document(document_id, tenant_id, file_path)
        -_download_document()
        -_parse_document()
        -_chunk_document()
        -_vectorize_chunks()
        -_store_vectors()
        -_build_graph()
        +get_stats()
    }

    class ParserFactory {
        -dict parsers
        +get_parser(file_path)
        +register_parser(extension, parser)
    }

    class BaseParser {
        <<interface>>
        +parse(file_data)
        +extract_metadata(file_data)
        +clean_text(text)
    }

    class PDFParser {
        +parse(file_data)
        -_parse_with_pdfplumber()
        -_parse_with_pypdf2()
        +extract_metadata()
    }

    class DocumentChunker {
        -int chunk_size
        -int chunk_overlap
        -RecursiveCharacterTextSplitter splitter
        +chunk(text, document_id)
        -_generate_chunk_id()
        -_estimate_tokens()
    }

    class AdaptiveChunker {
        -dict strategies
        +chunk(text, document_id, doc_type)
    }

    class BGE_M3_Embedder {
        -string model_name
        -int dimension
        -SentenceTransformer model
        +embed_query(text)
        +embed_batch(texts)
    }

    class GraphBuilder {
        -Neo4jClient neo4j_client
        +extract(text)
        +build(entities, relationships, doc_id)
    }

    class Chunk {
        +string id
        +string content
        +int index
        +int tokens
        +dict metadata
    }

    class ProcessingResult {
        +string document_id
        +string status
        +int chunks_count
        +float duration
        +string error
        +is_success()
    }

    DocumentProcessor "1" *-- "1" ParserFactory
    DocumentProcessor "1" *-- "1" DocumentChunker
    DocumentProcessor "1" *-- "1" BGE_M3_Embedder
    DocumentProcessor "1" *-- "1" GraphBuilder
    DocumentProcessor ..> ProcessingResult : returns

    ParserFactory ..> BaseParser : creates
    BaseParser <|.. PDFParser : implements

    DocumentChunker <|-- AdaptiveChunker : extends
    DocumentChunker ..> Chunk : creates

数据结构说明

Chunk 结构

字段 类型 说明
id string chunk 唯一标识符,格式:{doc_id}_{index}_{hash}
content string chunk 文本内容,长度 500 字符(可配置)
index int chunk 在文档中的序号,从 0 开始
tokens int 估算的 Token 数量,用于控制 LLM 上下文长度
metadata dict 元数据,包含 document_id、chunk_index、total_chunks

ProcessingResult 结构

字段 类型 说明
document_id string 文档 ID
status string 处理状态:success/failed
chunks_count int 生成的 chunk 数量
duration float 处理耗时(秒)
error string 错误信息(仅 status=failed 时)

调用链路深度分析

上游接口:Kafka 事件触发

Indexing Service 的主流程由 Kafka 事件驱动,从 Knowledge Service 上传文档开始。

入口点:DocumentEventConsumer

# app/infrastructure/kafka_consumer.py
class DocumentEventConsumer:
    async def start(self):
        """启动Kafka消费者"""
        self.consumer = Consumer(self.config)
        self.consumer.subscribe([self.topic])  # document.events
        self._running = True
        await self._consume_loop()

关键代码路径

  1. main.py 启动阶段
# main.py (行 56-89)
@asynccontextmanager
async def lifespan(app: FastAPI):
    kafka_consumer = DocumentEventConsumer()
    document_processor = DocumentProcessor()

    # 异步启动Kafka消费者
    asyncio.create_task(kafka_consumer.start())

    yield

    # 关闭资源
    await kafka_consumer.stop()
    await document_processor.cleanup()
  1. 消息消费循环
# app/infrastructure/kafka_consumer.py (行 94-120)
async def _consume_loop(self):
    while self._running:
        msg = self.consumer.poll(timeout=1.0)
        if msg and not msg.error():
            await self._handle_message(msg)
  1. 事件分发
# app/infrastructure/kafka_consumer.py (行 121-153)
async def _handle_message(self, msg):
    event = json.loads(msg.value().decode("utf-8"))
    event_type = event.get("event_type")  # document.uploaded
    payload = event.get("payload", {})

    # 分发到注册的处理器
    handlers = self.handlers.get(event_type, [])
    for handler in handlers:
        await handler(payload)

模块内部时序图

1. Kafka 消费与事件分发时序图

sequenceDiagram
    autonumber
    participant Main as main.py
    participant App as FastAPI App
    participant Consumer as DocumentEventConsumer
    participant Kafka as Kafka Broker
    participant Handler as document_uploaded_handler
    participant Processor as DocumentProcessor

    Main->>App: uvicorn.run(app)
    App->>App: lifespan startup
    App->>Consumer: __init__()
    App->>Processor: __init__()

    App->>Consumer: asyncio.create_task(start())
    Consumer->>Kafka: subscribe("document.events")

    loop 消费循环
        Consumer->>Kafka: poll(timeout=1.0)
        Kafka-->>Consumer: message

        Consumer->>Consumer: _handle_message(msg)
        Consumer->>Consumer: json.loads(msg.value())
        Consumer->>Consumer: 提取event_type和payload

        Consumer->>Handler: await handler(payload)
        Handler->>Processor: process_document(doc_id, tenant_id, file_path)
        Processor-->>Handler: ProcessingResult
        Handler-->>Consumer: success
    end

    Note over Consumer,Kafka: 轮询间隔1秒,最大轮询间隔5分钟<br/>自动提交偏移量(5秒间隔)

时序图说明

  • 步骤 1-5(初始化):FastAPI 应用启动时,初始化 Consumer 和 Processor,异步启动消费循环
  • 步骤 6-7(订阅):Consumer 连接 Kafka,订阅 document.events topic
  • 步骤 8-14(消费循环):轮询消息(1 秒超时),解析 JSON,分发到处理器
  • 并发控制:消费循环串行处理,但 DocumentProcessor 内部使用 asyncio 并发
  • 容错机制:poll 超时或解析失败时,记录错误日志,继续下一轮
  • 性能特征:单个 Consumer 吞吐量约 10-20 条/秒(受 DocumentProcessor 处理速度限制)

2. DocumentProcessor 主流程时序图

sequenceDiagram
    autonumber
    participant Processor as DocumentProcessor
    participant MinIO as MinIOClient
    participant Parser as ParserFactory
    participant PDF as PDFParser
    participant Chunker as DocumentChunker
    participant Embedder as BGE_M3_Embedder
    participant Vector as VectorStoreClient
    participant Graph as GraphBuilder
    participant Neo4j as Neo4jClient

    Processor->>Processor: process_document(doc_id, tenant_id, file_path)
    Processor->>Processor: start_time = time.time()

    rect rgb(230, 240, 255)
        Note over Processor,MinIO: 步骤1:下载文档
        Processor->>MinIO: _download_document(file_path)
        MinIO->>MinIO: download_file(file_path)
        MinIO-->>Processor: file_data (bytes)
    end

    rect rgb(255, 240, 230)
        Note over Processor,PDF: 步骤2:解析文档
        Processor->>Parser: _parse_document(file_data, file_path)
        Parser->>Parser: get_parser(file_path)
        Parser->>PDF: parse(file_data)
        PDF->>PDF: _parse_with_pdfplumber(file_data)
        alt pdfplumber成功且文本>100字符
            PDF-->>Parser: text
        else 降级到PyPDF2
            PDF->>PDF: _parse_with_pypdf2(file_data)
            PDF-->>Parser: text
        end
        Parser->>Parser: clean_text(text)
        Parser-->>Processor: text (纯文本)
    end

    rect rgb(230, 255, 230)
        Note over Processor,Chunker: 步骤3:语义分块
        Processor->>Chunker: _chunk_document(text, doc_id)
        Chunker->>Chunker: splitter.split_text(text)
        loop 遍历每个chunk
            Chunker->>Chunker: _generate_chunk_id(doc_id, index, text)
            Chunker->>Chunker: _estimate_tokens(text)
        end
        Chunker-->>Processor: chunks (List[Dict])
    end

    rect rgb(255, 230, 255)
        Note over Processor,Embedder: 步骤4:批量向量化
        Processor->>Embedder: _vectorize_chunks(chunks)
        Embedder->>Embedder: embed_batch(texts)
        loop 分批处理 (batch_size=32)
            Embedder->>Embedder: model.encode(batch)
        end
        Embedder-->>Processor: embeddings (List[1024-dim])
    end

    rect rgb(230, 255, 255)
        Note over Processor,Vector: 步骤5:存储向量
        Processor->>Vector: _store_vectors(chunks, embeddings, doc_id, tenant_id)
        Vector->>Vector: 准备数据 (chunk_id, content, embedding, metadata)
        Vector->>Vector: insert_batch(data)
        Vector-->>Processor: success
    end

    par 步骤6:异步构建图谱(不阻塞)
        Processor->>Graph: asyncio.create_task(_build_graph(...))
        Graph->>Graph: extract(text)
        Graph->>Graph: _extract_entities(text)
        Graph->>Graph: _extract_relationships(text, entities)
        Graph->>Neo4j: batch_create_nodes(entities)
        Graph->>Neo4j: batch_create_relationships(relationships)
        Neo4j-->>Graph: success
    end

    Processor->>Processor: 更新统计信息
    Processor->>Processor: duration = time.time() - start_time
    Processor-->>Processor: return {status, doc_id, chunks_count, duration}

调用链路详解

步骤 1:文档下载(_download_document

  • 调用栈DocumentProcessor.process_document()DocumentProcessor._download_document()MinIOClient.download_file()
  • 关键代码
# app/core/document_processor.py (行 111-114)
async def _download_document(self, file_path: str) -> bytes:
    logger.info(f"Downloading document: {file_path}")
    return await self.minio_client.download_file(file_path)
  • 性能特征:网络 IO,延迟 1-2 秒(10MB 文件),取决于 MinIO 与服务间的网络带宽
  • 容错:下载失败时抛出异常,外层捕获并记录到 failed 统计

步骤 2:文档解析(_parse_document

  • 调用栈DocumentProcessor._parse_document()ParserFactory.get_parser()PDFParser.parse()
  • 关键代码
# app/core/document_processor.py (行 116-128)
async def _parse_document(self, file_data: bytes, file_path: str) -> str:
    logger.info(f"Parsing document: {file_path}")
    parser = self.parser_factory.get_parser(file_path)
    text = await parser.parse(file_data)
    logger.info(f"Parsed document, text length: {len(text)} chars")
    return text
  • PDFParser 降级策略
# app/core/parsers/pdf_parser.py (行 18-33)
async def parse(self, file_data: bytes, **kwargs) -> str:
    try:
        # 主解析器:pdfplumber
        text = await self._parse_with_pdfplumber(file_data)

        if not text or len(text) < 100:
            # 降级到PyPDF2
            logger.warning("pdfplumber failed, trying PyPDF2")
            text = await self._parse_with_pypdf2(file_data)

        return self.clean_text(text)
    except Exception as e:
        logger.error(f"Failed to parse PDF: {e}")
        raise
  • 性能特征:CPU 密集,10 页 PDF 约 2-3 秒,100 页约 15-20 秒
  • 准确率:pdfplumber 准确率 95%+,PyPDF2 准确率 85%+(降级场景)

步骤 3:语义分块(_chunk_document

  • 调用栈DocumentProcessor._chunk_document()DocumentChunker.chunk()RecursiveCharacterTextSplitter.split_text()
  • 关键代码
# app/core/document_processor.py (行 130-138)
async def _chunk_document(self, text: str, document_id: str) -> List[Dict]:
    logger.info(f"Chunking document: {document_id}")
    chunks = await self.chunker.chunk(text, document_id)
    logger.info(f"Created {len(chunks)} chunks")
    return chunks
  • 分块实现
# app/core/chunker.py (行 40-77)
async def chunk(self, text: str, document_id: str) -> List[Dict]:
    if not text or not text.strip():
        return []

    # LangChain分块
    chunks = self.splitter.split_text(text)

    # 构建结果
    result = []
    for i, chunk_text in enumerate(chunks):
        chunk_id = self._generate_chunk_id(document_id, i, chunk_text)
        result.append({
            "id": chunk_id,
            "content": chunk_text,
            "index": i,
            "tokens": self._estimate_tokens(chunk_text),
            "metadata": {
                "document_id": document_id,
                "chunk_index": i,
                "total_chunks": len(chunks),
            },
        })
    return result
  • 性能特征:纯计算,O(n)复杂度,10000 字符约 0.5-1 秒
  • 分块效果:500 字符/chunk,10 页 PDF(约 5000 字)生成 10-15 个 chunks

步骤 4:批量向量化(_vectorize_chunks

  • 调用栈DocumentProcessor._vectorize_chunks()BGE_M3_Embedder.embed_batch()SentenceTransformer.encode()
  • 关键代码
# app/core/document_processor.py (行 140-152)
async def _vectorize_chunks(self, chunks: List[Dict]) -> List[List[float]]:
    logger.info(f"Vectorizing {len(chunks)} chunks")
    texts = [chunk["content"] for chunk in chunks]
    embeddings = await self.embedder.embed_batch(texts)
    logger.info(f"Generated {len(embeddings)} embeddings")
    return embeddings
  • 批量向量化实现
# app/core/embedder.py (行 57-84)
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
    if not texts:
        return []

    # 过滤空文本
    valid_texts = [text if text and text.strip() else " " for text in texts]

    # 批量生成向量
    embeddings = self.model.encode(
        valid_texts,
        batch_size=self.batch_size,  # 32
        show_progress_bar=len(valid_texts) > 100,
        convert_to_numpy=True,
    )

    return embeddings.tolist()
  • 性能特征:GPU 推理,batch_size=32,50 个 chunks 约 3-5 秒(GPU),CPU 约 30-50 秒
  • 吞吐量提升:批量处理相比单条提升 10 倍+(GPU 并行)

步骤 5:向量存储(_store_vectors

  • 调用栈DocumentProcessor._store_vectors()VectorStoreClient.insert_batch()
  • 关键代码
# app/core/document_processor.py (行 154-175)
async def _store_vectors(self, chunks: List[Dict], embeddings: List[List[float]],
                       document_id: str, tenant_id: str = None):
    logger.info(f"Storing {len(embeddings)} vectors to Milvus")

    # 准备数据
    data = []
    for chunk, embedding in zip(chunks, embeddings):
        data.append({
            "chunk_id": chunk["id"],
            "document_id": document_id,
            "tenant_id": tenant_id or "default",
            "content": chunk["content"],
            "embedding": embedding,
            "metadata": chunk.get("metadata", {}),
        })

    # 批量插入
    await self.vector_store_client.insert_batch(data)
  • 性能特征:网络 IO + 磁盘 IO,50 个向量约 1-2 秒,单批次最大 10000 条
  • 索引更新:插入后 Milvus 异步更新 HNSW 索引,不阻塞插入操作

步骤 6:图谱构建(_build_graph,异步)

  • 调用栈asyncio.create_task(DocumentProcessor._build_graph())GraphBuilder.extract()GraphBuilder.build()
  • 关键代码
# app/core/document_processor.py (行 77-80)
asyncio.create_task(
    self._build_graph(text, document_id, tenant_id)
)

# app/core/document_processor.py (行 176-196)
async def _build_graph(self, text: str, document_id: str, tenant_id: str = None):
    try:
        entities, relationships = await self.graph_builder.extract(text)
        await self.graph_builder.build(
            entities=entities,
            relationships=relationships,
            document_id=document_id,
            tenant_id=tenant_id,
        )
    except Exception as e:
        logger.error(f"Error building graph: {e}", exc_info=True)
  • 性能特征:异步执行,不阻塞主流程,10 页 PDF 约 5-10 秒
  • 容错:图谱构建失败不影响向量存储,仅记录错误日志

3. PDFParser 解析流程时序图

sequenceDiagram
    autonumber
    participant Processor as DocumentProcessor
    participant Factory as ParserFactory
    participant PDF as PDFParser
    participant Plumber as pdfplumber
    participant PyPDF as PyPDF2

    Processor->>Factory: get_parser(file_path)
    Factory->>Factory: 提取扩展名: .pdf
    Factory->>PDF: __init__()
    Factory-->>Processor: PDFParser实例

    Processor->>PDF: parse(file_data)

    rect rgb(230, 255, 230)
        Note over PDF,Plumber: 主解析器:pdfplumber
        PDF->>Plumber: _parse_with_pdfplumber(file_data)
        Plumber->>Plumber: pdfplumber.open(BytesIO(file_data))

        loop 遍历每一页
            Plumber->>Plumber: page.extract_text()
            Plumber->>Plumber: text_parts.append(text)
        end

        Plumber->>Plumber: "\n\n".join(text_parts)
        Plumber-->>PDF: text

        PDF->>PDF: 检查文本长度

        alt 文本长度 >= 100字符
            Note over PDF: pdfplumber成功
        else 文本过短或为空
            rect rgb(255, 230, 230)
                Note over PDF,PyPDF: 降级解析器:PyPDF2
                PDF->>PyPDF: _parse_with_pypdf2(file_data)
                PyPDF->>PyPDF: PyPDF2.PdfReader(BytesIO(file_data))

                loop 遍历每一页
                    PyPDF->>PyPDF: page.extract_text()
                    PyPDF->>PyPDF: text_parts.append(text)
                end

                PyPDF->>PyPDF: "\n\n".join(text_parts)
                PyPDF-->>PDF: text
            end
        end
    end

    PDF->>PDF: clean_text(text)
    PDF->>PDF: 去除多余空格和特殊字符
    PDF-->>Processor: 清理后的text

    Note over PDF,PyPDF: 降级策略:pdfplumber失败或文本<100字符时切换
    Note over PDF,PyPDF: 准确率:pdfplumber 95%+,PyPDF2 85%+
    Note over PDF,PyPDF: 性能:10页PDF 2-3秒,100页PDF 15-20秒

PDFParser 关键特性

  • 双层降级:pdfplumber → PyPDF2,保证解析成功率 98%+
  • 降级阈值:文本长度 < 100 字符触发降级
  • 性能开销:pdfplumber 稍慢但准确率高,PyPDF2 更快但兼容性有限
  • 错误处理:两个解析器都失败时,抛出异常,外层记录 failed 统计

4. DocumentChunker 分块流程时序图

sequenceDiagram
    autonumber
    participant Processor as DocumentProcessor
    participant Chunker as DocumentChunker
    participant Splitter as RecursiveCharacterTextSplitter

    Processor->>Chunker: chunk(text, document_id)

    Chunker->>Chunker: 检查文本是否为空
    alt 文本为空
        Chunker-->>Processor: []
    end

    rect rgb(230, 240, 255)
        Note over Chunker,Splitter: LangChain递归分块
        Chunker->>Splitter: split_text(text)

        Splitter->>Splitter: 尝试分隔符: "\n\n"
        alt 分块成功且长度<=chunk_size
            Note over Splitter: 段落级分块
        else 继续尝试下一个分隔符
            Splitter->>Splitter: 尝试分隔符: "\n"
            alt 分块成功
                Note over Splitter: 行级分块
            else 继续尝试
                Splitter->>Splitter: 尝试句号: "。", "."
                alt 分块成功
                    Note over Splitter: 句子级分块
                else 最终兜底
                    Splitter->>Splitter: 字符级硬切分
                end
            end
        end

        Splitter->>Splitter: 应用chunk_overlap (50字符)
        Splitter-->>Chunker: chunks (List[str])
    end

    rect rgb(255, 240, 230)
        Note over Chunker: 构建Chunk结构
        loop 遍历每个chunk_text
            Chunker->>Chunker: _generate_chunk_id(doc_id, index, text)
            Chunker->>Chunker: MD5哈希前8位

            Chunker->>Chunker: _estimate_tokens(text)
            Chunker->>Chunker: 中文字符 × 1.5 + 英文单词 × 1.3

            Chunker->>Chunker: 构建chunk对象
            Note over Chunker: {id, content, index, tokens, metadata}
        end
    end

    Chunker-->>Processor: chunks (List[Dict])

    Note over Chunker,Splitter: 分块策略:优先语义边界,兜底字符切分
    Note over Chunker,Splitter: chunk_size=500, overlap=50 (10%重叠)
    Note over Chunker,Splitter: 10页PDF(5000字) → 10-15 chunks

DocumentChunker 关键特性

  • 递归分块:按分隔符优先级递归尝试,保证语义完整性
  • 重叠策略:10%重叠(50/500),保证上下文连贯,检索时边界 chunk 也能召回
  • Chunk ID 生成{doc_id}_{index}_{hash},hash 保证内容变化时 ID 变化
  • Token 估算:简单估算公式,实际应用可使用 tiktoken 精确计算
  • 性能:纯计算,O(n)复杂度,10000 字符约 0.5-1 秒

5. BGE_M3_Embedder 向量化流程时序图

sequenceDiagram
    autonumber
    participant Processor as DocumentProcessor
    participant Embedder as BGE_M3_Embedder
    participant Model as SentenceTransformer
    participant GPU as CUDA Device

    Processor->>Embedder: embed_batch(texts)

    Embedder->>Embedder: 过滤空文本
    loop 遍历texts
        alt text为空或空白
            Embedder->>Embedder: 替换为单空格 " "
        end
    end

    rect rgb(230, 240, 255)
        Note over Embedder,GPU: 批量向量化(batch_size=32)
        loop 分批处理
            Embedder->>Embedder: 提取batch (32条)

            Embedder->>Model: encode(batch, batch_size=32)
            Model->>Model: 文本tokenize
            Model->>Model: 截断或padding到max_length (512)

            Model->>GPU: 前向推理
            GPU->>GPU: BERT模型推理
            GPU->>GPU: Pooling (CLS或Mean)
            GPU->>GPU: L2归一化
            GPU-->>Model: embeddings (32 × 1024)

            Model-->>Embedder: batch_embeddings
            Embedder->>Embedder: all_embeddings.extend(batch)
        end
    end

    Embedder->>Embedder: embeddings.tolist()
    Embedder-->>Processor: List[List[float]]

    Note over Embedder,GPU: 批量处理:32条/批,相比单条提升10倍+
    Note over Embedder,GPU: 性能:50 chunks约3-5秒(GPU),30-50秒(CPU)
    Note over Embedder,GPU: 显存占用:2-3GB (batch_size=32)

BGE_M3_Embedder 关键特性

  • 批量处理:batch_size=32,GPU 并行推理,吞吐量提升 10 倍+
  • 归一化:L2 归一化,支持余弦相似度(范围 [-1, 1])
  • 设备选择:优先 GPU(CUDA),降级到 CPU
  • 内存管理:分批处理,避免 GPU OOM(batch_size 太大时)
  • 性能瓶颈:向量化是主流程中最耗时的步骤(占 30-40%)

6. VectorStoreClient 存储流程时序图

sequenceDiagram
    autonumber
    participant Processor as DocumentProcessor
    participant Client as VectorStoreClient
    participant Milvus as Milvus Server

    Processor->>Client: insert_batch(data)

    Client->>Client: 准备插入数据
    loop 遍历data
        Client->>Client: 构建记录
        Note over Client: {chunk_id, doc_id, tenant_id,<br/>content, embedding, metadata}
    end

    Client->>Client: 检查batch大小
    alt batch_size > 10000
        Client->>Client: 拆分为多个批次
    end

    rect rgb(230, 240, 255)
        Note over Client,Milvus: Milvus批量插入
        loop 每个批次
            Client->>Milvus: collection.insert(entities)
            Milvus->>Milvus: 根据tenant_id选择分区
            Milvus->>Milvus: 写入向量数据
            Milvus->>Milvus: 更新主键索引
            Milvus->>Milvus: 触发HNSW索引增量更新
            Milvus-->>Client: insert_result

            Client->>Client: 检查insert_result.err_count
            alt err_count > 0
                Client->>Client: 记录错误
                Note over Client: 主键冲突或数据格式错误
            end
        end
    end

    Client->>Milvus: collection.flush()
    Note over Client,Milvus: 确保数据持久化到磁盘

    Client-->>Processor: success

    Note over Client,Milvus: 批量插入:单批次最大10000条
    Note over Client,Milvus: 性能:50向量约1-2秒,1000向量约10-15秒
    Note over Client,Milvus: 索引更新:异步增量更新,不阻塞插入

VectorStoreClient 关键特性

  • 批量插入:单批次最大 10000 条,超过自动拆分
  • 分区隔离:按 tenant_id 分区,支持多租户数据隔离
  • 索引更新:HNSW 索引异步增量更新,插入后立即可查询(但 recall 可能略低,直到索引更新完成)
  • 容错:主键冲突时报错,支持先删除旧数据再插入(幂等性)
  • 性能:网络 IO + 磁盘 IO,50 向量约 1-2 秒

7. VectorStoreClient 适配器模式时序图

VectorStoreClient 通过 vector-store-adapter 服务统一访问 Milvus/PgVector,实现解耦和多后端支持。

sequenceDiagram
    autonumber
    participant Processor as DocumentProcessor
    participant Client as VectorStoreClient<br/>(algo/common)
    participant Adapter as vector-store-adapter<br/>HTTP Service
    participant Milvus as Milvus Server

    Processor->>Client: insert_batch(data_list)

    rect rgb(230, 240, 255)
        Note over Client,Adapter: 步骤1-2: HTTP调用适配器服务
        Client->>Client: 准备请求JSON
        Client->>Client: {backend: "milvus", data: [...]}

        Client->>Adapter: POST /collections/{name}/insert
        Note over Adapter: 接收请求并验证
    end

    rect rgb(255, 240, 230)
        Note over Adapter,Milvus: 步骤3-6: 适配器转换为Milvus调用
        Adapter->>Adapter: 选择后端实现(Milvus/PgVector)
        Adapter->>Adapter: 转换数据格式

        Adapter->>Milvus: collection.insert(entities)
        Note over Milvus: 批量写入向量数据<br/>更新HNSW索引

        Milvus-->>Adapter: insert_result
        Adapter->>Adapter: 检查错误
    end

    rect rgb(230, 255, 230)
        Note over Adapter,Client: 步骤7-8: 返回结果
        Adapter-->>Client: HTTP 200 + JSON result
        Client->>Client: 解析响应
        Client-->>Processor: success
    end

    Note over Client,Milvus: 适配器模式优势:<br/>1. 解耦服务与存储<br/>2. 支持多后端切换<br/>3. 统一错误处理<br/>4. 集中式监控

VectorStoreClient 适配器模式优势

  1. 解耦:Indexing Service 不直接依赖 Milvus SDK,降低耦合
  2. 多后端支持:通过配置切换 Milvus 或 PgVector,无需修改代码
  3. 统一接口:所有向量操作通过统一的 REST API
  4. 集中式监控:vector-store-adapter 统一记录指标和日志
  5. 灰度切换:可按 tenant_id 灰度切换后端,实现平滑迁移

关键代码

# algo/common/vector_store_client.py
class VectorStoreClient:
    def __init__(self, base_url: str = "http://vector-store-adapter:8003"):
        self.base_url = base_url
        self.backend = "milvus"  # 或 "pgvector"
        self.client = httpx.AsyncClient(base_url=base_url, timeout=30.0)

    async def insert_batch(self, data_list: List[Dict]):
        """批量插入向量"""
        response = await self.client.post(
            f"/collections/{self.collection_name}/insert",
            json={"backend": self.backend, "data": data_list}
        )
        response.raise_for_status()
        return response.json()

    async def search(self, query_vector, top_k=10, tenant_id=None):
        """向量检索"""
        response = await self.client.post(
            f"/collections/{self.collection_name}/search",
            json={
                "backend": self.backend,
                "query_vector": query_vector,
                "top_k": top_k,
                "tenant_id": tenant_id
            }
        )
        return response.json()["results"]

数据流转过程

  1. DocumentProcessor 准备向量数据(chunk_id、content、embedding、metadata)
  2. VectorStoreClient 封装为 HTTP 请求发送到 vector-store-adapter
  3. vector-store-adapter 根据 backend 参数选择 Milvus 或 PgVector 实现
  4. 适配器 转换数据格式,调用 Milvus SDK 的 collection.insert()
  5. Milvus 写入向量数据,触发 HNSW 索引增量更新
  6. 适配器 检查 insert_result.err_count,返回 HTTP 响应
  7. VectorStoreClient 解析响应,返回成功或失败状态
  8. DocumentProcessor 记录统计信息(total_vectors)

性能特征

  • HTTP 开销:增加 5-10ms 延迟(相比直接 SDK 调用)
  • 批量优化:单次插入 50 个向量,HTTP 开销占比 <1%
  • 重试机制:适配器内置重试(3 次,指数退避),提升可靠性
  • 超时控制:默认 30 秒超时,大批量插入可调整

API 详解

1. 处理文档(内部 API)

接口信息

  • 调用方式:Kafka 消费者自动触发
  • 协议:事件驱动
  • 幂等性:是(相同 document_id 重复处理覆盖旧数据)

Kafka 事件结构

{
    "document_id": "doc_123",
    "tenant_id": "tenant_abc",
    "user_id": "user_456",
    "file_path": "tenant_abc/2025-01-27/doc.pdf",
    "collection_id": "coll_789",
    "created_at": "2025-01-27T10:00:00Z"
}
字段 类型 必填 说明
document_id string 文档唯一标识符
tenant_id string 租户 ID,用于数据隔离
user_id string 上传用户 ID
file_path string MinIO 中的文件路径
collection_id string 所属集合 ID
created_at string 文档创建时间(ISO8601)

处理流程

async def process_document(self, document_id: str, tenant_id: str,
                          user_id: str, file_path: str) -> Dict:
    """
    处理文档(完整流程)
    """
    start_time = time.time()

    try:
        # 1. 下载文档
        file_data = await self._download_document(file_path)
        # MinIO下载,返回bytes

        # 2. 解析文档
        text = await self._parse_document(file_data, file_path)
        # 根据扩展名选择解析器(PDF/DOCX/TXT等)
        # 提取纯文本内容

        # 3. 分块
        chunks = await self._chunk_document(text, document_id)
        # RecursiveCharacterTextSplitter语义分块
        # 生成chunk_id、估算tokens

        # 4. 向量化
        embeddings = await self._vectorize_chunks(chunks)
        # BGE-M3批量向量化
        # 返回List[List[float]],每个1024维向量

        # 5. 存储到Milvus
        await self._store_vectors(chunks, embeddings, document_id, tenant_id)
        # 批量插入Milvus
        # 字段:chunk_id, doc_id, tenant_id, content, embedding, metadata

        # 6. 构建知识图谱(异步,不阻塞)
        asyncio.create_task(
            self._build_graph(text, document_id, tenant_id)
        )
        # 提取实体和关系
        # 存储到Neo4j

        duration = time.time() - start_time

        return {
            "status": "success",
            "document_id": document_id,
            "chunks_count": len(chunks),
            "duration": duration,
        }

    except Exception as e:
        # 记录错误日志
        # 更新文档状态为FAILED
        return {
            "status": "failed",
            "document_id": document_id,
            "error": str(e),
        }

文档解析详细实现

async def _parse_document(self, file_data: bytes, file_path: str) -> str:
    """
    解析文档,提取文本内容
    """
    # 1. 根据文件扩展名选择解析器
    parser = self.parser_factory.get_parser(file_path)
    # 支持:.pdf, .docx, .txt, .md, .html, .xlsx, .pptx

    # 2. 解析文档
    text = await parser.parse(file_data)
    # 每个解析器实现BaseParser接口

    return text

# PDF解析器示例
class PDFParser(BaseParser):
    async def parse(self, file_data: bytes) -> str:
        """解析PDF文档"""
        try:
            # 优先使用pdfplumber(更准确)
            text = await self._parse_with_pdfplumber(file_data)

            if not text or len(text) < 100:
                # 降级使用PyPDF2
                text = await self._parse_with_pypdf2(file_data)

            return self.clean_text(text)
        except Exception as e:
            # 记录错误并抛出
            raise

    async def _parse_with_pdfplumber(self, file_data: bytes) -> str:
        """使用pdfplumber解析"""
        text_parts = []

        with pdfplumber.open(io.BytesIO(file_data)) as pdf:
            for page in pdf.pages:
                text = page.extract_text()
                if text:
                    text_parts.append(text)

        return "\n\n".join(text_parts)

语义分块详细实现

async def _chunk_document(self, text: str, document_id: str) -> List[Dict]:
    """
    分块文档,保证语义完整性
    """
    if not text or not text.strip():
        return []

    # 1. 使用LangChain RecursiveCharacterTextSplitter
    chunks = self.splitter.split_text(text)
    # separators:按段落、句子、标点分割
    # chunk_size:500字符
    # chunk_overlap:50字符(10%重叠)

    # 2. 构建chunk结构
    result = []
    for i, chunk_text in enumerate(chunks):
        chunk_id = self._generate_chunk_id(document_id, i, chunk_text)

        result.append({
            "id": chunk_id,
            "content": chunk_text,
            "index": i,
            "tokens": self._estimate_tokens(chunk_text),
            "metadata": {
                "document_id": document_id,
                "chunk_index": i,
                "total_chunks": len(chunks),
            },
        })

    return result

def _generate_chunk_id(self, document_id: str, index: int, text: str) -> str:
    """
    生成chunk ID,保证唯一性

    格式:{document_id}_{index}_{text_hash}
    text_hash取MD5前8位,避免内容变化导致ID冲突
    """
    text_hash = hashlib.md5(text.encode()).hexdigest()[:8]
    return f"{document_id}_{index}_{text_hash}"

def _estimate_tokens(self, text: str) -> int:
    """
    估算Token数量

    策略:
    - 中文字符:1字符 ≈ 1.5 tokens
    - 英文单词:1单词 ≈ 1.3 tokens

    更准确的方法是使用tiktoken,但有性能开销
    """
    chinese_chars = sum(1 for char in text if "\u4e00" <= char <= "\u9fff")
    english_words = len([word for word in text.split() if any(c.isalpha() for c in word)])

    estimated_tokens = int(chinese_chars * 1.5 + english_words * 1.3)
    return estimated_tokens

向量化批量处理

async def _vectorize_chunks(self, chunks: List[Dict]) -> List[List[float]]:
    """
    批量向量化chunk,提升吞吐量
    """
    # 1. 提取文本
    texts = [chunk["content"] for chunk in chunks]

    # 2. 批量向量化(batch_size=32)
    embeddings = await self.embedder.embed_batch(texts)
    # BGE-M3模型
    # 输入:List[str]
    # 输出:List[List[float]],每个1024维

    return embeddings

# BGE-M3 Embedder实现
class BGE_M3_Embedder:
    async def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """批量向量化"""
        # 分批处理,避免内存溢出
        batch_size = 32
        all_embeddings = []

        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]

            # SentenceTransformer模型推理
            embeddings = self.model.encode(
                batch,
                batch_size=batch_size,
                show_progress_bar=False,
                normalize_embeddings=True,  # L2归一化
            )

            all_embeddings.extend(embeddings.tolist())

        return all_embeddings

Milvus 批量存储

async def _store_vectors(self, chunks: List[Dict], embeddings: List[List[float]],
                       document_id: str, tenant_id: str):
    """
    批量存储向量到Milvus
    """
    # 1. 准备数据
    data = []
    for chunk, embedding in zip(chunks, embeddings):
        data.append({
            "chunk_id": chunk["id"],
            "document_id": document_id,
            "tenant_id": tenant_id or "default",
            "content": chunk["content"],
            "embedding": embedding,  # 1024维向量
            "metadata": chunk.get("metadata", {}),
        })

    # 2. 批量插入Milvus
    await self.vector_store_client.insert_batch(data)
    # collection: documents
    # 索引: HNSW (M=16, efConstruction=200)
    # 分区: 按tenant_id

时序图:带增量索引的完整流程(端到端)

以下时序图展示从文档上传到索引完成的完整流程,包含增量索引优化。

sequenceDiagram
    autonumber
    participant KS as Knowledge Service
    participant Kafka
    participant Consumer as Kafka Consumer
    participant Indexer as IncrementalIndexer
    participant VersionMgr as VersionManager
    participant Redis
    participant Processor as DocumentProcessor
    participant MinIO
    participant Parser as PDFParser
    participant Chunker
    participant Embedder as BGE-M3
    participant VSAdapter as vector-store-adapter
    participant Milvus
    participant Neo4j

    rect rgb(225, 245, 254)
        Note over KS,Kafka: 步骤1-3: 文档上传与事件发送
        KS->>MinIO: 上传文档
        MinIO-->>KS: file_path
        KS->>Kafka: 发送 document.uploaded 事件
    end

    rect rgb(255, 243, 224)
        Note over Kafka,Indexer: 步骤4-6: Kafka 消费与分发
        Kafka->>Consumer: 轮询消息
        Consumer->>Consumer: 解析 event_type  payload
        Consumer->>Indexer: 调用增量索引器
    end

    rect rgb(200, 230, 201)
        Note over Indexer,Redis: 步骤7-11: 增量索引变更检测
        Indexer->>Indexer: calculate_content_hash(content)
        Indexer->>VersionMgr: detect_changes(doc_id, hashes)
        VersionMgr->>Redis: GET doc_version:{tenant}:{doc_id}
        Redis-->>VersionMgr: current_version
        VersionMgr-->>Indexer: VersionChange(needs_reindex)
    end

    alt 文档未变更(跳过索引)
        rect rgb(200, 230, 201)
            Note over Indexer,Consumer: 快速返回,节省 99% 时间
            Indexer-->>Consumer: {status: "skipped", elapsed: 50ms}
            Consumer->>Kafka: 提交 offset
        end

    else 文档变更或新文档(执行索引)
        rect rgb(255, 224, 178)
            Note over Indexer,Processor: 步骤12-20: 完整文档处理流程
            Indexer->>Processor: process_document(doc_id, ...)

            Processor->>MinIO: download_file(file_path)
            MinIO-->>Processor: file_data (bytes)

            Processor->>Parser: parse(file_data)
            Parser->>Parser: pdfplumber  PyPDF2 降级
            Parser-->>Processor: text (纯文本)

            Processor->>Chunker: chunk(text, doc_id)
            Chunker->>Chunker: RecursiveTextSplitter<br/>chunk_size=500, overlap=50
            Chunker-->>Processor: chunks (List[Dict])

            Processor->>Embedder: embed_batch(texts)
            Embedder->>Embedder: SentenceTransformer.encode()<br/>batch_size=32
            Embedder-->>Processor: embeddings (1024-dim)
        end

        rect rgb(255, 249, 196)
            Note over Processor,Milvus: 步骤21-26: 向量存储(适配器模式)
            Processor->>VSAdapter: VectorStoreClient.insert_batch(data)
            VSAdapter->>VSAdapter: POST /collections/{name}/insert
            VSAdapter->>VSAdapter: 选择后端: Milvus
            VSAdapter->>Milvus: collection.insert(entities)
            Milvus-->>VSAdapter: insert_result
            VSAdapter-->>Processor: HTTP 200 + success
        end

        par 步骤27-30: 异步构建知识图谱(不阻塞)
            Processor->>Processor: asyncio.create_task()
            Processor->>Processor: GraphBuilder.extract(text)
            Processor->>Neo4j: batch_create_nodes(entities)
            Neo4j-->>Processor: success
        end

        rect rgb(200, 230, 201)
            Note over Indexer,Redis: 步骤31-33: 保存新版本
            Processor-->>Indexer: {status, chunks_count, duration}
            Indexer->>VersionMgr: save_version(doc_id, hashes, chunk_count)
            VersionMgr->>Redis: SET doc_version + LPUSH history
            Redis-->>VersionMgr: ok
        end

        Indexer-->>Consumer: {status: "indexed", elapsed: 12s}
        Consumer->>Kafka: 提交 offset
        Consumer->>Kafka: 发送 document.indexed 事件(可选)
    end

    Note over KS,Neo4j: 性能对比:<br/>未变更文档: 50ms(跳过)<br/>新文档/变更: 10-15秒(完整索引)<br/>增量索引节省 60-80% 成本

流程详细说明

  1. 步骤 1-3(文档上传):Knowledge Service 上传文档到 MinIO,获取 file_path,发送 Kafka 事件
  2. 步骤 4-6(事件消费):Kafka Consumer 轮询消息,解析 event_type 和 payload,分发到 IncrementalIndexer
  3. 步骤 7-11(增量检测):计算内容哈希,查询 Redis 获取当前版本,对比哈希值判断是否需要重新索引
  4. 快速路径(未变更):检测到无变更,直接返回 “skipped”,耗时 50ms,节省 99% 时间和成本
  5. 完整路径(变更或新文档)
    • 步骤 12-20:文档下载 → 解析(PDF 双层降级)→ 语义分块 → 批量向量化
    • 步骤 21-26:通过 VectorStoreClient 适配器写入 Milvus,支持多后端切换
    • 步骤 27-30:异步构建知识图谱,不阻塞主流程
    • 步骤 31-33:保存新版本到 Redis,包含哈希值、chunk 数量等元数据
  6. 事件反馈:处理完成后,Consumer 提交 Kafka offset,可选发送 document.indexed 事件通知下游

关键优化点

  • 增量索引:跳过未变更文档,节省 60-80% 成本
  • 批量向量化:batch_size=32,吞吐量提升 10 倍+
  • 适配器模式:解耦服务与存储,支持 Milvus/PgVector 切换
  • 异步图谱:不阻塞主流程,总延迟减少 40%
  • 双层降级:PDF 解析成功率提升到 98%+

时序图:文档索引完整流程(传统流程)

以下是不包含增量索引的传统流程,供对比参考。

sequenceDiagram
    autonumber
    participant KS as Knowledge Service
    participant Kafka
    participant Consumer as Kafka Consumer
    participant Processor as DocumentProcessor
    participant MinIO
    participant Parser as PDFParser
    participant Chunker as DocumentChunker
    participant Embedder as BGE-M3 Embedder
    participant Milvus
    participant GraphBuilder
    participant Neo4j
    participant DB as PostgreSQL

    KS->>MinIO: 上传文档
    MinIO-->>KS: file_path

    KS->>Kafka: 发送document.created事件
    Note over Kafka: topic: document.events<br/>partition: tenant_id哈希

    Kafka->>Consumer: 消费事件
    Consumer->>Processor: process_document(doc_id, tenant_id, file_path)

    Processor->>MinIO: download_file(file_path)
    MinIO-->>Processor: file_data (bytes)

    Processor->>Parser: parse(file_data)
    Parser->>Parser: pdfplumber.open(BytesIO)
    Parser->>Parser: extract_text()逐页提取
    Parser-->>Processor: text (纯文本)

    Processor->>Chunker: chunk(text, doc_id)
    Chunker->>Chunker: RecursiveCharacterTextSplitter<br/>chunk_size=500, overlap=50
    Chunker->>Chunker: generate_chunk_id()<br/>estimate_tokens()
    Chunker-->>Processor: chunks (List[Dict])

    Processor->>Embedder: embed_batch(texts)
    Embedder->>Embedder: SentenceTransformer.encode()<br/>batch_size=32, normalize=True
    Embedder-->>Processor: embeddings (List[1024-dim vectors])

    Processor->>Milvus: insert_batch(chunks + embeddings)
    Note over Milvus: collection: documents<br/>index: HNSW<br/>partition: tenant_id
    Milvus-->>Processor: success

    par 异步构建图谱(不阻塞)
        Processor->>GraphBuilder: extract(text)
        GraphBuilder->>GraphBuilder: NER实体识别
        GraphBuilder->>GraphBuilder: 关系抽取
        GraphBuilder-->>Processor: entities + relationships

        Processor->>GraphBuilder: build(entities, rels, doc_id)
        GraphBuilder->>Neo4j: CREATE (Entity)节点
        GraphBuilder->>Neo4j: CREATE ()-[RELATED_TO]->()
        Neo4j-->>GraphBuilder: success
    end

    Processor->>DB: UPDATE documents SET status='COMPLETED'
    DB-->>Processor: ok

    Processor-->>Consumer: ProcessingResult(success, chunks_count, duration)
    Consumer->>Kafka: 发送document.indexed事件(可选)

时序图详细说明

1. 图意概述(200-300 字)

该时序图展示了 Indexing Service 从文档上传到向量存储的完整索引流程,包含 7 个核心步骤:事件触发(步骤 1-4)、文档下载(步骤 5-6)、文档解析(步骤 7-9)、语义分块(步骤 10-12)、向量化(步骤 13-14)、Milvus 存储(步骤 15-16)、知识图谱构建(步骤 17-22)、状态回写(步骤 23-24)。

流程采用事件驱动架构,Knowledge Service 上传文档后发送 Kafka 事件触发索引。Processor 作为协调器,串联各处理组件(Parser、Chunker、Embedder)。每个组件职责单一,降低耦合,便于测试和扩展。

关键设计决策:(1)图谱构建异步执行,不阻塞向量存储;(2)批量向量化(batch_size=32),提升吞吐量;(3)Milvus 按 tenant_id 分区,支持多租户隔离;(4)状态回写 PostgreSQL,Knowledge Service 查询处理进度。

端到端延迟:PDF(10 页)约 10-15 秒(下载 1s、解析 2s、分块 1s、向量化 3s、存储 2s、图谱 5s 异步)。100 页文档约 30-40 秒。

2. 边界条件(150-200 字)

并发控制:Kafka Consumer 并发数设置为 4,每个 consumer 独立处理文档。Processor 内部使用 asyncio,单个文档的步骤串行执行。Milvus 支持并发插入,但单批次最大 10000 条记录。

超时控制:文档下载超时 30 秒,解析超时 60 秒,向量化超时 120 秒,Milvus 插入超时 30 秒。超时后任务失败,发送重试。图谱构建超时 300 秒,失败不影响向量存储。

幂等性:相同 document_id 重复处理时,Milvus 中旧数据被删除并重新插入。chunk_id 包含内容哈希,内容不变则 ID 不变。

顺序性:文档必须先解析再分块,分块完成才能向量化。向量化和图谱构建可并行(图谱异步)。

3. 异常路径与回退(200-300 字)

文档下载失败(步骤 5-6):MinIO 不可用或 file_path 错误时,重试 3 次(指数退避:2s、4s、8s)。仍失败则任务失败,状态回写 PostgreSQL 为 FAILED,通知用户。

PDF 解析失败(步骤 7-9):pdfplumber 解析失败时,降级到 PyPDF2。两者都失败则任务失败。扫描版 PDF 无法提取文本时,返回空文本,记录警告但不失败。

分块异常(步骤 10-12):文本为空或过短(<100 字符)时,创建单个 chunk。分块逻辑异常时,降级为固定长度分块(500 字符硬切分)。

向量化失败(步骤 13-14):Embedding 模型加载失败或推理超时时,任务失败并重试。GPU OOM 时,降级到 CPU 或减小 batch_size(32→16→8)。

Milvus 存储失败(步骤 15-16):连接失败或插入超时时,重试 3 次。Collection 不存在时,自动创建。主键冲突时,先删除旧数据再插入。

图谱构建失败(步骤 17-22):Neo4j 不可用或实体提取失败时,记录错误但不影响向量存储。图谱视为增强功能,非核心路径。

状态回写失败(步骤 23-24):PostgreSQL 不可用时,任务仍算成功(向量已存储),但状态不一致。后台任务定期同步状态。

4. 性能要点(200-300 字)

关键路径延迟分析(10 页 PDF 为例):

  • 文档下载(步骤 5-6):1-2 秒(MinIO 对象存储,网络 IO)
  • PDF 解析(步骤 7-9):2-3 秒(pdfplumber 逐页提取,CPU 密集)
  • 语义分块(步骤 10-12):0.5-1 秒(纯计算,O(n)复杂度)
  • 向量化(步骤 13-14):3-5 秒(GPU 推理,batch_size=32,约 50 个 chunk)
  • Milvus 存储(步骤 15-16):1-2 秒(批量插入,网络+磁盘 IO)
  • 图谱构建(步骤 17-22):5-10 秒(异步,不阻塞主流程)

端到端延迟:10-15 秒(不含图谱),主要瓶颈在向量化(30-40%)和 PDF 解析(20-30%)。

吞吐量优化:

  • 批量向量化:batch_size=32,相比单条处理快 10 倍+
  • Milvus 批量插入:单批次最多 10000 条,减少网络往返
  • 异步图谱构建:不阻塞向量存储,总延迟减少 30%
  • Kafka 分区:按 tenant_id 哈希,避免热点,支持水平扩展

资源消耗:

  • CPU:解析和分块,占用 60-80%
  • GPU:向量化推理,显存占用 2-3GB(batch_size=32)
  • 内存:单文档峰值 500MB(存储 file_data 和 embeddings)
  • 磁盘 IO:MinIO 下载和 Milvus 写入,IOPS 约 1000

扩展性:单实例处理约 10-20 个文档/分钟(混合文档类型)。生产环境部署 5 个实例,总吞吐量 50-100 个文档/分钟。

5. 兼容性说明(150-200 字)

事件版本:Kafka 事件结构 v1,未来 v2 新增字段(如 priority、callback_url)均为可选,保证向后兼容。Consumer 同时支持 v1 和 v2。

解析器扩展:ParserFactory 动态注册解析器,新增文档格式(如 JSON、CSV)无需修改核心代码。解析器实现 BaseParser 接口即可。

向量模型:默认 BGE-M3(1024 维),未来支持切换模型(如 OpenAI text-embedding-ada-002, 1536 维)。Milvus schema 支持 dynamic schema,向量维度可变。

分块策略:当前 RecursiveCharacterTextSplitter(固定分块),未来支持语义分块(sentence-transformers)和滑动窗口。策略选择通过配置,不影响 API。

灰度策略:新解析器或新分块策略通过配置项feature_flags控制,默认关闭。按 tenant_id 灰度,逐步放量。

增量索引模块

设计目标

增量索引是 Indexing Service 的核心优化特性,通过智能检测文档变更,避免不必要的重复索引,显著降低计算成本和索引延迟。

核心价值

  • 成本减少:跳过未变更文档,节省 60-80% 的向量化成本
  • 延迟降低:无变更文档响应时间从 10-15 秒降至 50-100ms
  • 资源优化:释放 GPU 资源用于真正需要索引的文档

增量索引架构

组件协作图

flowchart TB
    subgraph API["API 层"]
        CheckAPI["/incremental/check<br/>检查是否需要索引"]
        ReindexAPI["/incremental/reindex<br/>强制重新索引"]
    end

    subgraph IncrementalModule["增量索引模块"]
        IncrementalIndexer["IncrementalIndexer<br/>增量索引器"]
        VersionManager["VersionManager<br/>版本管理器"]
    end

    subgraph Storage["存储层"]
        Redis["Redis<br/>版本信息存储"]
        VectorStore["Milvus<br/>向量数据"]
    end

    subgraph Processing["处理层"]
        DocProcessor["DocumentProcessor<br/>文档处理器"]
    end

    CheckAPI --> IncrementalIndexer
    ReindexAPI --> IncrementalIndexer

    IncrementalIndexer --> VersionManager
    IncrementalIndexer --> DocProcessor

    VersionManager --> Redis
    DocProcessor --> VectorStore

    style API fill:#e3f2fd
    style IncrementalModule fill:#fff3e0
    style Storage fill:#e8f5e9
    style Processing fill:#f3e5f5

VersionManager(版本管理器)

职责

  1. 内容哈希计算:使用 SHA256 计算文档内容和元数据哈希值
  2. 变更检测:对比新旧哈希值,判断文档是否变更
  3. 版本存储:将版本信息存储到 Redis,TTL 365 天
  4. 历史追踪:记录文档版本历史(最多保留 100 个版本)

关键代码

class VersionManager:
    def calculate_content_hash(self, content: str) -> str:
        """计算内容哈希(SHA256)"""
        return hashlib.sha256(content.encode('utf-8')).hexdigest()

    async def detect_changes(
        self,
        document_id: str,
        new_content_hash: str,
        new_metadata_hash: str,
        tenant_id: Optional[str] = None
    ) -> VersionChange:
        """检测文档变更"""
        current_version = await self.get_current_version(document_id, tenant_id)

        if not current_version:
            return VersionChange(
                change_type="created",
                needs_reindex=True
            )

        content_changed = new_content_hash != current_version.content_hash
        metadata_changed = new_metadata_hash != current_version.metadata_hash

        if not content_changed and not metadata_changed:
            return VersionChange(
                change_type="unchanged",
                needs_reindex=False
            )

        return VersionChange(
            change_type="updated",
            content_changed=content_changed,
            metadata_changed=metadata_changed,
            needs_reindex=content_changed  # 仅内容变更需要重新索引
        )

版本数据结构

class DocumentVersion(BaseModel):
    document_id: str
    version: int
    content_hash: str           # SHA256 内容哈希
    metadata_hash: str          # SHA256 元数据哈希
    chunk_count: int            # 分块数量
    vector_count: int           # 向量数量
    created_at: float           # 创建时间戳
    updated_at: float           # 更新时间戳
    tenant_id: Optional[str]
    user_id: Optional[str]
    status: str                 # active/deleted/archived

Redis 存储模式

  • 当前版本键doc_version:{tenant_id}:{document_id} → DocumentVersion JSON
  • 历史记录键doc_history:{tenant_id}:{document_id} → List[DocumentVersion](最新在前)
  • TTL:365 天(可配置)

IncrementalIndexer(增量索引器)

职责

  1. 变更检查:调用 VersionManager 检测文档变更类型
  2. 智能分发:根据变更类型决定是否重新索引
  3. 完全重新索引:内容变更时,删除旧数据并重新索引
  4. 元数据更新:仅元数据变更时,只更新元数据,不重新向量化
  5. 统计追踪:记录跳过率、重新索引率等指标

处理决策树

flowchart TD
    Start[收到文档] --> CalcHash[计算内容哈希和元数据哈希]
    CalcHash --> CheckVersion[检查版本]

    CheckVersion --> IsNew{是新文档?}
    IsNew -->|是| FullIndex[完全索引]
    IsNew -->|否| CompareHash{内容变更?}

    CompareHash -->|内容未变更,元数据未变更| Skip[跳过索引]
    CompareHash -->|内容未变更,元数据变更| UpdateMeta[仅更新元数据]
    CompareHash -->|内容变更| FullReindex[完全重新索引]

    FullIndex --> SaveVersion[保存新版本]
    FullReindex --> DeleteOld[删除旧数据]
    DeleteOld --> SaveVersion
    UpdateMeta --> SaveVersion
    Skip --> Return[返回结果]
    SaveVersion --> Return

    style Skip fill:#c8e6c9
    style UpdateMeta fill:#fff9c4
    style FullReindex fill:#ffccbc
    style FullIndex fill:#b3e5fc

关键方法

async def process_with_version_check(
    self,
    document_id: str,
    content: str,
    metadata: Dict,
    tenant_id: Optional[str] = None,
    force_reindex: bool = False
) -> Dict:
    """带版本检查的文档处理"""
    # 1. 计算哈希
    content_hash = self.version_manager.calculate_content_hash(content)
    metadata_hash = self.version_manager.calculate_metadata_hash(metadata)

    # 2. 检测变更
    change = await self.version_manager.detect_changes(
        document_id, content_hash, metadata_hash, tenant_id
    )

    # 3. 根据变更类型处理
    if change.change_type == "unchanged":
        # 无变更,跳过索引
        return {"status": "skipped", "reason": "no_changes"}

    elif change.needs_reindex:
        # 完全重新索引
        return await self._full_reindex(...)
    else:
        # 仅更新元数据
        return await self._update_metadata_only(...)

增量索引时序图

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Indexer as IncrementalIndexer
    participant VersionMgr as VersionManager
    participant Redis
    participant Processor as DocumentProcessor
    participant Milvus

    Client->>Indexer: process_with_version_check(doc_id, content, metadata)

    rect rgb(230, 245, 255)
        Note over Indexer,Redis: 步骤1-4: 版本检查
        Indexer->>Indexer: calculate_content_hash(content)
        Indexer->>Indexer: calculate_metadata_hash(metadata)

        Indexer->>VersionMgr: detect_changes(doc_id, hashes)
        VersionMgr->>Redis: GET doc_version:{tenant}:{doc_id}
        Redis-->>VersionMgr: current_version

        VersionMgr->>VersionMgr: 对比哈希值
        VersionMgr-->>Indexer: VersionChange(type, needs_reindex)
    end

    alt 文档未变更 (unchanged)
        rect rgb(200, 230, 201)
            Note over Indexer,Client: 快速路径: 跳过索引
            Indexer-->>Client: {status: "skipped", elapsed: 50ms}
        end

    else 内容变更 (needs_reindex=true)
        rect rgb(255, 224, 178)
            Note over Indexer,Milvus: 完全重新索引
            Indexer->>Milvus: delete_by_document(doc_id)
            Milvus-->>Indexer: deleted

            Indexer->>Processor: process_document(doc_id, ...)
            Processor->>Processor: 解析->分块->向量化->存储
            Processor-->>Indexer: {chunks_count, duration}

            Indexer->>VersionMgr: save_version(doc_id, hashes, ...)
            VersionMgr->>Redis: SET doc_version + LPUSH history
            Redis-->>VersionMgr: ok

            Indexer-->>Client: {status: "reindexed", chunks: N, elapsed: 12s}
        end

    else 仅元数据变更 (metadata_changed=true)
        rect rgb(255, 249, 196)
            Note over Indexer,Milvus: 增量更新: 仅更新元数据
            Indexer->>Milvus: update_metadata(doc_id, metadata)
            Milvus-->>Indexer: updated

            Indexer->>VersionMgr: save_version(doc_id, hashes, ...)
            VersionMgr->>Redis: SET doc_version
            Redis-->>VersionMgr: ok

            Indexer-->>Client: {status: "updated", operation: "metadata_only", elapsed: 0.5s}
        end
    end

    Note over Indexer,Milvus: 性能对比:<br/>跳过: 50ms<br/>元数据更新: 0.5s<br/>完全重新索引: 10-15s

增量索引性能数值

跳过率(Skip Rate)

场景 跳过率 节省成本 说明
文档重复上传 95-100% 95-100% 相同文档重复上传,完全跳过
定期全量同步 70-80% 70-80% 大部分文档未变更
元数据修正 0% 50% 仅更新元数据,不重新向量化
内容微调(<5%) 0% 0% 内容变更,必须重新索引

响应延迟对比

操作类型 无增量索引 有增量索引 延迟减少
未变更文档 10-15 秒 50-100 ms 减少 99%
元数据变更 10-15 秒 0.5-1 秒 减少 90-95%
内容变更 10-15 秒 10-15 秒 无优化(必须重新索引)

成本节省估算

假设场景:1000 个文档/天上传,其中 70% 未变更,20% 仅元数据变更,10% 内容变更

无增量索引

  • 总索引次数:1000 次
  • 向量化成本:1000 × $0.002 = $2.00/天
  • GPU 占用时长:1000 × 5 秒 = 1.39 小时/天

有增量索引

  • 跳过索引:700 次(无成本)
  • 元数据更新:200 次(无向量化成本)
  • 完全重新索引:100 次
  • 向量化成本:100 × $0.002 = $0.20/天
  • GPU 占用时长:100 × 5 秒 = 0.14 小时/天

节省

  • 成本节省:90%($2.00 → $0.20)
  • GPU 时长节省:90%(1.39h → 0.14h)
  • 平均延迟:从 12 秒降至 1.5 秒(减少 87%)

关键功能点分析

本节罗列 Indexing Service 的关键功能点,说明其设计目的(性能提升、成本减少、准确率提升、减少幻觉等)以及估计的数值效果。

1. 批量向量化(Batch Embedding)

功能描述

使用 batch_size=32 批量调用 SentenceTransformer 模型,相比单条向量化提升吞吐量。

设计目的

  • 性能提升:GPU 并行推理,充分利用硬件并行能力
  • 成本减少:降低 GPU 空闲时间,提升资源利用率

关键代码

# app/core/embedder.py
embeddings = self.model.encode(
    valid_texts,
    batch_size=self.batch_size,  # 32
    convert_to_numpy=True,
)

数值估计

指标 单条处理 批量处理 (batch_size=32) 提升比例
吞吐量 3-5 条/秒 30-50 条/秒 10 倍+
延迟 (50 chunks) 15-20 秒 3-5 秒 减少 70-80%
GPU 利用率 20-30% 80-90% 提升 60%
成本 (每万次向量化) $5-8 $0.5-1 减少 85-90%

2. 语义分块(Semantic Chunking)

功能描述

使用 RecursiveCharacterTextSplitter 按语义边界分块,优先保留段落、句子完整性。

设计目的

  • 准确率提升:保留语义完整性,检索时召回更准确的上下文
  • 减少幻觉:避免句子被截断,LLM 生成时减少上下文不连贯导致的幻觉

关键策略

  • 分隔符优先级:\n\n\n. → 字符级
  • chunk_overlap:50 字符(10%重叠),保证边界 chunk 也能召回

数值估计

指标 固定长度分块 语义分块 提升比例
检索准确率 (Recall@10) 65-70% 78-85% 提升 13-15%
检索精确率 (Precision@10) 55-60% 68-75% 提升 13-15%
LLM 生成幻觉率 15-20% 8-12% 减少 40-50%
用户满意度 70-75% 82-88% 提升 12-13%

3. PDFParser 双层降级策略

功能描述

优先使用 pdfplumber 解析 PDF,文本不足 100 字符时降级到 PyPDF2。

设计目的

  • 准确率提升:pdfplumber 准确率更高(95%+ vs 85%+)
  • 成本减少:降级策略保证兼容性,减少人工介入成本

降级条件

if not text or len(text) < 100:
    # 降级到PyPDF2
    text = await self._parse_with_pypdf2(file_data)

数值估计

指标 仅 PyPDF2 仅 pdfplumber 双层降级
解析成功率 88-92% 93-97% 98-99.5%
文本准确率 85-90% 95-98% 93-96%
平均延迟 (10 页 PDF) 1.5-2 秒 2.5-3 秒 2-2.5 秒
人工介入率 8-12% 3-7% 0.5-2%

4. Chunk Overlap(重叠分块)

功能描述

相邻 chunk 之间重叠 50 字符(10%),保证边界内容在两个 chunk 中都出现。

设计目的

  • 准确率提升:边界关键信息不被遗漏,检索召回率提升
  • 减少幻觉:LLM 生成时,边界 chunk 提供完整上下文,减少理解偏差

重叠示例

Chunk 1: "...关键信息在句子末尾"
Chunk 2: "关键信息在句子末尾...后续内容"

数值估计

指标 无重叠 10%重叠 20%重叠 最优值
检索召回率 (Recall@10) 72-75% 80-85% 82-87% 10%重叠
边界信息召回率 55-60% 88-92% 90-94% 10%重叠
存储成本增加 0% +10% +20% 10%重叠
LLM 幻觉率 12-15% 8-10% 7-9% 10%重叠

权衡分析

  • 10%重叠是性价比最优点:召回率提升 8-10%,存储成本仅增加 10%
  • 20%重叠边际收益递减,存储成本增加 20%,召回率仅提升 2%

5. Milvus HNSW 索引

功能描述

使用 HNSW(Hierarchical Navigable Small World)索引,参数 M=16,efConstruction=200。

设计目的

  • 性能提升:HNSW 检索速度快(O(log n)),相比暴力搜索提升 100 倍+
  • 准确率保持:Recall@10 约 95-98%,接近暴力搜索

索引参数

  • M=16:每个节点的邻居数,平衡速度和准确率
  • efConstruction=200:构建时搜索深度,影响索引质量

数值估计

指标 暴力搜索 (FLAT) HNSW (M=16, ef=200) IVF_FLAT 最优选择
检索延迟 (100万向量, top 10) 800-1200 ms 15-25 ms 50-80 ms HNSW
Recall@10 100% 95-98% 90-95% HNSW
索引构建时间 (100万向量) 0 ms 5-10 分钟 2-5 分钟 IVF_FLAT
内存占用 (100万向量, 1024维) 4 GB 5-6 GB 4.5 GB FLAT
QPS (单机) 1-2 200-300 50-80 HNSW

权衡分析

  • HNSW 在检索速度和准确率之间达到最优平衡
  • Recall@10 下降 2-5%,但延迟降低 98%,QPS 提升 100 倍+

6. 异步图谱构建

功能描述

使用 asyncio.create_task() 异步构建知识图谱,不阻塞向量存储主流程。

设计目的

  • 性能提升:主流程延迟减少 30-40%
  • 用户体验:用户更快看到文档索引完成,图谱后台构建

关键代码

# app/core/document_processor.py
asyncio.create_task(
    self._build_graph(text, document_id, tenant_id)
)
# 不等待图谱构建完成,立即返回

数值估计

指标 同步图谱构建 异步图谱构建 提升比例
主流程延迟 (10 页 PDF) 18-25 秒 10-15 秒 减少 40-50%
用户感知延迟 18-25 秒 10-15 秒 减少 40-50%
系统吞吐量 10-15 文档/分钟 18-25 文档/分钟 提升 60-80%
图谱构建成功率 98-99% 95-97% 降低 2-3%

权衡分析

  • 异步构建牺牲 2-3%图谱成功率(失败不影响向量存储),换取主流程延迟减少 40-50%
  • 图谱构建失败时,仅记录错误日志,不影响用户使用

7. Milvus 按 tenant_id 分区

功能描述

Milvus collection 按 tenant_id 分区,每个租户数据物理隔离。

设计目的

  • 性能提升:检索时只搜索当前租户分区,速度提升 5-10 倍(假设 10 个租户)
  • 成本减少:分区剪枝,减少计算和 IO 开销
  • 数据安全:租户数据物理隔离,防止越权访问

分区策略

# 插入时指定分区
collection.insert(data, partition_name=tenant_id)

# 检索时指定分区
collection.search(query, partition_names=[tenant_id])

数值估计

指标 单分区(不隔离) 多分区(按 tenant_id) 提升比例
检索延迟 (10 租户,每租户 10万向量) 120-150 ms 15-25 ms 减少 85-90%
检索 QPS (单机) 30-50 200-300 提升 5-7 倍
跨租户访问风险 中等(需应用层控制) 低(物理隔离) 安全性提升
索引构建时间 15-20 分钟 5-10 分钟(分区并行) 减少 50-60%

8. Kafka 事件驱动架构

功能描述

Knowledge Service 上传文档后发送 Kafka 事件,Indexing Service 异步消费。

设计目的

  • 性能提升:解耦上游服务,Knowledge Service 不等待索引完成
  • 可靠性提升:Kafka 持久化事件,索引失败可重试
  • 成本减少:异步处理,削峰填谷,降低服务器配置要求

数值估计

指标 同步索引(阻塞) Kafka 异步索引 提升比例
Knowledge Service 响应延迟 12-18 秒 200-500 ms 减少 95%+
系统吞吐量 10-15 文档/分钟 50-100 文档/分钟 提升 4-7 倍
失败重试能力 无(需人工重试) 自动重试 3 次 可靠性提升
峰值处理能力 20 文档/分钟 200 文档/分钟 提升 10 倍

9. CachedEmbedder(向量化缓存)

功能描述

对重复文本的向量化结果进行缓存(LRU,容量 10000 条)。

设计目的

  • 性能提升:缓存命中时延迟降低 50 倍+(50ms → <1ms)
  • 成本减少:减少 GPU 推理次数,降低计算成本

缓存策略

# 缓存键:文本哈希值
text_hash = hash(text)
if text_hash in self.cache:
    return self.cache[text_hash]  # 缓存命中

数值估计

指标 无缓存 缓存 (10000 条) 提升比例
缓存命中率(重复文档场景) 0% 30-50% -
平均向量化延迟 50 ms/条 25-30 ms/条 减少 40-50%
GPU 推理次数(1000 次请求) 1000 次 500-700 次 减少 30-50%
计算成本(每万次向量化) $0.5-1 $0.25-0.5 减少 50%

权衡分析

  • 缓存占用内存 40MB(10000 条 × 1024 维 × 4 字节),成本可忽略
  • 命中率取决于重复文本比例,FAQ 场景命中率可达 60-80%

10. RecursiveCharacterTextSplitter(递归分块)

功能描述

按分隔符优先级递归尝试分块,保证优先按语义边界分块。

设计目的

  • 准确率提升:优先保留段落、句子完整性,提升检索和生成质量
  • 减少幻觉:避免句子被截断,LLM 理解更准确

分隔符优先级

  1. \n\n(段落)→ 2. \n(行)→ 3. (中文句号)→ 4. .(英文句号)→ 5. 空格 → 6. 字符级

数值估计

指标 固定长度分块 递归语义分块 提升比例
句子完整率 65-70% 92-96% 提升 25-30%
段落完整率 50-60% 85-90% 提升 35-40%
检索准确率 (Recall@10) 70-75% 80-85% 提升 10-15%
LLM 生成幻觉率 12-15% 7-10% 减少 30-40%

0. 增量索引(Incremental Indexing)

功能描述

通过内容哈希对比,智能检测文档变更,仅对变更文档执行重新索引。

设计目的

  • 成本减少:跳过未变更文档,避免不必要的向量化计算
  • 性能提升:无变更文档响应时间从 10 秒降至 50ms
  • 资源优化:释放 GPU 资源处理真正需要索引的文档

关键代码

# app/services/version_manager.py
async def detect_changes(self, document_id, new_content_hash, new_metadata_hash):
    current_version = await self.get_current_version(document_id)

    if not current_version:
        return VersionChange(change_type="created", needs_reindex=True)

    content_changed = new_content_hash != current_version.content_hash
    if not content_changed:
        return VersionChange(change_type="unchanged", needs_reindex=False)

    return VersionChange(change_type="updated", needs_reindex=True)

数值估计

指标 无增量索引 有增量索引 提升比例
未变更文档延迟 10-15 秒 50-100 ms 减少 99%
跳过率(重复上传场景) 0% 70-80% -
向量化成本(70%未变更) $2.00/千文档 $0.60/千文档 减少 70%
GPU 占用时长 1.39 小时/千文档 0.42 小时/千文档 减少 70%
平均响应延迟 12 秒 4.2 秒 减少 65%

权衡分析

  • 需要 Redis 存储版本信息(内存占用约 1KB/文档)
  • SHA256 哈希计算开销(10MB 文档约 50-100ms,可忽略)
  • 版本信息 TTL 365 天,长期存储成本低

关键功能点汇总表

功能点 主要目的 核心指标 提升/减少 权衡
增量索引 成本减少、性能提升 平均延迟 减少 65% Redis 内存 +1KB/文档
批量向量化 性能提升、成本减少 吞吐量 提升 10 倍+ 无明显权衡
语义分块 准确率提升、减少幻觉 Recall@10 提升 13-15% 无明显权衡
PDFParser 降级 准确率提升、成本减少 解析成功率 提升到 98-99.5% 延迟略增(+10-20%)
Chunk Overlap 准确率提升、减少幻觉 边界召回率 提升 30-35% 存储成本 +10%
Milvus HNSW 索引 性能提升 检索延迟 减少 98% Recall 下降 2-5%
异步图谱构建 性能提升、用户体验 主流程延迟 减少 40-50% 图谱成功率降低 2-3%
tenant_id 分区 性能提升、数据安全 检索延迟 减少 85-90% 分区管理复杂度 +20%
Kafka 异步 性能提升、可靠性 响应延迟 减少 95%+ 最终一致性(非实时)
向量化缓存 性能提升、成本减少 平均延迟 减少 40-50% 内存占用 +40MB
递归分块 准确率提升、减少幻觉 句子完整率 提升 25-30% 无明显权衡

性能优化优先级建议

基于成本-收益分析,优化优先级从高到低:

  1. 增量索引(P0):成本减少 70%,平均延迟减少 65%,ROI 最高,必须实施
  2. 批量向量化(P0):无权衡,性能提升 10 倍+,必须实施
  3. Kafka 异步(P0):解耦上游,响应延迟减少 95%+,必须实施
  4. Milvus HNSW 索引(P0):检索延迟减少 98%,Recall 下降 2-5%可接受
  5. 语义分块 + Chunk Overlap(P0):准确率提升 13-15%,减少幻觉 30-40%
  6. PDFParser 降级(P1):解析成功率提升到 98%+,降低人工介入
  7. 异步图谱构建(P1):主流程延迟减少 40%,图谱成功率略降可接受
  8. tenant_id 分区(P1):多租户场景必须,检索延迟减少 85%
  9. 向量化缓存(P2):重复文档场景有效,通用场景收益有限
  10. 递归分块(P2):已在语义分块中实施,无额外成本
  11. 其他优化(P3):如自适应分块、智能解析等,边际收益递减

优先级说明

  • P0(必须实施):成本减少 >50% 或性能提升 >5 倍,无重大权衡
  • P1(高优先级):成本减少 30-50% 或性能提升 2-5 倍,权衡可接受
  • P2(中优先级):特定场景有效,边际收益明显
  • P3(低优先级):边际收益递减,酌情实施

配置说明

环境变量

# 服务配置
HOST=0.0.0.0
PORT=8002
WORKERS=4

# Kafka配置
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CONSUMER_GROUP=indexing-service
KAFKA_TOPIC=document.events
KAFKA_AUTO_OFFSET_RESET=earliest

# MinIO配置
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=voiceassistant
MINIO_USE_SSL=false

# Milvus配置
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_COLLECTION=documents
MILVUS_INDEX_TYPE=HNSW
MILVUS_METRIC_TYPE=COSINE

# Neo4j配置
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password

# PostgreSQL配置
DB_HOST=localhost
DB_PORT=5432
DB_NAME=voiceassistant
DB_USER=postgres
DB_PASSWORD=password

# 分块配置
CHUNK_SIZE=500
CHUNK_OVERLAP=50
CHUNK_SEPARATORS=["\n\n", "\n", "。", ".", " "]

# Embedding配置
EMBEDDING_MODEL=BAAI/bge-m3
EMBEDDING_DIM=1024
EMBEDDING_BATCH_SIZE=32
EMBEDDING_MAX_LENGTH=512

Nacos 配置

# indexing-service.yaml
service:
  name: indexing-service
  version: 1.0.0

server:
  host: 0.0.0.0
  port: 8002
  workers: 4

kafka:
  bootstrap_servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
  consumer_group: indexing-service
  topic: document.events
  auto_offset_reset: earliest
  max_poll_records: 10
  session_timeout_ms: 30000

minio:
  endpoint: ${MINIO_ENDPOINT:localhost:9000}
  access_key: ${MINIO_ACCESS_KEY}
  secret_key: ${MINIO_SECRET_KEY}
  bucket: voiceassistant
  use_ssl: false
  download_timeout: 30

milvus:
  host: ${MILVUS_HOST:localhost}
  port: 19530
  collection: documents
  index_type: HNSW
  index_params:
    M: 16
    efConstruction: 200
  metric_type: COSINE
  insert_batch_size: 1000

neo4j:
  uri: bolt://localhost:7687
  user: neo4j
  password: ${NEO4J_PASSWORD}
  max_connection_lifetime: 3600
  max_connection_pool_size: 50

chunking:
  strategy: recursive
  chunk_size: 500
  chunk_overlap: 50
  separators:
    - "\n\n"
    - "\n"
    - '。'
    - '!'
    - '?'
    - '.'
    - '!'
    - '?'
    - ' '
    - ''

  adaptive_strategies:
    code:
      chunk_size: 800
      chunk_overlap: 100
      separators: ["\n\nclass ", "\n\ndef ", "\n\n", "\n"]

    markdown:
      chunk_size: 600
      chunk_overlap: 60
      separators: ["\n## ", "\n### ", "\n\n", "\n"]

    technical:
      chunk_size: 700
      chunk_overlap: 70

embedding:
  model: BAAI/bge-m3
  dimension: 1024
  batch_size: 32
  max_length: 512
  device: cuda # cuda/cpu
  normalize: true

graph_building:
  enabled: true
  async: true
  timeout: 300
  entity_extraction:
    model: ner-model
    confidence_threshold: 0.7

  relationship_extraction:
    model: relation-model
    confidence_threshold: 0.6