GraphRAG-00-总览
0. 摘要
0.1 项目目标
GraphRAG 是由 Microsoft 开发的基于图的检索增强生成(Retrieval-Augmented Generation)系统。项目核心目标包括:
- 从非结构化文本中抽取结构化知识图谱(实体、关系、社区)
- 基于图结构提供多种查询模式(全局搜索、局部搜索、DRIFT 搜索、基础搜索)
- 支持大规模文档集的索引与增量更新
- 提供灵活的配置系统与多种 LLM 提供商集成
0.2 核心能力边界
核心能力:
- 索引构建:文本分块、实体抽取、关系抽取、社区发现、社区报告生成、文本嵌入
- 查询引擎:全局搜索(Global Search)、局部搜索(Local Search)、DRIFT 搜索、基础搜索(Basic Search)
- 增量更新:支持基于现有索引的增量更新,避免全量重建
- 多模型支持:支持 OpenAI、Azure OpenAI、LiteLLM 等多种 LLM 提供商
- 向量存储:支持 Azure AI Search、LanceDB、CosmosDB 等向量数据库
- 提示词调优:自动根据输入数据生成定制化提示词
非目标:
- 不提供实时流式数据处理
- 不提供分布式并行计算(索引构建在单机异步执行)
- 不提供图数据库持久化(输出为 Parquet 文件)
- 不提供 Web UI 界面(仅提供 CLI 和 Python API)
0.3 运行环境
-
语言:Python 3.10-3.12
-
运行时:asyncio 异步执行框架
-
核心依赖:
- LLM 调用:
fnllm、openai、litellm - 图计算:
networkx、graspologic - 数据处理:
pandas、pyarrow - 向量计算:
numpy、umap-learn - 自然语言处理:
nltk、spacy、tiktoken - 向量存储:
azure-search-documents、lancedb、azure-cosmos
- LLM 调用:
-
部署形态:
- 单体应用:通过 CLI 或 Python API 调用
- 索引输出:Parquet 文件存储在本地文件系统、Azure Blob Storage 或 CosmosDB
- 查询服务:可集成到用户应用中作为库使用
1. 整体架构图
flowchart TB
subgraph CLI["命令行入口层"]
Init[graphrag init]
Index[graphrag index]
Update[graphrag update]
Query[graphrag query]
PromptTune[graphrag prompt-tune]
end
subgraph API["API 层"]
IndexAPI[build_index]
QueryAPI[global_search<br/>local_search<br/>drift_search<br/>basic_search]
PromptAPI[generate_indexing_prompts]
end
subgraph Core["核心业务层"]
direction TB
subgraph IndexEngine["索引引擎"]
Pipeline[Pipeline Factory]
Workflows[Workflows]
Operations[Operations]
end
subgraph QueryEngine["查询引擎"]
GlobalSearch[Global Search]
LocalSearch[Local Search]
DriftSearch[DRIFT Search]
BasicSearch[Basic Search]
end
subgraph ConfigSystem["配置系统"]
ConfigLoader[Config Loader]
Validation[Validation]
end
end
subgraph DataLayer["数据层"]
DataModel[Data Model<br/>Entity/Relationship/Community]
Storage[Storage<br/>File/Blob/CosmosDB]
Cache[Cache<br/>File/JSON/Memory]
VectorStore[Vector Store<br/>Azure AI/LanceDB/CosmosDB]
end
subgraph AI["AI 服务层"]
LLM[Language Model<br/>OpenAI/Azure/LiteLLM]
Embedding[Embedding Model]
Tokenizer[Tokenizer]
PromptSystem[Prompt System]
end
subgraph Support["支撑层"]
Callbacks[Callbacks]
Logger[Logger]
Factory[Factory]
end
CLI --> API
API --> Core
Core --> DataLayer
Core --> AI
Core --> Support
IndexEngine --> Workflows
Workflows --> Operations
QueryEngine -.查询.-> DataModel
IndexEngine -.写入.-> DataModel
Operations --> LLM
Operations --> Embedding
QueryEngine --> LLM
QueryEngine --> VectorStore
ConfigSystem --> ConfigLoader
1.1 架构图说明
1.1.1 组件职责
命令行入口层(CLI):
graphrag init:初始化项目,生成默认配置文件和目录结构graphrag index:构建知识图谱索引graphrag update:增量更新现有索引graphrag query:执行查询操作graphrag prompt-tune:根据数据自动生成定制化提示词
API 层:
- 提供编程接口供外部应用集成
build_index:异步索引构建接口,返回PipelineRunResult列表- 查询接口:支持流式和非流式两种模式
generate_indexing_prompts:提示词调优接口
核心业务层:
-
索引引擎:
Pipeline Factory:根据配置生成工作流管道(Standard/Fast/Update 模式)Workflows:工作流定义(如extract_graph、create_communities、generate_text_embeddings)Operations:原子操作(如 LLM 抽取、图算法、嵌入计算)
-
查询引擎:
Global Search:Map-Reduce 模式,适合高层次宏观问题Local Search:基于实体和关系的局部上下文检索DRIFT Search:结合局部和全局策略的混合搜索Basic Search:基于向量相似度的简单检索
-
配置系统:
- 支持 YAML 格式配置文件
- 环境变量注入(通过
.env文件) - CLI 参数覆盖
- Pydantic 模型校验
数据层:
- Data Model:定义核心数据结构(Entity、Relationship、Community、TextUnit、CommunityReport、Covariate、Document)
- Storage:抽象存储接口,支持本地文件系统、Azure Blob Storage、CosmosDB
- Cache:LLM 调用缓存,减少重复请求(支持文件缓存、JSON 缓存、内存缓存)
- Vector Store:向量嵌入存储与检索
AI 服务层:
- Language Model:统一的 LLM 调用接口,支持多种提供商
- Embedding Model:文本嵌入模型
- Tokenizer:Token 计数与文本切分
- Prompt System:提示词模板管理
支撑层:
- Callbacks:工作流生命周期回调(进度、错误、日志)
- Logger:日志记录(标准输出、文件、Blob)
- Factory:工厂模式封装对象创建
1.1.2 数据流与控制流
索引构建数据流:
原始文档 → 文本分块 → 实体抽取 → 关系抽取 → 图合并 → 社区发现 → 社区报告生成 → 嵌入计算 → Parquet 文件
查询执行数据流:
用户查询 → 查询嵌入 → 向量检索 → 上下文构建 → LLM 生成 → 响应返回
控制流特征:
- 索引构建:异步管道执行,支持并发 LLM 调用
- 查询执行:异步流式生成(可选)
- 错误处理:每个工作流独立错误捕获,不中断整体流程
- 回调机制:通过 Callbacks 接口通知外部系统
1.1.3 扩展性与状态管理
扩展性:
- 通过自定义 Workflow 扩展索引流程
- 通过自定义 Operation 添加新的数据处理逻辑
- 通过 Factory 模式注册新的存储后端
- 通过 LLM Provider 接口集成新的模型提供商
状态管理:
- 索引状态:存储在 Parquet 文件中,无内存状态
- 查询状态:无状态设计,每次查询独立执行
- 增量更新:读取历史索引,合并新数据,输出到新目录
1.1.4 高可用与容错
高可用:
- 无中心化服务,索引输出可分发到多个查询节点
- 存储层支持 Azure Blob Storage 和 CosmosDB 的高可用特性
容错:
- LLM 调用失败:支持重试策略(指数退避)
- 工作流失败:记录错误但继续执行其他工作流
- 缓存机制:减少 LLM 调用失败的影响
2. 全局时序图(索引构建主流程)
sequenceDiagram
autonumber
participant User as 用户/CLI
participant API as API (build_index)
participant Factory as Pipeline Factory
participant Pipeline as Pipeline Runner
participant WF_Load as Workflow: Load Documents
participant WF_Extract as Workflow: Extract Graph
participant WF_Community as Workflow: Create Communities
participant WF_Report as Workflow: Community Reports
participant WF_Embed as Workflow: Generate Embeddings
participant LLM as LLM Service
participant Storage as Storage Layer
participant Cache as Cache Layer
User->>API: build_index(config, method)
API->>Factory: create_pipeline(config, method)
Factory-->>API: Pipeline(workflows)
API->>Pipeline: run_pipeline(pipeline, config, callbacks)
Note over Pipeline: 工作流 1: 加载输入文档
Pipeline->>WF_Load: run_workflow(config, context)
WF_Load->>Storage: 读取原始文档 (CSV/Text/JSON)
Storage-->>WF_Load: DataFrame(documents)
WF_Load->>WF_Load: 文本分块 (text_splitting)
WF_Load->>Storage: 写入 text_units.parquet
WF_Load-->>Pipeline: text_units DataFrame
Note over Pipeline: 工作流 2: 实体和关系抽取
Pipeline->>WF_Extract: run_workflow(config, context)
WF_Extract->>Storage: 读取 text_units.parquet
Storage-->>WF_Extract: text_units DataFrame
loop 每个 text_unit(异步并发)
WF_Extract->>Cache: 检查缓存
alt 缓存命中
Cache-->>WF_Extract: 缓存的抽取结果
else 缓存未命中
WF_Extract->>LLM: 调用实体抽取提示词
LLM-->>WF_Extract: 实体和关系列表 (JSON)
WF_Extract->>Cache: 存储到缓存
end
end
WF_Extract->>WF_Extract: 合并和去重实体/关系
WF_Extract->>Storage: 写入 entities.parquet, relationships.parquet
WF_Extract-->>Pipeline: entities, relationships
Note over Pipeline: 工作流 3: 社区发现
Pipeline->>WF_Community: run_workflow(config, context)
WF_Community->>Storage: 读取 entities.parquet, relationships.parquet
Storage-->>WF_Community: 图数据
WF_Community->>WF_Community: Leiden 算法聚类
WF_Community->>Storage: 写入 communities.parquet
WF_Community-->>Pipeline: communities
Note over Pipeline: 工作流 4: 社区报告生成
Pipeline->>WF_Report: run_workflow(config, context)
WF_Report->>Storage: 读取 communities.parquet
Storage-->>WF_Report: communities DataFrame
loop 每个社区(异步并发)
WF_Report->>Cache: 检查缓存
alt 缓存命中
Cache-->>WF_Report: 缓存的报告
else 缓存未命中
WF_Report->>LLM: 调用社区报告生成提示词
LLM-->>WF_Report: 社区报告 (JSON)
WF_Report->>Cache: 存储到缓存
end
end
WF_Report->>Storage: 写入 community_reports.parquet
WF_Report-->>Pipeline: community_reports
Note over Pipeline: 工作流 5: 文本嵌入
Pipeline->>WF_Embed: run_workflow(config, context)
WF_Embed->>Storage: 读取待嵌入数据
Storage-->>WF_Embed: 文本列表
loop 每个文本批次
WF_Embed->>Cache: 检查缓存
alt 缓存命中
Cache-->>WF_Embed: 缓存的嵌入向量
else 缓存未命中
WF_Embed->>LLM: 调用嵌入 API
LLM-->>WF_Embed: 嵌入向量数组
WF_Embed->>Cache: 存储到缓存
end
end
WF_Embed->>Storage: 写入嵌入列到 Parquet
WF_Embed-->>Pipeline: 嵌入完成
Pipeline-->>API: List[PipelineRunResult]
API-->>User: 索引构建完成
2.1 时序图说明
2.1.1 入口与鉴权
- 入口:用户通过 CLI(
graphrag index)或直接调用graphrag.api.build_index进入 - 鉴权:无内置鉴权机制,由用户在配置中提供 API Key(通过环境变量或配置文件)
- 配置加载:读取 YAML 配置文件,解析环境变量,构建
GraphRagConfig对象
2.1.2 幂等性
- LLM 调用幂等:通过缓存机制实现,相同输入返回缓存结果
- 工作流幂等:每次运行完全重新计算,无状态累积
- 增量更新模式:读取历史索引,仅处理新文档,最后合并输出
2.1.3 回退策略
-
LLM 调用失败:
- 重试机制:指数退避,最多重试 10 次
- 降级策略:若 LLM 提取失败,记录错误但继续处理其他文本单元
-
工作流失败:
- 独立错误捕获:每个工作流的错误不影响其他工作流
- 错误聚合:最终返回所有工作流的错误列表
-
存储失败:
- Blob Storage 重试:Azure SDK 内置重试
- 本地文件系统:不重试,直接报错
2.1.4 重试点
- LLM API 调用:在
language_model层实现重试 - 向量存储写入:在
vector_store层实现重试 - Blob Storage 读写:在
storage层依赖 Azure SDK 重试
2.1.5 超时设定
- LLM 调用超时:默认 180 秒(可配置
request_timeout) - 工作流超时:无整体超时限制(依赖 LLM 调用超时累积)
- 向量存储操作超时:依赖各存储后端的默认设置
2.1.6 资源上界
- 并发 LLM 请求:通过
concurrent_requests配置(默认 25) - 内存使用:由 Pandas DataFrame 大小决定,取决于文档数量和文本长度
- 文件 I/O:批量写入 Parquet 文件,单次写入大小无限制
3. 全局时序图(查询执行流程)
sequenceDiagram
autonumber
participant User as 用户/CLI
participant API as Query API
participant Factory as Search Factory
participant Search as Search Engine
participant Context as Context Builder
participant Vector as Vector Store
participant LLM as LLM Service
participant Storage as Storage Layer
User->>API: local_search(config, query, ...)
API->>Storage: 读取索引文件<br/>(entities/relationships/text_units/...)
Storage-->>API: DataFrames
API->>API: 数据模型转换<br/>(DataFrame → Entity/Relationship objects)
API->>Factory: get_local_search_engine(config, data...)
Factory->>Factory: 创建 LLM 实例
Factory->>Factory: 创建 Embedding 实例
Factory->>Factory: 创建 Tokenizer 实例
Factory->>Factory: 创建 Context Builder 实例
Factory-->>API: LocalSearch 实例
API->>Search: stream_search(query)
Note over Search,Context: 步骤 1: 构建上下文
Search->>Context: build_context(query, ...)
Context->>Context: 生成查询嵌入
Context->>Vector: 向量相似度搜索 (top-k entities)
Vector-->>Context: 匹配的实体列表
Context->>Context: 扩展相关关系
Context->>Context: 获取相关文本单元
Context->>Context: 获取相关社区报告
Context->>Context: Token 预算分配<br/>(text_unit_prop, community_prop)
Context->>Context: 截断上下文以适应 token 限制
Context-->>Search: ContextResult(context_text, entities, ...)
Note over Search,LLM: 步骤 2: LLM 生成答案
Search->>Search: 构建系统提示词 + 上下文 + 查询
alt 流式模式
loop 流式生成 token
Search->>LLM: 调用 LLM (streaming=true)
LLM-->>Search: token chunk
Search-->>API: yield token chunk
API-->>User: 流式输出 token
end
else 非流式模式
Search->>LLM: 调用 LLM (streaming=false)
LLM-->>Search: 完整响应文本
Search-->>API: 响应文本 + 上下文数据
API-->>User: 响应结果
end
3.1 查询流程说明
3.1.1 查询类型
Local Search(局部搜索):
- 适用场景:特定实体或概念的详细信息查询
- 上下文来源:相关实体、关系、文本单元、社区报告
- 上下文构建:基于实体嵌入的向量相似度检索
Global Search(全局搜索):
- 适用场景:高层次、宏观的主题性问题
- 上下文来源:社区报告(按层级选择)
- 执行模式:Map-Reduce(并行 Map 每个社区报告,Reduce 合并答案)
DRIFT Search(混合搜索):
- 适用场景:需要结合局部细节和全局视角的问题
- 执行模式:先局部搜索,根据结果动态选择社区,再执行全局 Reduce
Basic Search(基础搜索):
- 适用场景:简单的文本相似度检索
- 上下文来源:仅文本单元(text_units)
- 无图结构信息
3.1.2 上下文构建策略
Token 预算分配:
max_context_tokens:总上下文 token 限制(默认 5000)text_unit_prop:分配给文本单元的比例(默认 0.5)community_prop:分配给社区报告的比例(默认 0.1)
实体选择:
top_k_mapped_entities:向量检索返回的实体数量(默认 10)top_k_relationships:扩展的关系数量(默认 10)
排序与优先级:
- 实体按向量相似度排序
- 关系按权重排序
- 文本单元按关联度排序
- 社区报告按排名排序
4. 模块边界与交互矩阵
4.1 模块清单
| 序号 | 模块名称 | 目录路径 | 职责 | 对外 API |
|---|---|---|---|---|
| 01 | API | graphrag/api/ |
对外编程接口 | build_index, global_search, local_search, drift_search, basic_search, generate_indexing_prompts |
| 02 | CLI | graphrag/cli/ |
命令行接口 | 命令:init, index, update, query, prompt-tune |
| 03 | Config | graphrag/config/ |
配置系统 | load_config, create_graphrag_config |
| 04 | Data Model | graphrag/data_model/ |
数据模型定义 | Entity, Relationship, Community, CommunityReport, TextUnit, Document, Covariate |
| 05 | Index | graphrag/index/ |
索引构建引擎 | run_pipeline, PipelineFactory |
| 06 | Query | graphrag/query/ |
查询引擎 | get_local_search_engine, get_global_search_engine, get_drift_search_engine, get_basic_search_engine |
| 07 | Language Model | graphrag/language_model/ |
LLM 调用封装 | ModelManager, ChatModel, EmbeddingModel |
| 08 | Storage | graphrag/storage/ |
存储抽象层 | PipelineStorage, FilePipelineStorage, BlobPipelineStorage, CosmosDBPipelineStorage |
| 09 | Cache | graphrag/cache/ |
LLM 缓存 | PipelineCache, JsonPipelineCache, MemoryPipelineCache |
| 10 | Vector Stores | graphrag/vector_stores/ |
向量存储 | BaseVectorStore, AzureAISearch, LanceDB, CosmosDB |
| 11 | Callbacks | graphrag/callbacks/ |
回调机制 | WorkflowCallbacks, QueryCallbacks |
| 12 | Logger | graphrag/logger/ |
日志系统 | init_loggers, BlobWorkflowLogger |
| 13 | Prompt Tune | graphrag/prompt_tune/ |
提示词调优 | 生成定制化提示词 |
| 14 | Prompts | graphrag/prompts/ |
提示词模板 | 默认提示词模板 |
| 15 | Tokenizer | graphrag/tokenizer/ |
分词器 | get_tokenizer, TiktokenTokenizer |
| 16 | Factory | graphrag/factory/ |
工厂模式 | 统一对象创建接口 |
4.2 模块交互矩阵
| 调用方 → 被调方 | Config | Data Model | Storage | Cache | Vector Stores | LLM | Index | Query | Callbacks | Logger |
|---|---|---|---|---|---|---|---|---|---|---|
| API | 读取 | 转换 | 读取 | - | - | - | 调用 | 调用 | 传递 | 初始化 |
| CLI | 加载 | - | - | - | - | - | 通过 API | 通过 API | 创建 | 初始化 |
| Index | 读取 | 写入 | 读写 | 读写 | 写入 | 调用 | - | - | 触发 | 记录 |
| Query | 读取 | 读取 | 读取 | - | 查询 | 调用 | - | - | 触发 | - |
| LLM | 读取 | - | - | 读写 | - | - | - | - | 触发 | - |
| Storage | 读取路径 | - | - | - | - | - | - | - | - | - |
| Cache | 读取 | - | 可选存储 | - | - | - | - | - | - | - |
交互说明:
- 同步调用:API → Index, API → Query, Index → LLM, Query → LLM
- 异步消息:无(项目内无消息队列)
- 共享存储:Index 和 Query 通过 Parquet 文件共享数据
- 订阅/发布:Callbacks 机制(观察者模式)
4.3 数据一致性与错误语义
数据一致性:
- 最终一致性:索引构建完成后,Parquet 文件与 Vector Store 达到一致
- 无跨模块事务:每个工作流独立提交
错误语义:
- LLM 调用失败:返回
None或抛出LLMException,由调用方决定重试或跳过 - 存储写入失败:直接抛出异常,中断工作流
- 查询失败:返回错误信息,不影响其他查询
5. 关键设计与权衡
5.1 数据一致性
设计选择:
- 采用最终一致性模型,索引构建和查询分离
- 索引输出为不可变 Parquet 文件,避免并发写入冲突
权衡:
- ✅ 优点:简化系统设计,无需分布式锁
- ❌ 缺点:无法实时更新,需要增量索引模式
5.2 事务边界
设计选择:
- 无跨工作流事务,每个工作流独立提交数据
- Vector Store 和 Parquet 文件分别写入,不保证原子性
权衡:
- ✅ 优点:工作流失败不影响其他工作流,便于部分重试
- ❌ 缺点:Vector Store 和文件数据可能短暂不一致
5.3 并发策略
设计选择:
- LLM 调用通过
asyncio.Semaphore控制并发度(默认 25) - 工作流串行执行,工作流内部操作并发执行
权衡:
- ✅ 优点:避免 LLM API 速率限制,充分利用并发能力
- ❌ 缺点:内存占用随并发度线性增长
5.4 缓存策略
设计选择:
- LLM 调用结果缓存到文件或内存
- 缓存键基于 LLM 参数和输入内容的哈希
权衡:
- ✅ 优点:大幅减少重复 LLM 调用,降低成本
- ❌ 缺点:缓存失效策略简单,无法自动检测模型更新
5.5 图算法选择
设计选择:
- 社区发现采用 Leiden 算法(通过
graspologic库) - 图布局采用 UMAP 降维(可选)
权衡:
- ✅ 优点:Leiden 算法在大规模图上性能优秀,支持层级社区
- ❌ 缺点:参数调优复杂,结果可能不稳定
6. 性能关键路径与可观测性
6.1 性能关键路径
索引构建:
-
实体抽取(最耗时):
- 瓶颈:LLM API 调用延迟
- 优化:并发调用、缓存、批处理
-
社区发现:
- 瓶颈:图算法计算复杂度 O(E log V)
- 优化:限制图规模、调整分辨率参数
-
嵌入计算:
- 瓶颈:嵌入 API 调用延迟
- 优化:批量调用(batch size)、缓存
查询执行:
-
向量检索:
- 瓶颈:向量相似度计算
- 优化:使用专用向量数据库(LanceDB、Azure AI Search)
-
上下文构建:
- 瓶颈:数据关联与 token 计数
- 优化:预计算 token 数、缓存 tokenizer
-
LLM 生成:
- 瓶颈:LLM 推理延迟
- 优化:流式生成、模型选择(gpt-4o-mini)
6.2 性能指标与阈值建议
| 指标 | 定义 | 建议阈值 | 监控方法 |
|---|---|---|---|
| 索引构建时长 | 从开始到完成的总时间 | < 1小时 (1000 documents) | 工作流 Callbacks 统计 |
| LLM 调用 P95 延迟 | LLM API 95 分位延迟 | < 5秒 | LLM Provider 日志 |
| 缓存命中率 | 缓存命中次数 / 总请求 | > 60% | Cache Layer 统计 |
| 查询响应时间 P95 | 查询完成 95 分位时间 | < 10秒 (Local), < 30秒 (Global) | Query Callbacks |
| 向量检索延迟 | 向量 top-k 搜索时间 | < 100ms | Vector Store 内部统计 |
| 内存峰值 | 索引构建内存占用 | < 16GB (10K documents) | 系统监控 |
6.3 可观测性指标
日志:
- 工作流启动/完成/失败事件
- LLM 调用请求/响应/错误
- 缓存命中/未命中
- 存储读写操作
Callbacks 事件:
on_workflow_start(workflow_name)on_workflow_end(workflow_name, result)on_error(workflow_name, error)on_llm_call(prompt, response, tokens)
输出统计(PipelineRunResult):
total_runtime:总运行时间num_documents:处理的文档数num_text_units:生成的文本单元数num_entities:抽取的实体数num_relationships:抽取的关系数num_communities:发现的社区数errors:错误列表
7. 配置项与可变参数
7.1 核心配置项
| 配置项 | 默认值 | 影响 | 建议 |
|---|---|---|---|
| chunks.size | 300 | 文本单元大小 | 300-600 (取决于文档类型) |
| chunks.overlap | 100 | 分块重叠 | 100-150 |
| extract_graph.concurrent_requests | 25 | LLM 并发度 | 根据 API 速率限制调整 |
| extract_graph.entity_types | 多种类型 | 抽取的实体类型 | 根据领域定制 |
| cluster_graph.max_cluster_size | 10 | 社区最大规模 | 10-50 |
| local_search.max_context_tokens | 5000 | 上下文 token 限制 | 根据模型上下文窗口调整 |
| local_search.top_k_entities | 10 | 检索实体数 | 10-20 |
| cache.type | file | 缓存类型 | 生产环境使用 file |
| storage.type | file | 存储类型 | 云端使用 blob |
| vector_store.{name}.container_name | - | 向量存储容器 | 根据后端配置 |
7.2 模型选择
| 场景 | 推荐模型 | Token 限制 | 成本 |
|---|---|---|---|
| 实体抽取 | gpt-4o-mini, gpt-4-turbo | 输入: 2K, 输出: 1K | 中等 |
| 社区报告 | gpt-4o-mini | 输入: 8K, 输出: 2K | 中等 |
| 查询生成 | gpt-4o-mini, gpt-4-turbo | 输入: 10K, 输出: 2K | 高 |
| 嵌入 | text-embedding-3-small | 输入: 8K | 低 |
8. 模块依赖关系图
graph TB
API --> Index
API --> Query
API --> Config
CLI --> API
CLI --> Config
Index --> Workflows
Index --> Storage
Index --> Cache
Index --> LLM
Index --> DataModel
Index --> Callbacks
Query --> SearchEngines
Query --> ContextBuilder
Query --> VectorStores
Query --> LLM
Query --> DataModel
Workflows --> Operations
Operations --> LLM
Operations --> Cache
SearchEngines --> ContextBuilder
ContextBuilder --> VectorStores
LLM --> Tokenizer
LLM --> Cache
Config --> EnvReader
Config --> Validation
Storage --> ConfigPaths
Cache --> Storage
VectorStores --> Storage
Logger -.日志.-> 所有模块
Factory -.创建.-> Storage
Factory -.创建.-> Cache
Factory -.创建.-> VectorStores
9. 典型使用场景
9.1 场景 1:初始化项目并构建索引
import asyncio
from graphrag.config import load_config
from graphrag.api import build_index
# 1. 初始化项目(CLI)
# 命令: graphrag init --root ./my_project
# 2. 准备输入数据
# 将文档放入 ./my_project/input/
# 3. 加载配置
config = load_config(
root_dir="./my_project",
config_filepath="./my_project/settings.yaml"
)
# 4. 构建索引
async def main():
results = await build_index(
config=config,
method="standard", # 或 "fast"
verbose=True
)
for result in results:
print(f"Workflow: {result.workflow}")
print(f"Runtime: {result.total_runtime}s")
if result.errors:
print(f"Errors: {result.errors}")
asyncio.run(main())
9.2 场景 2:执行局部搜索查询
import asyncio
import pandas as pd
from graphrag.config import load_config
from graphrag.api import local_search
# 1. 加载配置
config = load_config(root_dir="./my_project")
# 2. 加载索引数据
entities = pd.read_parquet("./my_project/output/entities.parquet")
relationships = pd.read_parquet("./my_project/output/relationships.parquet")
text_units = pd.read_parquet("./my_project/output/text_units.parquet")
communities = pd.read_parquet("./my_project/output/communities.parquet")
community_reports = pd.read_parquet("./my_project/output/community_reports.parquet")
covariates = None # 可选
# 3. 执行查询
async def main():
response, context = await local_search(
config=config,
entities=entities,
relationships=relationships,
text_units=text_units,
communities=communities,
community_reports=community_reports,
covariates=covariates,
community_level=2,
response_type="Multiple Paragraphs",
query="What are the main themes in the documents?"
)
print("Response:", response)
print("Context entities:", len(context.get("entities", [])))
asyncio.run(main())
9.3 场景 3:增量更新索引
import asyncio
from graphrag.config import load_config
from graphrag.api import build_index
# 1. 将新文档添加到 ./my_project/input/
# 2. 加载配置
config = load_config(root_dir="./my_project")
# 3. 执行增量更新
async def main():
results = await build_index(
config=config,
method="standard",
is_update_run=True, # 关键参数
verbose=True
)
print("Update completed")
asyncio.run(main())
10. 系统级关键场景
10.1 场景:冷启动(首次索引构建)
前置条件:
- 项目已初始化(
graphrag init) - 输入文档已就位
- 配置文件已调整
执行步骤:
- 加载配置 → 验证配置项 → 初始化日志
- 创建 Pipeline(Standard 或 Fast 模式)
- 依次执行工作流:
- 加载文档 → 文本分块 → 实体抽取 → 关系抽取 → 社区发现 → 报告生成 → 嵌入计算
- 输出 Parquet 文件到
output/目录 - 向量嵌入写入 Vector Store
约束与风险:
- 时间:与文档数量和 LLM API 延迟正相关
- 成本:大量 LLM 调用(实体抽取、报告生成)
- 风险:LLM API 速率限制导致失败
10.2 场景:峰值压测(高并发查询)
前置条件:
- 索引已构建完成
- Vector Store 已就绪
执行步骤:
- 多个客户端并发发起查询请求
- 每个查询独立执行:
- 向量检索 → 上下文构建 → LLM 生成
- 返回查询结果
约束与风险:
- 瓶颈:LLM API 速率限制
- 优化:使用流式生成降低感知延迟
- 风险:Vector Store 查询延迟增加
10.3 场景:异常恢复(LLM 调用失败)
异常类型:
- LLM API 超时
- LLM API 速率限制(429 错误)
- LLM API 服务不可用(500 错误)
恢复策略:
- 重试:指数退避,最多 10 次
- 缓存:已成功的调用结果被缓存,重启后跳过
- 降级:部分失败不中断整体流程,记录错误后继续
数据一致性:
- 部分失败的工作流不会污染已成功的输出
- 下次运行会重新处理失败的部分