GraphRAG-00-总览

0. 摘要

0.1 项目目标

GraphRAG 是由 Microsoft 开发的基于图的检索增强生成(Retrieval-Augmented Generation)系统。项目核心目标包括:

  • 从非结构化文本中抽取结构化知识图谱(实体、关系、社区)
  • 基于图结构提供多种查询模式(全局搜索、局部搜索、DRIFT 搜索、基础搜索)
  • 支持大规模文档集的索引与增量更新
  • 提供灵活的配置系统与多种 LLM 提供商集成

0.2 核心能力边界

核心能力

  • 索引构建:文本分块、实体抽取、关系抽取、社区发现、社区报告生成、文本嵌入
  • 查询引擎:全局搜索(Global Search)、局部搜索(Local Search)、DRIFT 搜索、基础搜索(Basic Search)
  • 增量更新:支持基于现有索引的增量更新,避免全量重建
  • 多模型支持:支持 OpenAI、Azure OpenAI、LiteLLM 等多种 LLM 提供商
  • 向量存储:支持 Azure AI Search、LanceDB、CosmosDB 等向量数据库
  • 提示词调优:自动根据输入数据生成定制化提示词

非目标

  • 不提供实时流式数据处理
  • 不提供分布式并行计算(索引构建在单机异步执行)
  • 不提供图数据库持久化(输出为 Parquet 文件)
  • 不提供 Web UI 界面(仅提供 CLI 和 Python API)

0.3 运行环境

  • 语言:Python 3.10-3.12

  • 运行时:asyncio 异步执行框架

  • 核心依赖

    • LLM 调用:fnllmopenailitellm
    • 图计算:networkxgraspologic
    • 数据处理:pandaspyarrow
    • 向量计算:numpyumap-learn
    • 自然语言处理:nltkspacytiktoken
    • 向量存储:azure-search-documentslancedbazure-cosmos
  • 部署形态

    • 单体应用:通过 CLI 或 Python API 调用
    • 索引输出:Parquet 文件存储在本地文件系统、Azure Blob Storage 或 CosmosDB
    • 查询服务:可集成到用户应用中作为库使用

1. 整体架构图

flowchart TB
    subgraph CLI["命令行入口层"]
        Init[graphrag init]
        Index[graphrag index]
        Update[graphrag update]
        Query[graphrag query]
        PromptTune[graphrag prompt-tune]
    end

    subgraph API["API 层"]
        IndexAPI[build_index]
        QueryAPI[global_search<br/>local_search<br/>drift_search<br/>basic_search]
        PromptAPI[generate_indexing_prompts]
    end

    subgraph Core["核心业务层"]
        direction TB
        
        subgraph IndexEngine["索引引擎"]
            Pipeline[Pipeline Factory]
            Workflows[Workflows]
            Operations[Operations]
        end
        
        subgraph QueryEngine["查询引擎"]
            GlobalSearch[Global Search]
            LocalSearch[Local Search]
            DriftSearch[DRIFT Search]
            BasicSearch[Basic Search]
        end
        
        subgraph ConfigSystem["配置系统"]
            ConfigLoader[Config Loader]
            Validation[Validation]
        end
    end

    subgraph DataLayer["数据层"]
        DataModel[Data Model<br/>Entity/Relationship/Community]
        Storage[Storage<br/>File/Blob/CosmosDB]
        Cache[Cache<br/>File/JSON/Memory]
        VectorStore[Vector Store<br/>Azure AI/LanceDB/CosmosDB]
    end

    subgraph AI["AI 服务层"]
        LLM[Language Model<br/>OpenAI/Azure/LiteLLM]
        Embedding[Embedding Model]
        Tokenizer[Tokenizer]
        PromptSystem[Prompt System]
    end

    subgraph Support["支撑层"]
        Callbacks[Callbacks]
        Logger[Logger]
        Factory[Factory]
    end

    CLI --> API
    API --> Core
    Core --> DataLayer
    Core --> AI
    Core --> Support
    IndexEngine --> Workflows
    Workflows --> Operations
    QueryEngine -.查询.-> DataModel
    IndexEngine -.写入.-> DataModel
    Operations --> LLM
    Operations --> Embedding
    QueryEngine --> LLM
    QueryEngine --> VectorStore
    ConfigSystem --> ConfigLoader

