VoiceHelper - 14 - Indexing Service
模块概览
Indexing Service(索引服务)是 VoiceHelper 平台 RAG 能力的前置处理服务,负责将上传的文档解析、分块、向量化,并存储到 Milvus 向量数据库和 Neo4j 知识图谱。该服务作为文档索引流水线的协调者,确保文档从原始格式转换为可检索的结构化数据。
核心职责
文档解析
- 多格式支持:PDF、DOCX、TXT、Markdown、HTML、Excel、PPT
- 文本提取:从二进制文件中提取纯文本内容
- 元数据提取:标题、作者、创建时间等元信息
- 布局保留:保持文档结构(段落、标题层级)
语义分块
- RecursiveCharacterTextSplitter:基于语义边界分块
- 自适应分块:根据文档类型调整策略
- Token 估算:中英文混合场景的 Token 计数
- 重叠策略:chunk_overlap 保证上下文连贯性
向量化
- BGE-M3 Embedding:多语言向量化模型
- 批量处理:batch_size=32,提升吞吐量
- 异步处理:asyncio 并发,降低延迟
- 维度标准化:1024 维向量,支持余弦相似度
向量存储
- Milvus 批量插入:高效写入向量数据库
- 索引优化:HNSW 索引,平衡速度和准确性
- 租户隔离:tenant_id 字段实现多租户数据隔离
- 元数据关联:chunk_id、doc_id、content 联合存储
知识图谱构建
- 实体提取:NER 模型识别人名、地名、机构等
- 关系抽取:识别实体间的语义关系
- Neo4j 存储:图数据库存储实体和关系
- 异步构建:不阻塞主流程,后台任务完成
任务队列
- Kafka 消费者:从 document.created 事件消费任务
- 异步处理:每个任务独立协程执行
- 失败重试:指数退避重试,最多 3 次
- 状态回写:处理结果写回 PostgreSQL
技术架构
综合架构全景图
flowchart TB
subgraph External["外部系统"]
KnowledgeService["Knowledge Service<br/>文档上传"]
RetrievalService["Retrieval Service<br/>向量检索"]
end
subgraph KafkaCluster["Kafka 集群"]
Topic["document.events<br/>文档事件流"]
end
subgraph IndexingService["Indexing Service (本服务)"]
subgraph API["API 层"]
HealthAPI["/health, /readiness"]
StatsAPI["/stats"]
IncrementalAPI["/incremental/*"]
end
subgraph Core["核心处理层"]
Consumer["Kafka Consumer<br/>事件消费"]
IncrementalIndexer["IncrementalIndexer<br/>增量索引协调器"]
VersionManager["VersionManager<br/>版本管理"]
DocumentProcessor["DocumentProcessor<br/>文档处理协调器"]
end
subgraph Modules["功能模块"]
ParserFactory["ParserFactory<br/>解析器工厂"]
Parsers["Parsers<br/>PDF/Word/Excel/..."]
Chunker["DocumentChunker<br/>语义分块"]
Embedder["BGE_M3_Embedder<br/>向量化"]
GraphBuilder["GraphBuilder<br/>图谱构建"]
end
end
subgraph Adapters["适配器层"]
MinIOClient["MinIOClient<br/>文档下载"]
VectorStoreClient["VectorStoreClient<br/>向量存储适配器"]
Neo4jClient["Neo4jClient<br/>图数据库适配器"]
end
subgraph StorageAdapter["Vector-Store-Adapter"]
AdapterAPI["适配器 API<br/>/collections/*/insert"]
MilvusImpl["Milvus 实现"]
PgVectorImpl["PgVector 实现"]
end
subgraph Storage["存储层"]
MinIO["MinIO<br/>对象存储"]
Redis["Redis<br/>版本缓存"]
Milvus["Milvus<br/>向量数据库"]
Neo4j["Neo4j<br/>知识图谱"]
end
%% 外部系统连接
KnowledgeService -->|上传文档| MinIO
KnowledgeService -->|发送事件| Topic
RetrievalService -->|检索向量| Milvus
%% Kafka 事件流
Topic --> Consumer
%% API 调用
External -.->|HTTP| HealthAPI
External -.->|HTTP| IncrementalAPI
%% 核心流程
Consumer --> IncrementalIndexer
IncrementalAPI --> IncrementalIndexer
IncrementalIndexer --> VersionManager
IncrementalIndexer --> DocumentProcessor
VersionManager --> Redis
DocumentProcessor --> ParserFactory
ParserFactory --> Parsers
DocumentProcessor --> Chunker
DocumentProcessor --> Embedder
DocumentProcessor --> GraphBuilder
%% 适配器调用
DocumentProcessor --> MinIOClient
DocumentProcessor --> VectorStoreClient
GraphBuilder --> Neo4jClient
MinIOClient --> MinIO
VectorStoreClient --> AdapterAPI
Neo4jClient --> Neo4j
%% 适配器内部
AdapterAPI --> MilvusImpl
AdapterAPI --> PgVectorImpl
MilvusImpl --> Milvus
%% 样式
style External fill:#e1f5fe
style KafkaCluster fill:#fff3e0
style IndexingService fill:#f3e5f5
style API fill:#e8f5e9
style Core fill:#fff9c4
style Modules fill:#fce4ec
style Adapters fill:#e0f2f1
style StorageAdapter fill:#f1f8e9
style Storage fill:#ede7f6
%% 关键路径标注
linkStyle 2,3,5,6,7,8,9,10 stroke:#f00,stroke-width:2px
架构说明:
- 事件驱动:Knowledge Service 上传文档后发送 Kafka 事件,Indexing Service 异步消费
- 增量索引:IncrementalIndexer 通过 VersionManager 检测变更,避免重复索引
- 模块化设计:解析、分块、向量化、图谱构建各司其职,低耦合高内聚
- 适配器模式:VectorStoreClient 通过 vector-store-adapter 统一访问向量存储,支持多后端切换
- 版本管理:Redis 缓存文档版本信息,支持快速变更检测
- 关键路径(红色标注):Kafka → Consumer → IncrementalIndexer → DocumentProcessor → VectorStoreClient → Milvus
整体分层架构图
flowchart TB
subgraph External["外部触发层"]
Upload["Knowledge Service<br/>文档上传"]
Kafka["Kafka<br/>document.created事件"]
end
subgraph IndexingSvc["Indexing Service"]
Consumer["Kafka Consumer<br/>任务消费"]
Processor["DocumentProcessor<br/>处理协调"]
end
subgraph Parsers["解析器"]
ParserFactory["ParserFactory<br/>解析器工厂"]
PDFParser["PDF Parser"]
DOCXParser["DOCX Parser"]
TXTParser["TXT Parser"]
MarkdownParser["Markdown Parser"]
end
subgraph Processing["处理层"]
Chunker["DocumentChunker<br/>语义分块"]
Embedder["BGE-M3 Embedder<br/>向量化"]
GraphBuilder["GraphBuilder<br/>图谱构建"]
end
subgraph Storage["存储"]
MinIO["MinIO<br/>文档下载"]
Milvus["Milvus<br/>向量存储"]
Neo4j["Neo4j<br/>知识图谱"]
PostgreSQL["PostgreSQL<br/>状态回写"]
end
Upload --> Kafka
Kafka --> Consumer
Consumer --> Processor
Processor --> MinIO
MinIO --> ParserFactory
ParserFactory --> PDFParser
ParserFactory --> DOCXParser
ParserFactory --> TXTParser
ParserFactory --> MarkdownParser
ParserFactory --> Chunker
Chunker --> Embedder
Embedder --> Milvus
Chunker --> GraphBuilder
GraphBuilder --> Neo4j
Processor --> PostgreSQL
style External fill:#e3f2fd
style IndexingSvc fill:#fff3e0
style Parsers fill:#f3e5f5
style Processing fill:#e8f5e9
style Storage fill:#e0f2f1
服务分层架构详图
flowchart LR
subgraph API["API层"]
Health["/health<br/>健康检查"]
Readiness["/readiness<br/>就绪检查"]
Stats["/stats<br/>统计信息"]
Trigger["/trigger<br/>手动触发"]
end
subgraph Infrastructure["基础设施层"]
KafkaConsumer["KafkaConsumer<br/>消息消费"]
MinIOClient["MinIOClient<br/>对象存储"]
Neo4jClient["Neo4jClient<br/>图数据库"]
VectorStoreClient["VectorStoreClient<br/>向量数据库"]
end
subgraph Core["核心业务层"]
DocumentProcessor["DocumentProcessor<br/>文档处理协调器"]
subgraph Parsers2["解析模块"]
ParserFactory2["ParserFactory"]
BaseParser["BaseParser接口"]
PDFParser2["PDFParser"]
WordParser["WordParser"]
ExcelParser["ExcelParser"]
HTMLParser["HTMLParser"]
end
subgraph Processing2["处理模块"]
DocumentChunker2["DocumentChunker<br/>固定分块"]
AdaptiveChunker["AdaptiveChunker<br/>自适应分块"]
BGE_M3_Embedder2["BGE_M3_Embedder<br/>向量化"]
CachedEmbedder["CachedEmbedder<br/>缓存向量化"]
end
subgraph GraphModule["图谱模块"]
EntityExtractor["EntityExtractor<br/>实体提取"]
GraphBuilder2["GraphBuilder<br/>图谱构建"]
end
end
API --> DocumentProcessor
KafkaConsumer --> DocumentProcessor
DocumentProcessor --> MinIOClient
DocumentProcessor --> ParserFactory2
DocumentProcessor --> DocumentChunker2
DocumentProcessor --> BGE_M3_Embedder2
DocumentProcessor --> VectorStoreClient
DocumentProcessor --> GraphBuilder2
ParserFactory2 -.实例化.-> BaseParser
BaseParser -.实现.-> PDFParser2
BaseParser -.实现.-> WordParser
BaseParser -.实现.-> ExcelParser
BaseParser -.实现.-> HTMLParser
DocumentChunker2 -.继承.-> AdaptiveChunker
BGE_M3_Embedder2 -.继承.-> CachedEmbedder
GraphBuilder2 --> EntityExtractor
GraphBuilder2 --> Neo4jClient
style API fill:#e3f2fd
style Infrastructure fill:#fff3e0
style Core fill:#e8f5e9
style Parsers2 fill:#f3e5f5
style Processing2 fill:#fce4ec
style GraphModule fill:#e0f2f1
模块间交互与依赖图
graph TB
subgraph "主流程"
A[main.py] --> B[FastAPI App]
B --> C[DocumentEventConsumer]
B --> D[DocumentProcessor]
end
subgraph "DocumentProcessor依赖"
D --> E[ParserFactory]
D --> F[DocumentChunker]
D --> G[BGE_M3_Embedder]
D --> H[GraphBuilder]
D --> I[MinIOClient]
D --> J[VectorStoreClient]
D --> K[Neo4jClient]
end
subgraph "Parser层级"
E --> L[BaseParser接口]
L --> M[PDFParser]
L --> N[WordParser]
L --> O[ExcelParser]
L --> P[HTMLParser]
L --> Q[MarkdownParser]
L --> R[TextParser]
L --> S[PPTParser]
end
subgraph "Chunker层级"
F --> T[RecursiveCharacterTextSplitter<br/>LangChain]
F --> U[AdaptiveChunker]
end
subgraph "Embedder层级"
G --> V[SentenceTransformer<br/>BAAI/bge-m3]
G --> W[CachedEmbedder]
end
subgraph "GraphBuilder依赖"
H --> X[EntityExtractor]
H --> K
end
subgraph "外部库"
M --> Y[pdfplumber]
M --> Z[PyPDF2]
N --> AA[python-docx]
O --> AB[openpyxl]
P --> AC[BeautifulSoup4]
S --> AD[python-pptx]
end
C -.消费事件.-> D
style A fill:#e3f2fd
style B fill:#e3f2fd
style D fill:#fff3e0
style H fill:#e8f5e9
style E fill:#f3e5f5
style F fill:#fce4ec
style G fill:#e1bee7
架构说明
分层架构详解
API 层(接口层)
Indexing Service 提供 6 个核心 HTTP 端点:
/health:健康检查,返回服务状态和版本号/readiness:就绪检查,验证 Kafka Consumer、DocumentProcessor、MinIO、Milvus、Neo4j 连接状态/stats:统计信息,返回已处理文档数、chunk 数、向量数、图谱节点数、增量索引跳过率等/trigger:手动触发文档处理(仅测试环境使用)/incremental/check:增量索引检查,返回文档是否需要重新索引/incremental/reindex:强制重新索引文档
API 层使用 FastAPI 框架,支持异步请求处理。集成中间件链:
- RateLimitMiddleware:速率限制(默认 60 次/分钟,1000 次/小时)
- RequestIDMiddleware:请求 ID 生成,用于链路追踪
- StructuredLoggingMiddleware:结构化日志记录
- CORSMiddleware:跨域请求支持
Prometheus metrics 通过 /metrics 端点暴露,监控维度包含 tenant_id、document_type 等标签,核心指标:
indexing_documents_processed_total:文档处理总数(按状态和租户分组)indexing_document_processing_seconds:文档处理耗时直方图(P50/P95/P99)indexing_chunks_created_total:分块创建总数indexing_vectors_stored_total:向量存储总数indexing_embedding_seconds:向量化耗时直方图indexing_minio_download_seconds:MinIO 下载耗时直方图indexing_errors_total:错误计数(按错误类型分组)
基础设施层(Infrastructure Layer)
该层封装外部系统的客户端,提供统一的接口:
-
KafkaConsumer:消费 Kafka
document.eventstopic,接收文档上传事件。使用confluent_kafka库,配置自动提交偏移量(5 秒间隔),最大轮询间隔 5 分钟。支持事件类型分发(document.uploaded/updated/deleted)。 -
MinIOClient:对象存储客户端,负责文档下载。连接 MinIO S3 兼容 API,下载超时 30 秒,支持多租户文件路径隔离(
tenant_id/date/filename)。 -
VectorStoreClient:Milvus 向量数据库客户端,批量插入向量。连接 Milvus 19530 端口,支持按 tenant_id 分区,单批次最大 10000 条记录,插入超时 30 秒。
-
Neo4jClient:Neo4j 图数据库客户端,批量创建节点和关系。使用 Bolt 协议连接,支持事务(ACID),连接池最大 50 个连接,生命周期 3600 秒。
核心业务层(Core Layer)
DocumentProcessor(文档处理协调器)
核心协调器,编排 6 个步骤的处理流水线:
- 文档下载(
_download_document):调用 MinIOClient 下载文件,返回 bytes - 文档解析(
_parse_document):调用 ParserFactory 选择解析器,提取纯文本 - 语义分块(
_chunk_document):调用 DocumentChunker 分块,生成 chunk_id 和 metadata - 向量化(
_vectorize_chunks):调用 BGE_M3_Embedder 批量向量化 - 向量存储(
_store_vectors):调用 VectorStoreClient 批量插入 Milvus - 图谱构建(
_build_graph):调用 GraphBuilder 异步构建知识图谱
每个步骤独立可测试,失败时记录错误日志,更新统计信息。统计数据包括:total_processed(总处理数)、total_success(成功数)、total_failed(失败数)、total_chunks(总 chunk 数)、total_vectors(总向量数)。
解析模块(Parser Module)
ParserFactory:工厂模式,根据文件扩展名动态选择解析器。扩展名与解析器的映射关系:
.pdf→ PDFParser.docx,.doc→ WordParser.xlsx,.xls→ ExcelParser.html,.htm→ HTMLParser.md→ MarkdownParser.txt→ TextParser.pptx,.ppt→ PPTParser
BaseParser 接口:定义统一的解析接口,包含 2 个方法:
parse(file_data: bytes) -> str:解析文件,返回纯文本extract_metadata(file_data: bytes) -> Dict:提取元数据(标题、作者、页数等)clean_text(text: str) -> str:清理文本(去除多余空格、特殊字符)
PDFParser 实现:双层降级策略,确保解析成功率
- 主解析器:pdfplumber,准确率高(95%+),支持表格和布局识别
- 备用解析器:PyPDF2,兼容性好,处理加密或复杂布局 PDF
- 降级条件:pdfplumber 提取文本少于 100 字符时,切换到 PyPDF2
- 元数据提取:使用 PyPDF2 读取 PDF 元数据(/Title、/Author、/Pages 等)
处理模块(Processing Module)
DocumentChunker(固定分块器)
使用 LangChain 的 RecursiveCharacterTextSplitter,按语义边界递归分块:
-
分块参数:
- chunk_size:500 字符(平衡上下文长度和检索粒度)
- chunk_overlap:50 字符(10%重叠,保证上下文连贯性)
-
分隔符优先级(从高到低):
\n\n(段落分隔)\n(行分隔)。!?;(中文句号). ! ? ;(英文句号)(空格)- ``(字符级切分,兜底策略)
-
Chunk ID 生成:格式
{document_id}_{index}_{text_hash},text_hash 取 MD5 前 8 位,保证内容变化时 ID 变化,支持增量更新。 -
Token 估算:中文字符 × 1.5 + 英文单词 × 1.3,用于控制 LLM 上下文长度。实际应用中可使用 tiktoken 库精确计算。
AdaptiveChunker(自适应分块器)
继承 DocumentChunker,根据文档类型调整分块策略:
| 文档类型 | chunk_size | chunk_overlap | 分隔符优先级 | 适用场景 |
|---|---|---|---|---|
| code | 800 | 100 | \n\nclass, \n\ndef, \n\n, \n |
代码文件,按类/函数分割 |
| markdown | 600 | 60 | \n##, \n###, \n\n, \n |
Markdown 文档,按标题层级分割 |
| technical | 700 | 70 | \n\n, \n, 。, . |
技术文档,保留术语完整性 |
| general | 500 | 50 | 默认分隔符 | 通用文档 |
BGE_M3_Embedder(向量化模块)
基于 Sentence-Transformers 库加载 BGE-M3 模型:
- 模型:BAAI/bge-m3,多语言向量化模型(支持中英文)
- 维度:1024 维,平衡向量表达能力和存储成本
- 批量处理:batch_size=32,相比单条处理提升吞吐量 10 倍+
- 归一化:L2 归一化,支持余弦相似度计算(范围 [-1, 1])
- 设备:优先 GPU(CUDA),降级到 CPU
批量向量化流程:
- 过滤空文本,替换为单空格
- 分批处理(每批 32 条),避免 GPU 内存溢出
- SentenceTransformer.encode() 推理,返回 numpy 数组
- 转换为 List[List[float]],用于 Milvus 插入
CachedEmbedder(缓存向量化)
继承 BGE_M3_Embedder,添加 LRU 缓存:
- 缓存容量:10000 条,约占用 40MB 内存(1024 维 × 4 字节 × 10000)
- 缓存键:文本哈希值(hash(text))
- 命中率:重复文档或相似文本场景,命中率 30-50%
- 性能提升:缓存命中时,延迟从 50ms 降至 <1ms,提升 50 倍+
- 驱逐策略:简单 FIFO,缓存满时删除第一个元素
图谱模块(Graph Module)
EntityExtractor(实体提取器)
基于 NER 模型提取实体:
- 模型:可选 spaCy、BERT-based NER、LLM-based extraction
- 实体类型:人名(PERSON)、地名(LOCATION)、机构(ORGANIZATION)、时间(DATE)、数量(QUANTITY)等
- 置信度阈值:0.7,过滤低置信度实体,减少噪声
GraphBuilder(图谱构建器)
负责构建知识图谱:
- 实体提取(
extract_entities):调用 EntityExtractor 识别实体 - 关系抽取(
extract_relationships):识别实体间的语义关系(依赖句法分析或关系抽取模型) - 图谱存储(
build):批量创建 Neo4j 节点和关系- 节点:
(Entity {name, type, document_id, tenant_id}) - 关系:
(e1)-[RELATED_TO {type, confidence}]->(e2)
- 节点:
- 异步执行:使用
asyncio.create_task(),不阻塞向量存储
存储层说明
Milvus 向量存储
- Collection:documents,按 tenant_id 分区,支持多租户隔离
- 索引类型:HNSW(Hierarchical Navigable Small World),平衡速度和准确性
- 索引参数:M=16(每个节点的邻居数),efConstruction=200(构建时搜索深度)
- 度量类型:COSINE(余弦相似度),范围 [-1, 1]
- 字段 Schema:
- chunk_id(主键,VARCHAR)
- document_id(VARCHAR)
- tenant_id(VARCHAR,分区键)
- content(VARCHAR,原始文本)
- embedding(FloatVector,1024 维)
- metadata(JSON,扩展字段)
Neo4j 知识图谱
- 节点类型:Entity(实体节点)、Document(文档节点)
- 关系类型:RELATED_TO(实体关联)、BELONGS_TO(文档归属)
- 索引:Entity.name、Entity.type、Document.document_id 建立索引,加速查询
- 事务:批量操作使用事务(ACID),确保一致性
PostgreSQL 状态回写
- 表:documents,存储文档元数据和处理状态
- 字段:document_id、tenant_id、status(PENDING/PROCESSING/COMPLETED/FAILED)、chunks_count、created_at、updated_at
- 更新时机:处理完成或失败时,回写 status 和 chunks_count
- 查询接口:Knowledge Service 查询处理进度,展示给用户
数据模型
领域模型 UML 图
classDiagram
class DocumentProcessor {
-ParserFactory parser_factory
-DocumentChunker chunker
-BGE_M3_Embedder embedder
-VectorStoreClient vector_store_client
-MinIOClient minio_client
-Neo4jClient neo4j_client
-GraphBuilder graph_builder
-dict stats
+process_document(document_id, tenant_id, file_path)
-_download_document()
-_parse_document()
-_chunk_document()
-_vectorize_chunks()
-_store_vectors()
-_build_graph()
+get_stats()
}
class ParserFactory {
-dict parsers
+get_parser(file_path)
+register_parser(extension, parser)
}
class BaseParser {
<<interface>>
+parse(file_data)
+extract_metadata(file_data)
+clean_text(text)
}
class PDFParser {
+parse(file_data)
-_parse_with_pdfplumber()
-_parse_with_pypdf2()
+extract_metadata()
}
class DocumentChunker {
-int chunk_size
-int chunk_overlap
-RecursiveCharacterTextSplitter splitter
+chunk(text, document_id)
-_generate_chunk_id()
-_estimate_tokens()
}
class AdaptiveChunker {
-dict strategies
+chunk(text, document_id, doc_type)
}
class BGE_M3_Embedder {
-string model_name
-int dimension
-SentenceTransformer model
+embed_query(text)
+embed_batch(texts)
}
class GraphBuilder {
-Neo4jClient neo4j_client
+extract(text)
+build(entities, relationships, doc_id)
}
class Chunk {
+string id
+string content
+int index
+int tokens
+dict metadata
}
class ProcessingResult {
+string document_id
+string status
+int chunks_count
+float duration
+string error
+is_success()
}
DocumentProcessor "1" *-- "1" ParserFactory
DocumentProcessor "1" *-- "1" DocumentChunker
DocumentProcessor "1" *-- "1" BGE_M3_Embedder
DocumentProcessor "1" *-- "1" GraphBuilder
DocumentProcessor ..> ProcessingResult : returns
ParserFactory ..> BaseParser : creates
BaseParser <|.. PDFParser : implements
DocumentChunker <|-- AdaptiveChunker : extends
DocumentChunker ..> Chunk : creates
数据结构说明
Chunk 结构
| 字段 | 类型 | 说明 |
|---|---|---|
| id | string | chunk 唯一标识符,格式:{doc_id}_{index}_{hash} |
| content | string | chunk 文本内容,长度 500 字符(可配置) |
| index | int | chunk 在文档中的序号,从 0 开始 |
| tokens | int | 估算的 Token 数量,用于控制 LLM 上下文长度 |
| metadata | dict | 元数据,包含 document_id、chunk_index、total_chunks |
ProcessingResult 结构
| 字段 | 类型 | 说明 |
|---|---|---|
| document_id | string | 文档 ID |
| status | string | 处理状态:success/failed |
| chunks_count | int | 生成的 chunk 数量 |
| duration | float | 处理耗时(秒) |
| error | string | 错误信息(仅 status=failed 时) |
调用链路深度分析
上游接口:Kafka 事件触发
Indexing Service 的主流程由 Kafka 事件驱动,从 Knowledge Service 上传文档开始。
入口点:DocumentEventConsumer
# app/infrastructure/kafka_consumer.py
class DocumentEventConsumer:
async def start(self):
"""启动Kafka消费者"""
self.consumer = Consumer(self.config)
self.consumer.subscribe([self.topic]) # document.events
self._running = True
await self._consume_loop()
关键代码路径:
- main.py 启动阶段:
# main.py (行 56-89)
@asynccontextmanager
async def lifespan(app: FastAPI):
kafka_consumer = DocumentEventConsumer()
document_processor = DocumentProcessor()
# 异步启动Kafka消费者
asyncio.create_task(kafka_consumer.start())
yield
# 关闭资源
await kafka_consumer.stop()
await document_processor.cleanup()
- 消息消费循环:
# app/infrastructure/kafka_consumer.py (行 94-120)
async def _consume_loop(self):
while self._running:
msg = self.consumer.poll(timeout=1.0)
if msg and not msg.error():
await self._handle_message(msg)
- 事件分发:
# app/infrastructure/kafka_consumer.py (行 121-153)
async def _handle_message(self, msg):
event = json.loads(msg.value().decode("utf-8"))
event_type = event.get("event_type") # document.uploaded
payload = event.get("payload", {})
# 分发到注册的处理器
handlers = self.handlers.get(event_type, [])
for handler in handlers:
await handler(payload)
模块内部时序图
1. Kafka 消费与事件分发时序图
sequenceDiagram
autonumber
participant Main as main.py
participant App as FastAPI App
participant Consumer as DocumentEventConsumer
participant Kafka as Kafka Broker
participant Handler as document_uploaded_handler
participant Processor as DocumentProcessor
Main->>App: uvicorn.run(app)
App->>App: lifespan startup
App->>Consumer: __init__()
App->>Processor: __init__()
App->>Consumer: asyncio.create_task(start())
Consumer->>Kafka: subscribe("document.events")
loop 消费循环
Consumer->>Kafka: poll(timeout=1.0)
Kafka-->>Consumer: message
Consumer->>Consumer: _handle_message(msg)
Consumer->>Consumer: json.loads(msg.value())
Consumer->>Consumer: 提取event_type和payload
Consumer->>Handler: await handler(payload)
Handler->>Processor: process_document(doc_id, tenant_id, file_path)
Processor-->>Handler: ProcessingResult
Handler-->>Consumer: success
end
Note over Consumer,Kafka: 轮询间隔1秒,最大轮询间隔5分钟<br/>自动提交偏移量(5秒间隔)
时序图说明:
- 步骤 1-5(初始化):FastAPI 应用启动时,初始化 Consumer 和 Processor,异步启动消费循环
- 步骤 6-7(订阅):Consumer 连接 Kafka,订阅
document.eventstopic - 步骤 8-14(消费循环):轮询消息(1 秒超时),解析 JSON,分发到处理器
- 并发控制:消费循环串行处理,但 DocumentProcessor 内部使用 asyncio 并发
- 容错机制:poll 超时或解析失败时,记录错误日志,继续下一轮
- 性能特征:单个 Consumer 吞吐量约 10-20 条/秒(受 DocumentProcessor 处理速度限制)
2. DocumentProcessor 主流程时序图
sequenceDiagram
autonumber
participant Processor as DocumentProcessor
participant MinIO as MinIOClient
participant Parser as ParserFactory
participant PDF as PDFParser
participant Chunker as DocumentChunker
participant Embedder as BGE_M3_Embedder
participant Vector as VectorStoreClient
participant Graph as GraphBuilder
participant Neo4j as Neo4jClient
Processor->>Processor: process_document(doc_id, tenant_id, file_path)
Processor->>Processor: start_time = time.time()
rect rgb(230, 240, 255)
Note over Processor,MinIO: 步骤1:下载文档
Processor->>MinIO: _download_document(file_path)
MinIO->>MinIO: download_file(file_path)
MinIO-->>Processor: file_data (bytes)
end
rect rgb(255, 240, 230)
Note over Processor,PDF: 步骤2:解析文档
Processor->>Parser: _parse_document(file_data, file_path)
Parser->>Parser: get_parser(file_path)
Parser->>PDF: parse(file_data)
PDF->>PDF: _parse_with_pdfplumber(file_data)
alt pdfplumber成功且文本>100字符
PDF-->>Parser: text
else 降级到PyPDF2
PDF->>PDF: _parse_with_pypdf2(file_data)
PDF-->>Parser: text
end
Parser->>Parser: clean_text(text)
Parser-->>Processor: text (纯文本)
end
rect rgb(230, 255, 230)
Note over Processor,Chunker: 步骤3:语义分块
Processor->>Chunker: _chunk_document(text, doc_id)
Chunker->>Chunker: splitter.split_text(text)
loop 遍历每个chunk
Chunker->>Chunker: _generate_chunk_id(doc_id, index, text)
Chunker->>Chunker: _estimate_tokens(text)
end
Chunker-->>Processor: chunks (List[Dict])
end
rect rgb(255, 230, 255)
Note over Processor,Embedder: 步骤4:批量向量化
Processor->>Embedder: _vectorize_chunks(chunks)
Embedder->>Embedder: embed_batch(texts)
loop 分批处理 (batch_size=32)
Embedder->>Embedder: model.encode(batch)
end
Embedder-->>Processor: embeddings (List[1024-dim])
end
rect rgb(230, 255, 255)
Note over Processor,Vector: 步骤5:存储向量
Processor->>Vector: _store_vectors(chunks, embeddings, doc_id, tenant_id)
Vector->>Vector: 准备数据 (chunk_id, content, embedding, metadata)
Vector->>Vector: insert_batch(data)
Vector-->>Processor: success
end
par 步骤6:异步构建图谱(不阻塞)
Processor->>Graph: asyncio.create_task(_build_graph(...))
Graph->>Graph: extract(text)
Graph->>Graph: _extract_entities(text)
Graph->>Graph: _extract_relationships(text, entities)
Graph->>Neo4j: batch_create_nodes(entities)
Graph->>Neo4j: batch_create_relationships(relationships)
Neo4j-->>Graph: success
end
Processor->>Processor: 更新统计信息
Processor->>Processor: duration = time.time() - start_time
Processor-->>Processor: return {status, doc_id, chunks_count, duration}
调用链路详解:
步骤 1:文档下载(_download_document)
- 调用栈:
DocumentProcessor.process_document()→DocumentProcessor._download_document()→MinIOClient.download_file() - 关键代码:
# app/core/document_processor.py (行 111-114)
async def _download_document(self, file_path: str) -> bytes:
logger.info(f"Downloading document: {file_path}")
return await self.minio_client.download_file(file_path)
- 性能特征:网络 IO,延迟 1-2 秒(10MB 文件),取决于 MinIO 与服务间的网络带宽
- 容错:下载失败时抛出异常,外层捕获并记录到 failed 统计
步骤 2:文档解析(_parse_document)
- 调用栈:
DocumentProcessor._parse_document()→ParserFactory.get_parser()→PDFParser.parse() - 关键代码:
# app/core/document_processor.py (行 116-128)
async def _parse_document(self, file_data: bytes, file_path: str) -> str:
logger.info(f"Parsing document: {file_path}")
parser = self.parser_factory.get_parser(file_path)
text = await parser.parse(file_data)
logger.info(f"Parsed document, text length: {len(text)} chars")
return text
- PDFParser 降级策略:
# app/core/parsers/pdf_parser.py (行 18-33)
async def parse(self, file_data: bytes, **kwargs) -> str:
try:
# 主解析器:pdfplumber
text = await self._parse_with_pdfplumber(file_data)
if not text or len(text) < 100:
# 降级到PyPDF2
logger.warning("pdfplumber failed, trying PyPDF2")
text = await self._parse_with_pypdf2(file_data)
return self.clean_text(text)
except Exception as e:
logger.error(f"Failed to parse PDF: {e}")
raise
- 性能特征:CPU 密集,10 页 PDF 约 2-3 秒,100 页约 15-20 秒
- 准确率:pdfplumber 准确率 95%+,PyPDF2 准确率 85%+(降级场景)
步骤 3:语义分块(_chunk_document)
- 调用栈:
DocumentProcessor._chunk_document()→DocumentChunker.chunk()→RecursiveCharacterTextSplitter.split_text() - 关键代码:
# app/core/document_processor.py (行 130-138)
async def _chunk_document(self, text: str, document_id: str) -> List[Dict]:
logger.info(f"Chunking document: {document_id}")
chunks = await self.chunker.chunk(text, document_id)
logger.info(f"Created {len(chunks)} chunks")
return chunks
- 分块实现:
# app/core/chunker.py (行 40-77)
async def chunk(self, text: str, document_id: str) -> List[Dict]:
if not text or not text.strip():
return []
# LangChain分块
chunks = self.splitter.split_text(text)
# 构建结果
result = []
for i, chunk_text in enumerate(chunks):
chunk_id = self._generate_chunk_id(document_id, i, chunk_text)
result.append({
"id": chunk_id,
"content": chunk_text,
"index": i,
"tokens": self._estimate_tokens(chunk_text),
"metadata": {
"document_id": document_id,
"chunk_index": i,
"total_chunks": len(chunks),
},
})
return result
- 性能特征:纯计算,O(n)复杂度,10000 字符约 0.5-1 秒
- 分块效果:500 字符/chunk,10 页 PDF(约 5000 字)生成 10-15 个 chunks
步骤 4:批量向量化(_vectorize_chunks)
- 调用栈:
DocumentProcessor._vectorize_chunks()→BGE_M3_Embedder.embed_batch()→SentenceTransformer.encode() - 关键代码:
# app/core/document_processor.py (行 140-152)
async def _vectorize_chunks(self, chunks: List[Dict]) -> List[List[float]]:
logger.info(f"Vectorizing {len(chunks)} chunks")
texts = [chunk["content"] for chunk in chunks]
embeddings = await self.embedder.embed_batch(texts)
logger.info(f"Generated {len(embeddings)} embeddings")
return embeddings
- 批量向量化实现:
# app/core/embedder.py (行 57-84)
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
if not texts:
return []
# 过滤空文本
valid_texts = [text if text and text.strip() else " " for text in texts]
# 批量生成向量
embeddings = self.model.encode(
valid_texts,
batch_size=self.batch_size, # 32
show_progress_bar=len(valid_texts) > 100,
convert_to_numpy=True,
)
return embeddings.tolist()
- 性能特征:GPU 推理,batch_size=32,50 个 chunks 约 3-5 秒(GPU),CPU 约 30-50 秒
- 吞吐量提升:批量处理相比单条提升 10 倍+(GPU 并行)
步骤 5:向量存储(_store_vectors)
- 调用栈:
DocumentProcessor._store_vectors()→VectorStoreClient.insert_batch() - 关键代码:
# app/core/document_processor.py (行 154-175)
async def _store_vectors(self, chunks: List[Dict], embeddings: List[List[float]],
document_id: str, tenant_id: str = None):
logger.info(f"Storing {len(embeddings)} vectors to Milvus")
# 准备数据
data = []
for chunk, embedding in zip(chunks, embeddings):
data.append({
"chunk_id": chunk["id"],
"document_id": document_id,
"tenant_id": tenant_id or "default",
"content": chunk["content"],
"embedding": embedding,
"metadata": chunk.get("metadata", {}),
})
# 批量插入
await self.vector_store_client.insert_batch(data)
- 性能特征:网络 IO + 磁盘 IO,50 个向量约 1-2 秒,单批次最大 10000 条
- 索引更新:插入后 Milvus 异步更新 HNSW 索引,不阻塞插入操作
步骤 6:图谱构建(_build_graph,异步)
- 调用栈:
asyncio.create_task(DocumentProcessor._build_graph())→GraphBuilder.extract()→GraphBuilder.build() - 关键代码:
# app/core/document_processor.py (行 77-80)
asyncio.create_task(
self._build_graph(text, document_id, tenant_id)
)
# app/core/document_processor.py (行 176-196)
async def _build_graph(self, text: str, document_id: str, tenant_id: str = None):
try:
entities, relationships = await self.graph_builder.extract(text)
await self.graph_builder.build(
entities=entities,
relationships=relationships,
document_id=document_id,
tenant_id=tenant_id,
)
except Exception as e:
logger.error(f"Error building graph: {e}", exc_info=True)
- 性能特征:异步执行,不阻塞主流程,10 页 PDF 约 5-10 秒
- 容错:图谱构建失败不影响向量存储,仅记录错误日志
3. PDFParser 解析流程时序图
sequenceDiagram
autonumber
participant Processor as DocumentProcessor
participant Factory as ParserFactory
participant PDF as PDFParser
participant Plumber as pdfplumber
participant PyPDF as PyPDF2
Processor->>Factory: get_parser(file_path)
Factory->>Factory: 提取扩展名: .pdf
Factory->>PDF: __init__()
Factory-->>Processor: PDFParser实例
Processor->>PDF: parse(file_data)
rect rgb(230, 255, 230)
Note over PDF,Plumber: 主解析器:pdfplumber
PDF->>Plumber: _parse_with_pdfplumber(file_data)
Plumber->>Plumber: pdfplumber.open(BytesIO(file_data))
loop 遍历每一页
Plumber->>Plumber: page.extract_text()
Plumber->>Plumber: text_parts.append(text)
end
Plumber->>Plumber: "\n\n".join(text_parts)
Plumber-->>PDF: text
PDF->>PDF: 检查文本长度
alt 文本长度 >= 100字符
Note over PDF: pdfplumber成功
else 文本过短或为空
rect rgb(255, 230, 230)
Note over PDF,PyPDF: 降级解析器:PyPDF2
PDF->>PyPDF: _parse_with_pypdf2(file_data)
PyPDF->>PyPDF: PyPDF2.PdfReader(BytesIO(file_data))
loop 遍历每一页
PyPDF->>PyPDF: page.extract_text()
PyPDF->>PyPDF: text_parts.append(text)
end
PyPDF->>PyPDF: "\n\n".join(text_parts)
PyPDF-->>PDF: text
end
end
end
PDF->>PDF: clean_text(text)
PDF->>PDF: 去除多余空格和特殊字符
PDF-->>Processor: 清理后的text
Note over PDF,PyPDF: 降级策略:pdfplumber失败或文本<100字符时切换
Note over PDF,PyPDF: 准确率:pdfplumber 95%+,PyPDF2 85%+
Note over PDF,PyPDF: 性能:10页PDF 2-3秒,100页PDF 15-20秒
PDFParser 关键特性:
- 双层降级:pdfplumber → PyPDF2,保证解析成功率 98%+
- 降级阈值:文本长度 < 100 字符触发降级
- 性能开销:pdfplumber 稍慢但准确率高,PyPDF2 更快但兼容性有限
- 错误处理:两个解析器都失败时,抛出异常,外层记录 failed 统计
4. DocumentChunker 分块流程时序图
sequenceDiagram
autonumber
participant Processor as DocumentProcessor
participant Chunker as DocumentChunker
participant Splitter as RecursiveCharacterTextSplitter
Processor->>Chunker: chunk(text, document_id)
Chunker->>Chunker: 检查文本是否为空
alt 文本为空
Chunker-->>Processor: []
end
rect rgb(230, 240, 255)
Note over Chunker,Splitter: LangChain递归分块
Chunker->>Splitter: split_text(text)
Splitter->>Splitter: 尝试分隔符: "\n\n"
alt 分块成功且长度<=chunk_size
Note over Splitter: 段落级分块
else 继续尝试下一个分隔符
Splitter->>Splitter: 尝试分隔符: "\n"
alt 分块成功
Note over Splitter: 行级分块
else 继续尝试
Splitter->>Splitter: 尝试句号: "。", "."
alt 分块成功
Note over Splitter: 句子级分块
else 最终兜底
Splitter->>Splitter: 字符级硬切分
end
end
end
Splitter->>Splitter: 应用chunk_overlap (50字符)
Splitter-->>Chunker: chunks (List[str])
end
rect rgb(255, 240, 230)
Note over Chunker: 构建Chunk结构
loop 遍历每个chunk_text
Chunker->>Chunker: _generate_chunk_id(doc_id, index, text)
Chunker->>Chunker: MD5哈希前8位
Chunker->>Chunker: _estimate_tokens(text)
Chunker->>Chunker: 中文字符 × 1.5 + 英文单词 × 1.3
Chunker->>Chunker: 构建chunk对象
Note over Chunker: {id, content, index, tokens, metadata}
end
end
Chunker-->>Processor: chunks (List[Dict])
Note over Chunker,Splitter: 分块策略:优先语义边界,兜底字符切分
Note over Chunker,Splitter: chunk_size=500, overlap=50 (10%重叠)
Note over Chunker,Splitter: 10页PDF(5000字) → 10-15 chunks
DocumentChunker 关键特性:
- 递归分块:按分隔符优先级递归尝试,保证语义完整性
- 重叠策略:10%重叠(50/500),保证上下文连贯,检索时边界 chunk 也能召回
- Chunk ID 生成:
{doc_id}_{index}_{hash},hash 保证内容变化时 ID 变化 - Token 估算:简单估算公式,实际应用可使用 tiktoken 精确计算
- 性能:纯计算,O(n)复杂度,10000 字符约 0.5-1 秒
5. BGE_M3_Embedder 向量化流程时序图
sequenceDiagram
autonumber
participant Processor as DocumentProcessor
participant Embedder as BGE_M3_Embedder
participant Model as SentenceTransformer
participant GPU as CUDA Device
Processor->>Embedder: embed_batch(texts)
Embedder->>Embedder: 过滤空文本
loop 遍历texts
alt text为空或空白
Embedder->>Embedder: 替换为单空格 " "
end
end
rect rgb(230, 240, 255)
Note over Embedder,GPU: 批量向量化(batch_size=32)
loop 分批处理
Embedder->>Embedder: 提取batch (32条)
Embedder->>Model: encode(batch, batch_size=32)
Model->>Model: 文本tokenize
Model->>Model: 截断或padding到max_length (512)
Model->>GPU: 前向推理
GPU->>GPU: BERT模型推理
GPU->>GPU: Pooling (CLS或Mean)
GPU->>GPU: L2归一化
GPU-->>Model: embeddings (32 × 1024)
Model-->>Embedder: batch_embeddings
Embedder->>Embedder: all_embeddings.extend(batch)
end
end
Embedder->>Embedder: embeddings.tolist()
Embedder-->>Processor: List[List[float]]
Note over Embedder,GPU: 批量处理:32条/批,相比单条提升10倍+
Note over Embedder,GPU: 性能:50 chunks约3-5秒(GPU),30-50秒(CPU)
Note over Embedder,GPU: 显存占用:2-3GB (batch_size=32)
BGE_M3_Embedder 关键特性:
- 批量处理:batch_size=32,GPU 并行推理,吞吐量提升 10 倍+
- 归一化:L2 归一化,支持余弦相似度(范围 [-1, 1])
- 设备选择:优先 GPU(CUDA),降级到 CPU
- 内存管理:分批处理,避免 GPU OOM(batch_size 太大时)
- 性能瓶颈:向量化是主流程中最耗时的步骤(占 30-40%)
6. VectorStoreClient 存储流程时序图
sequenceDiagram
autonumber
participant Processor as DocumentProcessor
participant Client as VectorStoreClient
participant Milvus as Milvus Server
Processor->>Client: insert_batch(data)
Client->>Client: 准备插入数据
loop 遍历data
Client->>Client: 构建记录
Note over Client: {chunk_id, doc_id, tenant_id,<br/>content, embedding, metadata}
end
Client->>Client: 检查batch大小
alt batch_size > 10000
Client->>Client: 拆分为多个批次
end
rect rgb(230, 240, 255)
Note over Client,Milvus: Milvus批量插入
loop 每个批次
Client->>Milvus: collection.insert(entities)
Milvus->>Milvus: 根据tenant_id选择分区
Milvus->>Milvus: 写入向量数据
Milvus->>Milvus: 更新主键索引
Milvus->>Milvus: 触发HNSW索引增量更新
Milvus-->>Client: insert_result
Client->>Client: 检查insert_result.err_count
alt err_count > 0
Client->>Client: 记录错误
Note over Client: 主键冲突或数据格式错误
end
end
end
Client->>Milvus: collection.flush()
Note over Client,Milvus: 确保数据持久化到磁盘
Client-->>Processor: success
Note over Client,Milvus: 批量插入:单批次最大10000条
Note over Client,Milvus: 性能:50向量约1-2秒,1000向量约10-15秒
Note over Client,Milvus: 索引更新:异步增量更新,不阻塞插入
VectorStoreClient 关键特性:
- 批量插入:单批次最大 10000 条,超过自动拆分
- 分区隔离:按 tenant_id 分区,支持多租户数据隔离
- 索引更新:HNSW 索引异步增量更新,插入后立即可查询(但 recall 可能略低,直到索引更新完成)
- 容错:主键冲突时报错,支持先删除旧数据再插入(幂等性)
- 性能:网络 IO + 磁盘 IO,50 向量约 1-2 秒
7. VectorStoreClient 适配器模式时序图
VectorStoreClient 通过 vector-store-adapter 服务统一访问 Milvus/PgVector,实现解耦和多后端支持。
sequenceDiagram
autonumber
participant Processor as DocumentProcessor
participant Client as VectorStoreClient<br/>(algo/common)
participant Adapter as vector-store-adapter<br/>HTTP Service
participant Milvus as Milvus Server
Processor->>Client: insert_batch(data_list)
rect rgb(230, 240, 255)
Note over Client,Adapter: 步骤1-2: HTTP调用适配器服务
Client->>Client: 准备请求JSON
Client->>Client: {backend: "milvus", data: [...]}
Client->>Adapter: POST /collections/{name}/insert
Note over Adapter: 接收请求并验证
end
rect rgb(255, 240, 230)
Note over Adapter,Milvus: 步骤3-6: 适配器转换为Milvus调用
Adapter->>Adapter: 选择后端实现(Milvus/PgVector)
Adapter->>Adapter: 转换数据格式
Adapter->>Milvus: collection.insert(entities)
Note over Milvus: 批量写入向量数据<br/>更新HNSW索引
Milvus-->>Adapter: insert_result
Adapter->>Adapter: 检查错误
end
rect rgb(230, 255, 230)
Note over Adapter,Client: 步骤7-8: 返回结果
Adapter-->>Client: HTTP 200 + JSON result
Client->>Client: 解析响应
Client-->>Processor: success
end
Note over Client,Milvus: 适配器模式优势:<br/>1. 解耦服务与存储<br/>2. 支持多后端切换<br/>3. 统一错误处理<br/>4. 集中式监控
VectorStoreClient 适配器模式优势:
- 解耦:Indexing Service 不直接依赖 Milvus SDK,降低耦合
- 多后端支持:通过配置切换 Milvus 或 PgVector,无需修改代码
- 统一接口:所有向量操作通过统一的 REST API
- 集中式监控:vector-store-adapter 统一记录指标和日志
- 灰度切换:可按 tenant_id 灰度切换后端,实现平滑迁移
关键代码:
# algo/common/vector_store_client.py
class VectorStoreClient:
def __init__(self, base_url: str = "http://vector-store-adapter:8003"):
self.base_url = base_url
self.backend = "milvus" # 或 "pgvector"
self.client = httpx.AsyncClient(base_url=base_url, timeout=30.0)
async def insert_batch(self, data_list: List[Dict]):
"""批量插入向量"""
response = await self.client.post(
f"/collections/{self.collection_name}/insert",
json={"backend": self.backend, "data": data_list}
)
response.raise_for_status()
return response.json()
async def search(self, query_vector, top_k=10, tenant_id=None):
"""向量检索"""
response = await self.client.post(
f"/collections/{self.collection_name}/search",
json={
"backend": self.backend,
"query_vector": query_vector,
"top_k": top_k,
"tenant_id": tenant_id
}
)
return response.json()["results"]
数据流转过程:
- DocumentProcessor 准备向量数据(chunk_id、content、embedding、metadata)
- VectorStoreClient 封装为 HTTP 请求发送到 vector-store-adapter
- vector-store-adapter 根据 backend 参数选择 Milvus 或 PgVector 实现
- 适配器 转换数据格式,调用 Milvus SDK 的
collection.insert() - Milvus 写入向量数据,触发 HNSW 索引增量更新
- 适配器 检查 insert_result.err_count,返回 HTTP 响应
- VectorStoreClient 解析响应,返回成功或失败状态
- DocumentProcessor 记录统计信息(total_vectors)
性能特征:
- HTTP 开销:增加 5-10ms 延迟(相比直接 SDK 调用)
- 批量优化:单次插入 50 个向量,HTTP 开销占比 <1%
- 重试机制:适配器内置重试(3 次,指数退避),提升可靠性
- 超时控制:默认 30 秒超时,大批量插入可调整
API 详解
1. 处理文档(内部 API)
接口信息
- 调用方式:Kafka 消费者自动触发
- 协议:事件驱动
- 幂等性:是(相同 document_id 重复处理覆盖旧数据)
Kafka 事件结构
{
"document_id": "doc_123",
"tenant_id": "tenant_abc",
"user_id": "user_456",
"file_path": "tenant_abc/2025-01-27/doc.pdf",
"collection_id": "coll_789",
"created_at": "2025-01-27T10:00:00Z"
}
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| document_id | string | 是 | 文档唯一标识符 |
| tenant_id | string | 是 | 租户 ID,用于数据隔离 |
| user_id | string | 是 | 上传用户 ID |
| file_path | string | 是 | MinIO 中的文件路径 |
| collection_id | string | 是 | 所属集合 ID |
| created_at | string | 是 | 文档创建时间(ISO8601) |
处理流程
async def process_document(self, document_id: str, tenant_id: str,
user_id: str, file_path: str) -> Dict:
"""
处理文档(完整流程)
"""
start_time = time.time()
try:
# 1. 下载文档
file_data = await self._download_document(file_path)
# MinIO下载,返回bytes
# 2. 解析文档
text = await self._parse_document(file_data, file_path)
# 根据扩展名选择解析器(PDF/DOCX/TXT等)
# 提取纯文本内容
# 3. 分块
chunks = await self._chunk_document(text, document_id)
# RecursiveCharacterTextSplitter语义分块
# 生成chunk_id、估算tokens
# 4. 向量化
embeddings = await self._vectorize_chunks(chunks)
# BGE-M3批量向量化
# 返回List[List[float]],每个1024维向量
# 5. 存储到Milvus
await self._store_vectors(chunks, embeddings, document_id, tenant_id)
# 批量插入Milvus
# 字段:chunk_id, doc_id, tenant_id, content, embedding, metadata
# 6. 构建知识图谱(异步,不阻塞)
asyncio.create_task(
self._build_graph(text, document_id, tenant_id)
)
# 提取实体和关系
# 存储到Neo4j
duration = time.time() - start_time
return {
"status": "success",
"document_id": document_id,
"chunks_count": len(chunks),
"duration": duration,
}
except Exception as e:
# 记录错误日志
# 更新文档状态为FAILED
return {
"status": "failed",
"document_id": document_id,
"error": str(e),
}
文档解析详细实现
async def _parse_document(self, file_data: bytes, file_path: str) -> str:
"""
解析文档,提取文本内容
"""
# 1. 根据文件扩展名选择解析器
parser = self.parser_factory.get_parser(file_path)
# 支持:.pdf, .docx, .txt, .md, .html, .xlsx, .pptx
# 2. 解析文档
text = await parser.parse(file_data)
# 每个解析器实现BaseParser接口
return text
# PDF解析器示例
class PDFParser(BaseParser):
async def parse(self, file_data: bytes) -> str:
"""解析PDF文档"""
try:
# 优先使用pdfplumber(更准确)
text = await self._parse_with_pdfplumber(file_data)
if not text or len(text) < 100:
# 降级使用PyPDF2
text = await self._parse_with_pypdf2(file_data)
return self.clean_text(text)
except Exception as e:
# 记录错误并抛出
raise
async def _parse_with_pdfplumber(self, file_data: bytes) -> str:
"""使用pdfplumber解析"""
text_parts = []
with pdfplumber.open(io.BytesIO(file_data)) as pdf:
for page in pdf.pages:
text = page.extract_text()
if text:
text_parts.append(text)
return "\n\n".join(text_parts)
语义分块详细实现
async def _chunk_document(self, text: str, document_id: str) -> List[Dict]:
"""
分块文档,保证语义完整性
"""
if not text or not text.strip():
return []
# 1. 使用LangChain RecursiveCharacterTextSplitter
chunks = self.splitter.split_text(text)
# separators:按段落、句子、标点分割
# chunk_size:500字符
# chunk_overlap:50字符(10%重叠)
# 2. 构建chunk结构
result = []
for i, chunk_text in enumerate(chunks):
chunk_id = self._generate_chunk_id(document_id, i, chunk_text)
result.append({
"id": chunk_id,
"content": chunk_text,
"index": i,
"tokens": self._estimate_tokens(chunk_text),
"metadata": {
"document_id": document_id,
"chunk_index": i,
"total_chunks": len(chunks),
},
})
return result
def _generate_chunk_id(self, document_id: str, index: int, text: str) -> str:
"""
生成chunk ID,保证唯一性
格式:{document_id}_{index}_{text_hash}
text_hash取MD5前8位,避免内容变化导致ID冲突
"""
text_hash = hashlib.md5(text.encode()).hexdigest()[:8]
return f"{document_id}_{index}_{text_hash}"
def _estimate_tokens(self, text: str) -> int:
"""
估算Token数量
策略:
- 中文字符:1字符 ≈ 1.5 tokens
- 英文单词:1单词 ≈ 1.3 tokens
更准确的方法是使用tiktoken,但有性能开销
"""
chinese_chars = sum(1 for char in text if "\u4e00" <= char <= "\u9fff")
english_words = len([word for word in text.split() if any(c.isalpha() for c in word)])
estimated_tokens = int(chinese_chars * 1.5 + english_words * 1.3)
return estimated_tokens
向量化批量处理
async def _vectorize_chunks(self, chunks: List[Dict]) -> List[List[float]]:
"""
批量向量化chunk,提升吞吐量
"""
# 1. 提取文本
texts = [chunk["content"] for chunk in chunks]
# 2. 批量向量化(batch_size=32)
embeddings = await self.embedder.embed_batch(texts)
# BGE-M3模型
# 输入:List[str]
# 输出:List[List[float]],每个1024维
return embeddings
# BGE-M3 Embedder实现
class BGE_M3_Embedder:
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""批量向量化"""
# 分批处理,避免内存溢出
batch_size = 32
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# SentenceTransformer模型推理
embeddings = self.model.encode(
batch,
batch_size=batch_size,
show_progress_bar=False,
normalize_embeddings=True, # L2归一化
)
all_embeddings.extend(embeddings.tolist())
return all_embeddings
Milvus 批量存储
async def _store_vectors(self, chunks: List[Dict], embeddings: List[List[float]],
document_id: str, tenant_id: str):
"""
批量存储向量到Milvus
"""
# 1. 准备数据
data = []
for chunk, embedding in zip(chunks, embeddings):
data.append({
"chunk_id": chunk["id"],
"document_id": document_id,
"tenant_id": tenant_id or "default",
"content": chunk["content"],
"embedding": embedding, # 1024维向量
"metadata": chunk.get("metadata", {}),
})
# 2. 批量插入Milvus
await self.vector_store_client.insert_batch(data)
# collection: documents
# 索引: HNSW (M=16, efConstruction=200)
# 分区: 按tenant_id
时序图:带增量索引的完整流程(端到端)
以下时序图展示从文档上传到索引完成的完整流程,包含增量索引优化。
sequenceDiagram
autonumber
participant KS as Knowledge Service
participant Kafka
participant Consumer as Kafka Consumer
participant Indexer as IncrementalIndexer
participant VersionMgr as VersionManager
participant Redis
participant Processor as DocumentProcessor
participant MinIO
participant Parser as PDFParser
participant Chunker
participant Embedder as BGE-M3
participant VSAdapter as vector-store-adapter
participant Milvus
participant Neo4j
rect rgb(225, 245, 254)
Note over KS,Kafka: 步骤1-3: 文档上传与事件发送
KS->>MinIO: 上传文档
MinIO-->>KS: file_path
KS->>Kafka: 发送 document.uploaded 事件
end
rect rgb(255, 243, 224)
Note over Kafka,Indexer: 步骤4-6: Kafka 消费与分发
Kafka->>Consumer: 轮询消息
Consumer->>Consumer: 解析 event_type 和 payload
Consumer->>Indexer: 调用增量索引器
end
rect rgb(200, 230, 201)
Note over Indexer,Redis: 步骤7-11: 增量索引变更检测
Indexer->>Indexer: calculate_content_hash(content)
Indexer->>VersionMgr: detect_changes(doc_id, hashes)
VersionMgr->>Redis: GET doc_version:{tenant}:{doc_id}
Redis-->>VersionMgr: current_version
VersionMgr-->>Indexer: VersionChange(needs_reindex)
end
alt 文档未变更(跳过索引)
rect rgb(200, 230, 201)
Note over Indexer,Consumer: 快速返回,节省 99% 时间
Indexer-->>Consumer: {status: "skipped", elapsed: 50ms}
Consumer->>Kafka: 提交 offset
end
else 文档变更或新文档(执行索引)
rect rgb(255, 224, 178)
Note over Indexer,Processor: 步骤12-20: 完整文档处理流程
Indexer->>Processor: process_document(doc_id, ...)
Processor->>MinIO: download_file(file_path)
MinIO-->>Processor: file_data (bytes)
Processor->>Parser: parse(file_data)
Parser->>Parser: pdfplumber → PyPDF2 降级
Parser-->>Processor: text (纯文本)
Processor->>Chunker: chunk(text, doc_id)
Chunker->>Chunker: RecursiveTextSplitter<br/>chunk_size=500, overlap=50
Chunker-->>Processor: chunks (List[Dict])
Processor->>Embedder: embed_batch(texts)
Embedder->>Embedder: SentenceTransformer.encode()<br/>batch_size=32
Embedder-->>Processor: embeddings (1024-dim)
end
rect rgb(255, 249, 196)
Note over Processor,Milvus: 步骤21-26: 向量存储(适配器模式)
Processor->>VSAdapter: VectorStoreClient.insert_batch(data)
VSAdapter->>VSAdapter: POST /collections/{name}/insert
VSAdapter->>VSAdapter: 选择后端: Milvus
VSAdapter->>Milvus: collection.insert(entities)
Milvus-->>VSAdapter: insert_result
VSAdapter-->>Processor: HTTP 200 + success
end
par 步骤27-30: 异步构建知识图谱(不阻塞)
Processor->>Processor: asyncio.create_task()
Processor->>Processor: GraphBuilder.extract(text)
Processor->>Neo4j: batch_create_nodes(entities)
Neo4j-->>Processor: success
end
rect rgb(200, 230, 201)
Note over Indexer,Redis: 步骤31-33: 保存新版本
Processor-->>Indexer: {status, chunks_count, duration}
Indexer->>VersionMgr: save_version(doc_id, hashes, chunk_count)
VersionMgr->>Redis: SET doc_version + LPUSH history
Redis-->>VersionMgr: ok
end
Indexer-->>Consumer: {status: "indexed", elapsed: 12s}
Consumer->>Kafka: 提交 offset
Consumer->>Kafka: 发送 document.indexed 事件(可选)
end
Note over KS,Neo4j: 性能对比:<br/>未变更文档: 50ms(跳过)<br/>新文档/变更: 10-15秒(完整索引)<br/>增量索引节省 60-80% 成本
流程详细说明:
- 步骤 1-3(文档上传):Knowledge Service 上传文档到 MinIO,获取 file_path,发送 Kafka 事件
- 步骤 4-6(事件消费):Kafka Consumer 轮询消息,解析 event_type 和 payload,分发到 IncrementalIndexer
- 步骤 7-11(增量检测):计算内容哈希,查询 Redis 获取当前版本,对比哈希值判断是否需要重新索引
- 快速路径(未变更):检测到无变更,直接返回 “skipped”,耗时 50ms,节省 99% 时间和成本
- 完整路径(变更或新文档):
- 步骤 12-20:文档下载 → 解析(PDF 双层降级)→ 语义分块 → 批量向量化
- 步骤 21-26:通过 VectorStoreClient 适配器写入 Milvus,支持多后端切换
- 步骤 27-30:异步构建知识图谱,不阻塞主流程
- 步骤 31-33:保存新版本到 Redis,包含哈希值、chunk 数量等元数据
- 事件反馈:处理完成后,Consumer 提交 Kafka offset,可选发送 document.indexed 事件通知下游
关键优化点:
- 增量索引:跳过未变更文档,节省 60-80% 成本
- 批量向量化:batch_size=32,吞吐量提升 10 倍+
- 适配器模式:解耦服务与存储,支持 Milvus/PgVector 切换
- 异步图谱:不阻塞主流程,总延迟减少 40%
- 双层降级:PDF 解析成功率提升到 98%+
时序图:文档索引完整流程(传统流程)
以下是不包含增量索引的传统流程,供对比参考。
sequenceDiagram
autonumber
participant KS as Knowledge Service
participant Kafka
participant Consumer as Kafka Consumer
participant Processor as DocumentProcessor
participant MinIO
participant Parser as PDFParser
participant Chunker as DocumentChunker
participant Embedder as BGE-M3 Embedder
participant Milvus
participant GraphBuilder
participant Neo4j
participant DB as PostgreSQL
KS->>MinIO: 上传文档
MinIO-->>KS: file_path
KS->>Kafka: 发送document.created事件
Note over Kafka: topic: document.events<br/>partition: tenant_id哈希
Kafka->>Consumer: 消费事件
Consumer->>Processor: process_document(doc_id, tenant_id, file_path)
Processor->>MinIO: download_file(file_path)
MinIO-->>Processor: file_data (bytes)
Processor->>Parser: parse(file_data)
Parser->>Parser: pdfplumber.open(BytesIO)
Parser->>Parser: extract_text()逐页提取
Parser-->>Processor: text (纯文本)
Processor->>Chunker: chunk(text, doc_id)
Chunker->>Chunker: RecursiveCharacterTextSplitter<br/>chunk_size=500, overlap=50
Chunker->>Chunker: generate_chunk_id()<br/>estimate_tokens()
Chunker-->>Processor: chunks (List[Dict])
Processor->>Embedder: embed_batch(texts)
Embedder->>Embedder: SentenceTransformer.encode()<br/>batch_size=32, normalize=True
Embedder-->>Processor: embeddings (List[1024-dim vectors])
Processor->>Milvus: insert_batch(chunks + embeddings)
Note over Milvus: collection: documents<br/>index: HNSW<br/>partition: tenant_id
Milvus-->>Processor: success
par 异步构建图谱(不阻塞)
Processor->>GraphBuilder: extract(text)
GraphBuilder->>GraphBuilder: NER实体识别
GraphBuilder->>GraphBuilder: 关系抽取
GraphBuilder-->>Processor: entities + relationships
Processor->>GraphBuilder: build(entities, rels, doc_id)
GraphBuilder->>Neo4j: CREATE (Entity)节点
GraphBuilder->>Neo4j: CREATE ()-[RELATED_TO]->()边
Neo4j-->>GraphBuilder: success
end
Processor->>DB: UPDATE documents SET status='COMPLETED'
DB-->>Processor: ok
Processor-->>Consumer: ProcessingResult(success, chunks_count, duration)
Consumer->>Kafka: 发送document.indexed事件(可选)
时序图详细说明
1. 图意概述(200-300 字)
该时序图展示了 Indexing Service 从文档上传到向量存储的完整索引流程,包含 7 个核心步骤:事件触发(步骤 1-4)、文档下载(步骤 5-6)、文档解析(步骤 7-9)、语义分块(步骤 10-12)、向量化(步骤 13-14)、Milvus 存储(步骤 15-16)、知识图谱构建(步骤 17-22)、状态回写(步骤 23-24)。
流程采用事件驱动架构,Knowledge Service 上传文档后发送 Kafka 事件触发索引。Processor 作为协调器,串联各处理组件(Parser、Chunker、Embedder)。每个组件职责单一,降低耦合,便于测试和扩展。
关键设计决策:(1)图谱构建异步执行,不阻塞向量存储;(2)批量向量化(batch_size=32),提升吞吐量;(3)Milvus 按 tenant_id 分区,支持多租户隔离;(4)状态回写 PostgreSQL,Knowledge Service 查询处理进度。
端到端延迟:PDF(10 页)约 10-15 秒(下载 1s、解析 2s、分块 1s、向量化 3s、存储 2s、图谱 5s 异步)。100 页文档约 30-40 秒。
2. 边界条件(150-200 字)
并发控制:Kafka Consumer 并发数设置为 4,每个 consumer 独立处理文档。Processor 内部使用 asyncio,单个文档的步骤串行执行。Milvus 支持并发插入,但单批次最大 10000 条记录。
超时控制:文档下载超时 30 秒,解析超时 60 秒,向量化超时 120 秒,Milvus 插入超时 30 秒。超时后任务失败,发送重试。图谱构建超时 300 秒,失败不影响向量存储。
幂等性:相同 document_id 重复处理时,Milvus 中旧数据被删除并重新插入。chunk_id 包含内容哈希,内容不变则 ID 不变。
顺序性:文档必须先解析再分块,分块完成才能向量化。向量化和图谱构建可并行(图谱异步)。
3. 异常路径与回退(200-300 字)
文档下载失败(步骤 5-6):MinIO 不可用或 file_path 错误时,重试 3 次(指数退避:2s、4s、8s)。仍失败则任务失败,状态回写 PostgreSQL 为 FAILED,通知用户。
PDF 解析失败(步骤 7-9):pdfplumber 解析失败时,降级到 PyPDF2。两者都失败则任务失败。扫描版 PDF 无法提取文本时,返回空文本,记录警告但不失败。
分块异常(步骤 10-12):文本为空或过短(<100 字符)时,创建单个 chunk。分块逻辑异常时,降级为固定长度分块(500 字符硬切分)。
向量化失败(步骤 13-14):Embedding 模型加载失败或推理超时时,任务失败并重试。GPU OOM 时,降级到 CPU 或减小 batch_size(32→16→8)。
Milvus 存储失败(步骤 15-16):连接失败或插入超时时,重试 3 次。Collection 不存在时,自动创建。主键冲突时,先删除旧数据再插入。
图谱构建失败(步骤 17-22):Neo4j 不可用或实体提取失败时,记录错误但不影响向量存储。图谱视为增强功能,非核心路径。
状态回写失败(步骤 23-24):PostgreSQL 不可用时,任务仍算成功(向量已存储),但状态不一致。后台任务定期同步状态。
4. 性能要点(200-300 字)
关键路径延迟分析(10 页 PDF 为例):
- 文档下载(步骤 5-6):1-2 秒(MinIO 对象存储,网络 IO)
- PDF 解析(步骤 7-9):2-3 秒(pdfplumber 逐页提取,CPU 密集)
- 语义分块(步骤 10-12):0.5-1 秒(纯计算,O(n)复杂度)
- 向量化(步骤 13-14):3-5 秒(GPU 推理,batch_size=32,约 50 个 chunk)
- Milvus 存储(步骤 15-16):1-2 秒(批量插入,网络+磁盘 IO)
- 图谱构建(步骤 17-22):5-10 秒(异步,不阻塞主流程)
端到端延迟:10-15 秒(不含图谱),主要瓶颈在向量化(30-40%)和 PDF 解析(20-30%)。
吞吐量优化:
- 批量向量化:batch_size=32,相比单条处理快 10 倍+
- Milvus 批量插入:单批次最多 10000 条,减少网络往返
- 异步图谱构建:不阻塞向量存储,总延迟减少 30%
- Kafka 分区:按 tenant_id 哈希,避免热点,支持水平扩展
资源消耗:
- CPU:解析和分块,占用 60-80%
- GPU:向量化推理,显存占用 2-3GB(batch_size=32)
- 内存:单文档峰值 500MB(存储 file_data 和 embeddings)
- 磁盘 IO:MinIO 下载和 Milvus 写入,IOPS 约 1000
扩展性:单实例处理约 10-20 个文档/分钟(混合文档类型)。生产环境部署 5 个实例,总吞吐量 50-100 个文档/分钟。
5. 兼容性说明(150-200 字)
事件版本:Kafka 事件结构 v1,未来 v2 新增字段(如 priority、callback_url)均为可选,保证向后兼容。Consumer 同时支持 v1 和 v2。
解析器扩展:ParserFactory 动态注册解析器,新增文档格式(如 JSON、CSV)无需修改核心代码。解析器实现 BaseParser 接口即可。
向量模型:默认 BGE-M3(1024 维),未来支持切换模型(如 OpenAI text-embedding-ada-002, 1536 维)。Milvus schema 支持 dynamic schema,向量维度可变。
分块策略:当前 RecursiveCharacterTextSplitter(固定分块),未来支持语义分块(sentence-transformers)和滑动窗口。策略选择通过配置,不影响 API。
灰度策略:新解析器或新分块策略通过配置项feature_flags控制,默认关闭。按 tenant_id 灰度,逐步放量。
增量索引模块
设计目标
增量索引是 Indexing Service 的核心优化特性,通过智能检测文档变更,避免不必要的重复索引,显著降低计算成本和索引延迟。
核心价值:
- 成本减少:跳过未变更文档,节省 60-80% 的向量化成本
- 延迟降低:无变更文档响应时间从 10-15 秒降至 50-100ms
- 资源优化:释放 GPU 资源用于真正需要索引的文档
增量索引架构
组件协作图
flowchart TB
subgraph API["API 层"]
CheckAPI["/incremental/check<br/>检查是否需要索引"]
ReindexAPI["/incremental/reindex<br/>强制重新索引"]
end
subgraph IncrementalModule["增量索引模块"]
IncrementalIndexer["IncrementalIndexer<br/>增量索引器"]
VersionManager["VersionManager<br/>版本管理器"]
end
subgraph Storage["存储层"]
Redis["Redis<br/>版本信息存储"]
VectorStore["Milvus<br/>向量数据"]
end
subgraph Processing["处理层"]
DocProcessor["DocumentProcessor<br/>文档处理器"]
end
CheckAPI --> IncrementalIndexer
ReindexAPI --> IncrementalIndexer
IncrementalIndexer --> VersionManager
IncrementalIndexer --> DocProcessor
VersionManager --> Redis
DocProcessor --> VectorStore
style API fill:#e3f2fd
style IncrementalModule fill:#fff3e0
style Storage fill:#e8f5e9
style Processing fill:#f3e5f5
VersionManager(版本管理器)
职责:
- 内容哈希计算:使用 SHA256 计算文档内容和元数据哈希值
- 变更检测:对比新旧哈希值,判断文档是否变更
- 版本存储:将版本信息存储到 Redis,TTL 365 天
- 历史追踪:记录文档版本历史(最多保留 100 个版本)
关键代码:
class VersionManager:
def calculate_content_hash(self, content: str) -> str:
"""计算内容哈希(SHA256)"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
async def detect_changes(
self,
document_id: str,
new_content_hash: str,
new_metadata_hash: str,
tenant_id: Optional[str] = None
) -> VersionChange:
"""检测文档变更"""
current_version = await self.get_current_version(document_id, tenant_id)
if not current_version:
return VersionChange(
change_type="created",
needs_reindex=True
)
content_changed = new_content_hash != current_version.content_hash
metadata_changed = new_metadata_hash != current_version.metadata_hash
if not content_changed and not metadata_changed:
return VersionChange(
change_type="unchanged",
needs_reindex=False
)
return VersionChange(
change_type="updated",
content_changed=content_changed,
metadata_changed=metadata_changed,
needs_reindex=content_changed # 仅内容变更需要重新索引
)
版本数据结构:
class DocumentVersion(BaseModel):
document_id: str
version: int
content_hash: str # SHA256 内容哈希
metadata_hash: str # SHA256 元数据哈希
chunk_count: int # 分块数量
vector_count: int # 向量数量
created_at: float # 创建时间戳
updated_at: float # 更新时间戳
tenant_id: Optional[str]
user_id: Optional[str]
status: str # active/deleted/archived
Redis 存储模式:
- 当前版本键:
doc_version:{tenant_id}:{document_id}→ DocumentVersion JSON - 历史记录键:
doc_history:{tenant_id}:{document_id}→ List[DocumentVersion](最新在前) - TTL:365 天(可配置)
IncrementalIndexer(增量索引器)
职责:
- 变更检查:调用 VersionManager 检测文档变更类型
- 智能分发:根据变更类型决定是否重新索引
- 完全重新索引:内容变更时,删除旧数据并重新索引
- 元数据更新:仅元数据变更时,只更新元数据,不重新向量化
- 统计追踪:记录跳过率、重新索引率等指标
处理决策树:
flowchart TD
Start[收到文档] --> CalcHash[计算内容哈希和元数据哈希]
CalcHash --> CheckVersion[检查版本]
CheckVersion --> IsNew{是新文档?}
IsNew -->|是| FullIndex[完全索引]
IsNew -->|否| CompareHash{内容变更?}
CompareHash -->|内容未变更,元数据未变更| Skip[跳过索引]
CompareHash -->|内容未变更,元数据变更| UpdateMeta[仅更新元数据]
CompareHash -->|内容变更| FullReindex[完全重新索引]
FullIndex --> SaveVersion[保存新版本]
FullReindex --> DeleteOld[删除旧数据]
DeleteOld --> SaveVersion
UpdateMeta --> SaveVersion
Skip --> Return[返回结果]
SaveVersion --> Return
style Skip fill:#c8e6c9
style UpdateMeta fill:#fff9c4
style FullReindex fill:#ffccbc
style FullIndex fill:#b3e5fc
关键方法:
async def process_with_version_check(
self,
document_id: str,
content: str,
metadata: Dict,
tenant_id: Optional[str] = None,
force_reindex: bool = False
) -> Dict:
"""带版本检查的文档处理"""
# 1. 计算哈希
content_hash = self.version_manager.calculate_content_hash(content)
metadata_hash = self.version_manager.calculate_metadata_hash(metadata)
# 2. 检测变更
change = await self.version_manager.detect_changes(
document_id, content_hash, metadata_hash, tenant_id
)
# 3. 根据变更类型处理
if change.change_type == "unchanged":
# 无变更,跳过索引
return {"status": "skipped", "reason": "no_changes"}
elif change.needs_reindex:
# 完全重新索引
return await self._full_reindex(...)
else:
# 仅更新元数据
return await self._update_metadata_only(...)
增量索引时序图
sequenceDiagram
autonumber
participant Client as 客户端
participant Indexer as IncrementalIndexer
participant VersionMgr as VersionManager
participant Redis
participant Processor as DocumentProcessor
participant Milvus
Client->>Indexer: process_with_version_check(doc_id, content, metadata)
rect rgb(230, 245, 255)
Note over Indexer,Redis: 步骤1-4: 版本检查
Indexer->>Indexer: calculate_content_hash(content)
Indexer->>Indexer: calculate_metadata_hash(metadata)
Indexer->>VersionMgr: detect_changes(doc_id, hashes)
VersionMgr->>Redis: GET doc_version:{tenant}:{doc_id}
Redis-->>VersionMgr: current_version
VersionMgr->>VersionMgr: 对比哈希值
VersionMgr-->>Indexer: VersionChange(type, needs_reindex)
end
alt 文档未变更 (unchanged)
rect rgb(200, 230, 201)
Note over Indexer,Client: 快速路径: 跳过索引
Indexer-->>Client: {status: "skipped", elapsed: 50ms}
end
else 内容变更 (needs_reindex=true)
rect rgb(255, 224, 178)
Note over Indexer,Milvus: 完全重新索引
Indexer->>Milvus: delete_by_document(doc_id)
Milvus-->>Indexer: deleted
Indexer->>Processor: process_document(doc_id, ...)
Processor->>Processor: 解析->分块->向量化->存储
Processor-->>Indexer: {chunks_count, duration}
Indexer->>VersionMgr: save_version(doc_id, hashes, ...)
VersionMgr->>Redis: SET doc_version + LPUSH history
Redis-->>VersionMgr: ok
Indexer-->>Client: {status: "reindexed", chunks: N, elapsed: 12s}
end
else 仅元数据变更 (metadata_changed=true)
rect rgb(255, 249, 196)
Note over Indexer,Milvus: 增量更新: 仅更新元数据
Indexer->>Milvus: update_metadata(doc_id, metadata)
Milvus-->>Indexer: updated
Indexer->>VersionMgr: save_version(doc_id, hashes, ...)
VersionMgr->>Redis: SET doc_version
Redis-->>VersionMgr: ok
Indexer-->>Client: {status: "updated", operation: "metadata_only", elapsed: 0.5s}
end
end
Note over Indexer,Milvus: 性能对比:<br/>跳过: 50ms<br/>元数据更新: 0.5s<br/>完全重新索引: 10-15s
增量索引性能数值
跳过率(Skip Rate)
| 场景 | 跳过率 | 节省成本 | 说明 |
|---|---|---|---|
| 文档重复上传 | 95-100% | 95-100% | 相同文档重复上传,完全跳过 |
| 定期全量同步 | 70-80% | 70-80% | 大部分文档未变更 |
| 元数据修正 | 0% | 50% | 仅更新元数据,不重新向量化 |
| 内容微调(<5%) | 0% | 0% | 内容变更,必须重新索引 |
响应延迟对比
| 操作类型 | 无增量索引 | 有增量索引 | 延迟减少 |
|---|---|---|---|
| 未变更文档 | 10-15 秒 | 50-100 ms | 减少 99% |
| 元数据变更 | 10-15 秒 | 0.5-1 秒 | 减少 90-95% |
| 内容变更 | 10-15 秒 | 10-15 秒 | 无优化(必须重新索引) |
成本节省估算
假设场景:1000 个文档/天上传,其中 70% 未变更,20% 仅元数据变更,10% 内容变更
无增量索引:
- 总索引次数:1000 次
- 向量化成本:1000 × $0.002 = $2.00/天
- GPU 占用时长:1000 × 5 秒 = 1.39 小时/天
有增量索引:
- 跳过索引:700 次(无成本)
- 元数据更新:200 次(无向量化成本)
- 完全重新索引:100 次
- 向量化成本:100 × $0.002 = $0.20/天
- GPU 占用时长:100 × 5 秒 = 0.14 小时/天
节省:
- 成本节省:90%($2.00 → $0.20)
- GPU 时长节省:90%(1.39h → 0.14h)
- 平均延迟:从 12 秒降至 1.5 秒(减少 87%)
关键功能点分析
本节罗列 Indexing Service 的关键功能点,说明其设计目的(性能提升、成本减少、准确率提升、减少幻觉等)以及估计的数值效果。
1. 批量向量化(Batch Embedding)
功能描述:
使用 batch_size=32 批量调用 SentenceTransformer 模型,相比单条向量化提升吞吐量。
设计目的:
- 性能提升:GPU 并行推理,充分利用硬件并行能力
- 成本减少:降低 GPU 空闲时间,提升资源利用率
关键代码:
# app/core/embedder.py
embeddings = self.model.encode(
valid_texts,
batch_size=self.batch_size, # 32
convert_to_numpy=True,
)
数值估计:
| 指标 | 单条处理 | 批量处理 (batch_size=32) | 提升比例 |
|---|---|---|---|
| 吞吐量 | 3-5 条/秒 | 30-50 条/秒 | 10 倍+ |
| 延迟 (50 chunks) | 15-20 秒 | 3-5 秒 | 减少 70-80% |
| GPU 利用率 | 20-30% | 80-90% | 提升 60% |
| 成本 (每万次向量化) | $5-8 | $0.5-1 | 减少 85-90% |
2. 语义分块(Semantic Chunking)
功能描述:
使用 RecursiveCharacterTextSplitter 按语义边界分块,优先保留段落、句子完整性。
设计目的:
- 准确率提升:保留语义完整性,检索时召回更准确的上下文
- 减少幻觉:避免句子被截断,LLM 生成时减少上下文不连贯导致的幻觉
关键策略:
- 分隔符优先级:
\n\n→\n→。→.→ 字符级 - chunk_overlap:50 字符(10%重叠),保证边界 chunk 也能召回
数值估计:
| 指标 | 固定长度分块 | 语义分块 | 提升比例 |
|---|---|---|---|
| 检索准确率 (Recall@10) | 65-70% | 78-85% | 提升 13-15% |
| 检索精确率 (Precision@10) | 55-60% | 68-75% | 提升 13-15% |
| LLM 生成幻觉率 | 15-20% | 8-12% | 减少 40-50% |
| 用户满意度 | 70-75% | 82-88% | 提升 12-13% |
3. PDFParser 双层降级策略
功能描述:
优先使用 pdfplumber 解析 PDF,文本不足 100 字符时降级到 PyPDF2。
设计目的:
- 准确率提升:pdfplumber 准确率更高(95%+ vs 85%+)
- 成本减少:降级策略保证兼容性,减少人工介入成本
降级条件:
if not text or len(text) < 100:
# 降级到PyPDF2
text = await self._parse_with_pypdf2(file_data)
数值估计:
| 指标 | 仅 PyPDF2 | 仅 pdfplumber | 双层降级 |
|---|---|---|---|
| 解析成功率 | 88-92% | 93-97% | 98-99.5% |
| 文本准确率 | 85-90% | 95-98% | 93-96% |
| 平均延迟 (10 页 PDF) | 1.5-2 秒 | 2.5-3 秒 | 2-2.5 秒 |
| 人工介入率 | 8-12% | 3-7% | 0.5-2% |
4. Chunk Overlap(重叠分块)
功能描述:
相邻 chunk 之间重叠 50 字符(10%),保证边界内容在两个 chunk 中都出现。
设计目的:
- 准确率提升:边界关键信息不被遗漏,检索召回率提升
- 减少幻觉:LLM 生成时,边界 chunk 提供完整上下文,减少理解偏差
重叠示例:
Chunk 1: "...关键信息在句子末尾"
Chunk 2: "关键信息在句子末尾...后续内容"
数值估计:
| 指标 | 无重叠 | 10%重叠 | 20%重叠 | 最优值 |
|---|---|---|---|---|
| 检索召回率 (Recall@10) | 72-75% | 80-85% | 82-87% | 10%重叠 |
| 边界信息召回率 | 55-60% | 88-92% | 90-94% | 10%重叠 |
| 存储成本增加 | 0% | +10% | +20% | 10%重叠 |
| LLM 幻觉率 | 12-15% | 8-10% | 7-9% | 10%重叠 |
权衡分析:
- 10%重叠是性价比最优点:召回率提升 8-10%,存储成本仅增加 10%
- 20%重叠边际收益递减,存储成本增加 20%,召回率仅提升 2%
5. Milvus HNSW 索引
功能描述:
使用 HNSW(Hierarchical Navigable Small World)索引,参数 M=16,efConstruction=200。
设计目的:
- 性能提升:HNSW 检索速度快(O(log n)),相比暴力搜索提升 100 倍+
- 准确率保持:Recall@10 约 95-98%,接近暴力搜索
索引参数:
- M=16:每个节点的邻居数,平衡速度和准确率
- efConstruction=200:构建时搜索深度,影响索引质量
数值估计:
| 指标 | 暴力搜索 (FLAT) | HNSW (M=16, ef=200) | IVF_FLAT | 最优选择 |
|---|---|---|---|---|
| 检索延迟 (100万向量, top 10) | 800-1200 ms | 15-25 ms | 50-80 ms | HNSW |
| Recall@10 | 100% | 95-98% | 90-95% | HNSW |
| 索引构建时间 (100万向量) | 0 ms | 5-10 分钟 | 2-5 分钟 | IVF_FLAT |
| 内存占用 (100万向量, 1024维) | 4 GB | 5-6 GB | 4.5 GB | FLAT |
| QPS (单机) | 1-2 | 200-300 | 50-80 | HNSW |
权衡分析:
- HNSW 在检索速度和准确率之间达到最优平衡
- Recall@10 下降 2-5%,但延迟降低 98%,QPS 提升 100 倍+
6. 异步图谱构建
功能描述:
使用 asyncio.create_task() 异步构建知识图谱,不阻塞向量存储主流程。
设计目的:
- 性能提升:主流程延迟减少 30-40%
- 用户体验:用户更快看到文档索引完成,图谱后台构建
关键代码:
# app/core/document_processor.py
asyncio.create_task(
self._build_graph(text, document_id, tenant_id)
)
# 不等待图谱构建完成,立即返回
数值估计:
| 指标 | 同步图谱构建 | 异步图谱构建 | 提升比例 |
|---|---|---|---|
| 主流程延迟 (10 页 PDF) | 18-25 秒 | 10-15 秒 | 减少 40-50% |
| 用户感知延迟 | 18-25 秒 | 10-15 秒 | 减少 40-50% |
| 系统吞吐量 | 10-15 文档/分钟 | 18-25 文档/分钟 | 提升 60-80% |
| 图谱构建成功率 | 98-99% | 95-97% | 降低 2-3% |
权衡分析:
- 异步构建牺牲 2-3%图谱成功率(失败不影响向量存储),换取主流程延迟减少 40-50%
- 图谱构建失败时,仅记录错误日志,不影响用户使用
7. Milvus 按 tenant_id 分区
功能描述:
Milvus collection 按 tenant_id 分区,每个租户数据物理隔离。
设计目的:
- 性能提升:检索时只搜索当前租户分区,速度提升 5-10 倍(假设 10 个租户)
- 成本减少:分区剪枝,减少计算和 IO 开销
- 数据安全:租户数据物理隔离,防止越权访问
分区策略:
# 插入时指定分区
collection.insert(data, partition_name=tenant_id)
# 检索时指定分区
collection.search(query, partition_names=[tenant_id])
数值估计:
| 指标 | 单分区(不隔离) | 多分区(按 tenant_id) | 提升比例 |
|---|---|---|---|
| 检索延迟 (10 租户,每租户 10万向量) | 120-150 ms | 15-25 ms | 减少 85-90% |
| 检索 QPS (单机) | 30-50 | 200-300 | 提升 5-7 倍 |
| 跨租户访问风险 | 中等(需应用层控制) | 低(物理隔离) | 安全性提升 |
| 索引构建时间 | 15-20 分钟 | 5-10 分钟(分区并行) | 减少 50-60% |
8. Kafka 事件驱动架构
功能描述:
Knowledge Service 上传文档后发送 Kafka 事件,Indexing Service 异步消费。
设计目的:
- 性能提升:解耦上游服务,Knowledge Service 不等待索引完成
- 可靠性提升:Kafka 持久化事件,索引失败可重试
- 成本减少:异步处理,削峰填谷,降低服务器配置要求
数值估计:
| 指标 | 同步索引(阻塞) | Kafka 异步索引 | 提升比例 |
|---|---|---|---|
| Knowledge Service 响应延迟 | 12-18 秒 | 200-500 ms | 减少 95%+ |
| 系统吞吐量 | 10-15 文档/分钟 | 50-100 文档/分钟 | 提升 4-7 倍 |
| 失败重试能力 | 无(需人工重试) | 自动重试 3 次 | 可靠性提升 |
| 峰值处理能力 | 20 文档/分钟 | 200 文档/分钟 | 提升 10 倍 |
9. CachedEmbedder(向量化缓存)
功能描述:
对重复文本的向量化结果进行缓存(LRU,容量 10000 条)。
设计目的:
- 性能提升:缓存命中时延迟降低 50 倍+(50ms → <1ms)
- 成本减少:减少 GPU 推理次数,降低计算成本
缓存策略:
# 缓存键:文本哈希值
text_hash = hash(text)
if text_hash in self.cache:
return self.cache[text_hash] # 缓存命中
数值估计:
| 指标 | 无缓存 | 缓存 (10000 条) | 提升比例 |
|---|---|---|---|
| 缓存命中率(重复文档场景) | 0% | 30-50% | - |
| 平均向量化延迟 | 50 ms/条 | 25-30 ms/条 | 减少 40-50% |
| GPU 推理次数(1000 次请求) | 1000 次 | 500-700 次 | 减少 30-50% |
| 计算成本(每万次向量化) | $0.5-1 | $0.25-0.5 | 减少 50% |
权衡分析:
- 缓存占用内存 40MB(10000 条 × 1024 维 × 4 字节),成本可忽略
- 命中率取决于重复文本比例,FAQ 场景命中率可达 60-80%
10. RecursiveCharacterTextSplitter(递归分块)
功能描述:
按分隔符优先级递归尝试分块,保证优先按语义边界分块。
设计目的:
- 准确率提升:优先保留段落、句子完整性,提升检索和生成质量
- 减少幻觉:避免句子被截断,LLM 理解更准确
分隔符优先级:
\n\n(段落)→ 2.\n(行)→ 3.。(中文句号)→ 4..(英文句号)→ 5. 空格 → 6. 字符级
数值估计:
| 指标 | 固定长度分块 | 递归语义分块 | 提升比例 |
|---|---|---|---|
| 句子完整率 | 65-70% | 92-96% | 提升 25-30% |
| 段落完整率 | 50-60% | 85-90% | 提升 35-40% |
| 检索准确率 (Recall@10) | 70-75% | 80-85% | 提升 10-15% |
| LLM 生成幻觉率 | 12-15% | 7-10% | 减少 30-40% |
0. 增量索引(Incremental Indexing)
功能描述:
通过内容哈希对比,智能检测文档变更,仅对变更文档执行重新索引。
设计目的:
- 成本减少:跳过未变更文档,避免不必要的向量化计算
- 性能提升:无变更文档响应时间从 10 秒降至 50ms
- 资源优化:释放 GPU 资源处理真正需要索引的文档
关键代码:
# app/services/version_manager.py
async def detect_changes(self, document_id, new_content_hash, new_metadata_hash):
current_version = await self.get_current_version(document_id)
if not current_version:
return VersionChange(change_type="created", needs_reindex=True)
content_changed = new_content_hash != current_version.content_hash
if not content_changed:
return VersionChange(change_type="unchanged", needs_reindex=False)
return VersionChange(change_type="updated", needs_reindex=True)
数值估计:
| 指标 | 无增量索引 | 有增量索引 | 提升比例 |
|---|---|---|---|
| 未变更文档延迟 | 10-15 秒 | 50-100 ms | 减少 99% |
| 跳过率(重复上传场景) | 0% | 70-80% | - |
| 向量化成本(70%未变更) | $2.00/千文档 | $0.60/千文档 | 减少 70% |
| GPU 占用时长 | 1.39 小时/千文档 | 0.42 小时/千文档 | 减少 70% |
| 平均响应延迟 | 12 秒 | 4.2 秒 | 减少 65% |
权衡分析:
- 需要 Redis 存储版本信息(内存占用约 1KB/文档)
- SHA256 哈希计算开销(10MB 文档约 50-100ms,可忽略)
- 版本信息 TTL 365 天,长期存储成本低
关键功能点汇总表
| 功能点 | 主要目的 | 核心指标 | 提升/减少 | 权衡 |
|---|---|---|---|---|
| 增量索引 | 成本减少、性能提升 | 平均延迟 | 减少 65% | Redis 内存 +1KB/文档 |
| 批量向量化 | 性能提升、成本减少 | 吞吐量 | 提升 10 倍+ | 无明显权衡 |
| 语义分块 | 准确率提升、减少幻觉 | Recall@10 | 提升 13-15% | 无明显权衡 |
| PDFParser 降级 | 准确率提升、成本减少 | 解析成功率 | 提升到 98-99.5% | 延迟略增(+10-20%) |
| Chunk Overlap | 准确率提升、减少幻觉 | 边界召回率 | 提升 30-35% | 存储成本 +10% |
| Milvus HNSW 索引 | 性能提升 | 检索延迟 | 减少 98% | Recall 下降 2-5% |
| 异步图谱构建 | 性能提升、用户体验 | 主流程延迟 | 减少 40-50% | 图谱成功率降低 2-3% |
| tenant_id 分区 | 性能提升、数据安全 | 检索延迟 | 减少 85-90% | 分区管理复杂度 +20% |
| Kafka 异步 | 性能提升、可靠性 | 响应延迟 | 减少 95%+ | 最终一致性(非实时) |
| 向量化缓存 | 性能提升、成本减少 | 平均延迟 | 减少 40-50% | 内存占用 +40MB |
| 递归分块 | 准确率提升、减少幻觉 | 句子完整率 | 提升 25-30% | 无明显权衡 |
性能优化优先级建议
基于成本-收益分析,优化优先级从高到低:
- 增量索引(P0):成本减少 70%,平均延迟减少 65%,ROI 最高,必须实施
- 批量向量化(P0):无权衡,性能提升 10 倍+,必须实施
- Kafka 异步(P0):解耦上游,响应延迟减少 95%+,必须实施
- Milvus HNSW 索引(P0):检索延迟减少 98%,Recall 下降 2-5%可接受
- 语义分块 + Chunk Overlap(P0):准确率提升 13-15%,减少幻觉 30-40%
- PDFParser 降级(P1):解析成功率提升到 98%+,降低人工介入
- 异步图谱构建(P1):主流程延迟减少 40%,图谱成功率略降可接受
- tenant_id 分区(P1):多租户场景必须,检索延迟减少 85%
- 向量化缓存(P2):重复文档场景有效,通用场景收益有限
- 递归分块(P2):已在语义分块中实施,无额外成本
- 其他优化(P3):如自适应分块、智能解析等,边际收益递减
优先级说明:
- P0(必须实施):成本减少 >50% 或性能提升 >5 倍,无重大权衡
- P1(高优先级):成本减少 30-50% 或性能提升 2-5 倍,权衡可接受
- P2(中优先级):特定场景有效,边际收益明显
- P3(低优先级):边际收益递减,酌情实施
配置说明
环境变量
# 服务配置
HOST=0.0.0.0
PORT=8002
WORKERS=4
# Kafka配置
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CONSUMER_GROUP=indexing-service
KAFKA_TOPIC=document.events
KAFKA_AUTO_OFFSET_RESET=earliest
# MinIO配置
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=voiceassistant
MINIO_USE_SSL=false
# Milvus配置
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_COLLECTION=documents
MILVUS_INDEX_TYPE=HNSW
MILVUS_METRIC_TYPE=COSINE
# Neo4j配置
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password
# PostgreSQL配置
DB_HOST=localhost
DB_PORT=5432
DB_NAME=voiceassistant
DB_USER=postgres
DB_PASSWORD=password
# 分块配置
CHUNK_SIZE=500
CHUNK_OVERLAP=50
CHUNK_SEPARATORS=["\n\n", "\n", "。", ".", " "]
# Embedding配置
EMBEDDING_MODEL=BAAI/bge-m3
EMBEDDING_DIM=1024
EMBEDDING_BATCH_SIZE=32
EMBEDDING_MAX_LENGTH=512
Nacos 配置
# indexing-service.yaml
service:
name: indexing-service
version: 1.0.0
server:
host: 0.0.0.0
port: 8002
workers: 4
kafka:
bootstrap_servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer_group: indexing-service
topic: document.events
auto_offset_reset: earliest
max_poll_records: 10
session_timeout_ms: 30000
minio:
endpoint: ${MINIO_ENDPOINT:localhost:9000}
access_key: ${MINIO_ACCESS_KEY}
secret_key: ${MINIO_SECRET_KEY}
bucket: voiceassistant
use_ssl: false
download_timeout: 30
milvus:
host: ${MILVUS_HOST:localhost}
port: 19530
collection: documents
index_type: HNSW
index_params:
M: 16
efConstruction: 200
metric_type: COSINE
insert_batch_size: 1000
neo4j:
uri: bolt://localhost:7687
user: neo4j
password: ${NEO4J_PASSWORD}
max_connection_lifetime: 3600
max_connection_pool_size: 50
chunking:
strategy: recursive
chunk_size: 500
chunk_overlap: 50
separators:
- "\n\n"
- "\n"
- '。'
- '!'
- '?'
- '.'
- '!'
- '?'
- ' '
- ''
adaptive_strategies:
code:
chunk_size: 800
chunk_overlap: 100
separators: ["\n\nclass ", "\n\ndef ", "\n\n", "\n"]
markdown:
chunk_size: 600
chunk_overlap: 60
separators: ["\n## ", "\n### ", "\n\n", "\n"]
technical:
chunk_size: 700
chunk_overlap: 70
embedding:
model: BAAI/bge-m3
dimension: 1024
batch_size: 32
max_length: 512
device: cuda # cuda/cpu
normalize: true
graph_building:
enabled: true
async: true
timeout: 300
entity_extraction:
model: ner-model
confidence_threshold: 0.7
relationship_extraction:
model: relation-model
confidence_threshold: 0.6