1.1 架构图说明

1.1.1 组件职责

命令行入口层(CLI)

  • graphrag init:初始化项目,生成默认配置文件和目录结构
  • graphrag index:构建知识图谱索引
  • graphrag update:增量更新现有索引
  • graphrag query:执行查询操作
  • graphrag prompt-tune:根据数据自动生成定制化提示词

API 层

  • 提供编程接口供外部应用集成
  • build_index:异步索引构建接口,返回 PipelineRunResult 列表
  • 查询接口:支持流式和非流式两种模式
  • generate_indexing_prompts:提示词调优接口

核心业务层

  • 索引引擎

    • Pipeline Factory:根据配置生成工作流管道(Standard/Fast/Update 模式)
    • Workflows:工作流定义(如 extract_graphcreate_communitiesgenerate_text_embeddings
    • Operations:原子操作(如 LLM 抽取、图算法、嵌入计算)
  • 查询引擎

    • Global Search:Map-Reduce 模式,适合高层次宏观问题
    • Local Search:基于实体和关系的局部上下文检索
    • DRIFT Search:结合局部和全局策略的混合搜索
    • Basic Search:基于向量相似度的简单检索
  • 配置系统

    • 支持 YAML 格式配置文件
    • 环境变量注入(通过 .env 文件)
    • CLI 参数覆盖
    • Pydantic 模型校验

数据层

  • Data Model:定义核心数据结构(Entity、Relationship、Community、TextUnit、CommunityReport、Covariate、Document)
  • Storage:抽象存储接口,支持本地文件系统、Azure Blob Storage、CosmosDB
  • Cache:LLM 调用缓存,减少重复请求(支持文件缓存、JSON 缓存、内存缓存)
  • Vector Store:向量嵌入存储与检索

AI 服务层

  • Language Model:统一的 LLM 调用接口,支持多种提供商
  • Embedding Model:文本嵌入模型
  • Tokenizer:Token 计数与文本切分
  • Prompt System:提示词模板管理

支撑层

  • Callbacks:工作流生命周期回调(进度、错误、日志)
  • Logger:日志记录(标准输出、文件、Blob)
  • Factory:工厂模式封装对象创建

1.1.2 数据流与控制流

索引构建数据流

原始文档 → 文本分块 → 实体抽取 → 关系抽取 → 图合并 → 社区发现 → 社区报告生成 → 嵌入计算 → Parquet 文件

查询执行数据流

用户查询 → 查询嵌入 → 向量检索 → 上下文构建 → LLM 生成 → 响应返回

控制流特征

  • 索引构建:异步管道执行,支持并发 LLM 调用
  • 查询执行:异步流式生成(可选)
  • 错误处理:每个工作流独立错误捕获,不中断整体流程
  • 回调机制:通过 Callbacks 接口通知外部系统

1.1.3 扩展性与状态管理

扩展性

  • 通过自定义 Workflow 扩展索引流程
  • 通过自定义 Operation 添加新的数据处理逻辑
  • 通过 Factory 模式注册新的存储后端
  • 通过 LLM Provider 接口集成新的模型提供商

状态管理

  • 索引状态:存储在 Parquet 文件中,无内存状态
  • 查询状态:无状态设计,每次查询独立执行
  • 增量更新:读取历史索引,合并新数据,输出到新目录

1.1.4 高可用与容错

高可用

  • 无中心化服务,索引输出可分发到多个查询节点
  • 存储层支持 Azure Blob Storage 和 CosmosDB 的高可用特性

容错

  • LLM 调用失败:支持重试策略(指数退避)
  • 工作流失败:记录错误但继续执行其他工作流
  • 缓存机制:减少 LLM 调用失败的影响

2. 全局时序图(索引构建主流程)

sequenceDiagram
    autonumber
    participant User as 用户/CLI
    participant API as API (build_index)
    participant Factory as Pipeline Factory
    participant Pipeline as Pipeline Runner
    participant WF_Load as Workflow: Load Documents
    participant WF_Extract as Workflow: Extract Graph
    participant WF_Community as Workflow: Create Communities
    participant WF_Report as Workflow: Community Reports
    participant WF_Embed as Workflow: Generate Embeddings
    participant LLM as LLM Service
    participant Storage as Storage Layer
    participant Cache as Cache Layer

    User->>API: build_index(config, method)
    API->>Factory: create_pipeline(config, method)
    Factory-->>API: Pipeline(workflows)
    
    API->>Pipeline: run_pipeline(pipeline, config, callbacks)
    
    Note over Pipeline: 工作流 1: 加载输入文档
    Pipeline->>WF_Load: run_workflow(config, context)
    WF_Load->>Storage: 读取原始文档 (CSV/Text/JSON)
    Storage-->>WF_Load: DataFrame(documents)
    WF_Load->>WF_Load: 文本分块 (text_splitting)
    WF_Load->>Storage: 写入 text_units.parquet
    WF_Load-->>Pipeline: text_units DataFrame
    
    Note over Pipeline: 工作流 2: 实体和关系抽取
    Pipeline->>WF_Extract: run_workflow(config, context)
    WF_Extract->>Storage: 读取 text_units.parquet
    Storage-->>WF_Extract: text_units DataFrame
    
    loop 每个 text_unit(异步并发)
        WF_Extract->>Cache: 检查缓存
        alt 缓存命中
            Cache-->>WF_Extract: 缓存的抽取结果
        else 缓存未命中
            WF_Extract->>LLM: 调用实体抽取提示词
            LLM-->>WF_Extract: 实体和关系列表 (JSON)
            WF_Extract->>Cache: 存储到缓存
        end
    end
    
    WF_Extract->>WF_Extract: 合并和去重实体/关系
    WF_Extract->>Storage: 写入 entities.parquet, relationships.parquet
    WF_Extract-->>Pipeline: entities, relationships
    
    Note over Pipeline: 工作流 3: 社区发现
    Pipeline->>WF_Community: run_workflow(config, context)
    WF_Community->>Storage: 读取 entities.parquet, relationships.parquet
    Storage-->>WF_Community: 图数据
    WF_Community->>WF_Community: Leiden 算法聚类
    WF_Community->>Storage: 写入 communities.parquet
    WF_Community-->>Pipeline: communities
    
    Note over Pipeline: 工作流 4: 社区报告生成
    Pipeline->>WF_Report: run_workflow(config, context)
    WF_Report->>Storage: 读取 communities.parquet
    Storage-->>WF_Report: communities DataFrame
    
    loop 每个社区(异步并发)
        WF_Report->>Cache: 检查缓存
        alt 缓存命中
            Cache-->>WF_Report: 缓存的报告
        else 缓存未命中
            WF_Report->>LLM: 调用社区报告生成提示词
            LLM-->>WF_Report: 社区报告 (JSON)
            WF_Report->>Cache: 存储到缓存
        end
    end
    
    WF_Report->>Storage: 写入 community_reports.parquet
    WF_Report-->>Pipeline: community_reports
    
    Note over Pipeline: 工作流 5: 文本嵌入
    Pipeline->>WF_Embed: run_workflow(config, context)
    WF_Embed->>Storage: 读取待嵌入数据
    Storage-->>WF_Embed: 文本列表
    
    loop 每个文本批次
        WF_Embed->>Cache: 检查缓存
        alt 缓存命中
            Cache-->>WF_Embed: 缓存的嵌入向量
        else 缓存未命中
            WF_Embed->>LLM: 调用嵌入 API
            LLM-->>WF_Embed: 嵌入向量数组
            WF_Embed->>Cache: 存储到缓存
        end
    end
    
    WF_Embed->>Storage: 写入嵌入列到 Parquet
    WF_Embed-->>Pipeline: 嵌入完成
    
    Pipeline-->>API: List[PipelineRunResult]
    API-->>User: 索引构建完成

2.1 时序图说明

2.1.1 入口与鉴权

  • 入口:用户通过 CLI(graphrag index)或直接调用 graphrag.api.build_index 进入
  • 鉴权:无内置鉴权机制,由用户在配置中提供 API Key(通过环境变量或配置文件)
  • 配置加载:读取 YAML 配置文件,解析环境变量,构建 GraphRagConfig 对象

2.1.2 幂等性

  • LLM 调用幂等:通过缓存机制实现,相同输入返回缓存结果
  • 工作流幂等:每次运行完全重新计算,无状态累积
  • 增量更新模式:读取历史索引,仅处理新文档,最后合并输出

2.1.3 回退策略

  • LLM 调用失败

    • 重试机制:指数退避,最多重试 10 次
    • 降级策略:若 LLM 提取失败,记录错误但继续处理其他文本单元
  • 工作流失败

    • 独立错误捕获:每个工作流的错误不影响其他工作流
    • 错误聚合:最终返回所有工作流的错误列表
  • 存储失败

    • Blob Storage 重试:Azure SDK 内置重试
    • 本地文件系统:不重试,直接报错

2.1.4 重试点

  • LLM API 调用:在 language_model 层实现重试
  • 向量存储写入:在 vector_store 层实现重试
  • Blob Storage 读写:在 storage 层依赖 Azure SDK 重试

2.1.5 超时设定

  • LLM 调用超时:默认 180 秒(可配置 request_timeout
  • 工作流超时:无整体超时限制(依赖 LLM 调用超时累积)
  • 向量存储操作超时:依赖各存储后端的默认设置

2.1.6 资源上界

  • 并发 LLM 请求:通过 concurrent_requests 配置(默认 25)
  • 内存使用:由 Pandas DataFrame 大小决定,取决于文档数量和文本长度
  • 文件 I/O:批量写入 Parquet 文件,单次写入大小无限制

3. 全局时序图(查询执行流程)

sequenceDiagram
    autonumber
    participant User as 用户/CLI
    participant API as Query API
    participant Factory as Search Factory
    participant Search as Search Engine
    participant Context as Context Builder
    participant Vector as Vector Store
    participant LLM as LLM Service
    participant Storage as Storage Layer

    User->>API: local_search(config, query, ...)
    API->>Storage: 读取索引文件<br/>(entities/relationships/text_units/...)
    Storage-->>API: DataFrames
    
    API->>API: 数据模型转换<br/>(DataFrame → Entity/Relationship objects)
    
    API->>Factory: get_local_search_engine(config, data...)
    Factory->>Factory: 创建 LLM 实例
    Factory->>Factory: 创建 Embedding 实例
    Factory->>Factory: 创建 Tokenizer 实例
    Factory->>Factory: 创建 Context Builder 实例
    Factory-->>API: LocalSearch 实例
    
    API->>Search: stream_search(query)
    
    Note over Search,Context: 步骤 1: 构建上下文
    Search->>Context: build_context(query, ...)
    Context->>Context: 生成查询嵌入
    Context->>Vector: 向量相似度搜索 (top-k entities)
    Vector-->>Context: 匹配的实体列表
    
    Context->>Context: 扩展相关关系
    Context->>Context: 获取相关文本单元
    Context->>Context: 获取相关社区报告
    
    Context->>Context: Token 预算分配<br/>(text_unit_prop, community_prop)
    Context->>Context: 截断上下文以适应 token 限制
    Context-->>Search: ContextResult(context_text, entities, ...)
    
    Note over Search,LLM: 步骤 2: LLM 生成答案
    Search->>Search: 构建系统提示词 + 上下文 + 查询
    
    alt 流式模式
        loop 流式生成 token
            Search->>LLM: 调用 LLM (streaming=true)
            LLM-->>Search: token chunk
            Search-->>API: yield token chunk
            API-->>User: 流式输出 token
        end
    else 非流式模式
        Search->>LLM: 调用 LLM (streaming=false)
        LLM-->>Search: 完整响应文本
        Search-->>API: 响应文本 + 上下文数据
        API-->>User: 响应结果
    end

3.1 查询流程说明

3.1.1 查询类型

Local Search(局部搜索)

  • 适用场景:特定实体或概念的详细信息查询
  • 上下文来源:相关实体、关系、文本单元、社区报告
  • 上下文构建:基于实体嵌入的向量相似度检索

Global Search(全局搜索)

  • 适用场景:高层次、宏观的主题性问题
  • 上下文来源:社区报告(按层级选择)
  • 执行模式:Map-Reduce(并行 Map 每个社区报告,Reduce 合并答案)

DRIFT Search(混合搜索)

  • 适用场景:需要结合局部细节和全局视角的问题
  • 执行模式:先局部搜索,根据结果动态选择社区,再执行全局 Reduce

Basic Search(基础搜索)

  • 适用场景:简单的文本相似度检索
  • 上下文来源:仅文本单元(text_units)
  • 无图结构信息

3.1.2 上下文构建策略

Token 预算分配

  • max_context_tokens:总上下文 token 限制(默认 5000)
  • text_unit_prop:分配给文本单元的比例(默认 0.5)
  • community_prop:分配给社区报告的比例(默认 0.1)

实体选择

  • top_k_mapped_entities:向量检索返回的实体数量(默认 10)
  • top_k_relationships:扩展的关系数量(默认 10)

排序与优先级

  • 实体按向量相似度排序
  • 关系按权重排序
  • 文本单元按关联度排序
  • 社区报告按排名排序

4. 模块边界与交互矩阵

4.1 模块清单

序号 模块名称 目录路径 职责 对外 API
01 API graphrag/api/ 对外编程接口 build_index, global_search, local_search, drift_search, basic_search, generate_indexing_prompts
02 CLI graphrag/cli/ 命令行接口 命令:init, index, update, query, prompt-tune
03 Config graphrag/config/ 配置系统 load_config, create_graphrag_config
04 Data Model graphrag/data_model/ 数据模型定义 Entity, Relationship, Community, CommunityReport, TextUnit, Document, Covariate
05 Index graphrag/index/ 索引构建引擎 run_pipeline, PipelineFactory
06 Query graphrag/query/ 查询引擎 get_local_search_engine, get_global_search_engine, get_drift_search_engine, get_basic_search_engine
07 Language Model graphrag/language_model/ LLM 调用封装 ModelManager, ChatModel, EmbeddingModel
08 Storage graphrag/storage/ 存储抽象层 PipelineStorage, FilePipelineStorage, BlobPipelineStorage, CosmosDBPipelineStorage
09 Cache graphrag/cache/ LLM 缓存 PipelineCache, JsonPipelineCache, MemoryPipelineCache
10 Vector Stores graphrag/vector_stores/ 向量存储 BaseVectorStore, AzureAISearch, LanceDB, CosmosDB
11 Callbacks graphrag/callbacks/ 回调机制 WorkflowCallbacks, QueryCallbacks
12 Logger graphrag/logger/ 日志系统 init_loggers, BlobWorkflowLogger
13 Prompt Tune graphrag/prompt_tune/ 提示词调优 生成定制化提示词
14 Prompts graphrag/prompts/ 提示词模板 默认提示词模板
15 Tokenizer graphrag/tokenizer/ 分词器 get_tokenizer, TiktokenTokenizer
16 Factory graphrag/factory/ 工厂模式 统一对象创建接口

4.2 模块交互矩阵

调用方 → 被调方 Config Data Model Storage Cache Vector Stores LLM Index Query Callbacks Logger
API 读取 转换 读取 - - - 调用 调用 传递 初始化
CLI 加载 - - - - - 通过 API 通过 API 创建 初始化
Index 读取 写入 读写 读写 写入 调用 - - 触发 记录
Query 读取 读取 读取 - 查询 调用 - - 触发 -
LLM 读取 - - 读写 - - - - 触发 -
Storage 读取路径 - - - - - - - - -
Cache 读取 - 可选存储 - - - - - - -

交互说明

  • 同步调用:API → Index, API → Query, Index → LLM, Query → LLM
  • 异步消息:无(项目内无消息队列)
  • 共享存储:Index 和 Query 通过 Parquet 文件共享数据
  • 订阅/发布:Callbacks 机制(观察者模式)

4.3 数据一致性与错误语义

数据一致性

  • 最终一致性:索引构建完成后,Parquet 文件与 Vector Store 达到一致
  • 无跨模块事务:每个工作流独立提交

错误语义

  • LLM 调用失败:返回 None 或抛出 LLMException,由调用方决定重试或跳过
  • 存储写入失败:直接抛出异常,中断工作流
  • 查询失败:返回错误信息,不影响其他查询

5. 关键设计与权衡

5.1 数据一致性

设计选择

  • 采用最终一致性模型,索引构建和查询分离
  • 索引输出为不可变 Parquet 文件,避免并发写入冲突

权衡

  • ✅ 优点:简化系统设计,无需分布式锁
  • ❌ 缺点:无法实时更新,需要增量索引模式

5.2 事务边界

设计选择

  • 无跨工作流事务,每个工作流独立提交数据
  • Vector Store 和 Parquet 文件分别写入,不保证原子性

权衡

  • ✅ 优点:工作流失败不影响其他工作流,便于部分重试
  • ❌ 缺点:Vector Store 和文件数据可能短暂不一致

5.3 并发策略

设计选择

  • LLM 调用通过 asyncio.Semaphore 控制并发度(默认 25)
  • 工作流串行执行,工作流内部操作并发执行

权衡

  • ✅ 优点:避免 LLM API 速率限制,充分利用并发能力
  • ❌ 缺点:内存占用随并发度线性增长

5.4 缓存策略

设计选择

  • LLM 调用结果缓存到文件或内存
  • 缓存键基于 LLM 参数和输入内容的哈希

权衡

  • ✅ 优点:大幅减少重复 LLM 调用,降低成本
  • ❌ 缺点:缓存失效策略简单,无法自动检测模型更新

5.5 图算法选择

设计选择

  • 社区发现采用 Leiden 算法(通过 graspologic 库)
  • 图布局采用 UMAP 降维(可选)

权衡

  • ✅ 优点:Leiden 算法在大规模图上性能优秀,支持层级社区
  • ❌ 缺点:参数调优复杂,结果可能不稳定

6. 性能关键路径与可观测性

6.1 性能关键路径

索引构建

  1. 实体抽取(最耗时):

    • 瓶颈:LLM API 调用延迟
    • 优化:并发调用、缓存、批处理
  2. 社区发现

    • 瓶颈:图算法计算复杂度 O(E log V)
    • 优化:限制图规模、调整分辨率参数
  3. 嵌入计算

    • 瓶颈:嵌入 API 调用延迟
    • 优化:批量调用(batch size)、缓存

查询执行

  1. 向量检索

    • 瓶颈:向量相似度计算
    • 优化:使用专用向量数据库(LanceDB、Azure AI Search)
  2. 上下文构建

    • 瓶颈:数据关联与 token 计数
    • 优化:预计算 token 数、缓存 tokenizer
  3. LLM 生成

    • 瓶颈:LLM 推理延迟
    • 优化:流式生成、模型选择(gpt-4o-mini)

6.2 性能指标与阈值建议

指标 定义 建议阈值 监控方法
索引构建时长 从开始到完成的总时间 < 1小时 (1000 documents) 工作流 Callbacks 统计
LLM 调用 P95 延迟 LLM API 95 分位延迟 < 5秒 LLM Provider 日志
缓存命中率 缓存命中次数 / 总请求 > 60% Cache Layer 统计
查询响应时间 P95 查询完成 95 分位时间 < 10秒 (Local), < 30秒 (Global) Query Callbacks
向量检索延迟 向量 top-k 搜索时间 < 100ms Vector Store 内部统计
内存峰值 索引构建内存占用 < 16GB (10K documents) 系统监控

6.3 可观测性指标

日志

  • 工作流启动/完成/失败事件
  • LLM 调用请求/响应/错误
  • 缓存命中/未命中
  • 存储读写操作

Callbacks 事件

  • on_workflow_start(workflow_name)
  • on_workflow_end(workflow_name, result)
  • on_error(workflow_name, error)
  • on_llm_call(prompt, response, tokens)

输出统计PipelineRunResult):

  • total_runtime:总运行时间
  • num_documents:处理的文档数
  • num_text_units:生成的文本单元数
  • num_entities:抽取的实体数
  • num_relationships:抽取的关系数
  • num_communities:发现的社区数
  • errors:错误列表

7. 配置项与可变参数

7.1 核心配置项

配置项 默认值 影响 建议
chunks.size 300 文本单元大小 300-600 (取决于文档类型)
chunks.overlap 100 分块重叠 100-150
extract_graph.concurrent_requests 25 LLM 并发度 根据 API 速率限制调整
extract_graph.entity_types 多种类型 抽取的实体类型 根据领域定制
cluster_graph.max_cluster_size 10 社区最大规模 10-50
local_search.max_context_tokens 5000 上下文 token 限制 根据模型上下文窗口调整
local_search.top_k_entities 10 检索实体数 10-20
cache.type file 缓存类型 生产环境使用 file
storage.type file 存储类型 云端使用 blob
vector_store.{name}.container_name - 向量存储容器 根据后端配置

7.2 模型选择

场景 推荐模型 Token 限制 成本
实体抽取 gpt-4o-mini, gpt-4-turbo 输入: 2K, 输出: 1K 中等
社区报告 gpt-4o-mini 输入: 8K, 输出: 2K 中等
查询生成 gpt-4o-mini, gpt-4-turbo 输入: 10K, 输出: 2K
嵌入 text-embedding-3-small 输入: 8K

8. 模块依赖关系图

graph TB
    API --> Index
    API --> Query
    API --> Config
    
    CLI --> API
    CLI --> Config
    
    Index --> Workflows
    Index --> Storage
    Index --> Cache
    Index --> LLM
    Index --> DataModel
    Index --> Callbacks
    
    Query --> SearchEngines
    Query --> ContextBuilder
    Query --> VectorStores
    Query --> LLM
    Query --> DataModel
    
    Workflows --> Operations
    Operations --> LLM
    Operations --> Cache
    
    SearchEngines --> ContextBuilder
    ContextBuilder --> VectorStores
    
    LLM --> Tokenizer
    LLM --> Cache
    
    Config --> EnvReader
    Config --> Validation
    
    Storage --> ConfigPaths
    Cache --> Storage
    VectorStores --> Storage
    
    Logger -.日志.-> 所有模块
    Factory -.创建.-> Storage
    Factory -.创建.-> Cache
    Factory -.创建.-> VectorStores

9. 典型使用场景

9.1 场景 1:初始化项目并构建索引

import asyncio
from graphrag.config import load_config
from graphrag.api import build_index

# 1. 初始化项目(CLI)
# 命令: graphrag init --root ./my_project

# 2. 准备输入数据
# 将文档放入 ./my_project/input/

# 3. 加载配置
config = load_config(
    root_dir="./my_project",
    config_filepath="./my_project/settings.yaml"
)

# 4. 构建索引
async def main():
    results = await build_index(
        config=config,
        method="standard",  # 或 "fast"
        verbose=True
    )
    
    for result in results:
        print(f"Workflow: {result.workflow}")
        print(f"Runtime: {result.total_runtime}s")
        if result.errors:
            print(f"Errors: {result.errors}")

asyncio.run(main())

9.2 场景 2:执行局部搜索查询

import asyncio
import pandas as pd
from graphrag.config import load_config
from graphrag.api import local_search

# 1. 加载配置
config = load_config(root_dir="./my_project")

# 2. 加载索引数据
entities = pd.read_parquet("./my_project/output/entities.parquet")
relationships = pd.read_parquet("./my_project/output/relationships.parquet")
text_units = pd.read_parquet("./my_project/output/text_units.parquet")
communities = pd.read_parquet("./my_project/output/communities.parquet")
community_reports = pd.read_parquet("./my_project/output/community_reports.parquet")
covariates = None  # 可选

# 3. 执行查询
async def main():
    response, context = await local_search(
        config=config,
        entities=entities,
        relationships=relationships,
        text_units=text_units,
        communities=communities,
        community_reports=community_reports,
        covariates=covariates,
        community_level=2,
        response_type="Multiple Paragraphs",
        query="What are the main themes in the documents?"
    )
    
    print("Response:", response)
    print("Context entities:", len(context.get("entities", [])))

asyncio.run(main())

9.3 场景 3:增量更新索引

import asyncio
from graphrag.config import load_config
from graphrag.api import build_index

# 1. 将新文档添加到 ./my_project/input/

# 2. 加载配置
config = load_config(root_dir="./my_project")

# 3. 执行增量更新
async def main():
    results = await build_index(
        config=config,
        method="standard",
        is_update_run=True,  # 关键参数
        verbose=True
    )
    
    print("Update completed")

asyncio.run(main())

10. 系统级关键场景

10.1 场景:冷启动(首次索引构建)

前置条件

  • 项目已初始化(graphrag init
  • 输入文档已就位
  • 配置文件已调整

执行步骤

  1. 加载配置 → 验证配置项 → 初始化日志
  2. 创建 Pipeline(Standard 或 Fast 模式)
  3. 依次执行工作流:
    • 加载文档 → 文本分块 → 实体抽取 → 关系抽取 → 社区发现 → 报告生成 → 嵌入计算
  4. 输出 Parquet 文件到 output/ 目录
  5. 向量嵌入写入 Vector Store

约束与风险

  • 时间:与文档数量和 LLM API 延迟正相关
  • 成本:大量 LLM 调用(实体抽取、报告生成)
  • 风险:LLM API 速率限制导致失败

10.2 场景:峰值压测(高并发查询)

前置条件

  • 索引已构建完成
  • Vector Store 已就绪

执行步骤

  1. 多个客户端并发发起查询请求
  2. 每个查询独立执行:
    • 向量检索 → 上下文构建 → LLM 生成
  3. 返回查询结果

约束与风险

  • 瓶颈:LLM API 速率限制
  • 优化:使用流式生成降低感知延迟
  • 风险:Vector Store 查询延迟增加

10.3 场景:异常恢复(LLM 调用失败)

异常类型

  • LLM API 超时
  • LLM API 速率限制(429 错误)
  • LLM API 服务不可用(500 错误)

恢复策略

  1. 重试:指数退避,最多 10 次
  2. 缓存:已成功的调用结果被缓存,重启后跳过
  3. 降级:部分失败不中断整体流程,记录错误后继续

数据一致性

  • 部分失败的工作流不会污染已成功的输出
  • 下次运行会重新处理失败的部分